about summary refs log tree commit diff
path: root/src/media/storage/mod.rs
diff options
context:
space:
mode:
authorVika <vika@fireburn.ru>2025-04-09 23:31:02 +0300
committerVika <vika@fireburn.ru>2025-04-09 23:31:57 +0300
commit8826d9446e6c492db2243b9921e59ce496027bef (patch)
tree63738aa9001cb73b11cb0e974e93129bcdf1adbb /src/media/storage/mod.rs
parent519cadfbb298f50cbf819dde757037ab56e2863e (diff)
downloadkittybox-8826d9446e6c492db2243b9921e59ce496027bef.tar.zst
cargo fmt
Change-Id: I80e81ebba3f0cdf8c094451c9fe3ee4126b8c888
Diffstat (limited to 'src/media/storage/mod.rs')
-rw-r--r--src/media/storage/mod.rs169
1 files changed, 97 insertions, 72 deletions
diff --git a/src/media/storage/mod.rs b/src/media/storage/mod.rs
index 3583247..5658071 100644
--- a/src/media/storage/mod.rs
+++ b/src/media/storage/mod.rs
@@ -1,12 +1,12 @@
 use axum::extract::multipart::Field;
-use tokio_stream::Stream;
 use bytes::Bytes;
 use serde::{Deserialize, Serialize};
+use std::fmt::Debug;
 use std::future::Future;
+use std::num::NonZeroUsize;
 use std::ops::Bound;
 use std::pin::Pin;
-use std::fmt::Debug;
-use std::num::NonZeroUsize;
+use tokio_stream::Stream;
 
 pub mod file;
 
@@ -24,17 +24,14 @@ pub struct Metadata {
 impl From<&Field<'_>> for Metadata {
     fn from(field: &Field<'_>) -> Self {
         Self {
-            content_type: field.content_type()
-                .map(|i| i.to_owned()),
-            filename: field.file_name()
-                .map(|i| i.to_owned()),
+            content_type: field.content_type().map(|i| i.to_owned()),
+            filename: field.file_name().map(|i| i.to_owned()),
             length: None,
             etag: None,
         }
     }
 }
 
-
 #[derive(Debug, Clone, Copy)]
 pub enum ErrorKind {
     Backend,
@@ -95,88 +92,116 @@ pub trait MediaStore: 'static + Send + Sync + Clone {
         content: T,
     ) -> impl Future<Output = Result<String>> + Send
     where
-        T: tokio_stream::Stream<Item = std::result::Result<bytes::Bytes, axum::extract::multipart::MultipartError>> + Unpin + Send + Debug;
+        T: tokio_stream::Stream<
+                Item = std::result::Result<bytes::Bytes, axum::extract::multipart::MultipartError>,
+            > + Unpin
+            + Send
+            + Debug;
 
     #[allow(clippy::type_complexity)]
     fn read_streaming(
         &self,
         domain: &str,
         filename: &str,
-    ) -> impl Future<Output = Result<
-        (Metadata, Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>)
-        >> + Send;
+    ) -> impl Future<
+        Output = Result<(
+            Metadata,
+            Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>,
+        )>,
+    > + Send;
 
     fn stream_range(
         &self,
         domain: &str,
         filename: &str,
-        range: (Bound<u64>, Bound<u64>)
-    ) -> impl Future<Output = Result<Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>>> + Send { async move {
-        use futures::stream::TryStreamExt;
-        use tracing::debug;
-        let (metadata, mut stream) = self.read_streaming(domain, filename).await?;
-        let length = metadata.length.unwrap().get();
-
-        use Bound::*;
-        let (start, end): (usize, usize) = match range {
-            (Unbounded, Unbounded) => return Ok(stream),
-            (Included(start), Unbounded) => (start.try_into().unwrap(), length - 1),
-            (Unbounded, Included(end)) => (length - usize::try_from(end).unwrap(), length - 1),
-            (Included(start), Included(end)) => (start.try_into().unwrap(), end.try_into().unwrap()),
-            (_, _) => unreachable!()
-        };
-
-        stream = Box::pin(
-            stream.map_ok({
-                let mut bytes_skipped = 0usize;
-                let mut bytes_read = 0usize;
-
-                move |chunk| {
-                    debug!("Skipped {}/{} bytes, chunk len {}", bytes_skipped, start, chunk.len());
-                    let chunk = if bytes_skipped < start {
-                        let need_to_skip = start - bytes_skipped;
-                        if chunk.len() < need_to_skip {
-                            return None
-                        }
-                        debug!("Skipping {} bytes", need_to_skip);
-                        bytes_skipped += need_to_skip;
-
-                        chunk.slice(need_to_skip..)
-                    } else {
-                        chunk
-                    };
-
-                    debug!("Read {} bytes from file, {} in this chunk", bytes_read, chunk.len());
-                    bytes_read += chunk.len();
-
-                    if bytes_read > length {
-                        if bytes_read - length > chunk.len() {
-                            return None
-                        }
-                        debug!("Truncating last {} bytes", bytes_read - length);
-                        return Some(chunk.slice(..chunk.len() - (bytes_read - length)))
-                    }
-
-                    Some(chunk)
+        range: (Bound<u64>, Bound<u64>),
+    ) -> impl Future<Output = Result<Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>>> + Send
+    {
+        async move {
+            use futures::stream::TryStreamExt;
+            use tracing::debug;
+            let (metadata, mut stream) = self.read_streaming(domain, filename).await?;
+            let length = metadata.length.unwrap().get();
+
+            use Bound::*;
+            let (start, end): (usize, usize) = match range {
+                (Unbounded, Unbounded) => return Ok(stream),
+                (Included(start), Unbounded) => (start.try_into().unwrap(), length - 1),
+                (Unbounded, Included(end)) => (length - usize::try_from(end).unwrap(), length - 1),
+                (Included(start), Included(end)) => {
+                    (start.try_into().unwrap(), end.try_into().unwrap())
                 }
-            })
-                .try_skip_while(|x| std::future::ready(Ok(x.is_none())))
-                .try_take_while(|x| std::future::ready(Ok(x.is_some())))
-                .map_ok(|x| x.unwrap())
-        );
+                (_, _) => unreachable!(),
+            };
+
+            stream = Box::pin(
+                stream
+                    .map_ok({
+                        let mut bytes_skipped = 0usize;
+                        let mut bytes_read = 0usize;
+
+                        move |chunk| {
+                            debug!(
+                                "Skipped {}/{} bytes, chunk len {}",
+                                bytes_skipped,
+                                start,
+                                chunk.len()
+                            );
+                            let chunk = if bytes_skipped < start {
+                                let need_to_skip = start - bytes_skipped;
+                                if chunk.len() < need_to_skip {
+                                    return None;
+                                }
+                                debug!("Skipping {} bytes", need_to_skip);
+                                bytes_skipped += need_to_skip;
+
+                                chunk.slice(need_to_skip..)
+                            } else {
+                                chunk
+                            };
+
+                            debug!(
+                                "Read {} bytes from file, {} in this chunk",
+                                bytes_read,
+                                chunk.len()
+                            );
+                            bytes_read += chunk.len();
+
+                            if bytes_read > length {
+                                if bytes_read - length > chunk.len() {
+                                    return None;
+                                }
+                                debug!("Truncating last {} bytes", bytes_read - length);
+                                return Some(chunk.slice(..chunk.len() - (bytes_read - length)));
+                            }
+
+                            Some(chunk)
+                        }
+                    })
+                    .try_skip_while(|x| std::future::ready(Ok(x.is_none())))
+                    .try_take_while(|x| std::future::ready(Ok(x.is_some())))
+                    .map_ok(|x| x.unwrap()),
+            );
 
-        Ok(stream)
-    } }
+            Ok(stream)
+        }
+    }
 
     /// Read metadata for a file.
     ///
     /// The default implementation uses the `read_streaming` method
     /// and drops the stream containing file content.
-    fn metadata(&self, domain: &str, filename: &str) -> impl Future<Output = Result<Metadata>> + Send { async move {
-        self.read_streaming(domain, filename)
-            .await
-            .map(|(meta, _)| meta)
-    } }
+    fn metadata(
+        &self,
+        domain: &str,
+        filename: &str,
+    ) -> impl Future<Output = Result<Metadata>> + Send {
+        async move {
+            self.read_streaming(domain, filename)
+                .await
+                .map(|(meta, _)| meta)
+        }
+    }
 
     fn delete(&self, domain: &str, filename: &str) -> impl Future<Output = Result<()>> + Send;
 }