about summary refs log tree commit diff
path: root/kittybox-rs/src
diff options
context:
space:
mode:
Diffstat (limited to 'kittybox-rs/src')
-rw-r--r--kittybox-rs/src/media/storage/file.rs44
-rw-r--r--kittybox-rs/src/media/storage/mod.rs3
2 files changed, 36 insertions, 11 deletions
diff --git a/kittybox-rs/src/media/storage/file.rs b/kittybox-rs/src/media/storage/file.rs
index f01cea8..84edb84 100644
--- a/kittybox-rs/src/media/storage/file.rs
+++ b/kittybox-rs/src/media/storage/file.rs
@@ -1,12 +1,15 @@
 use super::{Metadata, ErrorKind, MediaStore, MediaStoreError, Result};
 use async_trait::async_trait;
-use std::path::PathBuf;
+use std::{path::PathBuf, fmt::Debug};
 use tokio::fs::OpenOptions;
-use tokio::io::AsyncWriteExt;
-use futures::StreamExt;
+use tokio::io::{BufReader, BufWriter, AsyncWriteExt};
+use futures::{StreamExt, TryStreamExt};
 use std::pin::Pin;
 use sha2::Digest;
 use futures::FutureExt;
+use tracing::{debug, error};
+
+const BUF_CAPACITY: usize = 16 * 1024;
 
 #[derive(Clone)]
 pub struct FileStore {
@@ -32,7 +35,7 @@ impl FileStore {
         Self { base: base.into() }
     }
 
-    async fn mktemp(&self) -> Result<(PathBuf, tokio::fs::File)> {
+    async fn mktemp(&self) -> Result<(PathBuf, BufWriter<tokio::fs::File>)> {
         use rand::{Rng, distributions::Alphanumeric};
         tokio::fs::create_dir_all(self.base.as_path()).await?;
         loop {
@@ -50,7 +53,8 @@ impl FileStore {
                 .open(&filename)
                 .await
             {
-                Ok(file) => return Ok((filename, file)),
+                // TODO: determine if BufWriter provides benefit here
+                Ok(file) => return Ok((filename, BufWriter::with_capacity(BUF_CAPACITY, file))),
                 Err(err) => match err.kind() {
                     std::io::ErrorKind::AlreadyExists => continue,
                     _ => return Err(err.into())
@@ -62,6 +66,8 @@ impl FileStore {
 
 #[async_trait]
 impl MediaStore for FileStore {
+
+    #[tracing::instrument(skip(self))]
     async fn write_streaming<T>(
         &self,
         domain: &str,
@@ -69,9 +75,10 @@ impl MediaStore for FileStore {
         mut content: T,
     ) -> Result<String>
     where
-        T: tokio_stream::Stream<Item = std::result::Result<bytes::Bytes, axum::extract::multipart::MultipartError>> + Unpin + Send
+        T: tokio_stream::Stream<Item = std::result::Result<bytes::Bytes, axum::extract::multipart::MultipartError>> + Unpin + Send + Debug
     {
         let (tempfilepath, mut tempfile) = self.mktemp().await?;
+        debug!("Temporary file opened for storing pending upload: {}", tempfilepath.display());
         let mut hasher = sha2::Sha256::new();
         let mut length: usize = 0;
 
@@ -81,6 +88,7 @@ impl MediaStore for FileStore {
                 source: Some(Box::new(err)),
                 msg: "Failed to read a data chunk".to_owned()
             })?);
+            debug!("Read {} bytes from the stream", chunk.len());
             length += chunk.len();
             let (write_result, _hasher) = tokio::join!(
                 tempfile.write_all(&*chunk),
@@ -94,6 +102,7 @@ impl MediaStore for FileStore {
                 }
             );
             if let Err(err) = write_result {
+                error!("Error while writing pending upload: {}", err);
                 drop(tempfile);
                 // this is just cleanup, nothing fails if it fails
                 // though temporary files might take up space on the hard drive
@@ -106,6 +115,7 @@ impl MediaStore for FileStore {
         }
 
         let hash = hasher.finalize();
+        debug!("Pending upload hash: {}", hex::encode(&hash));
         let filename = format!(
             "{}/{}/{}/{}/{}",
             hex::encode([hash[0]]),
@@ -118,7 +128,9 @@ impl MediaStore for FileStore {
         let domain_str = domain.to_string();
         let filepath = self.base.join(domain_str.as_str()).join(&filename);
         let metafilename = filename.clone() + ".json";
-        let metapath = self.base.join(domain_str.as_str()).join(metafilename);
+        let metapath = self.base.join(domain_str.as_str()).join(&metafilename);
+        let metatemppath = self.base.join(domain_str.as_str()).join(metafilename + ".tmp");
+        debug!("File path: {}, metadata: {}", filepath.display(), metapath.display());
         {
             let parent = filepath.parent().unwrap();
             tokio::fs::create_dir_all(parent).await?;            
@@ -126,13 +138,15 @@ impl MediaStore for FileStore {
         let mut meta = OpenOptions::new()
             .create_new(true)
             .write(true)
-            .open(&metapath)
+            .open(&metatemppath)
             .await?;
         meta.write_all(&serde_json::to_vec(&metadata).unwrap()).await?;
         tokio::fs::rename(tempfilepath, filepath).await?;
+        tokio::fs::rename(metatemppath, metapath).await?;
         Ok(filename)
     }
 
+    #[tracing::instrument(skip(self))]
     async fn read_streaming(
         &self,
         domain: &str,
@@ -140,7 +154,7 @@ impl MediaStore for FileStore {
     ) -> Result<(Metadata, Pin<Box<dyn tokio_stream::Stream<Item = std::io::Result<bytes::Bytes>> + Send>>)> {
         let path = self.base.join(format!("{}{}", domain, filename));
         let metapath = self.base.join(format!("{}{}.json", domain, filename));
-        tracing::debug!("Path: {}, metadata: {}", path.display(), metapath.display());
+        debug!("Path: {}, metadata: {}", path.display(), metapath.display());
 
         let file = OpenOptions::new()
             .read(true)
@@ -153,7 +167,17 @@ impl MediaStore for FileStore {
                 source: Some(Box::new(err))
             })?;
 
-        Ok((meta, Box::pin(tokio_util::io::ReaderStream::new(file))))
+        Ok((meta, Box::pin(
+            tokio_util::io::ReaderStream::new(
+                // TODO: determine if BufReader provides benefit here
+                // From the logs it looks like we're reading 4KiB at a time
+                // Buffering file contents seems to double download speed
+                // How to benchmark this?
+                tokio::io::BufReader::with_capacity(BUF_CAPACITY, file)
+            )
+            // Sprinkle some salt in form of protective log wrapping
+                .inspect_ok(|chunk| debug!("Read {} bytes from file", chunk.len()))
+        )))
     }
 
     async fn delete(&self, domain: &str, filename: &str) -> Result<()> {
diff --git a/kittybox-rs/src/media/storage/mod.rs b/kittybox-rs/src/media/storage/mod.rs
index cb8b38f..5614437 100644
--- a/kittybox-rs/src/media/storage/mod.rs
+++ b/kittybox-rs/src/media/storage/mod.rs
@@ -4,6 +4,7 @@ use tokio_stream::Stream;
 use bytes::Bytes;
 use serde::{Deserialize, Serialize};
 use std::pin::Pin;
+use std::fmt::Debug;
 
 pub mod file;
 
@@ -86,7 +87,7 @@ pub trait MediaStore: 'static + Send + Sync + Clone {
         content: T,
     ) -> Result<String>
     where
-        T: tokio_stream::Stream<Item = std::result::Result<bytes::Bytes, axum::extract::multipart::MultipartError>> + Unpin + Send;
+        T: tokio_stream::Stream<Item = std::result::Result<bytes::Bytes, axum::extract::multipart::MultipartError>> + Unpin + Send + Debug;
 
     async fn read_streaming(
         &self,