From 7f23ec84bc05c236c1bf40c2f0d72412af711516 Mon Sep 17 00:00:00 2001 From: Vika Date: Thu, 7 Jul 2022 00:32:33 +0300 Subject: treewide: rewrite using Axum Axum has streaming bodies and allows to write simpler code. It also helps enforce stronger types and looks much more neat. This allows me to progress on the media endpoint and add streaming reads and writes to the MediaStore trait. Metrics are temporarily not implemented. Everything else was preserved, and the tests still pass, after adjusting for new calling conventions. TODO: create method routers for protocol endpoints --- kittybox-rs/src/media/mod.rs | 50 +++++++++++++--------------- kittybox-rs/src/media/storage/file.rs | 61 +++++++++++++++++++++++++++++++++++ kittybox-rs/src/media/storage/mod.rs | 53 ++++++++++++++++++++++++++++++ 3 files changed, 137 insertions(+), 27 deletions(-) create mode 100644 kittybox-rs/src/media/storage/file.rs create mode 100644 kittybox-rs/src/media/storage/mod.rs (limited to 'kittybox-rs/src/media') diff --git a/kittybox-rs/src/media/mod.rs b/kittybox-rs/src/media/mod.rs index 0d46e0c..d18cf34 100644 --- a/kittybox-rs/src/media/mod.rs +++ b/kittybox-rs/src/media/mod.rs @@ -1,27 +1,25 @@ -use futures_util::StreamExt; use bytes::buf::Buf; -use warp::{Filter, Rejection, Reply, multipart::{FormData, Part}}; - -pub fn query() -> impl Filter + Clone { - warp::get() - .and(crate::util::require_host()) - .map(|host| "media endpoint query...") -} +use futures_util::StreamExt; +use axum::{ + extract::{Host, Extension, Multipart}, + response::{Response, IntoResponse, Json} +}; -pub fn options() -> impl Filter + Clone { - warp::options() - .map(|| warp::reply::json::>(&None)) - .with(warp::reply::with::header("Allow", "GET, POST")) -} +pub mod storage; +use storage::{MediaStore, MediaStoreError}; -pub fn upload() -> impl Filter + Clone { +/*pub fn upload() -> impl Filter + Clone { warp::post() .and(crate::util::require_host()) - .and(warp::multipart::form().max_length(1024*1024*150/*mb*/)) + .and(warp::multipart::form().max_length(1024 * 1024 * 150 /*mb*/)) .and_then(|host, mut form: FormData| async move { // TODO get rid of the double unwrap() here let file: Part = form.next().await.unwrap().unwrap(); - log::debug!("Uploaded: {:?}, type: {:?}", file.filename(), file.content_type()); + log::debug!( + "Uploaded: {:?}, type: {:?}", + file.filename(), + file.content_type() + ); let mut data = file.stream(); while let Some(buf) = data.next().await { @@ -29,18 +27,16 @@ pub fn upload() -> impl Filter + Clo log::debug!("buffer length: {:?}", buf.map(|b| b.remaining())); } Ok::<_, warp::Rejection>(warp::reply::with_header( - warp::reply::with_status( - "", - warp::http::StatusCode::CREATED - ), + warp::reply::with_status("", warp::http::StatusCode::CREATED), "Location", - "./awoo.png" + "./awoo.png", )) }) -} - -pub fn media() -> impl Filter + Clone { - upload() - .or(query()) - .or(options()) +}*/ +pub async fn upload( + Host(host): Host, + upload: Multipart, + Extension(db): Extension +) -> Response { + todo!() } diff --git a/kittybox-rs/src/media/storage/file.rs b/kittybox-rs/src/media/storage/file.rs new file mode 100644 index 0000000..8c0ddf0 --- /dev/null +++ b/kittybox-rs/src/media/storage/file.rs @@ -0,0 +1,61 @@ +use super::{ErrorKind, MediaStore, MediaStoreError, Result}; +use async_trait::async_trait; +use std::path::PathBuf; +use tokio::fs::OpenOptions; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; + +#[derive(Clone)] +pub struct FileStore { + base: PathBuf, +} + +impl From for MediaStoreError { + fn from(source: tokio::io::Error) -> Self { + Self { + source: Some(Box::new(source)), + msg: "file I/O error".to_owned(), + kind: ErrorKind::Backend, + } + } +} + +#[async_trait] +impl MediaStore for FileStore { + async fn write_streaming( + &self, domain: url::Host, filename: &str, + content: axum::extract::multipart::Field<'_> + ) -> Result<()> { + todo!() + } + + async fn read_streaming(&self, domain: url::Host, filename: &str) -> Result>> { + todo!() + } + + async fn write(&self, domain: url::Host, filename: &str, content: &[u8]) -> Result<()> { + let path = self.base.join(format!("{}/{}", domain, filename)); + + let mut file = OpenOptions::new() + .create_new(true) + .write(true) + .open(path) + .await?; + + Ok(file.write_all(content).await?) + } + + async fn read(&self, domain: url::Host, filename: &str) -> Result> { + let path = self.base.join(format!("{}/{}", domain, filename)); + + let mut file = OpenOptions::new().read(true).open(path).await?; + + let mut buf: Vec = Vec::default(); + file.read_to_end(&mut buf).await?; + + Ok(buf) + } + + async fn delete(&self, domain: url::Host, filename: &str) -> Result<()> { + todo!() + } +} diff --git a/kittybox-rs/src/media/storage/mod.rs b/kittybox-rs/src/media/storage/mod.rs new file mode 100644 index 0000000..e9b01f9 --- /dev/null +++ b/kittybox-rs/src/media/storage/mod.rs @@ -0,0 +1,53 @@ +use async_trait::async_trait; + +pub mod file; + +#[derive(Debug, Clone, Copy)] +pub enum ErrorKind { + Backend, + Permission, + Conflict, + Other, +} + +#[derive(Debug)] +pub struct MediaStoreError { + kind: ErrorKind, + source: Option>, + msg: String, +} + +impl std::error::Error for MediaStoreError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + self.source + .as_ref() + .map(|i| i.as_ref() as &dyn std::error::Error) + } +} + +impl std::fmt::Display for MediaStoreError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}: {}", + match self.kind { + ErrorKind::Backend => "media storage backend error", + ErrorKind::Permission => "permission denied", + ErrorKind::Conflict => "conflict with existing data", + ErrorKind::Other => "unknown media storage error", + }, + self.msg + ) + } +} + +pub type Result = std::result::Result; + +#[async_trait] +pub trait MediaStore: 'static + Send + Sync + Clone { + async fn write_streaming(&self, domain: url::Host, filename: &str, content: axum::extract::multipart::Field<'_>) -> Result<()>; + async fn write(&self, domain: url::Host, filename: &str, content: &[u8]) -> Result<()>; + async fn read_streaming(&self, domain: url::Host, filename: &str) -> Result>>; + async fn read(&self, domain: url::Host, filename: &str) -> Result>; + async fn delete(&self, domain: url::Host, filename: &str) -> Result<()>; +} -- cgit 1.4.1