diff options
author | Vika <vika@fireburn.ru> | 2025-04-09 23:31:02 +0300 |
---|---|---|
committer | Vika <vika@fireburn.ru> | 2025-04-09 23:31:57 +0300 |
commit | 8826d9446e6c492db2243b9921e59ce496027bef (patch) | |
tree | 63738aa9001cb73b11cb0e974e93129bcdf1adbb /src/media/storage/mod.rs | |
parent | 519cadfbb298f50cbf819dde757037ab56e2863e (diff) | |
download | kittybox-8826d9446e6c492db2243b9921e59ce496027bef.tar.zst |
cargo fmt
Change-Id: I80e81ebba3f0cdf8c094451c9fe3ee4126b8c888
Diffstat (limited to 'src/media/storage/mod.rs')
-rw-r--r-- | src/media/storage/mod.rs | 169 |
1 files changed, 97 insertions, 72 deletions
diff --git a/src/media/storage/mod.rs b/src/media/storage/mod.rs index 3583247..5658071 100644 --- a/src/media/storage/mod.rs +++ b/src/media/storage/mod.rs @@ -1,12 +1,12 @@ use axum::extract::multipart::Field; -use tokio_stream::Stream; use bytes::Bytes; use serde::{Deserialize, Serialize}; +use std::fmt::Debug; use std::future::Future; +use std::num::NonZeroUsize; use std::ops::Bound; use std::pin::Pin; -use std::fmt::Debug; -use std::num::NonZeroUsize; +use tokio_stream::Stream; pub mod file; @@ -24,17 +24,14 @@ pub struct Metadata { impl From<&Field<'_>> for Metadata { fn from(field: &Field<'_>) -> Self { Self { - content_type: field.content_type() - .map(|i| i.to_owned()), - filename: field.file_name() - .map(|i| i.to_owned()), + content_type: field.content_type().map(|i| i.to_owned()), + filename: field.file_name().map(|i| i.to_owned()), length: None, etag: None, } } } - #[derive(Debug, Clone, Copy)] pub enum ErrorKind { Backend, @@ -95,88 +92,116 @@ pub trait MediaStore: 'static + Send + Sync + Clone { content: T, ) -> impl Future<Output = Result<String>> + Send where - T: tokio_stream::Stream<Item = std::result::Result<bytes::Bytes, axum::extract::multipart::MultipartError>> + Unpin + Send + Debug; + T: tokio_stream::Stream< + Item = std::result::Result<bytes::Bytes, axum::extract::multipart::MultipartError>, + > + Unpin + + Send + + Debug; #[allow(clippy::type_complexity)] fn read_streaming( &self, domain: &str, filename: &str, - ) -> impl Future<Output = Result< - (Metadata, Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>) - >> + Send; + ) -> impl Future< + Output = Result<( + Metadata, + Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>, + )>, + > + Send; fn stream_range( &self, domain: &str, filename: &str, - range: (Bound<u64>, Bound<u64>) - ) -> impl Future<Output = Result<Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>>> + Send { async move { - use futures::stream::TryStreamExt; - use tracing::debug; - let (metadata, mut stream) = self.read_streaming(domain, filename).await?; - let length = metadata.length.unwrap().get(); - - use Bound::*; - let (start, end): (usize, usize) = match range { - (Unbounded, Unbounded) => return Ok(stream), - (Included(start), Unbounded) => (start.try_into().unwrap(), length - 1), - (Unbounded, Included(end)) => (length - usize::try_from(end).unwrap(), length - 1), - (Included(start), Included(end)) => (start.try_into().unwrap(), end.try_into().unwrap()), - (_, _) => unreachable!() - }; - - stream = Box::pin( - stream.map_ok({ - let mut bytes_skipped = 0usize; - let mut bytes_read = 0usize; - - move |chunk| { - debug!("Skipped {}/{} bytes, chunk len {}", bytes_skipped, start, chunk.len()); - let chunk = if bytes_skipped < start { - let need_to_skip = start - bytes_skipped; - if chunk.len() < need_to_skip { - return None - } - debug!("Skipping {} bytes", need_to_skip); - bytes_skipped += need_to_skip; - - chunk.slice(need_to_skip..) - } else { - chunk - }; - - debug!("Read {} bytes from file, {} in this chunk", bytes_read, chunk.len()); - bytes_read += chunk.len(); - - if bytes_read > length { - if bytes_read - length > chunk.len() { - return None - } - debug!("Truncating last {} bytes", bytes_read - length); - return Some(chunk.slice(..chunk.len() - (bytes_read - length))) - } - - Some(chunk) + range: (Bound<u64>, Bound<u64>), + ) -> impl Future<Output = Result<Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>>> + Send + { + async move { + use futures::stream::TryStreamExt; + use tracing::debug; + let (metadata, mut stream) = self.read_streaming(domain, filename).await?; + let length = metadata.length.unwrap().get(); + + use Bound::*; + let (start, end): (usize, usize) = match range { + (Unbounded, Unbounded) => return Ok(stream), + (Included(start), Unbounded) => (start.try_into().unwrap(), length - 1), + (Unbounded, Included(end)) => (length - usize::try_from(end).unwrap(), length - 1), + (Included(start), Included(end)) => { + (start.try_into().unwrap(), end.try_into().unwrap()) } - }) - .try_skip_while(|x| std::future::ready(Ok(x.is_none()))) - .try_take_while(|x| std::future::ready(Ok(x.is_some()))) - .map_ok(|x| x.unwrap()) - ); + (_, _) => unreachable!(), + }; + + stream = Box::pin( + stream + .map_ok({ + let mut bytes_skipped = 0usize; + let mut bytes_read = 0usize; + + move |chunk| { + debug!( + "Skipped {}/{} bytes, chunk len {}", + bytes_skipped, + start, + chunk.len() + ); + let chunk = if bytes_skipped < start { + let need_to_skip = start - bytes_skipped; + if chunk.len() < need_to_skip { + return None; + } + debug!("Skipping {} bytes", need_to_skip); + bytes_skipped += need_to_skip; + + chunk.slice(need_to_skip..) + } else { + chunk + }; + + debug!( + "Read {} bytes from file, {} in this chunk", + bytes_read, + chunk.len() + ); + bytes_read += chunk.len(); + + if bytes_read > length { + if bytes_read - length > chunk.len() { + return None; + } + debug!("Truncating last {} bytes", bytes_read - length); + return Some(chunk.slice(..chunk.len() - (bytes_read - length))); + } + + Some(chunk) + } + }) + .try_skip_while(|x| std::future::ready(Ok(x.is_none()))) + .try_take_while(|x| std::future::ready(Ok(x.is_some()))) + .map_ok(|x| x.unwrap()), + ); - Ok(stream) - } } + Ok(stream) + } + } /// Read metadata for a file. /// /// The default implementation uses the `read_streaming` method /// and drops the stream containing file content. - fn metadata(&self, domain: &str, filename: &str) -> impl Future<Output = Result<Metadata>> + Send { async move { - self.read_streaming(domain, filename) - .await - .map(|(meta, _)| meta) - } } + fn metadata( + &self, + domain: &str, + filename: &str, + ) -> impl Future<Output = Result<Metadata>> + Send { + async move { + self.read_streaming(domain, filename) + .await + .map(|(meta, _)| meta) + } + } fn delete(&self, domain: &str, filename: &str) -> impl Future<Output = Result<()>> + Send; } |