From 7f23ec84bc05c236c1bf40c2f0d72412af711516 Mon Sep 17 00:00:00 2001 From: Vika Date: Thu, 7 Jul 2022 00:32:33 +0300 Subject: treewide: rewrite using Axum Axum has streaming bodies and allows to write simpler code. It also helps enforce stronger types and looks much more neat. This allows me to progress on the media endpoint and add streaming reads and writes to the MediaStore trait. Metrics are temporarily not implemented. Everything else was preserved, and the tests still pass, after adjusting for new calling conventions. TODO: create method routers for protocol endpoints --- kittybox-rs/src/database/file/mod.rs | 108 +++++++++++++++++++++++------------ 1 file changed, 70 insertions(+), 38 deletions(-) (limited to 'kittybox-rs/src/database/file') diff --git a/kittybox-rs/src/database/file/mod.rs b/kittybox-rs/src/database/file/mod.rs index 1e7aa96..fb18dc4 100644 --- a/kittybox-rs/src/database/file/mod.rs +++ b/kittybox-rs/src/database/file/mod.rs @@ -1,15 +1,15 @@ //#![warn(clippy::unwrap_used)] -use crate::database::{filter_post, ErrorKind, Result, Storage, StorageError, Settings}; -use std::io::ErrorKind as IOErrorKind; -use tokio::fs::{File, OpenOptions}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::task::spawn_blocking; +use crate::database::{filter_post, ErrorKind, Result, Settings, Storage, StorageError}; use async_trait::async_trait; use futures::{stream, StreamExt, TryStreamExt}; -use log::debug; use serde_json::json; use std::collections::HashMap; +use std::io::ErrorKind as IOErrorKind; use std::path::{Path, PathBuf}; +use tokio::fs::{File, OpenOptions}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::task::spawn_blocking; +use tracing::debug; impl From for StorageError { fn from(source: std::io::Error) -> Self { @@ -30,7 +30,7 @@ impl From for StorageError { Self::with_source( ErrorKind::Backend, "timeout on I/O operation", - Box::new(source) + Box::new(source), ) } } @@ -107,7 +107,7 @@ fn url_to_path(root: &Path, url: &str) -> PathBuf { } fn url_to_relative_path(url: &str) -> relative_path::RelativePathBuf { - let url = warp::http::Uri::try_from(url).expect("Couldn't parse a URL"); + let url = axum::http::Uri::try_from(url).expect("Couldn't parse a URL"); let mut path = relative_path::RelativePathBuf::new(); path.push(url.authority().unwrap().to_string() + url.path() + ".json"); @@ -160,7 +160,10 @@ fn modify_post(post: &serde_json::Value, update: &serde_json::Value) -> Result Result( if let Some(props) = feed["properties"].as_object_mut() { props["author"] = json!(author_list); } else { - feed["properties"] = json!({"author": author_list}); + feed["properties"] = json!({ "author": author_list }); } } } @@ -270,6 +271,7 @@ async fn hydrate_author( #[async_trait] impl Storage for FileStorage { + #[tracing::instrument] async fn post_exists(&self, url: &str) -> Result { let path = url_to_path(&self.root_dir, url); debug!("Checking if {:?} exists...", path); @@ -289,6 +291,7 @@ impl Storage for FileStorage { Ok(spawn_blocking(move || path.is_file()).await.unwrap()) } + #[tracing::instrument] async fn get_post(&self, url: &str) -> Result> { let path = url_to_path(&self.root_dir, url); // TODO: check that the path actually belongs to the dir of user who requested it @@ -302,9 +305,13 @@ impl Storage for FileStorage { // Typechecks because OS magic acts on references // to FDs as if they were behind a mutex AsyncReadExt::read_to_string(&mut file, &mut content).await?; - debug!("Read {} bytes successfully from {:?}", content.as_bytes().len(), &path); + debug!( + "Read {} bytes successfully from {:?}", + content.as_bytes().len(), + &path + ); Ok(Some(serde_json::from_str(&content)?)) - }, + } Err(err) => { if err.kind() == IOErrorKind::NotFound { Ok(None) @@ -315,6 +322,7 @@ impl Storage for FileStorage { } } + #[tracing::instrument] async fn put_post(&self, post: &'_ serde_json::Value, user: &'_ str) -> Result<()> { let key = post["properties"]["uid"][0] .as_str() @@ -323,7 +331,10 @@ impl Storage for FileStorage { let tempfile = (&path).with_extension("tmp"); debug!("Creating {:?}", path); - let parent = path.parent().expect("Parent for this directory should always exist").to_owned(); + let parent = path + .parent() + .expect("Parent for this directory should always exist") + .to_owned(); if !parent.is_dir() { tokio::fs::create_dir_all(parent).await?; } @@ -331,7 +342,8 @@ impl Storage for FileStorage { let mut file = tokio::fs::OpenOptions::new() .write(true) .create_new(true) - .open(&tempfile).await?; + .open(&tempfile) + .await?; file.write_all(post.to_string().as_bytes()).await?; file.flush().await?; @@ -339,10 +351,7 @@ impl Storage for FileStorage { tokio::fs::rename(&tempfile, &path).await?; if let Some(urls) = post["properties"]["url"].as_array() { - for url in urls - .iter() - .map(|i| i.as_str().unwrap()) - { + for url in urls.iter().map(|i| i.as_str().unwrap()) { if url != key && url.starts_with(user) { let link = url_to_path(&self.root_dir, url); debug!("Creating a symlink at {:?}", link); @@ -370,7 +379,13 @@ impl Storage for FileStorage { println!("Adding to channel list..."); // Add the h-feed to the channel list let mut path = relative_path::RelativePathBuf::new(); - path.push(warp::http::Uri::try_from(user.to_string()).unwrap().authority().unwrap().to_string()); + path.push( + axum::http::Uri::try_from(user.to_string()) + .unwrap() + .authority() + .unwrap() + .to_string(), + ); path.push("channels"); let path = path.to_path(&self.root_dir); @@ -384,13 +399,15 @@ impl Storage for FileStorage { let mut tempfile = OpenOptions::new() .write(true) .create_new(true) - .open(&tempfilename).await?; + .open(&tempfilename) + .await?; let mut file = OpenOptions::new() .read(true) .write(true) .truncate(false) .create(true) - .open(&path).await?; + .open(&path) + .await?; let mut content = String::new(); file.read_to_string(&mut content).await?; @@ -406,7 +423,9 @@ impl Storage for FileStorage { name: channel_name, }); - tempfile.write_all(serde_json::to_string(&channels)?.as_bytes()).await?; + tempfile + .write_all(serde_json::to_string(&channels)?.as_bytes()) + .await?; tempfile.flush().await?; drop(tempfile); tokio::fs::rename(tempfilename, path).await?; @@ -414,6 +433,7 @@ impl Storage for FileStorage { Ok(()) } + #[tracing::instrument] async fn update_post(&self, url: &'_ str, update: serde_json::Value) -> Result<()> { let path = url_to_path(&self.root_dir, url); let tempfilename = path.with_extension("tmp"); @@ -424,10 +444,7 @@ impl Storage for FileStorage { .create_new(true) .open(&tempfilename) .await?; - let mut file = OpenOptions::new() - .read(true) - .open(&path) - .await?; + let mut file = OpenOptions::new().read(true).open(&path).await?; let mut content = String::new(); file.read_to_string(&mut content).await?; @@ -447,9 +464,16 @@ impl Storage for FileStorage { Ok(()) } + #[tracing::instrument] async fn get_channels(&self, user: &'_ str) -> Result> { let mut path = relative_path::RelativePathBuf::new(); - path.push(warp::http::Uri::try_from(user.to_string()).unwrap().authority().unwrap().to_string()); + path.push( + axum::http::Uri::try_from(user.to_string()) + .unwrap() + .authority() + .unwrap() + .to_string(), + ); path.push("channels"); let path = path.to_path(&self.root_dir); @@ -474,6 +498,7 @@ impl Storage for FileStorage { } } + #[tracing::instrument] async fn read_feed_with_limit( &self, url: &'_ str, @@ -498,7 +523,7 @@ impl Storage for FileStorage { if let Some(after) = after { for s in posts_iter.by_ref() { if &s == after { - break + break; } } }; @@ -539,6 +564,7 @@ impl Storage for FileStorage { } } + #[tracing::instrument] async fn delete_post(&self, url: &'_ str) -> Result<()> { let path = url_to_path(&self.root_dir, url); if let Err(e) = tokio::fs::remove_file(path).await { @@ -549,9 +575,10 @@ impl Storage for FileStorage { } } + #[tracing::instrument] async fn get_setting(&self, setting: Settings, user: &'_ str) -> Result { log::debug!("User for getting settings: {}", user); - let url = warp::http::Uri::try_from(user).expect("Couldn't parse a URL"); + let url = axum::http::Uri::try_from(user).expect("Couldn't parse a URL"); let mut path = relative_path::RelativePathBuf::new(); path.push(url.authority().unwrap().to_string()); path.push("settings"); @@ -572,8 +599,9 @@ impl Storage for FileStorage { .ok_or_else(|| StorageError::new(ErrorKind::Backend, "Setting not set")) } + #[tracing::instrument] async fn set_setting(&self, setting: Settings, user: &'_ str, value: &'_ str) -> Result<()> { - let url = warp::http::Uri::try_from(user).expect("Couldn't parse a URL"); + let url = axum::http::Uri::try_from(user).expect("Couldn't parse a URL"); let mut path = relative_path::RelativePathBuf::new(); path.push(url.authority().unwrap().to_string()); path.push("settings"); @@ -604,14 +632,18 @@ impl Storage for FileStorage { serde_json::from_str(&content)? } } - Err(err) => if err.kind() == IOErrorKind::NotFound { - HashMap::default() - } else { - return Err(err.into()) + Err(err) => { + if err.kind() == IOErrorKind::NotFound { + HashMap::default() + } else { + return Err(err.into()); + } } }; settings.insert(setting, value); - tempfile.write_all(serde_json::to_string(&settings)?.as_bytes()).await?; + tempfile + .write_all(serde_json::to_string(&settings)?.as_bytes()) + .await?; drop(tempfile); tokio::fs::rename(temppath, path).await?; Ok(()) -- cgit 1.4.1