use async_trait::async_trait; use axum::extract::multipart::Field; use tokio_stream::Stream; use bytes::Bytes; use serde::{Deserialize, Serialize}; use std::ops::Bound; use std::pin::Pin; use std::fmt::Debug; use std::num::NonZeroUsize; pub mod file; #[derive(Debug, Deserialize, Serialize)] pub struct Metadata { /// Content type of the file. If None, the content-type is considered undefined. pub content_type: Option<String>, /// The original filename that was passed. pub filename: Option<String>, /// The recorded length of the file. pub length: Option<NonZeroUsize>, /// The e-tag of a file. Note: it must be a strong e-tag, for example, a hash. pub etag: Option<String>, } impl From<&Field<'_>> for Metadata { fn from(field: &Field<'_>) -> Self { Self { content_type: field.content_type() .map(|i| i.to_owned()), filename: field.file_name() .map(|i| i.to_owned()), length: None, etag: None, } } } #[derive(Debug, Clone, Copy)] pub enum ErrorKind { Backend, Permission, Json, NotFound, Other, } #[derive(Debug)] pub struct MediaStoreError { kind: ErrorKind, source: Option<Box<dyn std::error::Error + Send + Sync>>, msg: String, } impl MediaStoreError { pub fn kind(&self) -> ErrorKind { self.kind } } impl std::error::Error for MediaStoreError { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { self.source .as_ref() .map(|i| i.as_ref() as &dyn std::error::Error) } } impl std::fmt::Display for MediaStoreError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, "{}: {}", match self.kind { ErrorKind::Backend => "media storage backend error", ErrorKind::Permission => "permission denied", ErrorKind::Json => "failed to parse json", ErrorKind::NotFound => "blob not found", ErrorKind::Other => "unknown media storage error", }, self.msg ) } } pub type Result<T> = std::result::Result<T, MediaStoreError>; #[async_trait] pub trait MediaStore: 'static + Send + Sync + Clone { async fn write_streaming<T>( &self, domain: &str, metadata: Metadata, content: T, ) -> Result<String> where T: tokio_stream::Stream<Item = std::result::Result<bytes::Bytes, axum::extract::multipart::MultipartError>> + Unpin + Send + Debug; async fn read_streaming( &self, domain: &str, filename: &str, ) -> Result<(Metadata, Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>)>; async fn stream_range( &self, domain: &str, filename: &str, range: (Bound<u64>, Bound<u64>) ) -> Result<Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>> { use futures::stream::TryStreamExt; use tracing::debug; let (metadata, mut stream) = self.read_streaming(domain, filename).await?; let length = metadata.length.unwrap().get(); use Bound::*; let (start, end): (usize, usize) = match range { (Unbounded, Unbounded) => return Ok(stream), (Included(start), Unbounded) => (start.try_into().unwrap(), length - 1), (Unbounded, Included(end)) => (length - usize::try_from(end).unwrap(), length - 1), (Included(start), Included(end)) => (start.try_into().unwrap(), end.try_into().unwrap()), (_, _) => unreachable!() }; stream = Box::pin( stream.map_ok({ let mut bytes_skipped = 0usize; let mut bytes_read = 0usize; move |chunk| { debug!("Skipped {}/{} bytes, chunk len {}", bytes_skipped, start, chunk.len()); let chunk = if bytes_skipped < start { let need_to_skip = start - bytes_skipped; if chunk.len() < need_to_skip { return None } debug!("Skipping {} bytes", need_to_skip); bytes_skipped += need_to_skip; chunk.slice(need_to_skip..) } else { chunk }; debug!("Read {} bytes from file, {} in this chunk", bytes_read, chunk.len()); bytes_read += chunk.len(); if bytes_read > length { if bytes_read - length > chunk.len() { return None } debug!("Truncating last {} bytes", bytes_read - length); return Some(chunk.slice(..chunk.len() - (bytes_read - length))) } Some(chunk) } }) .try_skip_while(|x| std::future::ready(Ok(x.is_none()))) .try_take_while(|x| std::future::ready(Ok(x.is_some()))) .map_ok(|x| x.unwrap()) ); return Ok(stream); } /// Read metadata for a file. /// /// The default implementation uses the `read_streaming` method /// and drops the stream containing file content. async fn metadata(&self, domain: &str, filename: &str) -> Result<Metadata> { self.read_streaming(domain, filename) .await .map(|(meta, stream)| meta) } async fn delete(&self, domain: &str, filename: &str) -> Result<()>; }