diff options
author | Vika <vika@fireburn.ru> | 2023-07-29 21:59:56 +0300 |
---|---|---|
committer | Vika <vika@fireburn.ru> | 2023-07-29 21:59:56 +0300 |
commit | 0617663b249f9ca488e5de652108b17d67fbaf45 (patch) | |
tree | 11564b6c8fa37bf9203a0a4cc1c4e9cc088cb1a5 /kittybox-rs/src/media/storage | |
parent | 26c2b79f6a6380ae3224e9309b9f3352f5717bd7 (diff) | |
download | kittybox-0617663b249f9ca488e5de652108b17d67fbaf45.tar.zst |
Moved the entire Kittybox tree into the root
Diffstat (limited to 'kittybox-rs/src/media/storage')
-rw-r--r-- | kittybox-rs/src/media/storage/file.rs | 434 | ||||
-rw-r--r-- | kittybox-rs/src/media/storage/mod.rs | 177 |
2 files changed, 0 insertions, 611 deletions
diff --git a/kittybox-rs/src/media/storage/file.rs b/kittybox-rs/src/media/storage/file.rs deleted file mode 100644 index 0aaaa3b..0000000 --- a/kittybox-rs/src/media/storage/file.rs +++ /dev/null @@ -1,434 +0,0 @@ -use super::{Metadata, ErrorKind, MediaStore, MediaStoreError, Result}; -use async_trait::async_trait; -use std::{path::PathBuf, fmt::Debug}; -use tokio::fs::OpenOptions; -use tokio::io::{BufReader, BufWriter, AsyncWriteExt, AsyncSeekExt}; -use futures::{StreamExt, TryStreamExt}; -use std::ops::{Bound, RangeBounds, Neg}; -use std::pin::Pin; -use sha2::Digest; -use futures::FutureExt; -use tracing::{debug, error}; - -const BUF_CAPACITY: usize = 16 * 1024; - -#[derive(Clone)] -pub struct FileStore { - base: PathBuf, -} - -impl From<tokio::io::Error> for MediaStoreError { - fn from(source: tokio::io::Error) -> Self { - Self { - msg: format!("file I/O error: {}", source), - kind: match source.kind() { - std::io::ErrorKind::NotFound => ErrorKind::NotFound, - _ => ErrorKind::Backend - }, - source: Some(Box::new(source)), - } - } -} - -impl FileStore { - pub fn new<T: Into<PathBuf>>(base: T) -> Self { - Self { base: base.into() } - } - - async fn mktemp(&self) -> Result<(PathBuf, BufWriter<tokio::fs::File>)> { - kittybox_util::fs::mktemp(&self.base, "temp", 16) - .await - .map(|(name, file)| (name, BufWriter::new(file))) - .map_err(Into::into) - } -} - -#[async_trait] -impl MediaStore for FileStore { - - #[tracing::instrument(skip(self, content))] - async fn write_streaming<T>( - &self, - domain: &str, - mut metadata: Metadata, - mut content: T, - ) -> Result<String> - where - T: tokio_stream::Stream<Item = std::result::Result<bytes::Bytes, axum::extract::multipart::MultipartError>> + Unpin + Send + Debug - { - let (tempfilepath, mut tempfile) = self.mktemp().await?; - debug!("Temporary file opened for storing pending upload: {}", tempfilepath.display()); - let mut hasher = sha2::Sha256::new(); - let mut length: usize = 0; - - while let Some(chunk) = content.next().await { - let chunk = chunk.map_err(|err| MediaStoreError { - kind: ErrorKind::Backend, - source: Some(Box::new(err)), - msg: "Failed to read a data chunk".to_owned() - })?; - debug!("Read {} bytes from the stream", chunk.len()); - length += chunk.len(); - let (write_result, _hasher) = tokio::join!( - { - let chunk = chunk.clone(); - let tempfile = &mut tempfile; - async move { - tempfile.write_all(&*chunk).await - } - }, - { - let chunk = chunk.clone(); - tokio::task::spawn_blocking(move || { - hasher.update(&*chunk); - - hasher - }).map(|r| r.unwrap()) - } - ); - if let Err(err) = write_result { - error!("Error while writing pending upload: {}", err); - drop(tempfile); - // this is just cleanup, nothing fails if it fails - // though temporary files might take up space on the hard drive - // We'll clean them when maintenance time comes - #[allow(unused_must_use)] - { tokio::fs::remove_file(tempfilepath).await; } - return Err(err.into()); - } - hasher = _hasher; - } - // Manually flush the buffer and drop the handle to close the file - tempfile.flush().await?; - tempfile.into_inner().sync_all().await?; - - let hash = hasher.finalize(); - debug!("Pending upload hash: {}", hex::encode(&hash)); - let filename = format!( - "{}/{}/{}/{}/{}", - hex::encode([hash[0]]), - hex::encode([hash[1]]), - hex::encode([hash[2]]), - hex::encode([hash[3]]), - hex::encode(&hash[4..32]) - ); - let domain_str = domain.to_string(); - let filepath = self.base.join(domain_str.as_str()).join(&filename); - let metafilename = filename.clone() + ".json"; - let metapath = self.base.join(domain_str.as_str()).join(&metafilename); - let metatemppath = self.base.join(domain_str.as_str()).join(metafilename + ".tmp"); - metadata.length = std::num::NonZeroUsize::new(length); - metadata.etag = Some(hex::encode(&hash)); - debug!("File path: {}, metadata: {}", filepath.display(), metapath.display()); - { - let parent = filepath.parent().unwrap(); - tokio::fs::create_dir_all(parent).await?; - } - let mut meta = OpenOptions::new() - .create_new(true) - .write(true) - .open(&metatemppath) - .await?; - meta.write_all(&serde_json::to_vec(&metadata).unwrap()).await?; - tokio::fs::rename(tempfilepath, filepath).await?; - tokio::fs::rename(metatemppath, metapath).await?; - Ok(filename) - } - - #[tracing::instrument(skip(self))] - async fn read_streaming( - &self, - domain: &str, - filename: &str, - ) -> Result<(Metadata, Pin<Box<dyn tokio_stream::Stream<Item = std::io::Result<bytes::Bytes>> + Send>>)> { - debug!("Domain: {}, filename: {}", domain, filename); - let path = self.base.join(domain).join(filename); - debug!("Path: {}", path.display()); - - let file = OpenOptions::new() - .read(true) - .open(path) - .await?; - let meta = self.metadata(domain, filename).await?; - - Ok((meta, Box::pin( - tokio_util::io::ReaderStream::new( - // TODO: determine if BufReader provides benefit here - // From the logs it looks like we're reading 4KiB at a time - // Buffering file contents seems to double download speed - // How to benchmark this? - BufReader::with_capacity(BUF_CAPACITY, file) - ) - // Sprinkle some salt in form of protective log wrapping - .inspect_ok(|chunk| debug!("Read {} bytes from file", chunk.len())) - ))) - } - - #[tracing::instrument(skip(self))] - async fn metadata(&self, domain: &str, filename: &str) -> Result<Metadata> { - let metapath = self.base.join(domain).join(format!("{}.json", filename)); - debug!("Metadata path: {}", metapath.display()); - - let meta = serde_json::from_slice(&tokio::fs::read(metapath).await?) - .map_err(|err| MediaStoreError { - kind: ErrorKind::Json, - msg: format!("{}", err), - source: Some(Box::new(err)) - })?; - - Ok(meta) - } - - #[tracing::instrument(skip(self))] - async fn stream_range( - &self, - domain: &str, - filename: &str, - range: (Bound<u64>, Bound<u64>) - ) -> Result<Pin<Box<dyn tokio_stream::Stream<Item = std::io::Result<bytes::Bytes>> + Send>>> { - let path = self.base.join(format!("{}/{}", domain, filename)); - let metapath = self.base.join(format!("{}/{}.json", domain, filename)); - debug!("Path: {}, metadata: {}", path.display(), metapath.display()); - - let mut file = OpenOptions::new() - .read(true) - .open(path) - .await?; - - let start = match range { - (Bound::Included(bound), _) => { - debug!("Seeking {} bytes forward...", bound); - file.seek(std::io::SeekFrom::Start(bound)).await? - } - (Bound::Excluded(_), _) => unreachable!(), - (Bound::Unbounded, Bound::Included(bound)) => { - // Seek to the end minus the bounded bytes - debug!("Seeking {} bytes back from the end...", bound); - file.seek(std::io::SeekFrom::End(i64::try_from(bound).unwrap().neg())).await? - }, - (Bound::Unbounded, Bound::Unbounded) => 0, - (_, Bound::Excluded(_)) => unreachable!() - }; - - let stream = Box::pin(tokio_util::io::ReaderStream::new(BufReader::with_capacity(BUF_CAPACITY, file))) - .map_ok({ - let mut bytes_read = 0usize; - let len = match range { - (_, Bound::Unbounded) => None, - (Bound::Unbounded, Bound::Included(bound)) => Some(bound), - (_, Bound::Included(bound)) => Some(bound + 1 - start), - (_, Bound::Excluded(_)) => unreachable!() - }; - move |chunk| { - debug!("Read {} bytes from file, {} in this chunk", bytes_read, chunk.len()); - bytes_read += chunk.len(); - if let Some(len) = len.map(|len| len.try_into().unwrap()) { - if bytes_read > len { - if bytes_read - len > chunk.len() { - return None - } - debug!("Truncating last {} bytes", bytes_read - len); - return Some(chunk.slice(..chunk.len() - (bytes_read - len))) - } - } - - Some(chunk) - } - }) - .try_take_while(|x| std::future::ready(Ok(x.is_some()))) - // Will never panic, because the moment the stream yields - // a None, it is considered exhausted. - .map_ok(|x| x.unwrap()); - - return Ok(Box::pin(stream)) - } - - - async fn delete(&self, domain: &str, filename: &str) -> Result<()> { - let path = self.base.join(format!("{}/{}", domain, filename)); - - Ok(tokio::fs::remove_file(path).await?) - } -} - -#[cfg(test)] -mod tests { - use super::{Metadata, FileStore, MediaStore}; - use std::ops::Bound; - use tokio::io::AsyncReadExt; - - #[tokio::test] - #[tracing_test::traced_test] - async fn test_ranges() { - let tempdir = tempfile::tempdir().expect("Failed to create tempdir"); - let store = FileStore::new(tempdir.path()); - - let file: &[u8] = include_bytes!("./file.rs"); - let stream = tokio_stream::iter(file.chunks(100).map(|i| Ok(bytes::Bytes::copy_from_slice(i)))); - let metadata = Metadata { - filename: Some("file.rs".to_string()), - content_type: Some("text/plain".to_string()), - length: None, - etag: None, - }; - - // write through the interface - let filename = store.write_streaming( - "fireburn.ru", - metadata, stream - ).await.unwrap(); - - tracing::debug!("Writing complete."); - - // Ensure the file is there - let content = tokio::fs::read( - tempdir.path() - .join("fireburn.ru") - .join(&filename) - ).await.unwrap(); - assert_eq!(content, file); - - tracing::debug!("Reading range from the start..."); - // try to read range - let range = { - let stream = store.stream_range( - "fireburn.ru", &filename, - (Bound::Included(0), Bound::Included(299)) - ).await.unwrap(); - - let mut reader = tokio_util::io::StreamReader::new(stream); - - let mut buf = Vec::default(); - reader.read_to_end(&mut buf).await.unwrap(); - - buf - }; - - assert_eq!(range.len(), 300); - assert_eq!(range.as_slice(), &file[..=299]); - - tracing::debug!("Reading range from the middle..."); - - let range = { - let stream = store.stream_range( - "fireburn.ru", &filename, - (Bound::Included(150), Bound::Included(449)) - ).await.unwrap(); - - let mut reader = tokio_util::io::StreamReader::new(stream); - - let mut buf = Vec::default(); - reader.read_to_end(&mut buf).await.unwrap(); - - buf - }; - - assert_eq!(range.len(), 300); - assert_eq!(range.as_slice(), &file[150..=449]); - - tracing::debug!("Reading range from the end..."); - let range = { - let stream = store.stream_range( - "fireburn.ru", &filename, - // Note: the `headers` crate parses bounds in a - // non-standard way, where unbounded start actually - // means getting things from the end... - (Bound::Unbounded, Bound::Included(300)) - ).await.unwrap(); - - let mut reader = tokio_util::io::StreamReader::new(stream); - - let mut buf = Vec::default(); - reader.read_to_end(&mut buf).await.unwrap(); - - buf - }; - - assert_eq!(range.len(), 300); - assert_eq!(range.as_slice(), &file[file.len()-300..file.len()]); - - tracing::debug!("Reading the whole file..."); - // try to read range - let range = { - let stream = store.stream_range( - "fireburn.ru", &("/".to_string() + &filename), - (Bound::Unbounded, Bound::Unbounded) - ).await.unwrap(); - - let mut reader = tokio_util::io::StreamReader::new(stream); - - let mut buf = Vec::default(); - reader.read_to_end(&mut buf).await.unwrap(); - - buf - }; - - assert_eq!(range.len(), file.len()); - assert_eq!(range.as_slice(), file); - } - - - #[tokio::test] - #[tracing_test::traced_test] - async fn test_streaming_read_write() { - let tempdir = tempfile::tempdir().expect("Failed to create tempdir"); - let store = FileStore::new(tempdir.path()); - - let file: &[u8] = include_bytes!("./file.rs"); - let stream = tokio_stream::iter(file.chunks(100).map(|i| Ok(bytes::Bytes::copy_from_slice(i)))); - let metadata = Metadata { - filename: Some("style.css".to_string()), - content_type: Some("text/css".to_string()), - length: None, - etag: None, - }; - - // write through the interface - let filename = store.write_streaming( - "fireburn.ru", - metadata, stream - ).await.unwrap(); - println!("{}, {}", filename, tempdir.path() - .join("fireburn.ru") - .join(&filename) - .display()); - let content = tokio::fs::read( - tempdir.path() - .join("fireburn.ru") - .join(&filename) - ).await.unwrap(); - assert_eq!(content, file); - - // check internal metadata format - let meta: Metadata = serde_json::from_slice(&tokio::fs::read( - tempdir.path() - .join("fireburn.ru") - .join(filename.clone() + ".json") - ).await.unwrap()).unwrap(); - assert_eq!(meta.content_type.as_deref(), Some("text/css")); - assert_eq!(meta.filename.as_deref(), Some("style.css")); - assert_eq!(meta.length.map(|i| i.get()), Some(file.len())); - assert!(meta.etag.is_some()); - - // read back the data using the interface - let (metadata, read_back) = { - let (metadata, stream) = store.read_streaming( - "fireburn.ru", - &filename - ).await.unwrap(); - let mut reader = tokio_util::io::StreamReader::new(stream); - - let mut buf = Vec::default(); - reader.read_to_end(&mut buf).await.unwrap(); - - (metadata, buf) - }; - - assert_eq!(read_back, file); - assert_eq!(metadata.content_type.as_deref(), Some("text/css")); - assert_eq!(meta.filename.as_deref(), Some("style.css")); - assert_eq!(meta.length.map(|i| i.get()), Some(file.len())); - assert!(meta.etag.is_some()); - - } -} diff --git a/kittybox-rs/src/media/storage/mod.rs b/kittybox-rs/src/media/storage/mod.rs deleted file mode 100644 index 020999c..0000000 --- a/kittybox-rs/src/media/storage/mod.rs +++ /dev/null @@ -1,177 +0,0 @@ -use async_trait::async_trait; -use axum::extract::multipart::Field; -use tokio_stream::Stream; -use bytes::Bytes; -use serde::{Deserialize, Serialize}; -use std::ops::Bound; -use std::pin::Pin; -use std::fmt::Debug; -use std::num::NonZeroUsize; - -pub mod file; - -#[derive(Debug, Deserialize, Serialize)] -pub struct Metadata { - /// Content type of the file. If None, the content-type is considered undefined. - pub content_type: Option<String>, - /// The original filename that was passed. - pub filename: Option<String>, - /// The recorded length of the file. - pub length: Option<NonZeroUsize>, - /// The e-tag of a file. Note: it must be a strong e-tag, for example, a hash. - pub etag: Option<String>, -} -impl From<&Field<'_>> for Metadata { - fn from(field: &Field<'_>) -> Self { - Self { - content_type: field.content_type() - .map(|i| i.to_owned()), - filename: field.file_name() - .map(|i| i.to_owned()), - length: None, - etag: None, - } - } -} - - -#[derive(Debug, Clone, Copy)] -pub enum ErrorKind { - Backend, - Permission, - Json, - NotFound, - Other, -} - -#[derive(Debug)] -pub struct MediaStoreError { - kind: ErrorKind, - source: Option<Box<dyn std::error::Error + Send + Sync>>, - msg: String, -} - -impl MediaStoreError { - pub fn kind(&self) -> ErrorKind { - self.kind - } -} - -impl std::error::Error for MediaStoreError { - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - self.source - .as_ref() - .map(|i| i.as_ref() as &dyn std::error::Error) - } -} - -impl std::fmt::Display for MediaStoreError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "{}: {}", - match self.kind { - ErrorKind::Backend => "media storage backend error", - ErrorKind::Permission => "permission denied", - ErrorKind::Json => "failed to parse json", - ErrorKind::NotFound => "blob not found", - ErrorKind::Other => "unknown media storage error", - }, - self.msg - ) - } -} - -pub type Result<T> = std::result::Result<T, MediaStoreError>; - -#[async_trait] -pub trait MediaStore: 'static + Send + Sync + Clone { - async fn write_streaming<T>( - &self, - domain: &str, - metadata: Metadata, - content: T, - ) -> Result<String> - where - T: tokio_stream::Stream<Item = std::result::Result<bytes::Bytes, axum::extract::multipart::MultipartError>> + Unpin + Send + Debug; - - async fn read_streaming( - &self, - domain: &str, - filename: &str, - ) -> Result<(Metadata, Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>)>; - - async fn stream_range( - &self, - domain: &str, - filename: &str, - range: (Bound<u64>, Bound<u64>) - ) -> Result<Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>> { - use futures::stream::TryStreamExt; - use tracing::debug; - let (metadata, mut stream) = self.read_streaming(domain, filename).await?; - let length = metadata.length.unwrap().get(); - - use Bound::*; - let (start, end): (usize, usize) = match range { - (Unbounded, Unbounded) => return Ok(stream), - (Included(start), Unbounded) => (start.try_into().unwrap(), length - 1), - (Unbounded, Included(end)) => (length - usize::try_from(end).unwrap(), length - 1), - (Included(start), Included(end)) => (start.try_into().unwrap(), end.try_into().unwrap()), - (_, _) => unreachable!() - }; - - stream = Box::pin( - stream.map_ok({ - let mut bytes_skipped = 0usize; - let mut bytes_read = 0usize; - - move |chunk| { - debug!("Skipped {}/{} bytes, chunk len {}", bytes_skipped, start, chunk.len()); - let chunk = if bytes_skipped < start { - let need_to_skip = start - bytes_skipped; - if chunk.len() < need_to_skip { - return None - } - debug!("Skipping {} bytes", need_to_skip); - bytes_skipped += need_to_skip; - - chunk.slice(need_to_skip..) - } else { - chunk - }; - - debug!("Read {} bytes from file, {} in this chunk", bytes_read, chunk.len()); - bytes_read += chunk.len(); - - if bytes_read > length { - if bytes_read - length > chunk.len() { - return None - } - debug!("Truncating last {} bytes", bytes_read - length); - return Some(chunk.slice(..chunk.len() - (bytes_read - length))) - } - - Some(chunk) - } - }) - .try_skip_while(|x| std::future::ready(Ok(x.is_none()))) - .try_take_while(|x| std::future::ready(Ok(x.is_some()))) - .map_ok(|x| x.unwrap()) - ); - - return Ok(stream); - } - - /// Read metadata for a file. - /// - /// The default implementation uses the `read_streaming` method - /// and drops the stream containing file content. - async fn metadata(&self, domain: &str, filename: &str) -> Result<Metadata> { - self.read_streaming(domain, filename) - .await - .map(|(meta, stream)| meta) - } - - async fn delete(&self, domain: &str, filename: &str) -> Result<()>; -} |