From 1031d495d5b78d9b19dcdc414b6d7b0daf313bb2 Mon Sep 17 00:00:00 2001 From: Vika Date: Sun, 10 Jul 2022 00:55:20 +0300 Subject: media: media endpoint PoC Supported features: - Streaming upload - Content-addressed storage - Metadata - MIME type (taken from Content-Type) - Length (I could use stat() for this one tho) - filename (for Content-Disposition: attachment, WIP) --- kittybox-rs/src/main.rs | 94 +++------------ kittybox-rs/src/media/mod.rs | 126 +++++++++++++++----- kittybox-rs/src/media/storage/file.rs | 215 +++++++++++++++++++++++++++++----- kittybox-rs/src/media/storage/mod.rs | 60 ++++++++-- kittybox-rs/src/micropub/mod.rs | 6 +- 5 files changed, 347 insertions(+), 154 deletions(-) (limited to 'kittybox-rs/src') diff --git a/kittybox-rs/src/main.rs b/kittybox-rs/src/main.rs index ece31df..6cc8c1d 100644 --- a/kittybox-rs/src/main.rs +++ b/kittybox-rs/src/main.rs @@ -100,84 +100,16 @@ async fn main() { } }; - let endpoints = kittybox::frontend::IndiewebEndpoints { - authorization_endpoint: authorization_endpoint.to_string(), - token_endpoint: token_endpoint.to_string(), - webmention: None, - microsub: None, + let blobstore = { + let variable = std::env::var("BLOBSTORE_URI") + .unwrap(); + let folder = variable + .strip_prefix("file://") + .unwrap(); + let path = std::path::PathBuf::from(folder); + kittybox::media::storage::file::FileStore::new(path) }; - /*let micropub = warp::path("micropub") - .and(warp::path::end() - .and(kittybox::micropub::micropub( - database.clone(), - token_endpoint.to_string(), - http.clone() - )) - .or(warp::get() - .and(warp::path("client")) - .and(warp::path::end()) - .map(|| warp::reply::html(kittybox::MICROPUB_CLIENT)))); - - let static_files = warp::path("static") - .and(kittybox::frontend::static_files()); - - let media = warp::path("media") - .and(warp::path::end() - .and(kittybox::media::media()) - .or(kittybox::util::require_host() - .and(warp::path::param()) - .map(|_host: Authority, path: String| format!("media file {}", path)))); - - let technical = warp::path(".kittybox") - .and(warp::path("onboarding") - .and(warp::path::end()) - .and(kittybox::frontend::onboarding( - database.clone(), - endpoints.clone(), - http.clone() - )) - .or(warp::path("health") - .and(warp::path::end()) - .and(warp::get()) - // TODO make healthcheck report on database status too - .map(|| "OK")) - .or(warp::path("metrics") - .and(warp::path::end()) - .and(warp::get()) - .map(kittybox::metrics::gather)) - .or(micropub) - .or(media) - .or(static_files) - .or(warp::path("login") - .and(warp::path("callback") - .map(|| "callback!") - // TODO form on GET and handler on POST - .or(warp::path::end() - .map(|| "login page!")) - ) - ) - ); - - // TODO prettier error response - let coffee = warp::path("coffee") - .map(|| warp::reply::with_status("I'm a teapot!", warp::http::StatusCode::IM_A_TEAPOT)); - - et catchall = ; - - let app = homepage - .or(technical) - .or(coffee) - .or(catchall) - .with(warp::log("kittybox")) - .with(kittybox::metrics::metrics(vec![ - ".kittybox".to_string() - ])) - ; - - let svc = warp::service(app); - */ - let svc = axum::Router::new() .route( "/", @@ -231,9 +163,14 @@ async fn main() { axum::Router::new() .route( "/", - axum::routing::get(|| async { todo!() }).post(|| async { todo!() }), + axum::routing::get(|| async { todo!() }) + .post( + kittybox::media::upload:: + ), ) - .route("/:filename", axum::routing::get(|| async { todo!() })), + .route("/uploads/*file", axum::routing::get( + kittybox::media::serve:: + )), ) .route( "/.kittybox/static/:path", @@ -247,6 +184,7 @@ async fn main() { .layer(axum::Extension(kittybox::indieauth::TokenEndpoint( token_endpoint, ))) + .layer(axum::Extension(blobstore)) .layer( tower::ServiceBuilder::new() .layer(tower_http::trace::TraceLayer::new_for_http()) diff --git a/kittybox-rs/src/media/mod.rs b/kittybox-rs/src/media/mod.rs index 0d26f92..1bf3958 100644 --- a/kittybox-rs/src/media/mod.rs +++ b/kittybox-rs/src/media/mod.rs @@ -1,42 +1,104 @@ use axum::{ - extract::{Extension, Host, Multipart}, - response::{IntoResponse, Json, Response}, + extract::{Extension, Host, multipart::{Multipart, MultipartError}, Path}, + response::{IntoResponse, Response}, headers::HeaderValue, }; -use bytes::buf::Buf; -use futures_util::StreamExt; +use crate::{micropub::{MicropubError, ErrorType}, indieauth::User}; pub mod storage; -use storage::{MediaStore, MediaStoreError}; +use storage::{MediaStore, MediaStoreError, Metadata, ErrorKind}; +pub use storage::file::FileStore; -/*pub fn upload() -> impl Filter + Clone { - warp::post() - .and(crate::util::require_host()) - .and(warp::multipart::form().max_length(1024 * 1024 * 150 /*mb*/)) - .and_then(|host, mut form: FormData| async move { - // TODO get rid of the double unwrap() here - let file: Part = form.next().await.unwrap().unwrap(); - log::debug!( - "Uploaded: {:?}, type: {:?}", - file.filename(), - file.content_type() - ); +impl From for MicropubError { + fn from(err: MultipartError) -> Self { + Self { + error: ErrorType::InvalidRequest, + error_description: format!("multipart/form-data error: {}", err) + } + } +} +impl From for MicropubError { + fn from(err: MediaStoreError) -> Self { + Self { + error: ErrorType::InternalServerError, + error_description: format!("{}", err) + } + } +} - let mut data = file.stream(); - while let Some(buf) = data.next().await { - // TODO save it into a file - log::debug!("buffer length: {:?}", buf.map(|b| b.remaining())); - } - Ok::<_, warp::Rejection>(warp::reply::with_header( - warp::reply::with_status("", warp::http::StatusCode::CREATED), - "Location", - "./awoo.png", - )) - }) -}*/ +#[tracing::instrument(skip(blobstore))] pub async fn upload( + mut upload: Multipart, + Extension(blobstore): Extension, + user: User +) -> Response { + if !user.check_scope("media") { + return MicropubError { + error: ErrorType::NotAuthorized, + error_description: "Interacting with the media storage requires the \"media\" scope.".to_owned() + }.into_response(); + } + let host = user.me.host().unwrap().to_string() + &user.me.port().map(|i| format!(":{}", i)).unwrap_or_default(); + let field = match upload.next_field().await { + Ok(Some(field)) => field, + Ok(None) => { + return MicropubError { + error: ErrorType::InvalidRequest, + error_description: "Send multipart/form-data with one field named file".to_owned() + }.into_response(); + }, + Err(err) => { + return MicropubError::from(err).into_response(); + }, + }; + let metadata: Metadata = (&field).into(); + match blobstore.write_streaming(&host, metadata, field).await { + Ok(filename) => IntoResponse::into_response(( + axum::http::StatusCode::CREATED, + [ + ("Location", user.me.join( + &format!(".kittybox/media/uploads/{}", filename) + ).unwrap().as_str()) + ] + )), + Err(err) => MicropubError::from(err).into_response() + } +} + +#[tracing::instrument(skip(blobstore))] +pub async fn serve( Host(host): Host, - upload: Multipart, - Extension(db): Extension, + Path(path): Path, + Extension(blobstore): Extension ) -> Response { - todo!() + use axum::http::StatusCode; + tracing::debug!("Searching for file..."); + match blobstore.read_streaming(&host, path.as_str()).await { + Ok((metadata, stream)) => { + tracing::debug!("Metadata: {:?}", metadata); + let mut r = Response::builder(); + { + let headers = r.headers_mut().unwrap(); + headers.insert( + "Content-Type", + HeaderValue::from_str(&metadata.content_type).unwrap() + ); + if let Some(length) = metadata.length { + headers.insert( + "Content-Length", + HeaderValue::from_str(&length.to_string()).unwrap() + ); + } + } + r.body(axum::body::StreamBody::new(stream)).unwrap().into_response() + }, + Err(err) => match err.kind() { + ErrorKind::NotFound => { + IntoResponse::into_response(StatusCode::NOT_FOUND) + }, + _ => { + tracing::error!("{}", err); + IntoResponse::into_response(StatusCode::INTERNAL_SERVER_ERROR) + } + } + } } diff --git a/kittybox-rs/src/media/storage/file.rs b/kittybox-rs/src/media/storage/file.rs index ea1f010..176c2f4 100644 --- a/kittybox-rs/src/media/storage/file.rs +++ b/kittybox-rs/src/media/storage/file.rs @@ -1,8 +1,12 @@ -use super::{ErrorKind, MediaStore, MediaStoreError, Result}; +use super::{Metadata, ErrorKind, MediaStore, MediaStoreError, Result}; use async_trait::async_trait; use std::path::PathBuf; use tokio::fs::OpenOptions; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::io::AsyncWriteExt; +use futures::StreamExt; +use std::pin::Pin; +use sha2::Digest; +use futures::FutureExt; #[derive(Clone)] pub struct FileStore { @@ -12,56 +16,209 @@ pub struct FileStore { impl From for MediaStoreError { fn from(source: tokio::io::Error) -> Self { Self { + msg: format!("file I/O error: {}", source), + kind: match source.kind() { + std::io::ErrorKind::NotFound => ErrorKind::NotFound, + _ => ErrorKind::Backend + }, source: Some(Box::new(source)), - msg: "file I/O error".to_owned(), - kind: ErrorKind::Backend, + } + } +} + + +impl FileStore { + pub fn new>(base: T) -> Self { + Self { base: base.into() } + } + + async fn mktemp(&self) -> Result<(PathBuf, tokio::fs::File)> { + use rand::{Rng, distributions::Alphanumeric}; + tokio::fs::create_dir_all(self.base.as_path()).await?; + loop { + let filename = self.base.join(format!("temp.{}", { + let string = rand::thread_rng() + .sample_iter(&Alphanumeric) + .take(16) + .collect::>(); + String::from_utf8(string).unwrap() + })); + + match OpenOptions::new() + .create_new(true) + .write(true) + .open(&filename) + .await + { + Ok(file) => return Ok((filename, file)), + Err(err) => match err.kind() { + std::io::ErrorKind::AlreadyExists => continue, + _ => return Err(err.into()) + } + } } } } #[async_trait] impl MediaStore for FileStore { - async fn write_streaming( + async fn write_streaming( &self, - domain: url::Host, - filename: &str, - content: axum::extract::multipart::Field<'_>, - ) -> Result<()> { - todo!() + domain: &str, + mut metadata: Metadata, + mut content: T, + ) -> Result + where + T: tokio_stream::Stream> + Unpin + Send + { + let (tempfilepath, mut tempfile) = self.mktemp().await?; + let mut hasher = sha2::Sha256::new(); + let mut length: usize = 0; + + while let Some(chunk) = content.next().await { + let chunk = std::sync::Arc::new(chunk.map_err(|err| MediaStoreError { + kind: ErrorKind::Backend, + source: Some(Box::new(err)), + msg: "Failed to read a data chunk".to_owned() + })?); + length += chunk.len(); + let (write_result, _hasher) = tokio::join!( + tempfile.write_all(&*chunk), + { + let chunk = chunk.clone(); + tokio::task::spawn_blocking(move || { + hasher.update(&*chunk); + + hasher + }).map(|r| r.unwrap()) + } + ); + if let Err(err) = write_result { + drop(tempfile); + // this is just cleanup, nothing fails if it fails + // though temporary files might take up space on the hard drive + // We'll clean them when maintenance time comes + #[allow(unused_must_use)] + { tokio::fs::remove_file(tempfilepath).await; } + return Err(err.into()); + } + hasher = _hasher; + } + + let hash = hasher.finalize(); + let filename = format!( + "{}/{}/{}/{}/{}", + hex::encode([hash[0]]), + hex::encode([hash[1]]), + hex::encode([hash[2]]), + hex::encode([hash[3]]), + hex::encode(&hash[4..32]) + ); + metadata.length = Some(length); + let domain_str = domain.to_string(); + let filepath = self.base.join(domain_str.as_str()).join(&filename); + let metafilename = filename.clone() + ".json"; + let metapath = self.base.join(domain_str.as_str()).join(metafilename); + { + let parent = filepath.parent().unwrap(); + tokio::fs::create_dir_all(parent).await?; + } + let mut meta = OpenOptions::new() + .create_new(true) + .write(true) + .open(&metapath) + .await?; + meta.write_all(&serde_json::to_vec(&metadata).unwrap()).await?; + tokio::fs::rename(tempfilepath, filepath).await?; + Ok(filename) } async fn read_streaming( &self, - domain: url::Host, + domain: &str, filename: &str, - ) -> Result>> { - todo!() - } - - async fn write(&self, domain: url::Host, filename: &str, content: &[u8]) -> Result<()> { - let path = self.base.join(format!("{}/{}", domain, filename)); + ) -> Result<(Metadata, Pin> + Send>>)> { + let path = self.base.join(format!("{}{}", domain, filename)); + let metapath = self.base.join(format!("{}{}.json", domain, filename)); + tracing::debug!("Path: {}, metadata: {}", path.display(), metapath.display()); - let mut file = OpenOptions::new() - .create_new(true) - .write(true) + let file = OpenOptions::new() + .read(true) .open(path) .await?; + let meta = serde_json::from_slice(&tokio::fs::read(metapath).await?) + .map_err(|err| MediaStoreError { + kind: ErrorKind::Json, + msg: format!("{}", err), + source: Some(Box::new(err)) + })?; - Ok(file.write_all(content).await?) + Ok((meta, Box::pin(tokio_util::io::ReaderStream::new(file)))) } - async fn read(&self, domain: url::Host, filename: &str) -> Result> { + async fn delete(&self, domain: &str, filename: &str) -> Result<()> { let path = self.base.join(format!("{}/{}", domain, filename)); - let mut file = OpenOptions::new().read(true).open(path).await?; + Ok(tokio::fs::remove_file(path).await?) + } +} - let mut buf: Vec = Vec::default(); - file.read_to_end(&mut buf).await?; +#[cfg(test)] +mod tests { + use super::{Metadata, FileStore, MediaStore}; + use tokio::io::AsyncReadExt; - Ok(buf) - } + #[tokio::test] + async fn test_streaming_read_write() { + let tempdir = tempdir::TempDir::new("file").expect("Failed to create tempdir"); + let store = FileStore::new(tempdir.path()); + + let file: &[u8] = include_bytes!("../../../../README.md"); + let stream = tokio_stream::iter(file.chunks(100).map(|i| Ok(bytes::Bytes::copy_from_slice(i)))); + let metadata = Metadata { + filename: Some("README.md".to_string()), + content_type: "text/markdown".to_string(), + length: None + }; + + let filename = store.write_streaming( + "fireburn.ru", + metadata, stream + ).await.unwrap(); + + let content = tokio::fs::read( + tempdir.path() + .join("fireburn.ru") + .join(&filename) + ).await.unwrap(); + assert_eq!(content, file); + + let meta: Metadata = serde_json::from_slice(&tokio::fs::read( + tempdir.path() + .join("fireburn.ru") + .join(filename.clone() + ".json") + ).await.unwrap()).unwrap(); + assert_eq!(&meta.content_type, "text/markdown"); + assert_eq!(meta.filename.as_deref(), Some("README.md")); + assert_eq!(meta.length, Some(file.len())); + + let (metadata, read_back) = { + let (metadata, stream) = store.read_streaming( + "fireburn.ru", + &filename + ).await.unwrap(); + let mut reader = tokio_util::io::StreamReader::new(stream); + + let mut buf = Vec::default(); + reader.read_to_end(&mut buf).await.unwrap(); + + (metadata, buf) + }; + + assert_eq!(read_back, file); + assert_eq!(&metadata.content_type, "text/markdown"); + assert_eq!(meta.filename.as_deref(), Some("README.md")); + assert_eq!(meta.length, Some(file.len())); - async fn delete(&self, domain: url::Host, filename: &str) -> Result<()> { - todo!() } } diff --git a/kittybox-rs/src/media/storage/mod.rs b/kittybox-rs/src/media/storage/mod.rs index ba880ab..cb8b38f 100644 --- a/kittybox-rs/src/media/storage/mod.rs +++ b/kittybox-rs/src/media/storage/mod.rs @@ -1,12 +1,39 @@ use async_trait::async_trait; +use axum::extract::multipart::Field; +use tokio_stream::Stream; +use bytes::Bytes; +use serde::{Deserialize, Serialize}; +use std::pin::Pin; pub mod file; +#[derive(Debug, Deserialize, Serialize)] +pub struct Metadata { + pub(super) content_type: String, + pub(super) filename: Option, + pub(super) length: Option +} +impl From<&Field<'_>> for Metadata { + fn from(field: &Field<'_>) -> Self { + Self { + content_type: field.content_type() + .map(|i| i.to_owned()) + .or_else(|| Some("application/octet-stream".to_owned())) + .unwrap(), + filename: field.file_name() + .map(|i| i.to_owned()), + length: None + } + } +} + + #[derive(Debug, Clone, Copy)] pub enum ErrorKind { Backend, Permission, - Conflict, + Json, + NotFound, Other, } @@ -17,6 +44,12 @@ pub struct MediaStoreError { msg: String, } +impl MediaStoreError { + pub fn kind(&self) -> ErrorKind { + self.kind + } +} + impl std::error::Error for MediaStoreError { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { self.source @@ -33,7 +66,8 @@ impl std::fmt::Display for MediaStoreError { match self.kind { ErrorKind::Backend => "media storage backend error", ErrorKind::Permission => "permission denied", - ErrorKind::Conflict => "conflict with existing data", + ErrorKind::Json => "failed to parse json", + ErrorKind::NotFound => "blob not found", ErrorKind::Other => "unknown media storage error", }, self.msg @@ -45,18 +79,20 @@ pub type Result = std::result::Result; #[async_trait] pub trait MediaStore: 'static + Send + Sync + Clone { - async fn write_streaming( + async fn write_streaming( &self, - domain: url::Host, - filename: &str, - content: axum::extract::multipart::Field<'_>, - ) -> Result<()>; - async fn write(&self, domain: url::Host, filename: &str, content: &[u8]) -> Result<()>; + domain: &str, + metadata: Metadata, + content: T, + ) -> Result + where + T: tokio_stream::Stream> + Unpin + Send; + async fn read_streaming( &self, - domain: url::Host, + domain: &str, filename: &str, - ) -> Result>>; - async fn read(&self, domain: url::Host, filename: &str) -> Result>; - async fn delete(&self, domain: url::Host, filename: &str) -> Result<()>; + ) -> Result<(Metadata, Pin> + Send>>)>; + + async fn delete(&self, domain: &str, filename: &str) -> Result<()>; } diff --git a/kittybox-rs/src/micropub/mod.rs b/kittybox-rs/src/micropub/mod.rs index d8a84e6..8550849 100644 --- a/kittybox-rs/src/micropub/mod.rs +++ b/kittybox-rs/src/micropub/mod.rs @@ -28,7 +28,7 @@ pub struct MicropubQuery { #[derive(Serialize, Deserialize, PartialEq, Debug)] #[serde(rename_all = "snake_case")] -enum ErrorType { +pub(crate) enum ErrorType { AlreadyExists, Forbidden, InternalServerError, @@ -41,8 +41,8 @@ enum ErrorType { #[derive(Serialize, Deserialize, Debug)] pub(crate) struct MicropubError { - error: ErrorType, - error_description: String, + pub(crate) error: ErrorType, + pub(crate) error_description: String, } impl From for MicropubError { -- cgit 1.4.1