From 0617663b249f9ca488e5de652108b17d67fbaf45 Mon Sep 17 00:00:00 2001 From: Vika Date: Sat, 29 Jul 2023 21:59:56 +0300 Subject: Moved the entire Kittybox tree into the root --- src/media/mod.rs | 141 +++++++++++++++ src/media/storage/file.rs | 434 ++++++++++++++++++++++++++++++++++++++++++++++ src/media/storage/mod.rs | 177 +++++++++++++++++++ 3 files changed, 752 insertions(+) create mode 100644 src/media/mod.rs create mode 100644 src/media/storage/file.rs create mode 100644 src/media/storage/mod.rs (limited to 'src/media') diff --git a/src/media/mod.rs b/src/media/mod.rs new file mode 100644 index 0000000..71f875e --- /dev/null +++ b/src/media/mod.rs @@ -0,0 +1,141 @@ +use std::convert::TryFrom; + +use axum::{ + extract::{Extension, Host, multipart::Multipart, Path}, + response::{IntoResponse, Response}, + headers::{Header, HeaderValue, IfNoneMatch, HeaderMapExt}, + TypedHeader, +}; +use kittybox_util::error::{MicropubError, ErrorType}; +use kittybox_indieauth::Scope; +use crate::indieauth::{User, backend::AuthBackend}; + +pub mod storage; +use storage::{MediaStore, MediaStoreError, Metadata, ErrorKind}; +pub use storage::file::FileStore; + +impl From for MicropubError { + fn from(err: MediaStoreError) -> Self { + Self { + error: ErrorType::InternalServerError, + error_description: format!("{}", err) + } + } +} + +#[tracing::instrument(skip(blobstore))] +pub(crate) async fn upload( + Extension(blobstore): Extension, + user: User, + mut upload: Multipart +) -> Response { + if !user.check_scope(&Scope::Media) { + return MicropubError { + error: ErrorType::NotAuthorized, + error_description: "Interacting with the media storage requires the \"media\" scope.".to_owned() + }.into_response(); + } + let host = user.me.host().unwrap().to_string() + &user.me.port().map(|i| format!(":{}", i)).unwrap_or_default(); + let field = match upload.next_field().await { + Ok(Some(field)) => field, + Ok(None) => { + return MicropubError { + error: ErrorType::InvalidRequest, + error_description: "Send multipart/form-data with one field named file".to_owned() + }.into_response(); + }, + Err(err) => { + return MicropubError { + error: ErrorType::InternalServerError, + error_description: format!("Error while parsing multipart/form-data: {}", err) + }.into_response(); + }, + }; + let metadata: Metadata = (&field).into(); + match blobstore.write_streaming(&host, metadata, field).await { + Ok(filename) => IntoResponse::into_response(( + axum::http::StatusCode::CREATED, + [ + ("Location", user.me.join( + &format!(".kittybox/media/uploads/{}", filename) + ).unwrap().as_str()) + ] + )), + Err(err) => MicropubError::from(err).into_response() + } +} + +#[tracing::instrument(skip(blobstore))] +pub(crate) async fn serve( + Host(host): Host, + Path(path): Path, + if_none_match: Option>, + Extension(blobstore): Extension +) -> Response { + use axum::http::StatusCode; + tracing::debug!("Searching for file..."); + match blobstore.read_streaming(&host, path.as_str()).await { + Ok((metadata, stream)) => { + tracing::debug!("Metadata: {:?}", metadata); + + let etag = if let Some(etag) = metadata.etag { + let etag = format!("\"{}\"", etag).parse::().unwrap(); + + if let Some(TypedHeader(if_none_match)) = if_none_match { + tracing::debug!("If-None-Match: {:?}", if_none_match); + // If-None-Match is a negative precondition that + // returns 304 when it doesn't match because it + // only matches when file is different + if !if_none_match.precondition_passes(&etag) { + return StatusCode::NOT_MODIFIED.into_response() + } + } + + Some(etag) + } else { None }; + + let mut r = Response::builder(); + { + let headers = r.headers_mut().unwrap(); + headers.insert( + "Content-Type", + HeaderValue::from_str( + metadata.content_type + .as_deref() + .unwrap_or("application/octet-stream") + ).unwrap() + ); + if let Some(length) = metadata.length { + headers.insert( + "Content-Length", + HeaderValue::from_str(&length.to_string()).unwrap() + ); + } + if let Some(etag) = etag { + headers.typed_insert(etag); + } + } + r.body(axum::body::StreamBody::new(stream)) + .unwrap() + .into_response() + }, + Err(err) => match err.kind() { + ErrorKind::NotFound => { + IntoResponse::into_response(StatusCode::NOT_FOUND) + }, + _ => { + tracing::error!("{}", err); + IntoResponse::into_response(StatusCode::INTERNAL_SERVER_ERROR) + } + } + } +} + +#[must_use] +pub fn router(blobstore: S, auth: A) -> axum::Router { + axum::Router::new() + .route("/", axum::routing::post(upload::)) + .route("/uploads/*file", axum::routing::get(serve::)) + .layer(axum::Extension(blobstore)) + .layer(axum::Extension(auth)) +} diff --git a/src/media/storage/file.rs b/src/media/storage/file.rs new file mode 100644 index 0000000..0aaaa3b --- /dev/null +++ b/src/media/storage/file.rs @@ -0,0 +1,434 @@ +use super::{Metadata, ErrorKind, MediaStore, MediaStoreError, Result}; +use async_trait::async_trait; +use std::{path::PathBuf, fmt::Debug}; +use tokio::fs::OpenOptions; +use tokio::io::{BufReader, BufWriter, AsyncWriteExt, AsyncSeekExt}; +use futures::{StreamExt, TryStreamExt}; +use std::ops::{Bound, RangeBounds, Neg}; +use std::pin::Pin; +use sha2::Digest; +use futures::FutureExt; +use tracing::{debug, error}; + +const BUF_CAPACITY: usize = 16 * 1024; + +#[derive(Clone)] +pub struct FileStore { + base: PathBuf, +} + +impl From for MediaStoreError { + fn from(source: tokio::io::Error) -> Self { + Self { + msg: format!("file I/O error: {}", source), + kind: match source.kind() { + std::io::ErrorKind::NotFound => ErrorKind::NotFound, + _ => ErrorKind::Backend + }, + source: Some(Box::new(source)), + } + } +} + +impl FileStore { + pub fn new>(base: T) -> Self { + Self { base: base.into() } + } + + async fn mktemp(&self) -> Result<(PathBuf, BufWriter)> { + kittybox_util::fs::mktemp(&self.base, "temp", 16) + .await + .map(|(name, file)| (name, BufWriter::new(file))) + .map_err(Into::into) + } +} + +#[async_trait] +impl MediaStore for FileStore { + + #[tracing::instrument(skip(self, content))] + async fn write_streaming( + &self, + domain: &str, + mut metadata: Metadata, + mut content: T, + ) -> Result + where + T: tokio_stream::Stream> + Unpin + Send + Debug + { + let (tempfilepath, mut tempfile) = self.mktemp().await?; + debug!("Temporary file opened for storing pending upload: {}", tempfilepath.display()); + let mut hasher = sha2::Sha256::new(); + let mut length: usize = 0; + + while let Some(chunk) = content.next().await { + let chunk = chunk.map_err(|err| MediaStoreError { + kind: ErrorKind::Backend, + source: Some(Box::new(err)), + msg: "Failed to read a data chunk".to_owned() + })?; + debug!("Read {} bytes from the stream", chunk.len()); + length += chunk.len(); + let (write_result, _hasher) = tokio::join!( + { + let chunk = chunk.clone(); + let tempfile = &mut tempfile; + async move { + tempfile.write_all(&*chunk).await + } + }, + { + let chunk = chunk.clone(); + tokio::task::spawn_blocking(move || { + hasher.update(&*chunk); + + hasher + }).map(|r| r.unwrap()) + } + ); + if let Err(err) = write_result { + error!("Error while writing pending upload: {}", err); + drop(tempfile); + // this is just cleanup, nothing fails if it fails + // though temporary files might take up space on the hard drive + // We'll clean them when maintenance time comes + #[allow(unused_must_use)] + { tokio::fs::remove_file(tempfilepath).await; } + return Err(err.into()); + } + hasher = _hasher; + } + // Manually flush the buffer and drop the handle to close the file + tempfile.flush().await?; + tempfile.into_inner().sync_all().await?; + + let hash = hasher.finalize(); + debug!("Pending upload hash: {}", hex::encode(&hash)); + let filename = format!( + "{}/{}/{}/{}/{}", + hex::encode([hash[0]]), + hex::encode([hash[1]]), + hex::encode([hash[2]]), + hex::encode([hash[3]]), + hex::encode(&hash[4..32]) + ); + let domain_str = domain.to_string(); + let filepath = self.base.join(domain_str.as_str()).join(&filename); + let metafilename = filename.clone() + ".json"; + let metapath = self.base.join(domain_str.as_str()).join(&metafilename); + let metatemppath = self.base.join(domain_str.as_str()).join(metafilename + ".tmp"); + metadata.length = std::num::NonZeroUsize::new(length); + metadata.etag = Some(hex::encode(&hash)); + debug!("File path: {}, metadata: {}", filepath.display(), metapath.display()); + { + let parent = filepath.parent().unwrap(); + tokio::fs::create_dir_all(parent).await?; + } + let mut meta = OpenOptions::new() + .create_new(true) + .write(true) + .open(&metatemppath) + .await?; + meta.write_all(&serde_json::to_vec(&metadata).unwrap()).await?; + tokio::fs::rename(tempfilepath, filepath).await?; + tokio::fs::rename(metatemppath, metapath).await?; + Ok(filename) + } + + #[tracing::instrument(skip(self))] + async fn read_streaming( + &self, + domain: &str, + filename: &str, + ) -> Result<(Metadata, Pin> + Send>>)> { + debug!("Domain: {}, filename: {}", domain, filename); + let path = self.base.join(domain).join(filename); + debug!("Path: {}", path.display()); + + let file = OpenOptions::new() + .read(true) + .open(path) + .await?; + let meta = self.metadata(domain, filename).await?; + + Ok((meta, Box::pin( + tokio_util::io::ReaderStream::new( + // TODO: determine if BufReader provides benefit here + // From the logs it looks like we're reading 4KiB at a time + // Buffering file contents seems to double download speed + // How to benchmark this? + BufReader::with_capacity(BUF_CAPACITY, file) + ) + // Sprinkle some salt in form of protective log wrapping + .inspect_ok(|chunk| debug!("Read {} bytes from file", chunk.len())) + ))) + } + + #[tracing::instrument(skip(self))] + async fn metadata(&self, domain: &str, filename: &str) -> Result { + let metapath = self.base.join(domain).join(format!("{}.json", filename)); + debug!("Metadata path: {}", metapath.display()); + + let meta = serde_json::from_slice(&tokio::fs::read(metapath).await?) + .map_err(|err| MediaStoreError { + kind: ErrorKind::Json, + msg: format!("{}", err), + source: Some(Box::new(err)) + })?; + + Ok(meta) + } + + #[tracing::instrument(skip(self))] + async fn stream_range( + &self, + domain: &str, + filename: &str, + range: (Bound, Bound) + ) -> Result> + Send>>> { + let path = self.base.join(format!("{}/{}", domain, filename)); + let metapath = self.base.join(format!("{}/{}.json", domain, filename)); + debug!("Path: {}, metadata: {}", path.display(), metapath.display()); + + let mut file = OpenOptions::new() + .read(true) + .open(path) + .await?; + + let start = match range { + (Bound::Included(bound), _) => { + debug!("Seeking {} bytes forward...", bound); + file.seek(std::io::SeekFrom::Start(bound)).await? + } + (Bound::Excluded(_), _) => unreachable!(), + (Bound::Unbounded, Bound::Included(bound)) => { + // Seek to the end minus the bounded bytes + debug!("Seeking {} bytes back from the end...", bound); + file.seek(std::io::SeekFrom::End(i64::try_from(bound).unwrap().neg())).await? + }, + (Bound::Unbounded, Bound::Unbounded) => 0, + (_, Bound::Excluded(_)) => unreachable!() + }; + + let stream = Box::pin(tokio_util::io::ReaderStream::new(BufReader::with_capacity(BUF_CAPACITY, file))) + .map_ok({ + let mut bytes_read = 0usize; + let len = match range { + (_, Bound::Unbounded) => None, + (Bound::Unbounded, Bound::Included(bound)) => Some(bound), + (_, Bound::Included(bound)) => Some(bound + 1 - start), + (_, Bound::Excluded(_)) => unreachable!() + }; + move |chunk| { + debug!("Read {} bytes from file, {} in this chunk", bytes_read, chunk.len()); + bytes_read += chunk.len(); + if let Some(len) = len.map(|len| len.try_into().unwrap()) { + if bytes_read > len { + if bytes_read - len > chunk.len() { + return None + } + debug!("Truncating last {} bytes", bytes_read - len); + return Some(chunk.slice(..chunk.len() - (bytes_read - len))) + } + } + + Some(chunk) + } + }) + .try_take_while(|x| std::future::ready(Ok(x.is_some()))) + // Will never panic, because the moment the stream yields + // a None, it is considered exhausted. + .map_ok(|x| x.unwrap()); + + return Ok(Box::pin(stream)) + } + + + async fn delete(&self, domain: &str, filename: &str) -> Result<()> { + let path = self.base.join(format!("{}/{}", domain, filename)); + + Ok(tokio::fs::remove_file(path).await?) + } +} + +#[cfg(test)] +mod tests { + use super::{Metadata, FileStore, MediaStore}; + use std::ops::Bound; + use tokio::io::AsyncReadExt; + + #[tokio::test] + #[tracing_test::traced_test] + async fn test_ranges() { + let tempdir = tempfile::tempdir().expect("Failed to create tempdir"); + let store = FileStore::new(tempdir.path()); + + let file: &[u8] = include_bytes!("./file.rs"); + let stream = tokio_stream::iter(file.chunks(100).map(|i| Ok(bytes::Bytes::copy_from_slice(i)))); + let metadata = Metadata { + filename: Some("file.rs".to_string()), + content_type: Some("text/plain".to_string()), + length: None, + etag: None, + }; + + // write through the interface + let filename = store.write_streaming( + "fireburn.ru", + metadata, stream + ).await.unwrap(); + + tracing::debug!("Writing complete."); + + // Ensure the file is there + let content = tokio::fs::read( + tempdir.path() + .join("fireburn.ru") + .join(&filename) + ).await.unwrap(); + assert_eq!(content, file); + + tracing::debug!("Reading range from the start..."); + // try to read range + let range = { + let stream = store.stream_range( + "fireburn.ru", &filename, + (Bound::Included(0), Bound::Included(299)) + ).await.unwrap(); + + let mut reader = tokio_util::io::StreamReader::new(stream); + + let mut buf = Vec::default(); + reader.read_to_end(&mut buf).await.unwrap(); + + buf + }; + + assert_eq!(range.len(), 300); + assert_eq!(range.as_slice(), &file[..=299]); + + tracing::debug!("Reading range from the middle..."); + + let range = { + let stream = store.stream_range( + "fireburn.ru", &filename, + (Bound::Included(150), Bound::Included(449)) + ).await.unwrap(); + + let mut reader = tokio_util::io::StreamReader::new(stream); + + let mut buf = Vec::default(); + reader.read_to_end(&mut buf).await.unwrap(); + + buf + }; + + assert_eq!(range.len(), 300); + assert_eq!(range.as_slice(), &file[150..=449]); + + tracing::debug!("Reading range from the end..."); + let range = { + let stream = store.stream_range( + "fireburn.ru", &filename, + // Note: the `headers` crate parses bounds in a + // non-standard way, where unbounded start actually + // means getting things from the end... + (Bound::Unbounded, Bound::Included(300)) + ).await.unwrap(); + + let mut reader = tokio_util::io::StreamReader::new(stream); + + let mut buf = Vec::default(); + reader.read_to_end(&mut buf).await.unwrap(); + + buf + }; + + assert_eq!(range.len(), 300); + assert_eq!(range.as_slice(), &file[file.len()-300..file.len()]); + + tracing::debug!("Reading the whole file..."); + // try to read range + let range = { + let stream = store.stream_range( + "fireburn.ru", &("/".to_string() + &filename), + (Bound::Unbounded, Bound::Unbounded) + ).await.unwrap(); + + let mut reader = tokio_util::io::StreamReader::new(stream); + + let mut buf = Vec::default(); + reader.read_to_end(&mut buf).await.unwrap(); + + buf + }; + + assert_eq!(range.len(), file.len()); + assert_eq!(range.as_slice(), file); + } + + + #[tokio::test] + #[tracing_test::traced_test] + async fn test_streaming_read_write() { + let tempdir = tempfile::tempdir().expect("Failed to create tempdir"); + let store = FileStore::new(tempdir.path()); + + let file: &[u8] = include_bytes!("./file.rs"); + let stream = tokio_stream::iter(file.chunks(100).map(|i| Ok(bytes::Bytes::copy_from_slice(i)))); + let metadata = Metadata { + filename: Some("style.css".to_string()), + content_type: Some("text/css".to_string()), + length: None, + etag: None, + }; + + // write through the interface + let filename = store.write_streaming( + "fireburn.ru", + metadata, stream + ).await.unwrap(); + println!("{}, {}", filename, tempdir.path() + .join("fireburn.ru") + .join(&filename) + .display()); + let content = tokio::fs::read( + tempdir.path() + .join("fireburn.ru") + .join(&filename) + ).await.unwrap(); + assert_eq!(content, file); + + // check internal metadata format + let meta: Metadata = serde_json::from_slice(&tokio::fs::read( + tempdir.path() + .join("fireburn.ru") + .join(filename.clone() + ".json") + ).await.unwrap()).unwrap(); + assert_eq!(meta.content_type.as_deref(), Some("text/css")); + assert_eq!(meta.filename.as_deref(), Some("style.css")); + assert_eq!(meta.length.map(|i| i.get()), Some(file.len())); + assert!(meta.etag.is_some()); + + // read back the data using the interface + let (metadata, read_back) = { + let (metadata, stream) = store.read_streaming( + "fireburn.ru", + &filename + ).await.unwrap(); + let mut reader = tokio_util::io::StreamReader::new(stream); + + let mut buf = Vec::default(); + reader.read_to_end(&mut buf).await.unwrap(); + + (metadata, buf) + }; + + assert_eq!(read_back, file); + assert_eq!(metadata.content_type.as_deref(), Some("text/css")); + assert_eq!(meta.filename.as_deref(), Some("style.css")); + assert_eq!(meta.length.map(|i| i.get()), Some(file.len())); + assert!(meta.etag.is_some()); + + } +} diff --git a/src/media/storage/mod.rs b/src/media/storage/mod.rs new file mode 100644 index 0000000..020999c --- /dev/null +++ b/src/media/storage/mod.rs @@ -0,0 +1,177 @@ +use async_trait::async_trait; +use axum::extract::multipart::Field; +use tokio_stream::Stream; +use bytes::Bytes; +use serde::{Deserialize, Serialize}; +use std::ops::Bound; +use std::pin::Pin; +use std::fmt::Debug; +use std::num::NonZeroUsize; + +pub mod file; + +#[derive(Debug, Deserialize, Serialize)] +pub struct Metadata { + /// Content type of the file. If None, the content-type is considered undefined. + pub content_type: Option, + /// The original filename that was passed. + pub filename: Option, + /// The recorded length of the file. + pub length: Option, + /// The e-tag of a file. Note: it must be a strong e-tag, for example, a hash. + pub etag: Option, +} +impl From<&Field<'_>> for Metadata { + fn from(field: &Field<'_>) -> Self { + Self { + content_type: field.content_type() + .map(|i| i.to_owned()), + filename: field.file_name() + .map(|i| i.to_owned()), + length: None, + etag: None, + } + } +} + + +#[derive(Debug, Clone, Copy)] +pub enum ErrorKind { + Backend, + Permission, + Json, + NotFound, + Other, +} + +#[derive(Debug)] +pub struct MediaStoreError { + kind: ErrorKind, + source: Option>, + msg: String, +} + +impl MediaStoreError { + pub fn kind(&self) -> ErrorKind { + self.kind + } +} + +impl std::error::Error for MediaStoreError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + self.source + .as_ref() + .map(|i| i.as_ref() as &dyn std::error::Error) + } +} + +impl std::fmt::Display for MediaStoreError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}: {}", + match self.kind { + ErrorKind::Backend => "media storage backend error", + ErrorKind::Permission => "permission denied", + ErrorKind::Json => "failed to parse json", + ErrorKind::NotFound => "blob not found", + ErrorKind::Other => "unknown media storage error", + }, + self.msg + ) + } +} + +pub type Result = std::result::Result; + +#[async_trait] +pub trait MediaStore: 'static + Send + Sync + Clone { + async fn write_streaming( + &self, + domain: &str, + metadata: Metadata, + content: T, + ) -> Result + where + T: tokio_stream::Stream> + Unpin + Send + Debug; + + async fn read_streaming( + &self, + domain: &str, + filename: &str, + ) -> Result<(Metadata, Pin> + Send>>)>; + + async fn stream_range( + &self, + domain: &str, + filename: &str, + range: (Bound, Bound) + ) -> Result> + Send>>> { + use futures::stream::TryStreamExt; + use tracing::debug; + let (metadata, mut stream) = self.read_streaming(domain, filename).await?; + let length = metadata.length.unwrap().get(); + + use Bound::*; + let (start, end): (usize, usize) = match range { + (Unbounded, Unbounded) => return Ok(stream), + (Included(start), Unbounded) => (start.try_into().unwrap(), length - 1), + (Unbounded, Included(end)) => (length - usize::try_from(end).unwrap(), length - 1), + (Included(start), Included(end)) => (start.try_into().unwrap(), end.try_into().unwrap()), + (_, _) => unreachable!() + }; + + stream = Box::pin( + stream.map_ok({ + let mut bytes_skipped = 0usize; + let mut bytes_read = 0usize; + + move |chunk| { + debug!("Skipped {}/{} bytes, chunk len {}", bytes_skipped, start, chunk.len()); + let chunk = if bytes_skipped < start { + let need_to_skip = start - bytes_skipped; + if chunk.len() < need_to_skip { + return None + } + debug!("Skipping {} bytes", need_to_skip); + bytes_skipped += need_to_skip; + + chunk.slice(need_to_skip..) + } else { + chunk + }; + + debug!("Read {} bytes from file, {} in this chunk", bytes_read, chunk.len()); + bytes_read += chunk.len(); + + if bytes_read > length { + if bytes_read - length > chunk.len() { + return None + } + debug!("Truncating last {} bytes", bytes_read - length); + return Some(chunk.slice(..chunk.len() - (bytes_read - length))) + } + + Some(chunk) + } + }) + .try_skip_while(|x| std::future::ready(Ok(x.is_none()))) + .try_take_while(|x| std::future::ready(Ok(x.is_some()))) + .map_ok(|x| x.unwrap()) + ); + + return Ok(stream); + } + + /// Read metadata for a file. + /// + /// The default implementation uses the `read_streaming` method + /// and drops the stream containing file content. + async fn metadata(&self, domain: &str, filename: &str) -> Result { + self.read_streaming(domain, filename) + .await + .map(|(meta, stream)| meta) + } + + async fn delete(&self, domain: &str, filename: &str) -> Result<()>; +} -- cgit 1.4.1