use super::{Metadata, ErrorKind, MediaStore, MediaStoreError, Result}; use async_trait::async_trait; use std::{path::PathBuf, fmt::Debug}; use tokio::fs::OpenOptions; use tokio::io::{BufReader, BufWriter, AsyncWriteExt}; use futures::{StreamExt, TryStreamExt}; use std::pin::Pin; use sha2::Digest; use futures::FutureExt; use tracing::{debug, error}; const BUF_CAPACITY: usize = 16 * 1024; #[derive(Clone)] pub struct FileStore { base: PathBuf, } impl From for MediaStoreError { fn from(source: tokio::io::Error) -> Self { Self { msg: format!("file I/O error: {}", source), kind: match source.kind() { std::io::ErrorKind::NotFound => ErrorKind::NotFound, _ => ErrorKind::Backend }, source: Some(Box::new(source)), } } } impl FileStore { pub fn new>(base: T) -> Self { Self { base: base.into() } } async fn mktemp(&self) -> Result<(PathBuf, BufWriter)> { use rand::{Rng, distributions::Alphanumeric}; tokio::fs::create_dir_all(self.base.as_path()).await?; loop { let filename = self.base.join(format!("temp.{}", { let string = rand::thread_rng() .sample_iter(&Alphanumeric) .take(16) .collect::>(); String::from_utf8(string).unwrap() })); match OpenOptions::new() .create_new(true) .write(true) .open(&filename) .await { // TODO: determine if BufWriter provides benefit here Ok(file) => return Ok((filename, BufWriter::with_capacity(BUF_CAPACITY, file))), Err(err) => match err.kind() { std::io::ErrorKind::AlreadyExists => continue, _ => return Err(err.into()) } } } } } #[async_trait] impl MediaStore for FileStore { #[tracing::instrument(skip(self))] async fn write_streaming( &self, domain: &str, mut metadata: Metadata, mut content: T, ) -> Result where T: tokio_stream::Stream> + Unpin + Send + Debug { let (tempfilepath, mut tempfile) = self.mktemp().await?; debug!("Temporary file opened for storing pending upload: {}", tempfilepath.display()); let mut hasher = sha2::Sha256::new(); let mut length: usize = 0; while let Some(chunk) = content.next().await { let chunk = std::sync::Arc::new(chunk.map_err(|err| MediaStoreError { kind: ErrorKind::Backend, source: Some(Box::new(err)), msg: "Failed to read a data chunk".to_owned() })?); debug!("Read {} bytes from the stream", chunk.len()); length += chunk.len(); let (write_result, _hasher) = tokio::join!( tempfile.write_all(&*chunk), { let chunk = chunk.clone(); tokio::task::spawn_blocking(move || { hasher.update(&*chunk); hasher }).map(|r| r.unwrap()) } ); if let Err(err) = write_result { error!("Error while writing pending upload: {}", err); drop(tempfile); // this is just cleanup, nothing fails if it fails // though temporary files might take up space on the hard drive // We'll clean them when maintenance time comes #[allow(unused_must_use)] { tokio::fs::remove_file(tempfilepath).await; } return Err(err.into()); } hasher = _hasher; } let hash = hasher.finalize(); debug!("Pending upload hash: {}", hex::encode(&hash)); let filename = format!( "{}/{}/{}/{}/{}", hex::encode([hash[0]]), hex::encode([hash[1]]), hex::encode([hash[2]]), hex::encode([hash[3]]), hex::encode(&hash[4..32]) ); metadata.length = Some(length); let domain_str = domain.to_string(); let filepath = self.base.join(domain_str.as_str()).join(&filename); let metafilename = filename.clone() + ".json"; let metapath = self.base.join(domain_str.as_str()).join(&metafilename); let metatemppath = self.base.join(domain_str.as_str()).join(metafilename + ".tmp"); debug!("File path: {}, metadata: {}", filepath.display(), metapath.display()); { let parent = filepath.parent().unwrap(); tokio::fs::create_dir_all(parent).await?; } let mut meta = OpenOptions::new() .create_new(true) .write(true) .open(&metatemppath) .await?; meta.write_all(&serde_json::to_vec(&metadata).unwrap()).await?; tokio::fs::rename(tempfilepath, filepath).await?; tokio::fs::rename(metatemppath, metapath).await?; Ok(filename) } #[tracing::instrument(skip(self))] async fn read_streaming( &self, domain: &str, filename: &str, ) -> Result<(Metadata, Pin> + Send>>)> { let path = self.base.join(format!("{}{}", domain, filename)); let metapath = self.base.join(format!("{}{}.json", domain, filename)); debug!("Path: {}, metadata: {}", path.display(), metapath.display()); let file = OpenOptions::new() .read(true) .open(path) .await?; let meta = serde_json::from_slice(&tokio::fs::read(metapath).await?) .map_err(|err| MediaStoreError { kind: ErrorKind::Json, msg: format!("{}", err), source: Some(Box::new(err)) })?; Ok((meta, Box::pin( tokio_util::io::ReaderStream::new( // TODO: determine if BufReader provides benefit here // From the logs it looks like we're reading 4KiB at a time // Buffering file contents seems to double download speed // How to benchmark this? tokio::io::BufReader::with_capacity(BUF_CAPACITY, file) ) // Sprinkle some salt in form of protective log wrapping .inspect_ok(|chunk| debug!("Read {} bytes from file", chunk.len())) ))) } async fn delete(&self, domain: &str, filename: &str) -> Result<()> { let path = self.base.join(format!("{}/{}", domain, filename)); Ok(tokio::fs::remove_file(path).await?) } } #[cfg(test)] mod tests { use super::{Metadata, FileStore, MediaStore}; use tokio::io::AsyncReadExt; #[tokio::test] async fn test_streaming_read_write() { let tempdir = tempdir::TempDir::new("file").expect("Failed to create tempdir"); let store = FileStore::new(tempdir.path()); let file: &[u8] = include_bytes!("../../../../README.md"); let stream = tokio_stream::iter(file.chunks(100).map(|i| Ok(bytes::Bytes::copy_from_slice(i)))); let metadata = Metadata { filename: Some("README.md".to_string()), content_type: "text/markdown".to_string(), length: None }; let filename = store.write_streaming( "fireburn.ru", metadata, stream ).await.unwrap(); let content = tokio::fs::read( tempdir.path() .join("fireburn.ru") .join(&filename) ).await.unwrap(); assert_eq!(content, file); let meta: Metadata = serde_json::from_slice(&tokio::fs::read( tempdir.path() .join("fireburn.ru") .join(filename.clone() + ".json") ).await.unwrap()).unwrap(); assert_eq!(&meta.content_type, "text/markdown"); assert_eq!(meta.filename.as_deref(), Some("README.md")); assert_eq!(meta.length, Some(file.len())); let (metadata, read_back) = { let (metadata, stream) = store.read_streaming( "fireburn.ru", &("/".to_string() + &filename) ).await.unwrap(); let mut reader = tokio_util::io::StreamReader::new(stream); let mut buf = Vec::default(); reader.read_to_end(&mut buf).await.unwrap(); (metadata, buf) }; assert_eq!(read_back, file); assert_eq!(&metadata.content_type, "text/markdown"); assert_eq!(meta.filename.as_deref(), Some("README.md")); assert_eq!(meta.length, Some(file.len())); } }