diff options
Diffstat (limited to 'src/media/storage/mod.rs')
-rw-r--r-- | src/media/storage/mod.rs | 177 |
1 files changed, 177 insertions, 0 deletions
diff --git a/src/media/storage/mod.rs b/src/media/storage/mod.rs new file mode 100644 index 0000000..020999c --- /dev/null +++ b/src/media/storage/mod.rs @@ -0,0 +1,177 @@ +use async_trait::async_trait; +use axum::extract::multipart::Field; +use tokio_stream::Stream; +use bytes::Bytes; +use serde::{Deserialize, Serialize}; +use std::ops::Bound; +use std::pin::Pin; +use std::fmt::Debug; +use std::num::NonZeroUsize; + +pub mod file; + +#[derive(Debug, Deserialize, Serialize)] +pub struct Metadata { + /// Content type of the file. If None, the content-type is considered undefined. + pub content_type: Option<String>, + /// The original filename that was passed. + pub filename: Option<String>, + /// The recorded length of the file. + pub length: Option<NonZeroUsize>, + /// The e-tag of a file. Note: it must be a strong e-tag, for example, a hash. + pub etag: Option<String>, +} +impl From<&Field<'_>> for Metadata { + fn from(field: &Field<'_>) -> Self { + Self { + content_type: field.content_type() + .map(|i| i.to_owned()), + filename: field.file_name() + .map(|i| i.to_owned()), + length: None, + etag: None, + } + } +} + + +#[derive(Debug, Clone, Copy)] +pub enum ErrorKind { + Backend, + Permission, + Json, + NotFound, + Other, +} + +#[derive(Debug)] +pub struct MediaStoreError { + kind: ErrorKind, + source: Option<Box<dyn std::error::Error + Send + Sync>>, + msg: String, +} + +impl MediaStoreError { + pub fn kind(&self) -> ErrorKind { + self.kind + } +} + +impl std::error::Error for MediaStoreError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + self.source + .as_ref() + .map(|i| i.as_ref() as &dyn std::error::Error) + } +} + +impl std::fmt::Display for MediaStoreError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}: {}", + match self.kind { + ErrorKind::Backend => "media storage backend error", + ErrorKind::Permission => "permission denied", + ErrorKind::Json => "failed to parse json", + ErrorKind::NotFound => "blob not found", + ErrorKind::Other => "unknown media storage error", + }, + self.msg + ) + } +} + +pub type Result<T> = std::result::Result<T, MediaStoreError>; + +#[async_trait] +pub trait MediaStore: 'static + Send + Sync + Clone { + async fn write_streaming<T>( + &self, + domain: &str, + metadata: Metadata, + content: T, + ) -> Result<String> + where + T: tokio_stream::Stream<Item = std::result::Result<bytes::Bytes, axum::extract::multipart::MultipartError>> + Unpin + Send + Debug; + + async fn read_streaming( + &self, + domain: &str, + filename: &str, + ) -> Result<(Metadata, Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>)>; + + async fn stream_range( + &self, + domain: &str, + filename: &str, + range: (Bound<u64>, Bound<u64>) + ) -> Result<Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>> { + use futures::stream::TryStreamExt; + use tracing::debug; + let (metadata, mut stream) = self.read_streaming(domain, filename).await?; + let length = metadata.length.unwrap().get(); + + use Bound::*; + let (start, end): (usize, usize) = match range { + (Unbounded, Unbounded) => return Ok(stream), + (Included(start), Unbounded) => (start.try_into().unwrap(), length - 1), + (Unbounded, Included(end)) => (length - usize::try_from(end).unwrap(), length - 1), + (Included(start), Included(end)) => (start.try_into().unwrap(), end.try_into().unwrap()), + (_, _) => unreachable!() + }; + + stream = Box::pin( + stream.map_ok({ + let mut bytes_skipped = 0usize; + let mut bytes_read = 0usize; + + move |chunk| { + debug!("Skipped {}/{} bytes, chunk len {}", bytes_skipped, start, chunk.len()); + let chunk = if bytes_skipped < start { + let need_to_skip = start - bytes_skipped; + if chunk.len() < need_to_skip { + return None + } + debug!("Skipping {} bytes", need_to_skip); + bytes_skipped += need_to_skip; + + chunk.slice(need_to_skip..) + } else { + chunk + }; + + debug!("Read {} bytes from file, {} in this chunk", bytes_read, chunk.len()); + bytes_read += chunk.len(); + + if bytes_read > length { + if bytes_read - length > chunk.len() { + return None + } + debug!("Truncating last {} bytes", bytes_read - length); + return Some(chunk.slice(..chunk.len() - (bytes_read - length))) + } + + Some(chunk) + } + }) + .try_skip_while(|x| std::future::ready(Ok(x.is_none()))) + .try_take_while(|x| std::future::ready(Ok(x.is_some()))) + .map_ok(|x| x.unwrap()) + ); + + return Ok(stream); + } + + /// Read metadata for a file. + /// + /// The default implementation uses the `read_streaming` method + /// and drops the stream containing file content. + async fn metadata(&self, domain: &str, filename: &str) -> Result<Metadata> { + self.read_streaming(domain, filename) + .await + .map(|(meta, stream)| meta) + } + + async fn delete(&self, domain: &str, filename: &str) -> Result<()>; +} |