about summary refs log tree commit diff
path: root/kittybox-rs/src/media/storage
diff options
context:
space:
mode:
authorVika <vika@fireburn.ru>2022-07-10 00:55:20 +0300
committerVika <vika@fireburn.ru>2022-07-10 00:55:20 +0300
commit1031d495d5b78d9b19dcdc414b6d7b0daf313bb2 (patch)
treea6ea6d347bc690c467df798f5576800d371b2254 /kittybox-rs/src/media/storage
parent2cbd19693115bf19da0ab888372cb1ff086967cd (diff)
downloadkittybox-1031d495d5b78d9b19dcdc414b6d7b0daf313bb2.tar.zst
media: media endpoint PoC
Supported features:
 - Streaming upload
 - Content-addressed storage
 - Metadata
   - MIME type (taken from Content-Type)
   - Length (I could use stat() for this one tho)
   - filename (for Content-Disposition: attachment, WIP)
Diffstat (limited to 'kittybox-rs/src/media/storage')
-rw-r--r--kittybox-rs/src/media/storage/file.rs215
-rw-r--r--kittybox-rs/src/media/storage/mod.rs60
2 files changed, 234 insertions, 41 deletions
diff --git a/kittybox-rs/src/media/storage/file.rs b/kittybox-rs/src/media/storage/file.rs
index ea1f010..176c2f4 100644
--- a/kittybox-rs/src/media/storage/file.rs
+++ b/kittybox-rs/src/media/storage/file.rs
@@ -1,8 +1,12 @@
-use super::{ErrorKind, MediaStore, MediaStoreError, Result};
+use super::{Metadata, ErrorKind, MediaStore, MediaStoreError, Result};
 use async_trait::async_trait;
 use std::path::PathBuf;
 use tokio::fs::OpenOptions;
-use tokio::io::{AsyncReadExt, AsyncWriteExt};
+use tokio::io::AsyncWriteExt;
+use futures::StreamExt;
+use std::pin::Pin;
+use sha2::Digest;
+use futures::FutureExt;
 
 #[derive(Clone)]
 pub struct FileStore {
@@ -12,56 +16,209 @@ pub struct FileStore {
 impl From<tokio::io::Error> for MediaStoreError {
     fn from(source: tokio::io::Error) -> Self {
         Self {
+            msg: format!("file I/O error: {}", source),
+            kind: match source.kind() {
+                std::io::ErrorKind::NotFound => ErrorKind::NotFound,
+                _ => ErrorKind::Backend
+            },
             source: Some(Box::new(source)),
-            msg: "file I/O error".to_owned(),
-            kind: ErrorKind::Backend,
+        }
+    }
+}
+
+
+impl FileStore {
+    pub fn new<T: Into<PathBuf>>(base: T) -> Self {
+        Self { base: base.into() }
+    }
+
+    async fn mktemp(&self) -> Result<(PathBuf, tokio::fs::File)> {
+        use rand::{Rng, distributions::Alphanumeric};
+        tokio::fs::create_dir_all(self.base.as_path()).await?;
+        loop {
+            let filename = self.base.join(format!("temp.{}", {
+                let string = rand::thread_rng()
+                    .sample_iter(&Alphanumeric)
+                    .take(16)
+                    .collect::<Vec<u8>>();
+                String::from_utf8(string).unwrap()
+            }));
+
+            match OpenOptions::new()
+                .create_new(true)
+                .write(true)
+                .open(&filename)
+                .await
+            {
+                Ok(file) => return Ok((filename, file)),
+                Err(err) => match err.kind() {
+                    std::io::ErrorKind::AlreadyExists => continue,
+                    _ => return Err(err.into())
+                }
+            }
         }
     }
 }
 
 #[async_trait]
 impl MediaStore for FileStore {
-    async fn write_streaming(
+    async fn write_streaming<T>(
         &self,
-        domain: url::Host,
-        filename: &str,
-        content: axum::extract::multipart::Field<'_>,
-    ) -> Result<()> {
-        todo!()
+        domain: &str,
+        mut metadata: Metadata,
+        mut content: T,
+    ) -> Result<String>
+    where
+        T: tokio_stream::Stream<Item = std::result::Result<bytes::Bytes, axum::extract::multipart::MultipartError>> + Unpin + Send
+    {
+        let (tempfilepath, mut tempfile) = self.mktemp().await?;
+        let mut hasher = sha2::Sha256::new();
+        let mut length: usize = 0;
+
+        while let Some(chunk) = content.next().await {
+            let chunk = std::sync::Arc::new(chunk.map_err(|err| MediaStoreError {
+                kind: ErrorKind::Backend,
+                source: Some(Box::new(err)),
+                msg: "Failed to read a data chunk".to_owned()
+            })?);
+            length += chunk.len();
+            let (write_result, _hasher) = tokio::join!(
+                tempfile.write_all(&*chunk),
+                {
+                    let chunk = chunk.clone();
+                    tokio::task::spawn_blocking(move || {
+                        hasher.update(&*chunk);
+
+                        hasher
+                    }).map(|r| r.unwrap())
+                }
+            );
+            if let Err(err) = write_result {
+                drop(tempfile);
+                // this is just cleanup, nothing fails if it fails
+                // though temporary files might take up space on the hard drive
+                // We'll clean them when maintenance time comes
+                #[allow(unused_must_use)]
+                { tokio::fs::remove_file(tempfilepath).await; }
+                return Err(err.into());
+            }
+            hasher = _hasher;
+        }
+
+        let hash = hasher.finalize();
+        let filename = format!(
+            "{}/{}/{}/{}/{}",
+            hex::encode([hash[0]]),
+            hex::encode([hash[1]]),
+            hex::encode([hash[2]]),
+            hex::encode([hash[3]]),
+            hex::encode(&hash[4..32])
+        );
+        metadata.length = Some(length);
+        let domain_str = domain.to_string();
+        let filepath = self.base.join(domain_str.as_str()).join(&filename);
+        let metafilename = filename.clone() + ".json";
+        let metapath = self.base.join(domain_str.as_str()).join(metafilename);
+        {
+            let parent = filepath.parent().unwrap();
+            tokio::fs::create_dir_all(parent).await?;            
+        }
+        let mut meta = OpenOptions::new()
+            .create_new(true)
+            .write(true)
+            .open(&metapath)
+            .await?;
+        meta.write_all(&serde_json::to_vec(&metadata).unwrap()).await?;
+        tokio::fs::rename(tempfilepath, filepath).await?;
+        Ok(filename)
     }
 
     async fn read_streaming(
         &self,
-        domain: url::Host,
+        domain: &str,
         filename: &str,
-    ) -> Result<tokio_util::io::ReaderStream<Box<dyn tokio::io::AsyncRead>>> {
-        todo!()
-    }
-
-    async fn write(&self, domain: url::Host, filename: &str, content: &[u8]) -> Result<()> {
-        let path = self.base.join(format!("{}/{}", domain, filename));
+    ) -> Result<(Metadata, Pin<Box<dyn tokio_stream::Stream<Item = std::io::Result<bytes::Bytes>> + Send>>)> {
+        let path = self.base.join(format!("{}{}", domain, filename));
+        let metapath = self.base.join(format!("{}{}.json", domain, filename));
+        tracing::debug!("Path: {}, metadata: {}", path.display(), metapath.display());
 
-        let mut file = OpenOptions::new()
-            .create_new(true)
-            .write(true)
+        let file = OpenOptions::new()
+            .read(true)
             .open(path)
             .await?;
+        let meta = serde_json::from_slice(&tokio::fs::read(metapath).await?)
+            .map_err(|err| MediaStoreError {
+                kind: ErrorKind::Json,
+                msg: format!("{}", err),
+                source: Some(Box::new(err))
+            })?;
 
-        Ok(file.write_all(content).await?)
+        Ok((meta, Box::pin(tokio_util::io::ReaderStream::new(file))))
     }
 
-    async fn read(&self, domain: url::Host, filename: &str) -> Result<Vec<u8>> {
+    async fn delete(&self, domain: &str, filename: &str) -> Result<()> {
         let path = self.base.join(format!("{}/{}", domain, filename));
 
-        let mut file = OpenOptions::new().read(true).open(path).await?;
+        Ok(tokio::fs::remove_file(path).await?)
+    }
+}
 
-        let mut buf: Vec<u8> = Vec::default();
-        file.read_to_end(&mut buf).await?;
+#[cfg(test)]
+mod tests {
+    use super::{Metadata, FileStore, MediaStore};
+    use tokio::io::AsyncReadExt;
 
-        Ok(buf)
-    }
+    #[tokio::test]
+    async fn test_streaming_read_write() {
+        let tempdir = tempdir::TempDir::new("file").expect("Failed to create tempdir");
+        let store = FileStore::new(tempdir.path());
+
+        let file: &[u8] = include_bytes!("../../../../README.md");
+        let stream = tokio_stream::iter(file.chunks(100).map(|i| Ok(bytes::Bytes::copy_from_slice(i))));
+        let metadata = Metadata {
+            filename: Some("README.md".to_string()),
+            content_type: "text/markdown".to_string(),
+            length: None
+        };
+
+        let filename = store.write_streaming(
+            "fireburn.ru",
+            metadata, stream
+        ).await.unwrap();
+
+        let content = tokio::fs::read(
+            tempdir.path()
+                .join("fireburn.ru")
+                .join(&filename)
+        ).await.unwrap();
+        assert_eq!(content, file);
+
+        let meta: Metadata = serde_json::from_slice(&tokio::fs::read(
+            tempdir.path()
+                .join("fireburn.ru")
+                .join(filename.clone() + ".json")
+        ).await.unwrap()).unwrap();
+        assert_eq!(&meta.content_type, "text/markdown");
+        assert_eq!(meta.filename.as_deref(), Some("README.md"));
+        assert_eq!(meta.length, Some(file.len()));
+
+        let (metadata, read_back) = {
+            let (metadata, stream) = store.read_streaming(
+                "fireburn.ru",
+                &filename
+            ).await.unwrap();
+            let mut reader = tokio_util::io::StreamReader::new(stream);
+
+            let mut buf = Vec::default();
+            reader.read_to_end(&mut buf).await.unwrap();
+
+            (metadata, buf)
+        };
+
+        assert_eq!(read_back, file);
+        assert_eq!(&metadata.content_type, "text/markdown");
+        assert_eq!(meta.filename.as_deref(), Some("README.md"));
+        assert_eq!(meta.length, Some(file.len()));
 
-    async fn delete(&self, domain: url::Host, filename: &str) -> Result<()> {
-        todo!()
     }
 }
diff --git a/kittybox-rs/src/media/storage/mod.rs b/kittybox-rs/src/media/storage/mod.rs
index ba880ab..cb8b38f 100644
--- a/kittybox-rs/src/media/storage/mod.rs
+++ b/kittybox-rs/src/media/storage/mod.rs
@@ -1,12 +1,39 @@
 use async_trait::async_trait;
+use axum::extract::multipart::Field;
+use tokio_stream::Stream;
+use bytes::Bytes;
+use serde::{Deserialize, Serialize};
+use std::pin::Pin;
 
 pub mod file;
 
+#[derive(Debug, Deserialize, Serialize)]
+pub struct Metadata {
+    pub(super) content_type: String,
+    pub(super) filename: Option<String>,
+    pub(super) length: Option<usize>
+}
+impl From<&Field<'_>> for Metadata {
+    fn from(field: &Field<'_>) -> Self {
+        Self {
+            content_type: field.content_type()
+                .map(|i| i.to_owned())
+                .or_else(|| Some("application/octet-stream".to_owned()))
+                .unwrap(),
+            filename: field.file_name()
+                .map(|i| i.to_owned()),
+            length: None
+        }
+    }
+}
+
+
 #[derive(Debug, Clone, Copy)]
 pub enum ErrorKind {
     Backend,
     Permission,
-    Conflict,
+    Json,
+    NotFound,
     Other,
 }
 
@@ -17,6 +44,12 @@ pub struct MediaStoreError {
     msg: String,
 }
 
+impl MediaStoreError {
+    pub fn kind(&self) -> ErrorKind {
+        self.kind
+    }
+}
+
 impl std::error::Error for MediaStoreError {
     fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
         self.source
@@ -33,7 +66,8 @@ impl std::fmt::Display for MediaStoreError {
             match self.kind {
                 ErrorKind::Backend => "media storage backend error",
                 ErrorKind::Permission => "permission denied",
-                ErrorKind::Conflict => "conflict with existing data",
+                ErrorKind::Json => "failed to parse json",
+                ErrorKind::NotFound => "blob not found",
                 ErrorKind::Other => "unknown media storage error",
             },
             self.msg
@@ -45,18 +79,20 @@ pub type Result<T> = std::result::Result<T, MediaStoreError>;
 
 #[async_trait]
 pub trait MediaStore: 'static + Send + Sync + Clone {
-    async fn write_streaming(
+    async fn write_streaming<T>(
         &self,
-        domain: url::Host,
-        filename: &str,
-        content: axum::extract::multipart::Field<'_>,
-    ) -> Result<()>;
-    async fn write(&self, domain: url::Host, filename: &str, content: &[u8]) -> Result<()>;
+        domain: &str,
+        metadata: Metadata,
+        content: T,
+    ) -> Result<String>
+    where
+        T: tokio_stream::Stream<Item = std::result::Result<bytes::Bytes, axum::extract::multipart::MultipartError>> + Unpin + Send;
+
     async fn read_streaming(
         &self,
-        domain: url::Host,
+        domain: &str,
         filename: &str,
-    ) -> Result<tokio_util::io::ReaderStream<Box<dyn tokio::io::AsyncRead>>>;
-    async fn read(&self, domain: url::Host, filename: &str) -> Result<Vec<u8>>;
-    async fn delete(&self, domain: url::Host, filename: &str) -> Result<()>;
+    ) -> Result<(Metadata, Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>)>;
+
+    async fn delete(&self, domain: &str, filename: &str) -> Result<()>;
 }