about summary refs log tree commit diff
path: root/kittybox-rs/src/media/storage
diff options
context:
space:
mode:
authorVika <vika@fireburn.ru>2023-07-29 21:59:56 +0300
committerVika <vika@fireburn.ru>2023-07-29 21:59:56 +0300
commit0617663b249f9ca488e5de652108b17d67fbaf45 (patch)
tree11564b6c8fa37bf9203a0a4cc1c4e9cc088cb1a5 /kittybox-rs/src/media/storage
parent26c2b79f6a6380ae3224e9309b9f3352f5717bd7 (diff)
downloadkittybox-0617663b249f9ca488e5de652108b17d67fbaf45.tar.zst
Moved the entire Kittybox tree into the root
Diffstat (limited to 'kittybox-rs/src/media/storage')
-rw-r--r--kittybox-rs/src/media/storage/file.rs434
-rw-r--r--kittybox-rs/src/media/storage/mod.rs177
2 files changed, 0 insertions, 611 deletions
diff --git a/kittybox-rs/src/media/storage/file.rs b/kittybox-rs/src/media/storage/file.rs
deleted file mode 100644
index 0aaaa3b..0000000
--- a/kittybox-rs/src/media/storage/file.rs
+++ /dev/null
@@ -1,434 +0,0 @@
-use super::{Metadata, ErrorKind, MediaStore, MediaStoreError, Result};
-use async_trait::async_trait;
-use std::{path::PathBuf, fmt::Debug};
-use tokio::fs::OpenOptions;
-use tokio::io::{BufReader, BufWriter, AsyncWriteExt, AsyncSeekExt};
-use futures::{StreamExt, TryStreamExt};
-use std::ops::{Bound, RangeBounds, Neg};
-use std::pin::Pin;
-use sha2::Digest;
-use futures::FutureExt;
-use tracing::{debug, error};
-
-const BUF_CAPACITY: usize = 16 * 1024;
-
-#[derive(Clone)]
-pub struct FileStore {
-    base: PathBuf,
-}
-
-impl From<tokio::io::Error> for MediaStoreError {
-    fn from(source: tokio::io::Error) -> Self {
-        Self {
-            msg: format!("file I/O error: {}", source),
-            kind: match source.kind() {
-                std::io::ErrorKind::NotFound => ErrorKind::NotFound,
-                _ => ErrorKind::Backend
-            },
-            source: Some(Box::new(source)),
-        }
-    }
-}
-
-impl FileStore {
-    pub fn new<T: Into<PathBuf>>(base: T) -> Self {
-        Self { base: base.into() }
-    }
-
-    async fn mktemp(&self) -> Result<(PathBuf, BufWriter<tokio::fs::File>)> {
-        kittybox_util::fs::mktemp(&self.base, "temp", 16)
-            .await
-            .map(|(name, file)| (name, BufWriter::new(file)))
-            .map_err(Into::into)
-    }
-}
-
-#[async_trait]
-impl MediaStore for FileStore {
-
-    #[tracing::instrument(skip(self, content))]
-    async fn write_streaming<T>(
-        &self,
-        domain: &str,
-        mut metadata: Metadata,
-        mut content: T,
-    ) -> Result<String>
-    where
-        T: tokio_stream::Stream<Item = std::result::Result<bytes::Bytes, axum::extract::multipart::MultipartError>> + Unpin + Send + Debug
-    {
-        let (tempfilepath, mut tempfile) = self.mktemp().await?;
-        debug!("Temporary file opened for storing pending upload: {}", tempfilepath.display());
-        let mut hasher = sha2::Sha256::new();
-        let mut length: usize = 0;
-
-        while let Some(chunk) = content.next().await {
-            let chunk = chunk.map_err(|err| MediaStoreError {
-                kind: ErrorKind::Backend,
-                source: Some(Box::new(err)),
-                msg: "Failed to read a data chunk".to_owned()
-            })?;
-            debug!("Read {} bytes from the stream", chunk.len());
-            length += chunk.len();
-            let (write_result, _hasher) = tokio::join!(
-                {
-                    let chunk = chunk.clone();
-                    let tempfile = &mut tempfile;
-                    async move {
-                        tempfile.write_all(&*chunk).await
-                    }
-                },
-                {
-                    let chunk = chunk.clone();
-                    tokio::task::spawn_blocking(move || {
-                        hasher.update(&*chunk);
-
-                        hasher
-                    }).map(|r| r.unwrap())
-                }
-            );
-            if let Err(err) = write_result {
-                error!("Error while writing pending upload: {}", err);
-                drop(tempfile);
-                // this is just cleanup, nothing fails if it fails
-                // though temporary files might take up space on the hard drive
-                // We'll clean them when maintenance time comes
-                #[allow(unused_must_use)]
-                { tokio::fs::remove_file(tempfilepath).await; }
-                return Err(err.into());
-            }
-            hasher = _hasher;
-        }
-        // Manually flush the buffer and drop the handle to close the file
-        tempfile.flush().await?;
-        tempfile.into_inner().sync_all().await?;
-
-        let hash = hasher.finalize();
-        debug!("Pending upload hash: {}", hex::encode(&hash));
-        let filename = format!(
-            "{}/{}/{}/{}/{}",
-            hex::encode([hash[0]]),
-            hex::encode([hash[1]]),
-            hex::encode([hash[2]]),
-            hex::encode([hash[3]]),
-            hex::encode(&hash[4..32])
-        );
-        let domain_str = domain.to_string();
-        let filepath = self.base.join(domain_str.as_str()).join(&filename);
-        let metafilename = filename.clone() + ".json";
-        let metapath = self.base.join(domain_str.as_str()).join(&metafilename);
-        let metatemppath = self.base.join(domain_str.as_str()).join(metafilename + ".tmp");
-        metadata.length = std::num::NonZeroUsize::new(length);
-        metadata.etag = Some(hex::encode(&hash));
-        debug!("File path: {}, metadata: {}", filepath.display(), metapath.display());
-        {
-            let parent = filepath.parent().unwrap();
-            tokio::fs::create_dir_all(parent).await?;            
-        }
-        let mut meta = OpenOptions::new()
-            .create_new(true)
-            .write(true)
-            .open(&metatemppath)
-            .await?;
-        meta.write_all(&serde_json::to_vec(&metadata).unwrap()).await?;
-        tokio::fs::rename(tempfilepath, filepath).await?;
-        tokio::fs::rename(metatemppath, metapath).await?;
-        Ok(filename)
-    }
-
-    #[tracing::instrument(skip(self))]
-    async fn read_streaming(
-        &self,
-        domain: &str,
-        filename: &str,
-    ) -> Result<(Metadata, Pin<Box<dyn tokio_stream::Stream<Item = std::io::Result<bytes::Bytes>> + Send>>)> {
-        debug!("Domain: {}, filename: {}", domain, filename);
-        let path = self.base.join(domain).join(filename);
-        debug!("Path: {}", path.display());
-
-        let file = OpenOptions::new()
-            .read(true)
-            .open(path)
-            .await?;
-        let meta = self.metadata(domain, filename).await?;
-
-        Ok((meta, Box::pin(
-            tokio_util::io::ReaderStream::new(
-                // TODO: determine if BufReader provides benefit here
-                // From the logs it looks like we're reading 4KiB at a time
-                // Buffering file contents seems to double download speed
-                // How to benchmark this?
-                BufReader::with_capacity(BUF_CAPACITY, file)
-            )
-            // Sprinkle some salt in form of protective log wrapping
-                .inspect_ok(|chunk| debug!("Read {} bytes from file", chunk.len()))
-        )))
-    }
-
-    #[tracing::instrument(skip(self))]
-    async fn metadata(&self, domain: &str, filename: &str) -> Result<Metadata> {
-        let metapath = self.base.join(domain).join(format!("{}.json", filename));
-        debug!("Metadata path: {}", metapath.display());
-
-        let meta = serde_json::from_slice(&tokio::fs::read(metapath).await?)
-            .map_err(|err| MediaStoreError {
-                kind: ErrorKind::Json,
-                msg: format!("{}", err),
-                source: Some(Box::new(err))
-            })?;
-
-        Ok(meta)
-    }
-    
-    #[tracing::instrument(skip(self))]
-    async fn stream_range(
-        &self,
-        domain: &str,
-        filename: &str,
-        range: (Bound<u64>, Bound<u64>)
-    ) -> Result<Pin<Box<dyn tokio_stream::Stream<Item = std::io::Result<bytes::Bytes>> + Send>>> {
-        let path = self.base.join(format!("{}/{}", domain, filename));
-        let metapath = self.base.join(format!("{}/{}.json", domain, filename));
-        debug!("Path: {}, metadata: {}", path.display(), metapath.display());
-
-        let mut file = OpenOptions::new()
-            .read(true)
-            .open(path)
-            .await?;
-
-        let start = match range {
-            (Bound::Included(bound), _) => {
-                debug!("Seeking {} bytes forward...", bound);
-                file.seek(std::io::SeekFrom::Start(bound)).await?
-            }
-            (Bound::Excluded(_), _) => unreachable!(),
-            (Bound::Unbounded, Bound::Included(bound)) => {
-                // Seek to the end minus the bounded bytes
-                debug!("Seeking {} bytes back from the end...", bound);
-                file.seek(std::io::SeekFrom::End(i64::try_from(bound).unwrap().neg())).await?
-            },
-            (Bound::Unbounded, Bound::Unbounded) => 0,
-            (_, Bound::Excluded(_)) => unreachable!()
-        };
-
-        let stream = Box::pin(tokio_util::io::ReaderStream::new(BufReader::with_capacity(BUF_CAPACITY, file)))
-            .map_ok({
-                let mut bytes_read = 0usize;
-                let len = match range {
-                    (_, Bound::Unbounded) => None,
-                    (Bound::Unbounded, Bound::Included(bound)) => Some(bound),
-                    (_, Bound::Included(bound)) => Some(bound + 1 - start),
-                    (_, Bound::Excluded(_)) => unreachable!()
-                };
-                move |chunk| {
-                    debug!("Read {} bytes from file, {} in this chunk", bytes_read, chunk.len());
-                    bytes_read += chunk.len();
-                    if let Some(len) = len.map(|len| len.try_into().unwrap()) {
-                        if bytes_read > len {
-                            if bytes_read - len > chunk.len() {
-                                return None
-                            }
-                            debug!("Truncating last {} bytes", bytes_read - len);
-                            return Some(chunk.slice(..chunk.len() - (bytes_read - len)))
-                        }
-                    }
-
-                    Some(chunk)
-                }
-            })
-            .try_take_while(|x| std::future::ready(Ok(x.is_some())))
-            // Will never panic, because the moment the stream yields
-            // a None, it is considered exhausted.
-            .map_ok(|x| x.unwrap());
-
-        return Ok(Box::pin(stream))
-    }
-
-
-    async fn delete(&self, domain: &str, filename: &str) -> Result<()> {
-        let path = self.base.join(format!("{}/{}", domain, filename));
-
-        Ok(tokio::fs::remove_file(path).await?)
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::{Metadata, FileStore, MediaStore};
-    use std::ops::Bound;
-    use tokio::io::AsyncReadExt;
-
-    #[tokio::test]
-    #[tracing_test::traced_test]
-    async fn test_ranges() {
-        let tempdir = tempfile::tempdir().expect("Failed to create tempdir");
-        let store = FileStore::new(tempdir.path());
-
-        let file: &[u8] = include_bytes!("./file.rs");
-        let stream = tokio_stream::iter(file.chunks(100).map(|i| Ok(bytes::Bytes::copy_from_slice(i))));
-        let metadata = Metadata {
-            filename: Some("file.rs".to_string()),
-            content_type: Some("text/plain".to_string()),
-            length: None,
-            etag: None,
-        };
-
-        // write through the interface
-        let filename = store.write_streaming(
-            "fireburn.ru",
-            metadata, stream
-        ).await.unwrap();
-
-        tracing::debug!("Writing complete.");
-
-        // Ensure the file is there
-        let content = tokio::fs::read(
-            tempdir.path()
-                .join("fireburn.ru")
-                .join(&filename)
-        ).await.unwrap();
-        assert_eq!(content, file);
-
-        tracing::debug!("Reading range from the start...");
-        // try to read range
-        let range = {
-            let stream = store.stream_range(
-                "fireburn.ru", &filename,
-                (Bound::Included(0), Bound::Included(299))
-            ).await.unwrap();
-
-            let mut reader = tokio_util::io::StreamReader::new(stream);
-
-            let mut buf = Vec::default();
-            reader.read_to_end(&mut buf).await.unwrap();
-
-            buf
-        };
-
-        assert_eq!(range.len(), 300);
-        assert_eq!(range.as_slice(), &file[..=299]);
-
-        tracing::debug!("Reading range from the middle...");
-
-        let range = {
-            let stream = store.stream_range(
-                "fireburn.ru", &filename,
-                (Bound::Included(150), Bound::Included(449))
-            ).await.unwrap();
-
-            let mut reader = tokio_util::io::StreamReader::new(stream);
-
-            let mut buf = Vec::default();
-            reader.read_to_end(&mut buf).await.unwrap();
-
-            buf
-        };
-
-        assert_eq!(range.len(), 300);
-        assert_eq!(range.as_slice(), &file[150..=449]);
-
-        tracing::debug!("Reading range from the end...");
-        let range = {
-            let stream = store.stream_range(
-                "fireburn.ru", &filename,
-                // Note: the `headers` crate parses bounds in a
-                // non-standard way, where unbounded start actually
-                // means getting things from the end...
-                (Bound::Unbounded, Bound::Included(300))
-            ).await.unwrap();
-
-            let mut reader = tokio_util::io::StreamReader::new(stream);
-
-            let mut buf = Vec::default();
-            reader.read_to_end(&mut buf).await.unwrap();
-
-            buf
-        };
-
-        assert_eq!(range.len(), 300);
-        assert_eq!(range.as_slice(), &file[file.len()-300..file.len()]);
-
-        tracing::debug!("Reading the whole file...");
-        // try to read range
-        let range = {
-            let stream = store.stream_range(
-                "fireburn.ru", &("/".to_string() + &filename),
-                (Bound::Unbounded, Bound::Unbounded)
-            ).await.unwrap();
-
-            let mut reader = tokio_util::io::StreamReader::new(stream);
-
-            let mut buf = Vec::default();
-            reader.read_to_end(&mut buf).await.unwrap();
-
-            buf
-        };
-
-        assert_eq!(range.len(), file.len());
-        assert_eq!(range.as_slice(), file);
-    }
-
-    
-    #[tokio::test]
-    #[tracing_test::traced_test]
-    async fn test_streaming_read_write() {
-        let tempdir = tempfile::tempdir().expect("Failed to create tempdir");
-        let store = FileStore::new(tempdir.path());
-
-        let file: &[u8] = include_bytes!("./file.rs");
-        let stream = tokio_stream::iter(file.chunks(100).map(|i| Ok(bytes::Bytes::copy_from_slice(i))));
-        let metadata = Metadata {
-            filename: Some("style.css".to_string()),
-            content_type: Some("text/css".to_string()),
-            length: None,
-            etag: None,
-        };
-
-        // write through the interface
-        let filename = store.write_streaming(
-            "fireburn.ru",
-            metadata, stream
-        ).await.unwrap();
-        println!("{}, {}", filename, tempdir.path()
-                 .join("fireburn.ru")
-                 .join(&filename)
-                 .display());
-        let content = tokio::fs::read(
-            tempdir.path()
-                .join("fireburn.ru")
-                .join(&filename)
-        ).await.unwrap();
-        assert_eq!(content, file);
-
-        // check internal metadata format
-        let meta: Metadata = serde_json::from_slice(&tokio::fs::read(
-            tempdir.path()
-                .join("fireburn.ru")
-                .join(filename.clone() + ".json")
-        ).await.unwrap()).unwrap();
-        assert_eq!(meta.content_type.as_deref(), Some("text/css"));
-        assert_eq!(meta.filename.as_deref(), Some("style.css"));
-        assert_eq!(meta.length.map(|i| i.get()), Some(file.len()));
-        assert!(meta.etag.is_some());
-
-        // read back the data using the interface
-        let (metadata, read_back) = {
-            let (metadata, stream) = store.read_streaming(
-                "fireburn.ru",
-                &filename
-            ).await.unwrap();
-            let mut reader = tokio_util::io::StreamReader::new(stream);
-
-            let mut buf = Vec::default();
-            reader.read_to_end(&mut buf).await.unwrap();
-
-            (metadata, buf)
-        };
-
-        assert_eq!(read_back, file);
-        assert_eq!(metadata.content_type.as_deref(), Some("text/css"));
-        assert_eq!(meta.filename.as_deref(), Some("style.css"));
-        assert_eq!(meta.length.map(|i| i.get()), Some(file.len()));
-        assert!(meta.etag.is_some());
-
-    }
-}
diff --git a/kittybox-rs/src/media/storage/mod.rs b/kittybox-rs/src/media/storage/mod.rs
deleted file mode 100644
index 020999c..0000000
--- a/kittybox-rs/src/media/storage/mod.rs
+++ /dev/null
@@ -1,177 +0,0 @@
-use async_trait::async_trait;
-use axum::extract::multipart::Field;
-use tokio_stream::Stream;
-use bytes::Bytes;
-use serde::{Deserialize, Serialize};
-use std::ops::Bound;
-use std::pin::Pin;
-use std::fmt::Debug;
-use std::num::NonZeroUsize;
-
-pub mod file;
-
-#[derive(Debug, Deserialize, Serialize)]
-pub struct Metadata {
-    /// Content type of the file. If None, the content-type is considered undefined.
-    pub content_type: Option<String>,
-    /// The original filename that was passed.
-    pub filename: Option<String>,
-    /// The recorded length of the file.
-    pub length: Option<NonZeroUsize>,
-    /// The e-tag of a file. Note: it must be a strong e-tag, for example, a hash.
-    pub etag: Option<String>,
-}
-impl From<&Field<'_>> for Metadata {
-    fn from(field: &Field<'_>) -> Self {
-        Self {
-            content_type: field.content_type()
-                .map(|i| i.to_owned()),
-            filename: field.file_name()
-                .map(|i| i.to_owned()),
-            length: None,
-            etag: None,
-        }
-    }
-}
-
-
-#[derive(Debug, Clone, Copy)]
-pub enum ErrorKind {
-    Backend,
-    Permission,
-    Json,
-    NotFound,
-    Other,
-}
-
-#[derive(Debug)]
-pub struct MediaStoreError {
-    kind: ErrorKind,
-    source: Option<Box<dyn std::error::Error + Send + Sync>>,
-    msg: String,
-}
-
-impl MediaStoreError {
-    pub fn kind(&self) -> ErrorKind {
-        self.kind
-    }
-}
-
-impl std::error::Error for MediaStoreError {
-    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
-        self.source
-            .as_ref()
-            .map(|i| i.as_ref() as &dyn std::error::Error)
-    }
-}
-
-impl std::fmt::Display for MediaStoreError {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        write!(
-            f,
-            "{}: {}",
-            match self.kind {
-                ErrorKind::Backend => "media storage backend error",
-                ErrorKind::Permission => "permission denied",
-                ErrorKind::Json => "failed to parse json",
-                ErrorKind::NotFound => "blob not found",
-                ErrorKind::Other => "unknown media storage error",
-            },
-            self.msg
-        )
-    }
-}
-
-pub type Result<T> = std::result::Result<T, MediaStoreError>;
-
-#[async_trait]
-pub trait MediaStore: 'static + Send + Sync + Clone {
-    async fn write_streaming<T>(
-        &self,
-        domain: &str,
-        metadata: Metadata,
-        content: T,
-    ) -> Result<String>
-    where
-        T: tokio_stream::Stream<Item = std::result::Result<bytes::Bytes, axum::extract::multipart::MultipartError>> + Unpin + Send + Debug;
-
-    async fn read_streaming(
-        &self,
-        domain: &str,
-        filename: &str,
-    ) -> Result<(Metadata, Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>)>;
-
-    async fn stream_range(
-        &self,
-        domain: &str,
-        filename: &str,
-        range: (Bound<u64>, Bound<u64>)
-    ) -> Result<Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>> {
-        use futures::stream::TryStreamExt;
-        use tracing::debug;
-        let (metadata, mut stream) = self.read_streaming(domain, filename).await?;
-        let length = metadata.length.unwrap().get();
-
-        use Bound::*;
-        let (start, end): (usize, usize) = match range {
-            (Unbounded, Unbounded) => return Ok(stream),
-            (Included(start), Unbounded) => (start.try_into().unwrap(), length - 1),
-            (Unbounded, Included(end)) => (length - usize::try_from(end).unwrap(), length - 1),
-            (Included(start), Included(end)) => (start.try_into().unwrap(), end.try_into().unwrap()),
-            (_, _) => unreachable!()
-        };
-
-        stream = Box::pin(
-            stream.map_ok({
-                let mut bytes_skipped = 0usize;
-                let mut bytes_read = 0usize;
-
-                move |chunk| {
-                    debug!("Skipped {}/{} bytes, chunk len {}", bytes_skipped, start, chunk.len());
-                    let chunk = if bytes_skipped < start {
-                        let need_to_skip = start - bytes_skipped;
-                        if chunk.len() < need_to_skip {
-                            return None
-                        }
-                        debug!("Skipping {} bytes", need_to_skip);
-                        bytes_skipped += need_to_skip;
-
-                        chunk.slice(need_to_skip..)
-                    } else {
-                        chunk
-                    };
-
-                    debug!("Read {} bytes from file, {} in this chunk", bytes_read, chunk.len());
-                    bytes_read += chunk.len();
-
-                    if bytes_read > length {
-                        if bytes_read - length > chunk.len() {
-                            return None
-                        }
-                        debug!("Truncating last {} bytes", bytes_read - length);
-                        return Some(chunk.slice(..chunk.len() - (bytes_read - length)))
-                    }
-
-                    Some(chunk)
-                }
-            })
-                .try_skip_while(|x| std::future::ready(Ok(x.is_none())))
-                .try_take_while(|x| std::future::ready(Ok(x.is_some())))
-                .map_ok(|x| x.unwrap())
-        );
-
-        return Ok(stream);
-    }
-
-    /// Read metadata for a file.
-    ///
-    /// The default implementation uses the `read_streaming` method
-    /// and drops the stream containing file content.
-    async fn metadata(&self, domain: &str, filename: &str) -> Result<Metadata> {
-        self.read_streaming(domain, filename)
-            .await
-            .map(|(meta, stream)| meta)
-    }
-
-    async fn delete(&self, domain: &str, filename: &str) -> Result<()>;
-}