From 1031d495d5b78d9b19dcdc414b6d7b0daf313bb2 Mon Sep 17 00:00:00 2001 From: Vika Date: Sun, 10 Jul 2022 00:55:20 +0300 Subject: media: media endpoint PoC Supported features: - Streaming upload - Content-addressed storage - Metadata - MIME type (taken from Content-Type) - Length (I could use stat() for this one tho) - filename (for Content-Disposition: attachment, WIP) --- kittybox-rs/src/media/storage/file.rs | 215 +++++++++++++++++++++++++++++----- kittybox-rs/src/media/storage/mod.rs | 60 ++++++++-- 2 files changed, 234 insertions(+), 41 deletions(-) (limited to 'kittybox-rs/src/media/storage') diff --git a/kittybox-rs/src/media/storage/file.rs b/kittybox-rs/src/media/storage/file.rs index ea1f010..176c2f4 100644 --- a/kittybox-rs/src/media/storage/file.rs +++ b/kittybox-rs/src/media/storage/file.rs @@ -1,8 +1,12 @@ -use super::{ErrorKind, MediaStore, MediaStoreError, Result}; +use super::{Metadata, ErrorKind, MediaStore, MediaStoreError, Result}; use async_trait::async_trait; use std::path::PathBuf; use tokio::fs::OpenOptions; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::io::AsyncWriteExt; +use futures::StreamExt; +use std::pin::Pin; +use sha2::Digest; +use futures::FutureExt; #[derive(Clone)] pub struct FileStore { @@ -12,56 +16,209 @@ pub struct FileStore { impl From for MediaStoreError { fn from(source: tokio::io::Error) -> Self { Self { + msg: format!("file I/O error: {}", source), + kind: match source.kind() { + std::io::ErrorKind::NotFound => ErrorKind::NotFound, + _ => ErrorKind::Backend + }, source: Some(Box::new(source)), - msg: "file I/O error".to_owned(), - kind: ErrorKind::Backend, + } + } +} + + +impl FileStore { + pub fn new>(base: T) -> Self { + Self { base: base.into() } + } + + async fn mktemp(&self) -> Result<(PathBuf, tokio::fs::File)> { + use rand::{Rng, distributions::Alphanumeric}; + tokio::fs::create_dir_all(self.base.as_path()).await?; + loop { + let filename = self.base.join(format!("temp.{}", { + let string = rand::thread_rng() + .sample_iter(&Alphanumeric) + .take(16) + .collect::>(); + String::from_utf8(string).unwrap() + })); + + match OpenOptions::new() + .create_new(true) + .write(true) + .open(&filename) + .await + { + Ok(file) => return Ok((filename, file)), + Err(err) => match err.kind() { + std::io::ErrorKind::AlreadyExists => continue, + _ => return Err(err.into()) + } + } } } } #[async_trait] impl MediaStore for FileStore { - async fn write_streaming( + async fn write_streaming( &self, - domain: url::Host, - filename: &str, - content: axum::extract::multipart::Field<'_>, - ) -> Result<()> { - todo!() + domain: &str, + mut metadata: Metadata, + mut content: T, + ) -> Result + where + T: tokio_stream::Stream> + Unpin + Send + { + let (tempfilepath, mut tempfile) = self.mktemp().await?; + let mut hasher = sha2::Sha256::new(); + let mut length: usize = 0; + + while let Some(chunk) = content.next().await { + let chunk = std::sync::Arc::new(chunk.map_err(|err| MediaStoreError { + kind: ErrorKind::Backend, + source: Some(Box::new(err)), + msg: "Failed to read a data chunk".to_owned() + })?); + length += chunk.len(); + let (write_result, _hasher) = tokio::join!( + tempfile.write_all(&*chunk), + { + let chunk = chunk.clone(); + tokio::task::spawn_blocking(move || { + hasher.update(&*chunk); + + hasher + }).map(|r| r.unwrap()) + } + ); + if let Err(err) = write_result { + drop(tempfile); + // this is just cleanup, nothing fails if it fails + // though temporary files might take up space on the hard drive + // We'll clean them when maintenance time comes + #[allow(unused_must_use)] + { tokio::fs::remove_file(tempfilepath).await; } + return Err(err.into()); + } + hasher = _hasher; + } + + let hash = hasher.finalize(); + let filename = format!( + "{}/{}/{}/{}/{}", + hex::encode([hash[0]]), + hex::encode([hash[1]]), + hex::encode([hash[2]]), + hex::encode([hash[3]]), + hex::encode(&hash[4..32]) + ); + metadata.length = Some(length); + let domain_str = domain.to_string(); + let filepath = self.base.join(domain_str.as_str()).join(&filename); + let metafilename = filename.clone() + ".json"; + let metapath = self.base.join(domain_str.as_str()).join(metafilename); + { + let parent = filepath.parent().unwrap(); + tokio::fs::create_dir_all(parent).await?; + } + let mut meta = OpenOptions::new() + .create_new(true) + .write(true) + .open(&metapath) + .await?; + meta.write_all(&serde_json::to_vec(&metadata).unwrap()).await?; + tokio::fs::rename(tempfilepath, filepath).await?; + Ok(filename) } async fn read_streaming( &self, - domain: url::Host, + domain: &str, filename: &str, - ) -> Result>> { - todo!() - } - - async fn write(&self, domain: url::Host, filename: &str, content: &[u8]) -> Result<()> { - let path = self.base.join(format!("{}/{}", domain, filename)); + ) -> Result<(Metadata, Pin> + Send>>)> { + let path = self.base.join(format!("{}{}", domain, filename)); + let metapath = self.base.join(format!("{}{}.json", domain, filename)); + tracing::debug!("Path: {}, metadata: {}", path.display(), metapath.display()); - let mut file = OpenOptions::new() - .create_new(true) - .write(true) + let file = OpenOptions::new() + .read(true) .open(path) .await?; + let meta = serde_json::from_slice(&tokio::fs::read(metapath).await?) + .map_err(|err| MediaStoreError { + kind: ErrorKind::Json, + msg: format!("{}", err), + source: Some(Box::new(err)) + })?; - Ok(file.write_all(content).await?) + Ok((meta, Box::pin(tokio_util::io::ReaderStream::new(file)))) } - async fn read(&self, domain: url::Host, filename: &str) -> Result> { + async fn delete(&self, domain: &str, filename: &str) -> Result<()> { let path = self.base.join(format!("{}/{}", domain, filename)); - let mut file = OpenOptions::new().read(true).open(path).await?; + Ok(tokio::fs::remove_file(path).await?) + } +} - let mut buf: Vec = Vec::default(); - file.read_to_end(&mut buf).await?; +#[cfg(test)] +mod tests { + use super::{Metadata, FileStore, MediaStore}; + use tokio::io::AsyncReadExt; - Ok(buf) - } + #[tokio::test] + async fn test_streaming_read_write() { + let tempdir = tempdir::TempDir::new("file").expect("Failed to create tempdir"); + let store = FileStore::new(tempdir.path()); + + let file: &[u8] = include_bytes!("../../../../README.md"); + let stream = tokio_stream::iter(file.chunks(100).map(|i| Ok(bytes::Bytes::copy_from_slice(i)))); + let metadata = Metadata { + filename: Some("README.md".to_string()), + content_type: "text/markdown".to_string(), + length: None + }; + + let filename = store.write_streaming( + "fireburn.ru", + metadata, stream + ).await.unwrap(); + + let content = tokio::fs::read( + tempdir.path() + .join("fireburn.ru") + .join(&filename) + ).await.unwrap(); + assert_eq!(content, file); + + let meta: Metadata = serde_json::from_slice(&tokio::fs::read( + tempdir.path() + .join("fireburn.ru") + .join(filename.clone() + ".json") + ).await.unwrap()).unwrap(); + assert_eq!(&meta.content_type, "text/markdown"); + assert_eq!(meta.filename.as_deref(), Some("README.md")); + assert_eq!(meta.length, Some(file.len())); + + let (metadata, read_back) = { + let (metadata, stream) = store.read_streaming( + "fireburn.ru", + &filename + ).await.unwrap(); + let mut reader = tokio_util::io::StreamReader::new(stream); + + let mut buf = Vec::default(); + reader.read_to_end(&mut buf).await.unwrap(); + + (metadata, buf) + }; + + assert_eq!(read_back, file); + assert_eq!(&metadata.content_type, "text/markdown"); + assert_eq!(meta.filename.as_deref(), Some("README.md")); + assert_eq!(meta.length, Some(file.len())); - async fn delete(&self, domain: url::Host, filename: &str) -> Result<()> { - todo!() } } diff --git a/kittybox-rs/src/media/storage/mod.rs b/kittybox-rs/src/media/storage/mod.rs index ba880ab..cb8b38f 100644 --- a/kittybox-rs/src/media/storage/mod.rs +++ b/kittybox-rs/src/media/storage/mod.rs @@ -1,12 +1,39 @@ use async_trait::async_trait; +use axum::extract::multipart::Field; +use tokio_stream::Stream; +use bytes::Bytes; +use serde::{Deserialize, Serialize}; +use std::pin::Pin; pub mod file; +#[derive(Debug, Deserialize, Serialize)] +pub struct Metadata { + pub(super) content_type: String, + pub(super) filename: Option, + pub(super) length: Option +} +impl From<&Field<'_>> for Metadata { + fn from(field: &Field<'_>) -> Self { + Self { + content_type: field.content_type() + .map(|i| i.to_owned()) + .or_else(|| Some("application/octet-stream".to_owned())) + .unwrap(), + filename: field.file_name() + .map(|i| i.to_owned()), + length: None + } + } +} + + #[derive(Debug, Clone, Copy)] pub enum ErrorKind { Backend, Permission, - Conflict, + Json, + NotFound, Other, } @@ -17,6 +44,12 @@ pub struct MediaStoreError { msg: String, } +impl MediaStoreError { + pub fn kind(&self) -> ErrorKind { + self.kind + } +} + impl std::error::Error for MediaStoreError { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { self.source @@ -33,7 +66,8 @@ impl std::fmt::Display for MediaStoreError { match self.kind { ErrorKind::Backend => "media storage backend error", ErrorKind::Permission => "permission denied", - ErrorKind::Conflict => "conflict with existing data", + ErrorKind::Json => "failed to parse json", + ErrorKind::NotFound => "blob not found", ErrorKind::Other => "unknown media storage error", }, self.msg @@ -45,18 +79,20 @@ pub type Result = std::result::Result; #[async_trait] pub trait MediaStore: 'static + Send + Sync + Clone { - async fn write_streaming( + async fn write_streaming( &self, - domain: url::Host, - filename: &str, - content: axum::extract::multipart::Field<'_>, - ) -> Result<()>; - async fn write(&self, domain: url::Host, filename: &str, content: &[u8]) -> Result<()>; + domain: &str, + metadata: Metadata, + content: T, + ) -> Result + where + T: tokio_stream::Stream> + Unpin + Send; + async fn read_streaming( &self, - domain: url::Host, + domain: &str, filename: &str, - ) -> Result>>; - async fn read(&self, domain: url::Host, filename: &str) -> Result>; - async fn delete(&self, domain: url::Host, filename: &str) -> Result<()>; + ) -> Result<(Metadata, Pin> + Send>>)>; + + async fn delete(&self, domain: &str, filename: &str) -> Result<()>; } -- cgit 1.4.1