use axum::extract::multipart::Field;
use tokio_stream::Stream;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::future::Future;
use std::ops::Bound;
use std::pin::Pin;
use std::fmt::Debug;
use std::num::NonZeroUsize;
pub mod file;
#[derive(Debug, Deserialize, Serialize)]
pub struct Metadata {
/// Content type of the file. If None, the content-type is considered undefined.
pub content_type: Option<String>,
/// The original filename that was passed.
pub filename: Option<String>,
/// The recorded length of the file.
pub length: Option<NonZeroUsize>,
/// The e-tag of a file. Note: it must be a strong e-tag, for example, a hash.
pub etag: Option<String>,
}
impl From<&Field<'_>> for Metadata {
fn from(field: &Field<'_>) -> Self {
Self {
content_type: field.content_type()
.map(|i| i.to_owned()),
filename: field.file_name()
.map(|i| i.to_owned()),
length: None,
etag: None,
}
}
}
#[derive(Debug, Clone, Copy)]
pub enum ErrorKind {
Backend,
Permission,
Json,
NotFound,
Other,
}
#[derive(Debug)]
pub struct MediaStoreError {
kind: ErrorKind,
source: Option<Box<dyn std::error::Error + Send + Sync>>,
msg: String,
}
impl MediaStoreError {
pub fn kind(&self) -> ErrorKind {
self.kind
}
}
impl std::error::Error for MediaStoreError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.source
.as_ref()
.map(|i| i.as_ref() as &dyn std::error::Error)
}
}
impl std::fmt::Display for MediaStoreError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}: {}",
match self.kind {
ErrorKind::Backend => "media storage backend error",
ErrorKind::Permission => "permission denied",
ErrorKind::Json => "failed to parse json",
ErrorKind::NotFound => "blob not found",
ErrorKind::Other => "unknown media storage error",
},
self.msg
)
}
}
pub type Result<T> = std::result::Result<T, MediaStoreError>;
pub trait MediaStore: 'static + Send + Sync + Clone {
// Initialize self from a URL, possibly performing asynchronous initialization.
fn new(url: &'_ url::Url) -> impl Future<Output = Result<Self>> + Send;
fn write_streaming<T>(
&self,
domain: &str,
metadata: Metadata,
content: T,
) -> impl Future<Output = Result<String>> + Send
where
T: tokio_stream::Stream<Item = std::result::Result<bytes::Bytes, axum::extract::multipart::MultipartError>> + Unpin + Send + Debug;
fn read_streaming(
&self,
domain: &str,
filename: &str,
) -> impl Future<Output = Result<
(Metadata, Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>)
>> + Send;
fn stream_range(
&self,
domain: &str,
filename: &str,
range: (Bound<u64>, Bound<u64>)
) -> impl Future<Output = Result<Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>>> + Send { async move {
use futures::stream::TryStreamExt;
use tracing::debug;
let (metadata, mut stream) = self.read_streaming(domain, filename).await?;
let length = metadata.length.unwrap().get();
use Bound::*;
let (start, end): (usize, usize) = match range {
(Unbounded, Unbounded) => return Ok(stream),
(Included(start), Unbounded) => (start.try_into().unwrap(), length - 1),
(Unbounded, Included(end)) => (length - usize::try_from(end).unwrap(), length - 1),
(Included(start), Included(end)) => (start.try_into().unwrap(), end.try_into().unwrap()),
(_, _) => unreachable!()
};
stream = Box::pin(
stream.map_ok({
let mut bytes_skipped = 0usize;
let mut bytes_read = 0usize;
move |chunk| {
debug!("Skipped {}/{} bytes, chunk len {}", bytes_skipped, start, chunk.len());
let chunk = if bytes_skipped < start {
let need_to_skip = start - bytes_skipped;
if chunk.len() < need_to_skip {
return None
}
debug!("Skipping {} bytes", need_to_skip);
bytes_skipped += need_to_skip;
chunk.slice(need_to_skip..)
} else {
chunk
};
debug!("Read {} bytes from file, {} in this chunk", bytes_read, chunk.len());
bytes_read += chunk.len();
if bytes_read > length {
if bytes_read - length > chunk.len() {
return None
}
debug!("Truncating last {} bytes", bytes_read - length);
return Some(chunk.slice(..chunk.len() - (bytes_read - length)))
}
Some(chunk)
}
})
.try_skip_while(|x| std::future::ready(Ok(x.is_none())))
.try_take_while(|x| std::future::ready(Ok(x.is_some())))
.map_ok(|x| x.unwrap())
);
Ok(stream)
} }
/// Read metadata for a file.
///
/// The default implementation uses the `read_streaming` method
/// and drops the stream containing file content.
fn metadata(&self, domain: &str, filename: &str) -> impl Future<Output = Result<Metadata>> + Send { async move {
self.read_streaming(domain, filename)
.await
.map(|(meta, stream)| meta)
} }
fn delete(&self, domain: &str, filename: &str) -> impl Future<Output = Result<()>> + Send;
}