about summary refs log tree commit diff
path: root/src/media/storage/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/media/storage/mod.rs')
-rw-r--r--src/media/storage/mod.rs177
1 files changed, 177 insertions, 0 deletions
diff --git a/src/media/storage/mod.rs b/src/media/storage/mod.rs
new file mode 100644
index 0000000..020999c
--- /dev/null
+++ b/src/media/storage/mod.rs
@@ -0,0 +1,177 @@
+use async_trait::async_trait;
+use axum::extract::multipart::Field;
+use tokio_stream::Stream;
+use bytes::Bytes;
+use serde::{Deserialize, Serialize};
+use std::ops::Bound;
+use std::pin::Pin;
+use std::fmt::Debug;
+use std::num::NonZeroUsize;
+
+pub mod file;
+
+#[derive(Debug, Deserialize, Serialize)]
+pub struct Metadata {
+    /// Content type of the file. If None, the content-type is considered undefined.
+    pub content_type: Option<String>,
+    /// The original filename that was passed.
+    pub filename: Option<String>,
+    /// The recorded length of the file.
+    pub length: Option<NonZeroUsize>,
+    /// The e-tag of a file. Note: it must be a strong e-tag, for example, a hash.
+    pub etag: Option<String>,
+}
+impl From<&Field<'_>> for Metadata {
+    fn from(field: &Field<'_>) -> Self {
+        Self {
+            content_type: field.content_type()
+                .map(|i| i.to_owned()),
+            filename: field.file_name()
+                .map(|i| i.to_owned()),
+            length: None,
+            etag: None,
+        }
+    }
+}
+
+
+#[derive(Debug, Clone, Copy)]
+pub enum ErrorKind {
+    Backend,
+    Permission,
+    Json,
+    NotFound,
+    Other,
+}
+
+#[derive(Debug)]
+pub struct MediaStoreError {
+    kind: ErrorKind,
+    source: Option<Box<dyn std::error::Error + Send + Sync>>,
+    msg: String,
+}
+
+impl MediaStoreError {
+    pub fn kind(&self) -> ErrorKind {
+        self.kind
+    }
+}
+
+impl std::error::Error for MediaStoreError {
+    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
+        self.source
+            .as_ref()
+            .map(|i| i.as_ref() as &dyn std::error::Error)
+    }
+}
+
+impl std::fmt::Display for MediaStoreError {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(
+            f,
+            "{}: {}",
+            match self.kind {
+                ErrorKind::Backend => "media storage backend error",
+                ErrorKind::Permission => "permission denied",
+                ErrorKind::Json => "failed to parse json",
+                ErrorKind::NotFound => "blob not found",
+                ErrorKind::Other => "unknown media storage error",
+            },
+            self.msg
+        )
+    }
+}
+
+pub type Result<T> = std::result::Result<T, MediaStoreError>;
+
+#[async_trait]
+pub trait MediaStore: 'static + Send + Sync + Clone {
+    async fn write_streaming<T>(
+        &self,
+        domain: &str,
+        metadata: Metadata,
+        content: T,
+    ) -> Result<String>
+    where
+        T: tokio_stream::Stream<Item = std::result::Result<bytes::Bytes, axum::extract::multipart::MultipartError>> + Unpin + Send + Debug;
+
+    async fn read_streaming(
+        &self,
+        domain: &str,
+        filename: &str,
+    ) -> Result<(Metadata, Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>)>;
+
+    async fn stream_range(
+        &self,
+        domain: &str,
+        filename: &str,
+        range: (Bound<u64>, Bound<u64>)
+    ) -> Result<Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>> {
+        use futures::stream::TryStreamExt;
+        use tracing::debug;
+        let (metadata, mut stream) = self.read_streaming(domain, filename).await?;
+        let length = metadata.length.unwrap().get();
+
+        use Bound::*;
+        let (start, end): (usize, usize) = match range {
+            (Unbounded, Unbounded) => return Ok(stream),
+            (Included(start), Unbounded) => (start.try_into().unwrap(), length - 1),
+            (Unbounded, Included(end)) => (length - usize::try_from(end).unwrap(), length - 1),
+            (Included(start), Included(end)) => (start.try_into().unwrap(), end.try_into().unwrap()),
+            (_, _) => unreachable!()
+        };
+
+        stream = Box::pin(
+            stream.map_ok({
+                let mut bytes_skipped = 0usize;
+                let mut bytes_read = 0usize;
+
+                move |chunk| {
+                    debug!("Skipped {}/{} bytes, chunk len {}", bytes_skipped, start, chunk.len());
+                    let chunk = if bytes_skipped < start {
+                        let need_to_skip = start - bytes_skipped;
+                        if chunk.len() < need_to_skip {
+                            return None
+                        }
+                        debug!("Skipping {} bytes", need_to_skip);
+                        bytes_skipped += need_to_skip;
+
+                        chunk.slice(need_to_skip..)
+                    } else {
+                        chunk
+                    };
+
+                    debug!("Read {} bytes from file, {} in this chunk", bytes_read, chunk.len());
+                    bytes_read += chunk.len();
+
+                    if bytes_read > length {
+                        if bytes_read - length > chunk.len() {
+                            return None
+                        }
+                        debug!("Truncating last {} bytes", bytes_read - length);
+                        return Some(chunk.slice(..chunk.len() - (bytes_read - length)))
+                    }
+
+                    Some(chunk)
+                }
+            })
+                .try_skip_while(|x| std::future::ready(Ok(x.is_none())))
+                .try_take_while(|x| std::future::ready(Ok(x.is_some())))
+                .map_ok(|x| x.unwrap())
+        );
+
+        return Ok(stream);
+    }
+
+    /// Read metadata for a file.
+    ///
+    /// The default implementation uses the `read_streaming` method
+    /// and drops the stream containing file content.
+    async fn metadata(&self, domain: &str, filename: &str) -> Result<Metadata> {
+        self.read_streaming(domain, filename)
+            .await
+            .map(|(meta, stream)| meta)
+    }
+
+    async fn delete(&self, domain: &str, filename: &str) -> Result<()>;
+}