diff options
author | Vika <vika@fireburn.ru> | 2025-04-09 23:31:02 +0300 |
---|---|---|
committer | Vika <vika@fireburn.ru> | 2025-04-09 23:31:57 +0300 |
commit | 8826d9446e6c492db2243b9921e59ce496027bef (patch) | |
tree | 63738aa9001cb73b11cb0e974e93129bcdf1adbb /src/media | |
parent | 519cadfbb298f50cbf819dde757037ab56e2863e (diff) | |
download | kittybox-8826d9446e6c492db2243b9921e59ce496027bef.tar.zst |
cargo fmt
Change-Id: I80e81ebba3f0cdf8c094451c9fe3ee4126b8c888
Diffstat (limited to 'src/media')
-rw-r--r-- | src/media/mod.rs | 83 | ||||
-rw-r--r-- | src/media/storage/file.rs | 322 | ||||
-rw-r--r-- | src/media/storage/mod.rs | 169 |
3 files changed, 332 insertions, 242 deletions
diff --git a/src/media/mod.rs b/src/media/mod.rs index 6f263b6..7e52414 100644 --- a/src/media/mod.rs +++ b/src/media/mod.rs @@ -1,22 +1,23 @@ +use crate::indieauth::{backend::AuthBackend, User}; use axum::{ - extract::{multipart::Multipart, FromRef, Path, State}, response::{IntoResponse, Response} + extract::{multipart::Multipart, FromRef, Path, State}, + response::{IntoResponse, Response}, }; -use axum_extra::headers::{ContentLength, HeaderMapExt, HeaderValue, IfNoneMatch}; use axum_extra::extract::Host; +use axum_extra::headers::{ContentLength, HeaderMapExt, HeaderValue, IfNoneMatch}; use axum_extra::TypedHeader; -use kittybox_util::micropub::{Error as MicropubError, ErrorKind as ErrorType}; use kittybox_indieauth::Scope; -use crate::indieauth::{backend::AuthBackend, User}; +use kittybox_util::micropub::{Error as MicropubError, ErrorKind as ErrorType}; pub mod storage; -use storage::{MediaStore, MediaStoreError, Metadata, ErrorKind}; pub use storage::file::FileStore; +use storage::{ErrorKind, MediaStore, MediaStoreError, Metadata}; impl From<MediaStoreError> for MicropubError { fn from(err: MediaStoreError) -> Self { Self::new( ErrorType::InternalServerError, - format!("media store error: {}", err) + format!("media store error: {}", err), ) } } @@ -25,13 +26,14 @@ impl From<MediaStoreError> for MicropubError { pub(crate) async fn upload<S: MediaStore, A: AuthBackend>( State(blobstore): State<S>, user: User<A>, - mut upload: Multipart + mut upload: Multipart, ) -> Response { if !user.check_scope(&Scope::Media) { return MicropubError::from_static( ErrorType::NotAuthorized, - "Interacting with the media storage requires the \"media\" scope." - ).into_response(); + "Interacting with the media storage requires the \"media\" scope.", + ) + .into_response(); } let host = user.me.authority(); let field = match upload.next_field().await { @@ -39,27 +41,31 @@ pub(crate) async fn upload<S: MediaStore, A: AuthBackend>( Ok(None) => { return MicropubError::from_static( ErrorType::InvalidRequest, - "Send multipart/form-data with one field named file" - ).into_response(); - }, + "Send multipart/form-data with one field named file", + ) + .into_response(); + } Err(err) => { return MicropubError::new( ErrorType::InternalServerError, - format!("Error while parsing multipart/form-data: {}", err) - ).into_response(); - }, + format!("Error while parsing multipart/form-data: {}", err), + ) + .into_response(); + } }; let metadata: Metadata = (&field).into(); match blobstore.write_streaming(host, metadata, field).await { Ok(filename) => IntoResponse::into_response(( axum::http::StatusCode::CREATED, - [ - ("Location", user.me.join( - &format!(".kittybox/media/uploads/{}", filename) - ).unwrap().as_str()) - ] + [( + "Location", + user.me + .join(&format!(".kittybox/media/uploads/{}", filename)) + .unwrap() + .as_str(), + )], )), - Err(err) => MicropubError::from(err).into_response() + Err(err) => MicropubError::from(err).into_response(), } } @@ -68,7 +74,7 @@ pub(crate) async fn serve<S: MediaStore>( Host(host): Host, Path(path): Path<String>, if_none_match: Option<TypedHeader<IfNoneMatch>>, - State(blobstore): State<S> + State(blobstore): State<S>, ) -> Response { use axum::http::StatusCode; tracing::debug!("Searching for file..."); @@ -77,7 +83,9 @@ pub(crate) async fn serve<S: MediaStore>( tracing::debug!("Metadata: {:?}", metadata); let etag = if let Some(etag) = metadata.etag { - let etag = format!("\"{}\"", etag).parse::<axum_extra::headers::ETag>().unwrap(); + let etag = format!("\"{}\"", etag) + .parse::<axum_extra::headers::ETag>() + .unwrap(); if let Some(TypedHeader(if_none_match)) = if_none_match { tracing::debug!("If-None-Match: {:?}", if_none_match); @@ -85,12 +93,14 @@ pub(crate) async fn serve<S: MediaStore>( // returns 304 when it doesn't match because it // only matches when file is different if !if_none_match.precondition_passes(&etag) { - return StatusCode::NOT_MODIFIED.into_response() + return StatusCode::NOT_MODIFIED.into_response(); } } Some(etag) - } else { None }; + } else { + None + }; let mut r = Response::builder(); { @@ -98,14 +108,16 @@ pub(crate) async fn serve<S: MediaStore>( headers.insert( "Content-Type", HeaderValue::from_str( - metadata.content_type + metadata + .content_type .as_deref() - .unwrap_or("application/octet-stream") - ).unwrap() + .unwrap_or("application/octet-stream"), + ) + .unwrap(), ); headers.insert( axum::http::header::X_CONTENT_TYPE_OPTIONS, - axum::http::HeaderValue::from_static("nosniff") + axum::http::HeaderValue::from_static("nosniff"), ); if let Some(length) = metadata.length { headers.typed_insert(ContentLength(length.get().try_into().unwrap())); @@ -117,23 +129,22 @@ pub(crate) async fn serve<S: MediaStore>( r.body(axum::body::Body::from_stream(stream)) .unwrap() .into_response() - }, + } Err(err) => match err.kind() { - ErrorKind::NotFound => { - IntoResponse::into_response(StatusCode::NOT_FOUND) - }, + ErrorKind::NotFound => IntoResponse::into_response(StatusCode::NOT_FOUND), _ => { tracing::error!("{}", err); IntoResponse::into_response(StatusCode::INTERNAL_SERVER_ERROR) } - } + }, } } -pub fn router<St, A, M>() -> axum::Router<St> where +pub fn router<St, A, M>() -> axum::Router<St> +where A: AuthBackend + FromRef<St>, M: MediaStore + FromRef<St>, - St: Clone + Send + Sync + 'static + St: Clone + Send + Sync + 'static, { axum::Router::new() .route("/", axum::routing::post(upload::<M, A>)) diff --git a/src/media/storage/file.rs b/src/media/storage/file.rs index 4cd0ece..5198a4c 100644 --- a/src/media/storage/file.rs +++ b/src/media/storage/file.rs @@ -1,12 +1,12 @@ -use super::{Metadata, ErrorKind, MediaStore, MediaStoreError, Result}; -use std::{path::PathBuf, fmt::Debug}; -use tokio::fs::OpenOptions; -use tokio::io::{BufReader, BufWriter, AsyncWriteExt, AsyncSeekExt}; +use super::{ErrorKind, MediaStore, MediaStoreError, Metadata, Result}; +use futures::FutureExt; use futures::{StreamExt, TryStreamExt}; +use sha2::Digest; use std::ops::{Bound, Neg}; use std::pin::Pin; -use sha2::Digest; -use futures::FutureExt; +use std::{fmt::Debug, path::PathBuf}; +use tokio::fs::OpenOptions; +use tokio::io::{AsyncSeekExt, AsyncWriteExt, BufReader, BufWriter}; use tracing::{debug, error}; const BUF_CAPACITY: usize = 16 * 1024; @@ -22,7 +22,7 @@ impl From<tokio::io::Error> for MediaStoreError { msg: format!("file I/O error: {}", source), kind: match source.kind() { std::io::ErrorKind::NotFound => ErrorKind::NotFound, - _ => ErrorKind::Backend + _ => ErrorKind::Backend, }, source: Some(Box::new(source)), } @@ -40,7 +40,9 @@ impl FileStore { impl MediaStore for FileStore { async fn new(url: &'_ url::Url) -> Result<Self> { - Ok(Self { base: url.path().into() }) + Ok(Self { + base: url.path().into(), + }) } #[tracing::instrument(skip(self, content))] @@ -51,10 +53,17 @@ impl MediaStore for FileStore { mut content: T, ) -> Result<String> where - T: tokio_stream::Stream<Item = std::result::Result<bytes::Bytes, axum::extract::multipart::MultipartError>> + Unpin + Send + Debug + T: tokio_stream::Stream< + Item = std::result::Result<bytes::Bytes, axum::extract::multipart::MultipartError>, + > + Unpin + + Send + + Debug, { let (tempfilepath, mut tempfile) = self.mktemp().await?; - debug!("Temporary file opened for storing pending upload: {}", tempfilepath.display()); + debug!( + "Temporary file opened for storing pending upload: {}", + tempfilepath.display() + ); let mut hasher = sha2::Sha256::new(); let mut length: usize = 0; @@ -62,7 +71,7 @@ impl MediaStore for FileStore { let chunk = chunk.map_err(|err| MediaStoreError { kind: ErrorKind::Backend, source: Some(Box::new(err)), - msg: "Failed to read a data chunk".to_owned() + msg: "Failed to read a data chunk".to_owned(), })?; debug!("Read {} bytes from the stream", chunk.len()); length += chunk.len(); @@ -70,9 +79,7 @@ impl MediaStore for FileStore { { let chunk = chunk.clone(); let tempfile = &mut tempfile; - async move { - tempfile.write_all(&chunk).await - } + async move { tempfile.write_all(&chunk).await } }, { let chunk = chunk.clone(); @@ -80,7 +87,8 @@ impl MediaStore for FileStore { hasher.update(&*chunk); hasher - }).map(|r| r.unwrap()) + }) + .map(|r| r.unwrap()) } ); if let Err(err) = write_result { @@ -90,7 +98,9 @@ impl MediaStore for FileStore { // though temporary files might take up space on the hard drive // We'll clean them when maintenance time comes #[allow(unused_must_use)] - { tokio::fs::remove_file(tempfilepath).await; } + { + tokio::fs::remove_file(tempfilepath).await; + } return Err(err.into()); } hasher = _hasher; @@ -113,10 +123,17 @@ impl MediaStore for FileStore { let filepath = self.base.join(domain_str.as_str()).join(&filename); let metafilename = filename.clone() + ".json"; let metapath = self.base.join(domain_str.as_str()).join(&metafilename); - let metatemppath = self.base.join(domain_str.as_str()).join(metafilename + ".tmp"); + let metatemppath = self + .base + .join(domain_str.as_str()) + .join(metafilename + ".tmp"); metadata.length = std::num::NonZeroUsize::new(length); metadata.etag = Some(hash); - debug!("File path: {}, metadata: {}", filepath.display(), metapath.display()); + debug!( + "File path: {}, metadata: {}", + filepath.display(), + metapath.display() + ); { let parent = filepath.parent().unwrap(); tokio::fs::create_dir_all(parent).await?; @@ -126,7 +143,8 @@ impl MediaStore for FileStore { .write(true) .open(&metatemppath) .await?; - meta.write_all(&serde_json::to_vec(&metadata).unwrap()).await?; + meta.write_all(&serde_json::to_vec(&metadata).unwrap()) + .await?; tokio::fs::rename(tempfilepath, filepath).await?; tokio::fs::rename(metatemppath, metapath).await?; Ok(filename) @@ -138,28 +156,31 @@ impl MediaStore for FileStore { &self, domain: &str, filename: &str, - ) -> Result<(Metadata, Pin<Box<dyn tokio_stream::Stream<Item = std::io::Result<bytes::Bytes>> + Send>>)> { + ) -> Result<( + Metadata, + Pin<Box<dyn tokio_stream::Stream<Item = std::io::Result<bytes::Bytes>> + Send>>, + )> { debug!("Domain: {}, filename: {}", domain, filename); let path = self.base.join(domain).join(filename); debug!("Path: {}", path.display()); - let file = OpenOptions::new() - .read(true) - .open(path) - .await?; + let file = OpenOptions::new().read(true).open(path).await?; let meta = self.metadata(domain, filename).await?; - Ok((meta, Box::pin( - tokio_util::io::ReaderStream::new( - // TODO: determine if BufReader provides benefit here - // From the logs it looks like we're reading 4KiB at a time - // Buffering file contents seems to double download speed - // How to benchmark this? - BufReader::with_capacity(BUF_CAPACITY, file) - ) - // Sprinkle some salt in form of protective log wrapping - .inspect_ok(|chunk| debug!("Read {} bytes from file", chunk.len())) - ))) + Ok(( + meta, + Box::pin( + tokio_util::io::ReaderStream::new( + // TODO: determine if BufReader provides benefit here + // From the logs it looks like we're reading 4KiB at a time + // Buffering file contents seems to double download speed + // How to benchmark this? + BufReader::with_capacity(BUF_CAPACITY, file), + ) + // Sprinkle some salt in form of protective log wrapping + .inspect_ok(|chunk| debug!("Read {} bytes from file", chunk.len())), + ), + )) } #[tracing::instrument(skip(self))] @@ -167,12 +188,13 @@ impl MediaStore for FileStore { let metapath = self.base.join(domain).join(format!("{}.json", filename)); debug!("Metadata path: {}", metapath.display()); - let meta = serde_json::from_slice(&tokio::fs::read(metapath).await?) - .map_err(|err| MediaStoreError { + let meta = serde_json::from_slice(&tokio::fs::read(metapath).await?).map_err(|err| { + MediaStoreError { kind: ErrorKind::Json, msg: format!("{}", err), - source: Some(Box::new(err)) - })?; + source: Some(Box::new(err)), + } + })?; Ok(meta) } @@ -182,16 +204,14 @@ impl MediaStore for FileStore { &self, domain: &str, filename: &str, - range: (Bound<u64>, Bound<u64>) - ) -> Result<Pin<Box<dyn tokio_stream::Stream<Item = std::io::Result<bytes::Bytes>> + Send>>> { + range: (Bound<u64>, Bound<u64>), + ) -> Result<Pin<Box<dyn tokio_stream::Stream<Item = std::io::Result<bytes::Bytes>> + Send>>> + { let path = self.base.join(format!("{}/{}", domain, filename)); let metapath = self.base.join(format!("{}/{}.json", domain, filename)); debug!("Path: {}, metadata: {}", path.display(), metapath.display()); - let mut file = OpenOptions::new() - .read(true) - .open(path) - .await?; + let mut file = OpenOptions::new().read(true).open(path).await?; let start = match range { (Bound::Included(bound), _) => { @@ -202,45 +222,52 @@ impl MediaStore for FileStore { (Bound::Unbounded, Bound::Included(bound)) => { // Seek to the end minus the bounded bytes debug!("Seeking {} bytes back from the end...", bound); - file.seek(std::io::SeekFrom::End(i64::try_from(bound).unwrap().neg())).await? - }, + file.seek(std::io::SeekFrom::End(i64::try_from(bound).unwrap().neg())) + .await? + } (Bound::Unbounded, Bound::Unbounded) => 0, - (_, Bound::Excluded(_)) => unreachable!() + (_, Bound::Excluded(_)) => unreachable!(), }; - let stream = Box::pin(tokio_util::io::ReaderStream::new(BufReader::with_capacity(BUF_CAPACITY, file))) - .map_ok({ - let mut bytes_read = 0usize; - let len = match range { - (_, Bound::Unbounded) => None, - (Bound::Unbounded, Bound::Included(bound)) => Some(bound), - (_, Bound::Included(bound)) => Some(bound + 1 - start), - (_, Bound::Excluded(_)) => unreachable!() - }; - move |chunk| { - debug!("Read {} bytes from file, {} in this chunk", bytes_read, chunk.len()); - bytes_read += chunk.len(); - if let Some(len) = len.map(|len| len.try_into().unwrap()) { - if bytes_read > len { - if bytes_read - len > chunk.len() { - return None - } - debug!("Truncating last {} bytes", bytes_read - len); - return Some(chunk.slice(..chunk.len() - (bytes_read - len))) + let stream = Box::pin(tokio_util::io::ReaderStream::new(BufReader::with_capacity( + BUF_CAPACITY, + file, + ))) + .map_ok({ + let mut bytes_read = 0usize; + let len = match range { + (_, Bound::Unbounded) => None, + (Bound::Unbounded, Bound::Included(bound)) => Some(bound), + (_, Bound::Included(bound)) => Some(bound + 1 - start), + (_, Bound::Excluded(_)) => unreachable!(), + }; + move |chunk| { + debug!( + "Read {} bytes from file, {} in this chunk", + bytes_read, + chunk.len() + ); + bytes_read += chunk.len(); + if let Some(len) = len.map(|len| len.try_into().unwrap()) { + if bytes_read > len { + if bytes_read - len > chunk.len() { + return None; } + debug!("Truncating last {} bytes", bytes_read - len); + return Some(chunk.slice(..chunk.len() - (bytes_read - len))); } - - Some(chunk) } - }) - .try_take_while(|x| std::future::ready(Ok(x.is_some()))) - // Will never panic, because the moment the stream yields - // a None, it is considered exhausted. - .map_ok(|x| x.unwrap()); - return Ok(Box::pin(stream)) - } + Some(chunk) + } + }) + .try_take_while(|x| std::future::ready(Ok(x.is_some()))) + // Will never panic, because the moment the stream yields + // a None, it is considered exhausted. + .map_ok(|x| x.unwrap()); + return Ok(Box::pin(stream)); + } async fn delete(&self, domain: &str, filename: &str) -> Result<()> { let path = self.base.join(format!("{}/{}", domain, filename)); @@ -251,7 +278,7 @@ impl MediaStore for FileStore { #[cfg(test)] mod tests { - use super::{Metadata, FileStore, MediaStore}; + use super::{FileStore, MediaStore, Metadata}; use std::ops::Bound; use tokio::io::AsyncReadExt; @@ -259,10 +286,15 @@ mod tests { #[tracing_test::traced_test] async fn test_ranges() { let tempdir = tempfile::tempdir().expect("Failed to create tempdir"); - let store = FileStore { base: tempdir.path().to_path_buf() }; + let store = FileStore { + base: tempdir.path().to_path_buf(), + }; let file: &[u8] = include_bytes!("./file.rs"); - let stream = tokio_stream::iter(file.chunks(100).map(|i| Ok(bytes::Bytes::copy_from_slice(i)))); + let stream = tokio_stream::iter( + file.chunks(100) + .map(|i| Ok(bytes::Bytes::copy_from_slice(i))), + ); let metadata = Metadata { filename: Some("file.rs".to_string()), content_type: Some("text/plain".to_string()), @@ -271,28 +303,30 @@ mod tests { }; // write through the interface - let filename = store.write_streaming( - "fireburn.ru", - metadata, stream - ).await.unwrap(); + let filename = store + .write_streaming("fireburn.ru", metadata, stream) + .await + .unwrap(); tracing::debug!("Writing complete."); // Ensure the file is there - let content = tokio::fs::read( - tempdir.path() - .join("fireburn.ru") - .join(&filename) - ).await.unwrap(); + let content = tokio::fs::read(tempdir.path().join("fireburn.ru").join(&filename)) + .await + .unwrap(); assert_eq!(content, file); tracing::debug!("Reading range from the start..."); // try to read range let range = { - let stream = store.stream_range( - "fireburn.ru", &filename, - (Bound::Included(0), Bound::Included(299)) - ).await.unwrap(); + let stream = store + .stream_range( + "fireburn.ru", + &filename, + (Bound::Included(0), Bound::Included(299)), + ) + .await + .unwrap(); let mut reader = tokio_util::io::StreamReader::new(stream); @@ -308,10 +342,14 @@ mod tests { tracing::debug!("Reading range from the middle..."); let range = { - let stream = store.stream_range( - "fireburn.ru", &filename, - (Bound::Included(150), Bound::Included(449)) - ).await.unwrap(); + let stream = store + .stream_range( + "fireburn.ru", + &filename, + (Bound::Included(150), Bound::Included(449)), + ) + .await + .unwrap(); let mut reader = tokio_util::io::StreamReader::new(stream); @@ -326,13 +364,17 @@ mod tests { tracing::debug!("Reading range from the end..."); let range = { - let stream = store.stream_range( - "fireburn.ru", &filename, - // Note: the `headers` crate parses bounds in a - // non-standard way, where unbounded start actually - // means getting things from the end... - (Bound::Unbounded, Bound::Included(300)) - ).await.unwrap(); + let stream = store + .stream_range( + "fireburn.ru", + &filename, + // Note: the `headers` crate parses bounds in a + // non-standard way, where unbounded start actually + // means getting things from the end... + (Bound::Unbounded, Bound::Included(300)), + ) + .await + .unwrap(); let mut reader = tokio_util::io::StreamReader::new(stream); @@ -343,15 +385,19 @@ mod tests { }; assert_eq!(range.len(), 300); - assert_eq!(range.as_slice(), &file[file.len()-300..file.len()]); + assert_eq!(range.as_slice(), &file[file.len() - 300..file.len()]); tracing::debug!("Reading the whole file..."); // try to read range let range = { - let stream = store.stream_range( - "fireburn.ru", &("/".to_string() + &filename), - (Bound::Unbounded, Bound::Unbounded) - ).await.unwrap(); + let stream = store + .stream_range( + "fireburn.ru", + &("/".to_string() + &filename), + (Bound::Unbounded, Bound::Unbounded), + ) + .await + .unwrap(); let mut reader = tokio_util::io::StreamReader::new(stream); @@ -365,15 +411,19 @@ mod tests { assert_eq!(range.as_slice(), file); } - #[tokio::test] #[tracing_test::traced_test] async fn test_streaming_read_write() { let tempdir = tempfile::tempdir().expect("Failed to create tempdir"); - let store = FileStore { base: tempdir.path().to_path_buf() }; + let store = FileStore { + base: tempdir.path().to_path_buf(), + }; let file: &[u8] = include_bytes!("./file.rs"); - let stream = tokio_stream::iter(file.chunks(100).map(|i| Ok(bytes::Bytes::copy_from_slice(i)))); + let stream = tokio_stream::iter( + file.chunks(100) + .map(|i| Ok(bytes::Bytes::copy_from_slice(i))), + ); let metadata = Metadata { filename: Some("style.css".to_string()), content_type: Some("text/css".to_string()), @@ -382,27 +432,32 @@ mod tests { }; // write through the interface - let filename = store.write_streaming( - "fireburn.ru", - metadata, stream - ).await.unwrap(); - println!("{}, {}", filename, tempdir.path() - .join("fireburn.ru") - .join(&filename) - .display()); - let content = tokio::fs::read( - tempdir.path() - .join("fireburn.ru") - .join(&filename) - ).await.unwrap(); + let filename = store + .write_streaming("fireburn.ru", metadata, stream) + .await + .unwrap(); + println!( + "{}, {}", + filename, + tempdir.path().join("fireburn.ru").join(&filename).display() + ); + let content = tokio::fs::read(tempdir.path().join("fireburn.ru").join(&filename)) + .await + .unwrap(); assert_eq!(content, file); // check internal metadata format - let meta: Metadata = serde_json::from_slice(&tokio::fs::read( - tempdir.path() - .join("fireburn.ru") - .join(filename.clone() + ".json") - ).await.unwrap()).unwrap(); + let meta: Metadata = serde_json::from_slice( + &tokio::fs::read( + tempdir + .path() + .join("fireburn.ru") + .join(filename.clone() + ".json"), + ) + .await + .unwrap(), + ) + .unwrap(); assert_eq!(meta.content_type.as_deref(), Some("text/css")); assert_eq!(meta.filename.as_deref(), Some("style.css")); assert_eq!(meta.length.map(|i| i.get()), Some(file.len())); @@ -410,10 +465,10 @@ mod tests { // read back the data using the interface let (metadata, read_back) = { - let (metadata, stream) = store.read_streaming( - "fireburn.ru", - &filename - ).await.unwrap(); + let (metadata, stream) = store + .read_streaming("fireburn.ru", &filename) + .await + .unwrap(); let mut reader = tokio_util::io::StreamReader::new(stream); let mut buf = Vec::default(); @@ -427,6 +482,5 @@ mod tests { assert_eq!(meta.filename.as_deref(), Some("style.css")); assert_eq!(meta.length.map(|i| i.get()), Some(file.len())); assert!(meta.etag.is_some()); - } } diff --git a/src/media/storage/mod.rs b/src/media/storage/mod.rs index 3583247..5658071 100644 --- a/src/media/storage/mod.rs +++ b/src/media/storage/mod.rs @@ -1,12 +1,12 @@ use axum::extract::multipart::Field; -use tokio_stream::Stream; use bytes::Bytes; use serde::{Deserialize, Serialize}; +use std::fmt::Debug; use std::future::Future; +use std::num::NonZeroUsize; use std::ops::Bound; use std::pin::Pin; -use std::fmt::Debug; -use std::num::NonZeroUsize; +use tokio_stream::Stream; pub mod file; @@ -24,17 +24,14 @@ pub struct Metadata { impl From<&Field<'_>> for Metadata { fn from(field: &Field<'_>) -> Self { Self { - content_type: field.content_type() - .map(|i| i.to_owned()), - filename: field.file_name() - .map(|i| i.to_owned()), + content_type: field.content_type().map(|i| i.to_owned()), + filename: field.file_name().map(|i| i.to_owned()), length: None, etag: None, } } } - #[derive(Debug, Clone, Copy)] pub enum ErrorKind { Backend, @@ -95,88 +92,116 @@ pub trait MediaStore: 'static + Send + Sync + Clone { content: T, ) -> impl Future<Output = Result<String>> + Send where - T: tokio_stream::Stream<Item = std::result::Result<bytes::Bytes, axum::extract::multipart::MultipartError>> + Unpin + Send + Debug; + T: tokio_stream::Stream< + Item = std::result::Result<bytes::Bytes, axum::extract::multipart::MultipartError>, + > + Unpin + + Send + + Debug; #[allow(clippy::type_complexity)] fn read_streaming( &self, domain: &str, filename: &str, - ) -> impl Future<Output = Result< - (Metadata, Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>) - >> + Send; + ) -> impl Future< + Output = Result<( + Metadata, + Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>, + )>, + > + Send; fn stream_range( &self, domain: &str, filename: &str, - range: (Bound<u64>, Bound<u64>) - ) -> impl Future<Output = Result<Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>>> + Send { async move { - use futures::stream::TryStreamExt; - use tracing::debug; - let (metadata, mut stream) = self.read_streaming(domain, filename).await?; - let length = metadata.length.unwrap().get(); - - use Bound::*; - let (start, end): (usize, usize) = match range { - (Unbounded, Unbounded) => return Ok(stream), - (Included(start), Unbounded) => (start.try_into().unwrap(), length - 1), - (Unbounded, Included(end)) => (length - usize::try_from(end).unwrap(), length - 1), - (Included(start), Included(end)) => (start.try_into().unwrap(), end.try_into().unwrap()), - (_, _) => unreachable!() - }; - - stream = Box::pin( - stream.map_ok({ - let mut bytes_skipped = 0usize; - let mut bytes_read = 0usize; - - move |chunk| { - debug!("Skipped {}/{} bytes, chunk len {}", bytes_skipped, start, chunk.len()); - let chunk = if bytes_skipped < start { - let need_to_skip = start - bytes_skipped; - if chunk.len() < need_to_skip { - return None - } - debug!("Skipping {} bytes", need_to_skip); - bytes_skipped += need_to_skip; - - chunk.slice(need_to_skip..) - } else { - chunk - }; - - debug!("Read {} bytes from file, {} in this chunk", bytes_read, chunk.len()); - bytes_read += chunk.len(); - - if bytes_read > length { - if bytes_read - length > chunk.len() { - return None - } - debug!("Truncating last {} bytes", bytes_read - length); - return Some(chunk.slice(..chunk.len() - (bytes_read - length))) - } - - Some(chunk) + range: (Bound<u64>, Bound<u64>), + ) -> impl Future<Output = Result<Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>>> + Send + { + async move { + use futures::stream::TryStreamExt; + use tracing::debug; + let (metadata, mut stream) = self.read_streaming(domain, filename).await?; + let length = metadata.length.unwrap().get(); + + use Bound::*; + let (start, end): (usize, usize) = match range { + (Unbounded, Unbounded) => return Ok(stream), + (Included(start), Unbounded) => (start.try_into().unwrap(), length - 1), + (Unbounded, Included(end)) => (length - usize::try_from(end).unwrap(), length - 1), + (Included(start), Included(end)) => { + (start.try_into().unwrap(), end.try_into().unwrap()) } - }) - .try_skip_while(|x| std::future::ready(Ok(x.is_none()))) - .try_take_while(|x| std::future::ready(Ok(x.is_some()))) - .map_ok(|x| x.unwrap()) - ); + (_, _) => unreachable!(), + }; + + stream = Box::pin( + stream + .map_ok({ + let mut bytes_skipped = 0usize; + let mut bytes_read = 0usize; + + move |chunk| { + debug!( + "Skipped {}/{} bytes, chunk len {}", + bytes_skipped, + start, + chunk.len() + ); + let chunk = if bytes_skipped < start { + let need_to_skip = start - bytes_skipped; + if chunk.len() < need_to_skip { + return None; + } + debug!("Skipping {} bytes", need_to_skip); + bytes_skipped += need_to_skip; + + chunk.slice(need_to_skip..) + } else { + chunk + }; + + debug!( + "Read {} bytes from file, {} in this chunk", + bytes_read, + chunk.len() + ); + bytes_read += chunk.len(); + + if bytes_read > length { + if bytes_read - length > chunk.len() { + return None; + } + debug!("Truncating last {} bytes", bytes_read - length); + return Some(chunk.slice(..chunk.len() - (bytes_read - length))); + } + + Some(chunk) + } + }) + .try_skip_while(|x| std::future::ready(Ok(x.is_none()))) + .try_take_while(|x| std::future::ready(Ok(x.is_some()))) + .map_ok(|x| x.unwrap()), + ); - Ok(stream) - } } + Ok(stream) + } + } /// Read metadata for a file. /// /// The default implementation uses the `read_streaming` method /// and drops the stream containing file content. - fn metadata(&self, domain: &str, filename: &str) -> impl Future<Output = Result<Metadata>> + Send { async move { - self.read_streaming(domain, filename) - .await - .map(|(meta, _)| meta) - } } + fn metadata( + &self, + domain: &str, + filename: &str, + ) -> impl Future<Output = Result<Metadata>> + Send { + async move { + self.read_streaming(domain, filename) + .await + .map(|(meta, _)| meta) + } + } fn delete(&self, domain: &str, filename: &str) -> impl Future<Output = Result<()>> + Send; } |