diff options
Diffstat (limited to 'kittybox-rs/src/media')
-rw-r--r-- | kittybox-rs/src/media/storage/file.rs | 44 | ||||
-rw-r--r-- | kittybox-rs/src/media/storage/mod.rs | 3 |
2 files changed, 36 insertions, 11 deletions
diff --git a/kittybox-rs/src/media/storage/file.rs b/kittybox-rs/src/media/storage/file.rs index f01cea8..84edb84 100644 --- a/kittybox-rs/src/media/storage/file.rs +++ b/kittybox-rs/src/media/storage/file.rs @@ -1,12 +1,15 @@ use super::{Metadata, ErrorKind, MediaStore, MediaStoreError, Result}; use async_trait::async_trait; -use std::path::PathBuf; +use std::{path::PathBuf, fmt::Debug}; use tokio::fs::OpenOptions; -use tokio::io::AsyncWriteExt; -use futures::StreamExt; +use tokio::io::{BufReader, BufWriter, AsyncWriteExt}; +use futures::{StreamExt, TryStreamExt}; use std::pin::Pin; use sha2::Digest; use futures::FutureExt; +use tracing::{debug, error}; + +const BUF_CAPACITY: usize = 16 * 1024; #[derive(Clone)] pub struct FileStore { @@ -32,7 +35,7 @@ impl FileStore { Self { base: base.into() } } - async fn mktemp(&self) -> Result<(PathBuf, tokio::fs::File)> { + async fn mktemp(&self) -> Result<(PathBuf, BufWriter<tokio::fs::File>)> { use rand::{Rng, distributions::Alphanumeric}; tokio::fs::create_dir_all(self.base.as_path()).await?; loop { @@ -50,7 +53,8 @@ impl FileStore { .open(&filename) .await { - Ok(file) => return Ok((filename, file)), + // TODO: determine if BufWriter provides benefit here + Ok(file) => return Ok((filename, BufWriter::with_capacity(BUF_CAPACITY, file))), Err(err) => match err.kind() { std::io::ErrorKind::AlreadyExists => continue, _ => return Err(err.into()) @@ -62,6 +66,8 @@ impl FileStore { #[async_trait] impl MediaStore for FileStore { + + #[tracing::instrument(skip(self))] async fn write_streaming<T>( &self, domain: &str, @@ -69,9 +75,10 @@ impl MediaStore for FileStore { mut content: T, ) -> Result<String> where - T: tokio_stream::Stream<Item = std::result::Result<bytes::Bytes, axum::extract::multipart::MultipartError>> + Unpin + Send + T: tokio_stream::Stream<Item = std::result::Result<bytes::Bytes, axum::extract::multipart::MultipartError>> + Unpin + Send + Debug { let (tempfilepath, mut tempfile) = self.mktemp().await?; + debug!("Temporary file opened for storing pending upload: {}", tempfilepath.display()); let mut hasher = sha2::Sha256::new(); let mut length: usize = 0; @@ -81,6 +88,7 @@ impl MediaStore for FileStore { source: Some(Box::new(err)), msg: "Failed to read a data chunk".to_owned() })?); + debug!("Read {} bytes from the stream", chunk.len()); length += chunk.len(); let (write_result, _hasher) = tokio::join!( tempfile.write_all(&*chunk), @@ -94,6 +102,7 @@ impl MediaStore for FileStore { } ); if let Err(err) = write_result { + error!("Error while writing pending upload: {}", err); drop(tempfile); // this is just cleanup, nothing fails if it fails // though temporary files might take up space on the hard drive @@ -106,6 +115,7 @@ impl MediaStore for FileStore { } let hash = hasher.finalize(); + debug!("Pending upload hash: {}", hex::encode(&hash)); let filename = format!( "{}/{}/{}/{}/{}", hex::encode([hash[0]]), @@ -118,7 +128,9 @@ impl MediaStore for FileStore { let domain_str = domain.to_string(); let filepath = self.base.join(domain_str.as_str()).join(&filename); let metafilename = filename.clone() + ".json"; - let metapath = self.base.join(domain_str.as_str()).join(metafilename); + let metapath = self.base.join(domain_str.as_str()).join(&metafilename); + let metatemppath = self.base.join(domain_str.as_str()).join(metafilename + ".tmp"); + debug!("File path: {}, metadata: {}", filepath.display(), metapath.display()); { let parent = filepath.parent().unwrap(); tokio::fs::create_dir_all(parent).await?; @@ -126,13 +138,15 @@ impl MediaStore for FileStore { let mut meta = OpenOptions::new() .create_new(true) .write(true) - .open(&metapath) + .open(&metatemppath) .await?; meta.write_all(&serde_json::to_vec(&metadata).unwrap()).await?; tokio::fs::rename(tempfilepath, filepath).await?; + tokio::fs::rename(metatemppath, metapath).await?; Ok(filename) } + #[tracing::instrument(skip(self))] async fn read_streaming( &self, domain: &str, @@ -140,7 +154,7 @@ impl MediaStore for FileStore { ) -> Result<(Metadata, Pin<Box<dyn tokio_stream::Stream<Item = std::io::Result<bytes::Bytes>> + Send>>)> { let path = self.base.join(format!("{}{}", domain, filename)); let metapath = self.base.join(format!("{}{}.json", domain, filename)); - tracing::debug!("Path: {}, metadata: {}", path.display(), metapath.display()); + debug!("Path: {}, metadata: {}", path.display(), metapath.display()); let file = OpenOptions::new() .read(true) @@ -153,7 +167,17 @@ impl MediaStore for FileStore { source: Some(Box::new(err)) })?; - Ok((meta, Box::pin(tokio_util::io::ReaderStream::new(file)))) + Ok((meta, Box::pin( + tokio_util::io::ReaderStream::new( + // TODO: determine if BufReader provides benefit here + // From the logs it looks like we're reading 4KiB at a time + // Buffering file contents seems to double download speed + // How to benchmark this? + tokio::io::BufReader::with_capacity(BUF_CAPACITY, file) + ) + // Sprinkle some salt in form of protective log wrapping + .inspect_ok(|chunk| debug!("Read {} bytes from file", chunk.len())) + ))) } async fn delete(&self, domain: &str, filename: &str) -> Result<()> { diff --git a/kittybox-rs/src/media/storage/mod.rs b/kittybox-rs/src/media/storage/mod.rs index cb8b38f..5614437 100644 --- a/kittybox-rs/src/media/storage/mod.rs +++ b/kittybox-rs/src/media/storage/mod.rs @@ -4,6 +4,7 @@ use tokio_stream::Stream; use bytes::Bytes; use serde::{Deserialize, Serialize}; use std::pin::Pin; +use std::fmt::Debug; pub mod file; @@ -86,7 +87,7 @@ pub trait MediaStore: 'static + Send + Sync + Clone { content: T, ) -> Result<String> where - T: tokio_stream::Stream<Item = std::result::Result<bytes::Bytes, axum::extract::multipart::MultipartError>> + Unpin + Send; + T: tokio_stream::Stream<Item = std::result::Result<bytes::Bytes, axum::extract::multipart::MultipartError>> + Unpin + Send + Debug; async fn read_streaming( &self, |