about summary refs log tree commit diff
path: root/kittybox-rs/src/media
diff options
context:
space:
mode:
Diffstat (limited to 'kittybox-rs/src/media')
-rw-r--r--kittybox-rs/src/media/mod.rs126
-rw-r--r--kittybox-rs/src/media/storage/file.rs215
-rw-r--r--kittybox-rs/src/media/storage/mod.rs60
3 files changed, 328 insertions, 73 deletions
diff --git a/kittybox-rs/src/media/mod.rs b/kittybox-rs/src/media/mod.rs
index 0d26f92..1bf3958 100644
--- a/kittybox-rs/src/media/mod.rs
+++ b/kittybox-rs/src/media/mod.rs
@@ -1,42 +1,104 @@
 use axum::{
-    extract::{Extension, Host, Multipart},
-    response::{IntoResponse, Json, Response},
+    extract::{Extension, Host, multipart::{Multipart, MultipartError}, Path},
+    response::{IntoResponse, Response}, headers::HeaderValue,
 };
-use bytes::buf::Buf;
-use futures_util::StreamExt;
+use crate::{micropub::{MicropubError, ErrorType}, indieauth::User};
 
 pub mod storage;
-use storage::{MediaStore, MediaStoreError};
+use storage::{MediaStore, MediaStoreError, Metadata, ErrorKind};
+pub use storage::file::FileStore;
 
-/*pub fn upload() -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone {
-    warp::post()
-        .and(crate::util::require_host())
-        .and(warp::multipart::form().max_length(1024 * 1024 * 150 /*mb*/))
-        .and_then(|host, mut form: FormData| async move {
-            // TODO get rid of the double unwrap() here
-            let file: Part = form.next().await.unwrap().unwrap();
-            log::debug!(
-                "Uploaded: {:?}, type: {:?}",
-                file.filename(),
-                file.content_type()
-            );
+impl From<MultipartError> for MicropubError {
+    fn from(err: MultipartError) -> Self {
+        Self {
+            error: ErrorType::InvalidRequest,
+            error_description: format!("multipart/form-data error: {}", err)
+        }
+    }
+}
+impl From<MediaStoreError> for MicropubError {
+    fn from(err: MediaStoreError) -> Self {
+        Self {
+            error: ErrorType::InternalServerError,
+            error_description: format!("{}", err)
+        }
+    }
+}
 
-            let mut data = file.stream();
-            while let Some(buf) = data.next().await {
-                // TODO save it into a file
-                log::debug!("buffer length: {:?}", buf.map(|b| b.remaining()));
-            }
-            Ok::<_, warp::Rejection>(warp::reply::with_header(
-                warp::reply::with_status("", warp::http::StatusCode::CREATED),
-                "Location",
-                "./awoo.png",
-            ))
-        })
-}*/
+#[tracing::instrument(skip(blobstore))]
 pub async fn upload<S: MediaStore>(
+    mut upload: Multipart,
+    Extension(blobstore): Extension<S>,
+    user: User
+) -> Response {
+    if !user.check_scope("media") {
+        return MicropubError {
+            error: ErrorType::NotAuthorized,
+            error_description: "Interacting with the media storage requires the \"media\" scope.".to_owned()
+        }.into_response();
+    }
+    let host = user.me.host().unwrap().to_string() + &user.me.port().map(|i| format!(":{}", i)).unwrap_or_default();
+    let field = match upload.next_field().await {
+        Ok(Some(field)) => field,
+        Ok(None) => {
+            return MicropubError {
+                error: ErrorType::InvalidRequest,
+                error_description: "Send multipart/form-data with one field named file".to_owned()
+            }.into_response();
+        },
+        Err(err) => {
+            return MicropubError::from(err).into_response();
+        },
+    };
+    let metadata: Metadata = (&field).into();
+    match blobstore.write_streaming(&host, metadata, field).await {
+        Ok(filename) => IntoResponse::into_response((
+            axum::http::StatusCode::CREATED,
+            [
+                ("Location", user.me.join(
+                    &format!(".kittybox/media/uploads/{}", filename)
+                ).unwrap().as_str())
+            ]
+        )),
+        Err(err) => MicropubError::from(err).into_response()
+    }
+}
+
+#[tracing::instrument(skip(blobstore))]
+pub async fn serve<S: MediaStore>(
     Host(host): Host,
-    upload: Multipart,
-    Extension(db): Extension<S>,
+    Path(path): Path<String>,
+    Extension(blobstore): Extension<S>
 ) -> Response {
-    todo!()
+    use axum::http::StatusCode;
+    tracing::debug!("Searching for file...");
+    match blobstore.read_streaming(&host, path.as_str()).await {
+        Ok((metadata, stream)) => {
+            tracing::debug!("Metadata: {:?}", metadata);
+            let mut r = Response::builder();
+            {
+                let headers = r.headers_mut().unwrap();
+                headers.insert(
+                    "Content-Type",
+                    HeaderValue::from_str(&metadata.content_type).unwrap()
+                );
+                if let Some(length) = metadata.length {
+                    headers.insert(
+                        "Content-Length",
+                        HeaderValue::from_str(&length.to_string()).unwrap()
+                    );
+                }
+            }
+            r.body(axum::body::StreamBody::new(stream)).unwrap().into_response()
+        },
+        Err(err) => match err.kind() {
+            ErrorKind::NotFound => {
+                IntoResponse::into_response(StatusCode::NOT_FOUND)
+            },
+            _ => {
+                tracing::error!("{}", err);
+                IntoResponse::into_response(StatusCode::INTERNAL_SERVER_ERROR)
+            }
+        }
+    }
 }
diff --git a/kittybox-rs/src/media/storage/file.rs b/kittybox-rs/src/media/storage/file.rs
index ea1f010..176c2f4 100644
--- a/kittybox-rs/src/media/storage/file.rs
+++ b/kittybox-rs/src/media/storage/file.rs
@@ -1,8 +1,12 @@
-use super::{ErrorKind, MediaStore, MediaStoreError, Result};
+use super::{Metadata, ErrorKind, MediaStore, MediaStoreError, Result};
 use async_trait::async_trait;
 use std::path::PathBuf;
 use tokio::fs::OpenOptions;
-use tokio::io::{AsyncReadExt, AsyncWriteExt};
+use tokio::io::AsyncWriteExt;
+use futures::StreamExt;
+use std::pin::Pin;
+use sha2::Digest;
+use futures::FutureExt;
 
 #[derive(Clone)]
 pub struct FileStore {
@@ -12,56 +16,209 @@ pub struct FileStore {
 impl From<tokio::io::Error> for MediaStoreError {
     fn from(source: tokio::io::Error) -> Self {
         Self {
+            msg: format!("file I/O error: {}", source),
+            kind: match source.kind() {
+                std::io::ErrorKind::NotFound => ErrorKind::NotFound,
+                _ => ErrorKind::Backend
+            },
             source: Some(Box::new(source)),
-            msg: "file I/O error".to_owned(),
-            kind: ErrorKind::Backend,
+        }
+    }
+}
+
+
+impl FileStore {
+    pub fn new<T: Into<PathBuf>>(base: T) -> Self {
+        Self { base: base.into() }
+    }
+
+    async fn mktemp(&self) -> Result<(PathBuf, tokio::fs::File)> {
+        use rand::{Rng, distributions::Alphanumeric};
+        tokio::fs::create_dir_all(self.base.as_path()).await?;
+        loop {
+            let filename = self.base.join(format!("temp.{}", {
+                let string = rand::thread_rng()
+                    .sample_iter(&Alphanumeric)
+                    .take(16)
+                    .collect::<Vec<u8>>();
+                String::from_utf8(string).unwrap()
+            }));
+
+            match OpenOptions::new()
+                .create_new(true)
+                .write(true)
+                .open(&filename)
+                .await
+            {
+                Ok(file) => return Ok((filename, file)),
+                Err(err) => match err.kind() {
+                    std::io::ErrorKind::AlreadyExists => continue,
+                    _ => return Err(err.into())
+                }
+            }
         }
     }
 }
 
 #[async_trait]
 impl MediaStore for FileStore {
-    async fn write_streaming(
+    async fn write_streaming<T>(
         &self,
-        domain: url::Host,
-        filename: &str,
-        content: axum::extract::multipart::Field<'_>,
-    ) -> Result<()> {
-        todo!()
+        domain: &str,
+        mut metadata: Metadata,
+        mut content: T,
+    ) -> Result<String>
+    where
+        T: tokio_stream::Stream<Item = std::result::Result<bytes::Bytes, axum::extract::multipart::MultipartError>> + Unpin + Send
+    {
+        let (tempfilepath, mut tempfile) = self.mktemp().await?;
+        let mut hasher = sha2::Sha256::new();
+        let mut length: usize = 0;
+
+        while let Some(chunk) = content.next().await {
+            let chunk = std::sync::Arc::new(chunk.map_err(|err| MediaStoreError {
+                kind: ErrorKind::Backend,
+                source: Some(Box::new(err)),
+                msg: "Failed to read a data chunk".to_owned()
+            })?);
+            length += chunk.len();
+            let (write_result, _hasher) = tokio::join!(
+                tempfile.write_all(&*chunk),
+                {
+                    let chunk = chunk.clone();
+                    tokio::task::spawn_blocking(move || {
+                        hasher.update(&*chunk);
+
+                        hasher
+                    }).map(|r| r.unwrap())
+                }
+            );
+            if let Err(err) = write_result {
+                drop(tempfile);
+                // this is just cleanup, nothing fails if it fails
+                // though temporary files might take up space on the hard drive
+                // We'll clean them when maintenance time comes
+                #[allow(unused_must_use)]
+                { tokio::fs::remove_file(tempfilepath).await; }
+                return Err(err.into());
+            }
+            hasher = _hasher;
+        }
+
+        let hash = hasher.finalize();
+        let filename = format!(
+            "{}/{}/{}/{}/{}",
+            hex::encode([hash[0]]),
+            hex::encode([hash[1]]),
+            hex::encode([hash[2]]),
+            hex::encode([hash[3]]),
+            hex::encode(&hash[4..32])
+        );
+        metadata.length = Some(length);
+        let domain_str = domain.to_string();
+        let filepath = self.base.join(domain_str.as_str()).join(&filename);
+        let metafilename = filename.clone() + ".json";
+        let metapath = self.base.join(domain_str.as_str()).join(metafilename);
+        {
+            let parent = filepath.parent().unwrap();
+            tokio::fs::create_dir_all(parent).await?;            
+        }
+        let mut meta = OpenOptions::new()
+            .create_new(true)
+            .write(true)
+            .open(&metapath)
+            .await?;
+        meta.write_all(&serde_json::to_vec(&metadata).unwrap()).await?;
+        tokio::fs::rename(tempfilepath, filepath).await?;
+        Ok(filename)
     }
 
     async fn read_streaming(
         &self,
-        domain: url::Host,
+        domain: &str,
         filename: &str,
-    ) -> Result<tokio_util::io::ReaderStream<Box<dyn tokio::io::AsyncRead>>> {
-        todo!()
-    }
-
-    async fn write(&self, domain: url::Host, filename: &str, content: &[u8]) -> Result<()> {
-        let path = self.base.join(format!("{}/{}", domain, filename));
+    ) -> Result<(Metadata, Pin<Box<dyn tokio_stream::Stream<Item = std::io::Result<bytes::Bytes>> + Send>>)> {
+        let path = self.base.join(format!("{}{}", domain, filename));
+        let metapath = self.base.join(format!("{}{}.json", domain, filename));
+        tracing::debug!("Path: {}, metadata: {}", path.display(), metapath.display());
 
-        let mut file = OpenOptions::new()
-            .create_new(true)
-            .write(true)
+        let file = OpenOptions::new()
+            .read(true)
             .open(path)
             .await?;
+        let meta = serde_json::from_slice(&tokio::fs::read(metapath).await?)
+            .map_err(|err| MediaStoreError {
+                kind: ErrorKind::Json,
+                msg: format!("{}", err),
+                source: Some(Box::new(err))
+            })?;
 
-        Ok(file.write_all(content).await?)
+        Ok((meta, Box::pin(tokio_util::io::ReaderStream::new(file))))
     }
 
-    async fn read(&self, domain: url::Host, filename: &str) -> Result<Vec<u8>> {
+    async fn delete(&self, domain: &str, filename: &str) -> Result<()> {
         let path = self.base.join(format!("{}/{}", domain, filename));
 
-        let mut file = OpenOptions::new().read(true).open(path).await?;
+        Ok(tokio::fs::remove_file(path).await?)
+    }
+}
 
-        let mut buf: Vec<u8> = Vec::default();
-        file.read_to_end(&mut buf).await?;
+#[cfg(test)]
+mod tests {
+    use super::{Metadata, FileStore, MediaStore};
+    use tokio::io::AsyncReadExt;
 
-        Ok(buf)
-    }
+    #[tokio::test]
+    async fn test_streaming_read_write() {
+        let tempdir = tempdir::TempDir::new("file").expect("Failed to create tempdir");
+        let store = FileStore::new(tempdir.path());
+
+        let file: &[u8] = include_bytes!("../../../../README.md");
+        let stream = tokio_stream::iter(file.chunks(100).map(|i| Ok(bytes::Bytes::copy_from_slice(i))));
+        let metadata = Metadata {
+            filename: Some("README.md".to_string()),
+            content_type: "text/markdown".to_string(),
+            length: None
+        };
+
+        let filename = store.write_streaming(
+            "fireburn.ru",
+            metadata, stream
+        ).await.unwrap();
+
+        let content = tokio::fs::read(
+            tempdir.path()
+                .join("fireburn.ru")
+                .join(&filename)
+        ).await.unwrap();
+        assert_eq!(content, file);
+
+        let meta: Metadata = serde_json::from_slice(&tokio::fs::read(
+            tempdir.path()
+                .join("fireburn.ru")
+                .join(filename.clone() + ".json")
+        ).await.unwrap()).unwrap();
+        assert_eq!(&meta.content_type, "text/markdown");
+        assert_eq!(meta.filename.as_deref(), Some("README.md"));
+        assert_eq!(meta.length, Some(file.len()));
+
+        let (metadata, read_back) = {
+            let (metadata, stream) = store.read_streaming(
+                "fireburn.ru",
+                &filename
+            ).await.unwrap();
+            let mut reader = tokio_util::io::StreamReader::new(stream);
+
+            let mut buf = Vec::default();
+            reader.read_to_end(&mut buf).await.unwrap();
+
+            (metadata, buf)
+        };
+
+        assert_eq!(read_back, file);
+        assert_eq!(&metadata.content_type, "text/markdown");
+        assert_eq!(meta.filename.as_deref(), Some("README.md"));
+        assert_eq!(meta.length, Some(file.len()));
 
-    async fn delete(&self, domain: url::Host, filename: &str) -> Result<()> {
-        todo!()
     }
 }
diff --git a/kittybox-rs/src/media/storage/mod.rs b/kittybox-rs/src/media/storage/mod.rs
index ba880ab..cb8b38f 100644
--- a/kittybox-rs/src/media/storage/mod.rs
+++ b/kittybox-rs/src/media/storage/mod.rs
@@ -1,12 +1,39 @@
 use async_trait::async_trait;
+use axum::extract::multipart::Field;
+use tokio_stream::Stream;
+use bytes::Bytes;
+use serde::{Deserialize, Serialize};
+use std::pin::Pin;
 
 pub mod file;
 
+#[derive(Debug, Deserialize, Serialize)]
+pub struct Metadata {
+    pub(super) content_type: String,
+    pub(super) filename: Option<String>,
+    pub(super) length: Option<usize>
+}
+impl From<&Field<'_>> for Metadata {
+    fn from(field: &Field<'_>) -> Self {
+        Self {
+            content_type: field.content_type()
+                .map(|i| i.to_owned())
+                .or_else(|| Some("application/octet-stream".to_owned()))
+                .unwrap(),
+            filename: field.file_name()
+                .map(|i| i.to_owned()),
+            length: None
+        }
+    }
+}
+
+
 #[derive(Debug, Clone, Copy)]
 pub enum ErrorKind {
     Backend,
     Permission,
-    Conflict,
+    Json,
+    NotFound,
     Other,
 }
 
@@ -17,6 +44,12 @@ pub struct MediaStoreError {
     msg: String,
 }
 
+impl MediaStoreError {
+    pub fn kind(&self) -> ErrorKind {
+        self.kind
+    }
+}
+
 impl std::error::Error for MediaStoreError {
     fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
         self.source
@@ -33,7 +66,8 @@ impl std::fmt::Display for MediaStoreError {
             match self.kind {
                 ErrorKind::Backend => "media storage backend error",
                 ErrorKind::Permission => "permission denied",
-                ErrorKind::Conflict => "conflict with existing data",
+                ErrorKind::Json => "failed to parse json",
+                ErrorKind::NotFound => "blob not found",
                 ErrorKind::Other => "unknown media storage error",
             },
             self.msg
@@ -45,18 +79,20 @@ pub type Result<T> = std::result::Result<T, MediaStoreError>;
 
 #[async_trait]
 pub trait MediaStore: 'static + Send + Sync + Clone {
-    async fn write_streaming(
+    async fn write_streaming<T>(
         &self,
-        domain: url::Host,
-        filename: &str,
-        content: axum::extract::multipart::Field<'_>,
-    ) -> Result<()>;
-    async fn write(&self, domain: url::Host, filename: &str, content: &[u8]) -> Result<()>;
+        domain: &str,
+        metadata: Metadata,
+        content: T,
+    ) -> Result<String>
+    where
+        T: tokio_stream::Stream<Item = std::result::Result<bytes::Bytes, axum::extract::multipart::MultipartError>> + Unpin + Send;
+
     async fn read_streaming(
         &self,
-        domain: url::Host,
+        domain: &str,
         filename: &str,
-    ) -> Result<tokio_util::io::ReaderStream<Box<dyn tokio::io::AsyncRead>>>;
-    async fn read(&self, domain: url::Host, filename: &str) -> Result<Vec<u8>>;
-    async fn delete(&self, domain: url::Host, filename: &str) -> Result<()>;
+    ) -> Result<(Metadata, Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>)>;
+
+    async fn delete(&self, domain: &str, filename: &str) -> Result<()>;
 }