about summary refs log blame commit diff
path: root/src/media/storage/mod.rs
blob: b6f6ad7e429d333e9c9c9092612c670a14532f2e (plain) (tree)
1
2
3
4
5
6
7
8
9


                                    
                        
                    
                  
                    
                           

             
                                        




                                                                                    
                                                                                  



                                              
                                       
                                       
                         



         


                             
             








                                                             




                                     














                                                                        
                                                          







                                                                  
                                                     
                                                                                   

                                                                           
              

                           
                                                    
         
                                                                                                                                           
 
                      
              
                     
                       

                                                                              
 
                    


                                       
                                                                                                                       



















































                                                                                                     
                  
       



                                                                   
                                                                                                                    

                                             
       
 
                                                                                              
 
use axum::extract::multipart::Field;
use tokio_stream::Stream;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::future::Future;
use std::ops::Bound;
use std::pin::Pin;
use std::fmt::Debug;
use std::num::NonZeroUsize;

pub mod file;

#[derive(Debug, Deserialize, Serialize)]
pub struct Metadata {
    /// Content type of the file. If None, the content-type is considered undefined.
    pub content_type: Option<String>,
    /// The original filename that was passed.
    pub filename: Option<String>,
    /// The recorded length of the file.
    pub length: Option<NonZeroUsize>,
    /// The e-tag of a file. Note: it must be a strong e-tag, for example, a hash.
    pub etag: Option<String>,
}
impl From<&Field<'_>> for Metadata {
    fn from(field: &Field<'_>) -> Self {
        Self {
            content_type: field.content_type()
                .map(|i| i.to_owned()),
            filename: field.file_name()
                .map(|i| i.to_owned()),
            length: None,
            etag: None,
        }
    }
}


#[derive(Debug, Clone, Copy)]
pub enum ErrorKind {
    Backend,
    Permission,
    Json,
    NotFound,
    Other,
}

#[derive(Debug)]
pub struct MediaStoreError {
    kind: ErrorKind,
    source: Option<Box<dyn std::error::Error + Send + Sync>>,
    msg: String,
}

impl MediaStoreError {
    pub fn kind(&self) -> ErrorKind {
        self.kind
    }
}

impl std::error::Error for MediaStoreError {
    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
        self.source
            .as_ref()
            .map(|i| i.as_ref() as &dyn std::error::Error)
    }
}

impl std::fmt::Display for MediaStoreError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(
            f,
            "{}: {}",
            match self.kind {
                ErrorKind::Backend => "media storage backend error",
                ErrorKind::Permission => "permission denied",
                ErrorKind::Json => "failed to parse json",
                ErrorKind::NotFound => "blob not found",
                ErrorKind::Other => "unknown media storage error",
            },
            self.msg
        )
    }
}

pub type Result<T> = std::result::Result<T, MediaStoreError>;

pub trait MediaStore: 'static + Send + Sync + Clone {
    // Initialize self from a URL, possibly performing asynchronous initialization.
    fn new(url: &'_ url::Url) -> impl Future<Output = Result<Self>> + Send;

    fn write_streaming<T>(
        &self,
        domain: &str,
        metadata: Metadata,
        content: T,
    ) -> impl Future<Output = Result<String>> + Send
    where
        T: tokio_stream::Stream<Item = std::result::Result<bytes::Bytes, axum::extract::multipart::MultipartError>> + Unpin + Send + Debug;

    fn read_streaming(
        &self,
        domain: &str,
        filename: &str,
    ) -> impl Future<Output = Result<
        (Metadata, Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>)
        >> + Send;

    fn stream_range(
        &self,
        domain: &str,
        filename: &str,
        range: (Bound<u64>, Bound<u64>)
    ) -> impl Future<Output = Result<Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>>> + Send { async move {
        use futures::stream::TryStreamExt;
        use tracing::debug;
        let (metadata, mut stream) = self.read_streaming(domain, filename).await?;
        let length = metadata.length.unwrap().get();

        use Bound::*;
        let (start, end): (usize, usize) = match range {
            (Unbounded, Unbounded) => return Ok(stream),
            (Included(start), Unbounded) => (start.try_into().unwrap(), length - 1),
            (Unbounded, Included(end)) => (length - usize::try_from(end).unwrap(), length - 1),
            (Included(start), Included(end)) => (start.try_into().unwrap(), end.try_into().unwrap()),
            (_, _) => unreachable!()
        };

        stream = Box::pin(
            stream.map_ok({
                let mut bytes_skipped = 0usize;
                let mut bytes_read = 0usize;

                move |chunk| {
                    debug!("Skipped {}/{} bytes, chunk len {}", bytes_skipped, start, chunk.len());
                    let chunk = if bytes_skipped < start {
                        let need_to_skip = start - bytes_skipped;
                        if chunk.len() < need_to_skip {
                            return None
                        }
                        debug!("Skipping {} bytes", need_to_skip);
                        bytes_skipped += need_to_skip;

                        chunk.slice(need_to_skip..)
                    } else {
                        chunk
                    };

                    debug!("Read {} bytes from file, {} in this chunk", bytes_read, chunk.len());
                    bytes_read += chunk.len();

                    if bytes_read > length {
                        if bytes_read - length > chunk.len() {
                            return None
                        }
                        debug!("Truncating last {} bytes", bytes_read - length);
                        return Some(chunk.slice(..chunk.len() - (bytes_read - length)))
                    }

                    Some(chunk)
                }
            })
                .try_skip_while(|x| std::future::ready(Ok(x.is_none())))
                .try_take_while(|x| std::future::ready(Ok(x.is_some())))
                .map_ok(|x| x.unwrap())
        );

        Ok(stream)
    } }

    /// Read metadata for a file.
    ///
    /// The default implementation uses the `read_streaming` method
    /// and drops the stream containing file content.
    fn metadata(&self, domain: &str, filename: &str) -> impl Future<Output = Result<Metadata>> + Send { async move {
        self.read_streaming(domain, filename)
            .await
            .map(|(meta, stream)| meta)
    } }

    fn delete(&self, domain: &str, filename: &str) -> impl Future<Output = Result<()>> + Send;
}