diff options
Diffstat (limited to 'kittybox-rs/src/media/storage/file.rs')
-rw-r--r-- | kittybox-rs/src/media/storage/file.rs | 215 |
1 files changed, 186 insertions, 29 deletions
diff --git a/kittybox-rs/src/media/storage/file.rs b/kittybox-rs/src/media/storage/file.rs index ea1f010..176c2f4 100644 --- a/kittybox-rs/src/media/storage/file.rs +++ b/kittybox-rs/src/media/storage/file.rs @@ -1,8 +1,12 @@ -use super::{ErrorKind, MediaStore, MediaStoreError, Result}; +use super::{Metadata, ErrorKind, MediaStore, MediaStoreError, Result}; use async_trait::async_trait; use std::path::PathBuf; use tokio::fs::OpenOptions; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::io::AsyncWriteExt; +use futures::StreamExt; +use std::pin::Pin; +use sha2::Digest; +use futures::FutureExt; #[derive(Clone)] pub struct FileStore { @@ -12,56 +16,209 @@ pub struct FileStore { impl From<tokio::io::Error> for MediaStoreError { fn from(source: tokio::io::Error) -> Self { Self { + msg: format!("file I/O error: {}", source), + kind: match source.kind() { + std::io::ErrorKind::NotFound => ErrorKind::NotFound, + _ => ErrorKind::Backend + }, source: Some(Box::new(source)), - msg: "file I/O error".to_owned(), - kind: ErrorKind::Backend, + } + } +} + + +impl FileStore { + pub fn new<T: Into<PathBuf>>(base: T) -> Self { + Self { base: base.into() } + } + + async fn mktemp(&self) -> Result<(PathBuf, tokio::fs::File)> { + use rand::{Rng, distributions::Alphanumeric}; + tokio::fs::create_dir_all(self.base.as_path()).await?; + loop { + let filename = self.base.join(format!("temp.{}", { + let string = rand::thread_rng() + .sample_iter(&Alphanumeric) + .take(16) + .collect::<Vec<u8>>(); + String::from_utf8(string).unwrap() + })); + + match OpenOptions::new() + .create_new(true) + .write(true) + .open(&filename) + .await + { + Ok(file) => return Ok((filename, file)), + Err(err) => match err.kind() { + std::io::ErrorKind::AlreadyExists => continue, + _ => return Err(err.into()) + } + } } } } #[async_trait] impl MediaStore for FileStore { - async fn write_streaming( + async fn write_streaming<T>( &self, - domain: url::Host, - filename: &str, - content: axum::extract::multipart::Field<'_>, - ) -> Result<()> { - todo!() + domain: &str, + mut metadata: Metadata, + mut content: T, + ) -> Result<String> + where + T: tokio_stream::Stream<Item = std::result::Result<bytes::Bytes, axum::extract::multipart::MultipartError>> + Unpin + Send + { + let (tempfilepath, mut tempfile) = self.mktemp().await?; + let mut hasher = sha2::Sha256::new(); + let mut length: usize = 0; + + while let Some(chunk) = content.next().await { + let chunk = std::sync::Arc::new(chunk.map_err(|err| MediaStoreError { + kind: ErrorKind::Backend, + source: Some(Box::new(err)), + msg: "Failed to read a data chunk".to_owned() + })?); + length += chunk.len(); + let (write_result, _hasher) = tokio::join!( + tempfile.write_all(&*chunk), + { + let chunk = chunk.clone(); + tokio::task::spawn_blocking(move || { + hasher.update(&*chunk); + + hasher + }).map(|r| r.unwrap()) + } + ); + if let Err(err) = write_result { + drop(tempfile); + // this is just cleanup, nothing fails if it fails + // though temporary files might take up space on the hard drive + // We'll clean them when maintenance time comes + #[allow(unused_must_use)] + { tokio::fs::remove_file(tempfilepath).await; } + return Err(err.into()); + } + hasher = _hasher; + } + + let hash = hasher.finalize(); + let filename = format!( + "{}/{}/{}/{}/{}", + hex::encode([hash[0]]), + hex::encode([hash[1]]), + hex::encode([hash[2]]), + hex::encode([hash[3]]), + hex::encode(&hash[4..32]) + ); + metadata.length = Some(length); + let domain_str = domain.to_string(); + let filepath = self.base.join(domain_str.as_str()).join(&filename); + let metafilename = filename.clone() + ".json"; + let metapath = self.base.join(domain_str.as_str()).join(metafilename); + { + let parent = filepath.parent().unwrap(); + tokio::fs::create_dir_all(parent).await?; + } + let mut meta = OpenOptions::new() + .create_new(true) + .write(true) + .open(&metapath) + .await?; + meta.write_all(&serde_json::to_vec(&metadata).unwrap()).await?; + tokio::fs::rename(tempfilepath, filepath).await?; + Ok(filename) } async fn read_streaming( &self, - domain: url::Host, + domain: &str, filename: &str, - ) -> Result<tokio_util::io::ReaderStream<Box<dyn tokio::io::AsyncRead>>> { - todo!() - } - - async fn write(&self, domain: url::Host, filename: &str, content: &[u8]) -> Result<()> { - let path = self.base.join(format!("{}/{}", domain, filename)); + ) -> Result<(Metadata, Pin<Box<dyn tokio_stream::Stream<Item = std::io::Result<bytes::Bytes>> + Send>>)> { + let path = self.base.join(format!("{}{}", domain, filename)); + let metapath = self.base.join(format!("{}{}.json", domain, filename)); + tracing::debug!("Path: {}, metadata: {}", path.display(), metapath.display()); - let mut file = OpenOptions::new() - .create_new(true) - .write(true) + let file = OpenOptions::new() + .read(true) .open(path) .await?; + let meta = serde_json::from_slice(&tokio::fs::read(metapath).await?) + .map_err(|err| MediaStoreError { + kind: ErrorKind::Json, + msg: format!("{}", err), + source: Some(Box::new(err)) + })?; - Ok(file.write_all(content).await?) + Ok((meta, Box::pin(tokio_util::io::ReaderStream::new(file)))) } - async fn read(&self, domain: url::Host, filename: &str) -> Result<Vec<u8>> { + async fn delete(&self, domain: &str, filename: &str) -> Result<()> { let path = self.base.join(format!("{}/{}", domain, filename)); - let mut file = OpenOptions::new().read(true).open(path).await?; + Ok(tokio::fs::remove_file(path).await?) + } +} - let mut buf: Vec<u8> = Vec::default(); - file.read_to_end(&mut buf).await?; +#[cfg(test)] +mod tests { + use super::{Metadata, FileStore, MediaStore}; + use tokio::io::AsyncReadExt; - Ok(buf) - } + #[tokio::test] + async fn test_streaming_read_write() { + let tempdir = tempdir::TempDir::new("file").expect("Failed to create tempdir"); + let store = FileStore::new(tempdir.path()); + + let file: &[u8] = include_bytes!("../../../../README.md"); + let stream = tokio_stream::iter(file.chunks(100).map(|i| Ok(bytes::Bytes::copy_from_slice(i)))); + let metadata = Metadata { + filename: Some("README.md".to_string()), + content_type: "text/markdown".to_string(), + length: None + }; + + let filename = store.write_streaming( + "fireburn.ru", + metadata, stream + ).await.unwrap(); + + let content = tokio::fs::read( + tempdir.path() + .join("fireburn.ru") + .join(&filename) + ).await.unwrap(); + assert_eq!(content, file); + + let meta: Metadata = serde_json::from_slice(&tokio::fs::read( + tempdir.path() + .join("fireburn.ru") + .join(filename.clone() + ".json") + ).await.unwrap()).unwrap(); + assert_eq!(&meta.content_type, "text/markdown"); + assert_eq!(meta.filename.as_deref(), Some("README.md")); + assert_eq!(meta.length, Some(file.len())); + + let (metadata, read_back) = { + let (metadata, stream) = store.read_streaming( + "fireburn.ru", + &filename + ).await.unwrap(); + let mut reader = tokio_util::io::StreamReader::new(stream); + + let mut buf = Vec::default(); + reader.read_to_end(&mut buf).await.unwrap(); + + (metadata, buf) + }; + + assert_eq!(read_back, file); + assert_eq!(&metadata.content_type, "text/markdown"); + assert_eq!(meta.filename.as_deref(), Some("README.md")); + assert_eq!(meta.length, Some(file.len())); - async fn delete(&self, domain: url::Host, filename: &str) -> Result<()> { - todo!() } } |