use super::{Metadata, ErrorKind, MediaStore, MediaStoreError, Result}; use async_trait::async_trait; use std::{path::PathBuf, fmt::Debug}; use tokio::fs::OpenOptions; use tokio::io::{BufReader, BufWriter, AsyncWriteExt, AsyncSeekExt}; use futures::{StreamExt, TryStreamExt}; use std::ops::{Bound, RangeBounds, Neg}; use std::pin::Pin; use sha2::Digest; use futures::FutureExt; use tracing::{debug, error}; const BUF_CAPACITY: usize = 16 * 1024; #[derive(Clone)] pub struct FileStore { base: PathBuf, } impl From<tokio::io::Error> for MediaStoreError { fn from(source: tokio::io::Error) -> Self { Self { msg: format!("file I/O error: {}", source), kind: match source.kind() { std::io::ErrorKind::NotFound => ErrorKind::NotFound, _ => ErrorKind::Backend }, source: Some(Box::new(source)), } } } impl FileStore { async fn mktemp(&self) -> Result<(PathBuf, BufWriter<tokio::fs::File>)> { kittybox_util::fs::mktemp(&self.base, "temp", 16) .await .map(|(name, file)| (name, BufWriter::new(file))) .map_err(Into::into) } } #[async_trait] impl MediaStore for FileStore { async fn new(url: &'_ url::Url) -> Result<Self> { Ok(Self { base: url.path().into() }) } #[tracing::instrument(skip(self, content))] async fn write_streaming<T>( &self, domain: &str, mut metadata: Metadata, mut content: T, ) -> Result<String> where T: tokio_stream::Stream<Item = std::result::Result<bytes::Bytes, axum::extract::multipart::MultipartError>> + Unpin + Send + Debug { let (tempfilepath, mut tempfile) = self.mktemp().await?; debug!("Temporary file opened for storing pending upload: {}", tempfilepath.display()); let mut hasher = sha2::Sha256::new(); let mut length: usize = 0; while let Some(chunk) = content.next().await { let chunk = chunk.map_err(|err| MediaStoreError { kind: ErrorKind::Backend, source: Some(Box::new(err)), msg: "Failed to read a data chunk".to_owned() })?; debug!("Read {} bytes from the stream", chunk.len()); length += chunk.len(); let (write_result, _hasher) = tokio::join!( { let chunk = chunk.clone(); let tempfile = &mut tempfile; async move { tempfile.write_all(&*chunk).await } }, { let chunk = chunk.clone(); tokio::task::spawn_blocking(move || { hasher.update(&*chunk); hasher }).map(|r| r.unwrap()) } ); if let Err(err) = write_result { error!("Error while writing pending upload: {}", err); drop(tempfile); // this is just cleanup, nothing fails if it fails // though temporary files might take up space on the hard drive // We'll clean them when maintenance time comes #[allow(unused_must_use)] { tokio::fs::remove_file(tempfilepath).await; } return Err(err.into()); } hasher = _hasher; } // Manually flush the buffer and drop the handle to close the file tempfile.flush().await?; tempfile.into_inner().sync_all().await?; let hash = hasher.finalize(); debug!("Pending upload hash: {}", hex::encode(&hash)); let filename = format!( "{}/{}/{}/{}/{}", hex::encode([hash[0]]), hex::encode([hash[1]]), hex::encode([hash[2]]), hex::encode([hash[3]]), hex::encode(&hash[4..32]) ); let domain_str = domain.to_string(); let filepath = self.base.join(domain_str.as_str()).join(&filename); let metafilename = filename.clone() + ".json"; let metapath = self.base.join(domain_str.as_str()).join(&metafilename); let metatemppath = self.base.join(domain_str.as_str()).join(metafilename + ".tmp"); metadata.length = std::num::NonZeroUsize::new(length); metadata.etag = Some(hex::encode(&hash)); debug!("File path: {}, metadata: {}", filepath.display(), metapath.display()); { let parent = filepath.parent().unwrap(); tokio::fs::create_dir_all(parent).await?; } let mut meta = OpenOptions::new() .create_new(true) .write(true) .open(&metatemppath) .await?; meta.write_all(&serde_json::to_vec(&metadata).unwrap()).await?; tokio::fs::rename(tempfilepath, filepath).await?; tokio::fs::rename(metatemppath, metapath).await?; Ok(filename) } #[tracing::instrument(skip(self))] async fn read_streaming( &self, domain: &str, filename: &str, ) -> Result<(Metadata, Pin<Box<dyn tokio_stream::Stream<Item = std::io::Result<bytes::Bytes>> + Send>>)> { debug!("Domain: {}, filename: {}", domain, filename); let path = self.base.join(domain).join(filename); debug!("Path: {}", path.display()); let file = OpenOptions::new() .read(true) .open(path) .await?; let meta = self.metadata(domain, filename).await?; Ok((meta, Box::pin( tokio_util::io::ReaderStream::new( // TODO: determine if BufReader provides benefit here // From the logs it looks like we're reading 4KiB at a time // Buffering file contents seems to double download speed // How to benchmark this? BufReader::with_capacity(BUF_CAPACITY, file) ) // Sprinkle some salt in form of protective log wrapping .inspect_ok(|chunk| debug!("Read {} bytes from file", chunk.len())) ))) } #[tracing::instrument(skip(self))] async fn metadata(&self, domain: &str, filename: &str) -> Result<Metadata> { let metapath = self.base.join(domain).join(format!("{}.json", filename)); debug!("Metadata path: {}", metapath.display()); let meta = serde_json::from_slice(&tokio::fs::read(metapath).await?) .map_err(|err| MediaStoreError { kind: ErrorKind::Json, msg: format!("{}", err), source: Some(Box::new(err)) })?; Ok(meta) } #[tracing::instrument(skip(self))] async fn stream_range( &self, domain: &str, filename: &str, range: (Bound<u64>, Bound<u64>) ) -> Result<Pin<Box<dyn tokio_stream::Stream<Item = std::io::Result<bytes::Bytes>> + Send>>> { let path = self.base.join(format!("{}/{}", domain, filename)); let metapath = self.base.join(format!("{}/{}.json", domain, filename)); debug!("Path: {}, metadata: {}", path.display(), metapath.display()); let mut file = OpenOptions::new() .read(true) .open(path) .await?; let start = match range { (Bound::Included(bound), _) => { debug!("Seeking {} bytes forward...", bound); file.seek(std::io::SeekFrom::Start(bound)).await? } (Bound::Excluded(_), _) => unreachable!(), (Bound::Unbounded, Bound::Included(bound)) => { // Seek to the end minus the bounded bytes debug!("Seeking {} bytes back from the end...", bound); file.seek(std::io::SeekFrom::End(i64::try_from(bound).unwrap().neg())).await? }, (Bound::Unbounded, Bound::Unbounded) => 0, (_, Bound::Excluded(_)) => unreachable!() }; let stream = Box::pin(tokio_util::io::ReaderStream::new(BufReader::with_capacity(BUF_CAPACITY, file))) .map_ok({ let mut bytes_read = 0usize; let len = match range { (_, Bound::Unbounded) => None, (Bound::Unbounded, Bound::Included(bound)) => Some(bound), (_, Bound::Included(bound)) => Some(bound + 1 - start), (_, Bound::Excluded(_)) => unreachable!() }; move |chunk| { debug!("Read {} bytes from file, {} in this chunk", bytes_read, chunk.len()); bytes_read += chunk.len(); if let Some(len) = len.map(|len| len.try_into().unwrap()) { if bytes_read > len { if bytes_read - len > chunk.len() { return None } debug!("Truncating last {} bytes", bytes_read - len); return Some(chunk.slice(..chunk.len() - (bytes_read - len))) } } Some(chunk) } }) .try_take_while(|x| std::future::ready(Ok(x.is_some()))) // Will never panic, because the moment the stream yields // a None, it is considered exhausted. .map_ok(|x| x.unwrap()); return Ok(Box::pin(stream)) } async fn delete(&self, domain: &str, filename: &str) -> Result<()> { let path = self.base.join(format!("{}/{}", domain, filename)); Ok(tokio::fs::remove_file(path).await?) } } #[cfg(test)] mod tests { use super::{Metadata, FileStore, MediaStore}; use std::ops::Bound; use tokio::io::AsyncReadExt; #[tokio::test] #[tracing_test::traced_test] async fn test_ranges() { let tempdir = tempfile::tempdir().expect("Failed to create tempdir"); let store = FileStore { base: tempdir.path().to_path_buf() }; let file: &[u8] = include_bytes!("./file.rs"); let stream = tokio_stream::iter(file.chunks(100).map(|i| Ok(bytes::Bytes::copy_from_slice(i)))); let metadata = Metadata { filename: Some("file.rs".to_string()), content_type: Some("text/plain".to_string()), length: None, etag: None, }; // write through the interface let filename = store.write_streaming( "fireburn.ru", metadata, stream ).await.unwrap(); tracing::debug!("Writing complete."); // Ensure the file is there let content = tokio::fs::read( tempdir.path() .join("fireburn.ru") .join(&filename) ).await.unwrap(); assert_eq!(content, file); tracing::debug!("Reading range from the start..."); // try to read range let range = { let stream = store.stream_range( "fireburn.ru", &filename, (Bound::Included(0), Bound::Included(299)) ).await.unwrap(); let mut reader = tokio_util::io::StreamReader::new(stream); let mut buf = Vec::default(); reader.read_to_end(&mut buf).await.unwrap(); buf }; assert_eq!(range.len(), 300); assert_eq!(range.as_slice(), &file[..=299]); tracing::debug!("Reading range from the middle..."); let range = { let stream = store.stream_range( "fireburn.ru", &filename, (Bound::Included(150), Bound::Included(449)) ).await.unwrap(); let mut reader = tokio_util::io::StreamReader::new(stream); let mut buf = Vec::default(); reader.read_to_end(&mut buf).await.unwrap(); buf }; assert_eq!(range.len(), 300); assert_eq!(range.as_slice(), &file[150..=449]); tracing::debug!("Reading range from the end..."); let range = { let stream = store.stream_range( "fireburn.ru", &filename, // Note: the `headers` crate parses bounds in a // non-standard way, where unbounded start actually // means getting things from the end... (Bound::Unbounded, Bound::Included(300)) ).await.unwrap(); let mut reader = tokio_util::io::StreamReader::new(stream); let mut buf = Vec::default(); reader.read_to_end(&mut buf).await.unwrap(); buf }; assert_eq!(range.len(), 300); assert_eq!(range.as_slice(), &file[file.len()-300..file.len()]); tracing::debug!("Reading the whole file..."); // try to read range let range = { let stream = store.stream_range( "fireburn.ru", &("/".to_string() + &filename), (Bound::Unbounded, Bound::Unbounded) ).await.unwrap(); let mut reader = tokio_util::io::StreamReader::new(stream); let mut buf = Vec::default(); reader.read_to_end(&mut buf).await.unwrap(); buf }; assert_eq!(range.len(), file.len()); assert_eq!(range.as_slice(), file); } #[tokio::test] #[tracing_test::traced_test] async fn test_streaming_read_write() { let tempdir = tempfile::tempdir().expect("Failed to create tempdir"); let store = FileStore { base: tempdir.path().to_path_buf() }; let file: &[u8] = include_bytes!("./file.rs"); let stream = tokio_stream::iter(file.chunks(100).map(|i| Ok(bytes::Bytes::copy_from_slice(i)))); let metadata = Metadata { filename: Some("style.css".to_string()), content_type: Some("text/css".to_string()), length: None, etag: None, }; // write through the interface let filename = store.write_streaming( "fireburn.ru", metadata, stream ).await.unwrap(); println!("{}, {}", filename, tempdir.path() .join("fireburn.ru") .join(&filename) .display()); let content = tokio::fs::read( tempdir.path() .join("fireburn.ru") .join(&filename) ).await.unwrap(); assert_eq!(content, file); // check internal metadata format let meta: Metadata = serde_json::from_slice(&tokio::fs::read( tempdir.path() .join("fireburn.ru") .join(filename.clone() + ".json") ).await.unwrap()).unwrap(); assert_eq!(meta.content_type.as_deref(), Some("text/css")); assert_eq!(meta.filename.as_deref(), Some("style.css")); assert_eq!(meta.length.map(|i| i.get()), Some(file.len())); assert!(meta.etag.is_some()); // read back the data using the interface let (metadata, read_back) = { let (metadata, stream) = store.read_streaming( "fireburn.ru", &filename ).await.unwrap(); let mut reader = tokio_util::io::StreamReader::new(stream); let mut buf = Vec::default(); reader.read_to_end(&mut buf).await.unwrap(); (metadata, buf) }; assert_eq!(read_back, file); assert_eq!(metadata.content_type.as_deref(), Some("text/css")); assert_eq!(meta.filename.as_deref(), Some("style.css")); assert_eq!(meta.length.map(|i| i.get()), Some(file.len())); assert!(meta.etag.is_some()); } }