diff options
Diffstat (limited to 'kittybox-rs/src/media/storage/mod.rs')
-rw-r--r-- | kittybox-rs/src/media/storage/mod.rs | 177 |
1 files changed, 0 insertions, 177 deletions
diff --git a/kittybox-rs/src/media/storage/mod.rs b/kittybox-rs/src/media/storage/mod.rs deleted file mode 100644 index 020999c..0000000 --- a/kittybox-rs/src/media/storage/mod.rs +++ /dev/null @@ -1,177 +0,0 @@ -use async_trait::async_trait; -use axum::extract::multipart::Field; -use tokio_stream::Stream; -use bytes::Bytes; -use serde::{Deserialize, Serialize}; -use std::ops::Bound; -use std::pin::Pin; -use std::fmt::Debug; -use std::num::NonZeroUsize; - -pub mod file; - -#[derive(Debug, Deserialize, Serialize)] -pub struct Metadata { - /// Content type of the file. If None, the content-type is considered undefined. - pub content_type: Option<String>, - /// The original filename that was passed. - pub filename: Option<String>, - /// The recorded length of the file. - pub length: Option<NonZeroUsize>, - /// The e-tag of a file. Note: it must be a strong e-tag, for example, a hash. - pub etag: Option<String>, -} -impl From<&Field<'_>> for Metadata { - fn from(field: &Field<'_>) -> Self { - Self { - content_type: field.content_type() - .map(|i| i.to_owned()), - filename: field.file_name() - .map(|i| i.to_owned()), - length: None, - etag: None, - } - } -} - - -#[derive(Debug, Clone, Copy)] -pub enum ErrorKind { - Backend, - Permission, - Json, - NotFound, - Other, -} - -#[derive(Debug)] -pub struct MediaStoreError { - kind: ErrorKind, - source: Option<Box<dyn std::error::Error + Send + Sync>>, - msg: String, -} - -impl MediaStoreError { - pub fn kind(&self) -> ErrorKind { - self.kind - } -} - -impl std::error::Error for MediaStoreError { - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - self.source - .as_ref() - .map(|i| i.as_ref() as &dyn std::error::Error) - } -} - -impl std::fmt::Display for MediaStoreError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "{}: {}", - match self.kind { - ErrorKind::Backend => "media storage backend error", - ErrorKind::Permission => "permission denied", - ErrorKind::Json => "failed to parse json", - ErrorKind::NotFound => "blob not found", - ErrorKind::Other => "unknown media storage error", - }, - self.msg - ) - } -} - -pub type Result<T> = std::result::Result<T, MediaStoreError>; - -#[async_trait] -pub trait MediaStore: 'static + Send + Sync + Clone { - async fn write_streaming<T>( - &self, - domain: &str, - metadata: Metadata, - content: T, - ) -> Result<String> - where - T: tokio_stream::Stream<Item = std::result::Result<bytes::Bytes, axum::extract::multipart::MultipartError>> + Unpin + Send + Debug; - - async fn read_streaming( - &self, - domain: &str, - filename: &str, - ) -> Result<(Metadata, Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>)>; - - async fn stream_range( - &self, - domain: &str, - filename: &str, - range: (Bound<u64>, Bound<u64>) - ) -> Result<Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>> { - use futures::stream::TryStreamExt; - use tracing::debug; - let (metadata, mut stream) = self.read_streaming(domain, filename).await?; - let length = metadata.length.unwrap().get(); - - use Bound::*; - let (start, end): (usize, usize) = match range { - (Unbounded, Unbounded) => return Ok(stream), - (Included(start), Unbounded) => (start.try_into().unwrap(), length - 1), - (Unbounded, Included(end)) => (length - usize::try_from(end).unwrap(), length - 1), - (Included(start), Included(end)) => (start.try_into().unwrap(), end.try_into().unwrap()), - (_, _) => unreachable!() - }; - - stream = Box::pin( - stream.map_ok({ - let mut bytes_skipped = 0usize; - let mut bytes_read = 0usize; - - move |chunk| { - debug!("Skipped {}/{} bytes, chunk len {}", bytes_skipped, start, chunk.len()); - let chunk = if bytes_skipped < start { - let need_to_skip = start - bytes_skipped; - if chunk.len() < need_to_skip { - return None - } - debug!("Skipping {} bytes", need_to_skip); - bytes_skipped += need_to_skip; - - chunk.slice(need_to_skip..) - } else { - chunk - }; - - debug!("Read {} bytes from file, {} in this chunk", bytes_read, chunk.len()); - bytes_read += chunk.len(); - - if bytes_read > length { - if bytes_read - length > chunk.len() { - return None - } - debug!("Truncating last {} bytes", bytes_read - length); - return Some(chunk.slice(..chunk.len() - (bytes_read - length))) - } - - Some(chunk) - } - }) - .try_skip_while(|x| std::future::ready(Ok(x.is_none()))) - .try_take_while(|x| std::future::ready(Ok(x.is_some()))) - .map_ok(|x| x.unwrap()) - ); - - return Ok(stream); - } - - /// Read metadata for a file. - /// - /// The default implementation uses the `read_streaming` method - /// and drops the stream containing file content. - async fn metadata(&self, domain: &str, filename: &str) -> Result<Metadata> { - self.read_streaming(domain, filename) - .await - .map(|(meta, stream)| meta) - } - - async fn delete(&self, domain: &str, filename: &str) -> Result<()>; -} |