use super::{Metadata, ErrorKind, MediaStore, MediaStoreError, Result}; use async_trait::async_trait; use std::{path::PathBuf, fmt::Debug}; use tokio::fs::OpenOptions; use tokio::io::{BufReader, BufWriter, AsyncWriteExt}; use futures::{StreamExt, TryStreamExt}; use std::pin::Pin; use sha2::Digest; use futures::FutureExt; use tracing::{debug, error}; const BUF_CAPACITY: usize = 16 * 1024; #[derive(Clone)] pub struct FileStore { base: PathBuf, } impl From for MediaStoreError { fn from(source: tokio::io::Error) -> Self { Self { msg: format!("file I/O error: {}", source), kind: match source.kind() { std::io::ErrorKind::NotFound => ErrorKind::NotFound, _ => ErrorKind::Backend }, source: Some(Box::new(source)), } } } impl FileStore { pub fn new>(base: T) -> Self { Self { base: base.into() } } async fn mktemp(&self) -> Result<(PathBuf, BufWriter)> { kittybox_util::fs::mktemp(&self.base, "temp", 16) .await .map(|(name, file)| (name, BufWriter::new(file))) .map_err(Into::into) } } #[async_trait] impl MediaStore for FileStore { #[tracing::instrument(skip(self))] async fn write_streaming( &self, domain: &str, mut metadata: Metadata, mut content: T, ) -> Result where T: tokio_stream::Stream> + Unpin + Send + Debug { let (tempfilepath, mut tempfile) = self.mktemp().await?; debug!("Temporary file opened for storing pending upload: {}", tempfilepath.display()); let mut hasher = sha2::Sha256::new(); let mut length: usize = 0; while let Some(chunk) = content.next().await { // TODO consider getting rid of this Arc as we only need immutable references // I don't think we quite need refcounting here let chunk = std::sync::Arc::new(chunk.map_err(|err| MediaStoreError { kind: ErrorKind::Backend, source: Some(Box::new(err)), msg: "Failed to read a data chunk".to_owned() })?); debug!("Read {} bytes from the stream", chunk.len()); length += chunk.len(); let (write_result, _hasher) = tokio::join!( tempfile.write_all(&*chunk), { let chunk = chunk.clone(); tokio::task::spawn_blocking(move || { hasher.update(&*chunk); hasher }).map(|r| r.unwrap()) } ); if let Err(err) = write_result { error!("Error while writing pending upload: {}", err); drop(tempfile); // this is just cleanup, nothing fails if it fails // though temporary files might take up space on the hard drive // We'll clean them when maintenance time comes #[allow(unused_must_use)] { tokio::fs::remove_file(tempfilepath).await; } return Err(err.into()); } hasher = _hasher; } // Manually flush the buffer and drop the handle to close the file tempfile.flush().await?; tempfile.into_inner().sync_all().await?; let hash = hasher.finalize(); debug!("Pending upload hash: {}", hex::encode(&hash)); let filename = format!( "{}/{}/{}/{}/{}", hex::encode([hash[0]]), hex::encode([hash[1]]), hex::encode([hash[2]]), hex::encode([hash[3]]), hex::encode(&hash[4..32]) ); let domain_str = domain.to_string(); let filepath = self.base.join(domain_str.as_str()).join(&filename); let metafilename = filename.clone() + ".json"; let metapath = self.base.join(domain_str.as_str()).join(&metafilename); let metatemppath = self.base.join(domain_str.as_str()).join(metafilename + ".tmp"); metadata.length = std::num::NonZeroUsize::new(length); metadata.etag = Some(hex::encode(&hash)); debug!("File path: {}, metadata: {}", filepath.display(), metapath.display()); { let parent = filepath.parent().unwrap(); tokio::fs::create_dir_all(parent).await?; } let mut meta = OpenOptions::new() .create_new(true) .write(true) .open(&metatemppath) .await?; meta.write_all(&serde_json::to_vec(&metadata).unwrap()).await?; tokio::fs::rename(tempfilepath, filepath).await?; tokio::fs::rename(metatemppath, metapath).await?; Ok(filename) } #[tracing::instrument(skip(self))] async fn read_streaming( &self, domain: &str, filename: &str, ) -> Result<(Metadata, Pin> + Send>>)> { let path = self.base.join(format!("{}{}", domain, filename)); let metapath = self.base.join(format!("{}{}.json", domain, filename)); debug!("Path: {}, metadata: {}", path.display(), metapath.display()); let file = OpenOptions::new() .read(true) .open(path) .await?; let meta = serde_json::from_slice(&tokio::fs::read(metapath).await?) .map_err(|err| MediaStoreError { kind: ErrorKind::Json, msg: format!("{}", err), source: Some(Box::new(err)) })?; Ok((meta, Box::pin( tokio_util::io::ReaderStream::new( // TODO: determine if BufReader provides benefit here // From the logs it looks like we're reading 4KiB at a time // Buffering file contents seems to double download speed // How to benchmark this? BufReader::with_capacity(BUF_CAPACITY, file) ) // Sprinkle some salt in form of protective log wrapping .inspect_ok(|chunk| debug!("Read {} bytes from file", chunk.len())) ))) } async fn delete(&self, domain: &str, filename: &str) -> Result<()> { let path = self.base.join(format!("{}/{}", domain, filename)); Ok(tokio::fs::remove_file(path).await?) } } #[cfg(test)] mod tests { use super::{Metadata, FileStore, MediaStore}; use tokio::io::AsyncReadExt; #[tokio::test] #[tracing_test::traced_test] async fn test_streaming_read_write() { let tempdir = tempdir::TempDir::new("file").expect("Failed to create tempdir"); let store = FileStore::new(tempdir.path()); let tempdir_path = tempdir.into_path(); let file: &[u8] = include_bytes!("../../../companion-lite/style.css"); let stream = tokio_stream::iter(file.chunks(100).map(|i| Ok(bytes::Bytes::copy_from_slice(i)))); let metadata = Metadata { filename: Some("style.css".to_string()), content_type: Some("text/css".to_string()), length: None, etag: None, }; // write through the interface let filename = store.write_streaming( "fireburn.ru", metadata, stream ).await.unwrap(); println!("{}, {}", filename, tempdir_path .join("fireburn.ru") .join(&filename) .display()); let content = tokio::fs::read( tempdir_path .join("fireburn.ru") .join(&filename) ).await.unwrap(); assert_eq!(content, file); // check internal metadata format let meta: Metadata = serde_json::from_slice(&tokio::fs::read( tempdir_path .join("fireburn.ru") .join(filename.clone() + ".json") ).await.unwrap()).unwrap(); assert_eq!(meta.content_type.as_deref(), Some("text/css")); assert_eq!(meta.filename.as_deref(), Some("style.css")); assert_eq!(meta.length.map(|i| i.get()), Some(file.len())); assert!(meta.etag.is_some()); // read back the data using the interface let (metadata, read_back) = { let (metadata, stream) = store.read_streaming( "fireburn.ru", &("/".to_string() + &filename) ).await.unwrap(); let mut reader = tokio_util::io::StreamReader::new(stream); let mut buf = Vec::default(); reader.read_to_end(&mut buf).await.unwrap(); (metadata, buf) }; assert_eq!(read_back, file); assert_eq!(metadata.content_type.as_deref(), Some("text/css")); assert_eq!(meta.filename.as_deref(), Some("style.css")); assert_eq!(meta.length.map(|i| i.get()), Some(file.len())); assert!(meta.etag.is_some()); } }