diff options
Diffstat (limited to 'kittybox-rs/src/database')
-rw-r--r-- | kittybox-rs/src/database/file/mod.rs | 108 | ||||
-rw-r--r-- | kittybox-rs/src/database/memory.rs | 138 | ||||
-rw-r--r-- | kittybox-rs/src/database/mod.rs | 102 |
3 files changed, 235 insertions, 113 deletions
diff --git a/kittybox-rs/src/database/file/mod.rs b/kittybox-rs/src/database/file/mod.rs index 1e7aa96..fb18dc4 100644 --- a/kittybox-rs/src/database/file/mod.rs +++ b/kittybox-rs/src/database/file/mod.rs @@ -1,15 +1,15 @@ //#![warn(clippy::unwrap_used)] -use crate::database::{filter_post, ErrorKind, Result, Storage, StorageError, Settings}; -use std::io::ErrorKind as IOErrorKind; -use tokio::fs::{File, OpenOptions}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::task::spawn_blocking; +use crate::database::{filter_post, ErrorKind, Result, Settings, Storage, StorageError}; use async_trait::async_trait; use futures::{stream, StreamExt, TryStreamExt}; -use log::debug; use serde_json::json; use std::collections::HashMap; +use std::io::ErrorKind as IOErrorKind; use std::path::{Path, PathBuf}; +use tokio::fs::{File, OpenOptions}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::task::spawn_blocking; +use tracing::debug; impl From<std::io::Error> for StorageError { fn from(source: std::io::Error) -> Self { @@ -30,7 +30,7 @@ impl From<tokio::time::error::Elapsed> for StorageError { Self::with_source( ErrorKind::Backend, "timeout on I/O operation", - Box::new(source) + Box::new(source), ) } } @@ -107,7 +107,7 @@ fn url_to_path(root: &Path, url: &str) -> PathBuf { } fn url_to_relative_path(url: &str) -> relative_path::RelativePathBuf { - let url = warp::http::Uri::try_from(url).expect("Couldn't parse a URL"); + let url = axum::http::Uri::try_from(url).expect("Couldn't parse a URL"); let mut path = relative_path::RelativePathBuf::new(); path.push(url.authority().unwrap().to_string() + url.path() + ".json"); @@ -160,7 +160,10 @@ fn modify_post(post: &serde_json::Value, update: &serde_json::Value) -> Result<s if let Some(v) = v.as_array() { add_keys.insert(k.to_string(), v.clone()); } else { - return Err(StorageError::new(ErrorKind::BadRequest, "Malformed update object")); + return Err(StorageError::new( + ErrorKind::BadRequest, + "Malformed update object", + )); } } } @@ -194,9 +197,7 @@ fn modify_post(post: &serde_json::Value, update: &serde_json::Value) -> Result<s let k = &k; if let Some(prop) = props[k].as_array_mut() { if k == "children" { - v.into_iter() - .rev() - .for_each(|v| prop.insert(0, v)); + v.into_iter().rev().for_each(|v| prop.insert(0, v)); } else { prop.extend(v.into_iter()); } @@ -262,7 +263,7 @@ async fn hydrate_author<S: Storage>( if let Some(props) = feed["properties"].as_object_mut() { props["author"] = json!(author_list); } else { - feed["properties"] = json!({"author": author_list}); + feed["properties"] = json!({ "author": author_list }); } } } @@ -270,6 +271,7 @@ async fn hydrate_author<S: Storage>( #[async_trait] impl Storage for FileStorage { + #[tracing::instrument] async fn post_exists(&self, url: &str) -> Result<bool> { let path = url_to_path(&self.root_dir, url); debug!("Checking if {:?} exists...", path); @@ -289,6 +291,7 @@ impl Storage for FileStorage { Ok(spawn_blocking(move || path.is_file()).await.unwrap()) } + #[tracing::instrument] async fn get_post(&self, url: &str) -> Result<Option<serde_json::Value>> { let path = url_to_path(&self.root_dir, url); // TODO: check that the path actually belongs to the dir of user who requested it @@ -302,9 +305,13 @@ impl Storage for FileStorage { // Typechecks because OS magic acts on references // to FDs as if they were behind a mutex AsyncReadExt::read_to_string(&mut file, &mut content).await?; - debug!("Read {} bytes successfully from {:?}", content.as_bytes().len(), &path); + debug!( + "Read {} bytes successfully from {:?}", + content.as_bytes().len(), + &path + ); Ok(Some(serde_json::from_str(&content)?)) - }, + } Err(err) => { if err.kind() == IOErrorKind::NotFound { Ok(None) @@ -315,6 +322,7 @@ impl Storage for FileStorage { } } + #[tracing::instrument] async fn put_post(&self, post: &'_ serde_json::Value, user: &'_ str) -> Result<()> { let key = post["properties"]["uid"][0] .as_str() @@ -323,7 +331,10 @@ impl Storage for FileStorage { let tempfile = (&path).with_extension("tmp"); debug!("Creating {:?}", path); - let parent = path.parent().expect("Parent for this directory should always exist").to_owned(); + let parent = path + .parent() + .expect("Parent for this directory should always exist") + .to_owned(); if !parent.is_dir() { tokio::fs::create_dir_all(parent).await?; } @@ -331,7 +342,8 @@ impl Storage for FileStorage { let mut file = tokio::fs::OpenOptions::new() .write(true) .create_new(true) - .open(&tempfile).await?; + .open(&tempfile) + .await?; file.write_all(post.to_string().as_bytes()).await?; file.flush().await?; @@ -339,10 +351,7 @@ impl Storage for FileStorage { tokio::fs::rename(&tempfile, &path).await?; if let Some(urls) = post["properties"]["url"].as_array() { - for url in urls - .iter() - .map(|i| i.as_str().unwrap()) - { + for url in urls.iter().map(|i| i.as_str().unwrap()) { if url != key && url.starts_with(user) { let link = url_to_path(&self.root_dir, url); debug!("Creating a symlink at {:?}", link); @@ -370,7 +379,13 @@ impl Storage for FileStorage { println!("Adding to channel list..."); // Add the h-feed to the channel list let mut path = relative_path::RelativePathBuf::new(); - path.push(warp::http::Uri::try_from(user.to_string()).unwrap().authority().unwrap().to_string()); + path.push( + axum::http::Uri::try_from(user.to_string()) + .unwrap() + .authority() + .unwrap() + .to_string(), + ); path.push("channels"); let path = path.to_path(&self.root_dir); @@ -384,13 +399,15 @@ impl Storage for FileStorage { let mut tempfile = OpenOptions::new() .write(true) .create_new(true) - .open(&tempfilename).await?; + .open(&tempfilename) + .await?; let mut file = OpenOptions::new() .read(true) .write(true) .truncate(false) .create(true) - .open(&path).await?; + .open(&path) + .await?; let mut content = String::new(); file.read_to_string(&mut content).await?; @@ -406,7 +423,9 @@ impl Storage for FileStorage { name: channel_name, }); - tempfile.write_all(serde_json::to_string(&channels)?.as_bytes()).await?; + tempfile + .write_all(serde_json::to_string(&channels)?.as_bytes()) + .await?; tempfile.flush().await?; drop(tempfile); tokio::fs::rename(tempfilename, path).await?; @@ -414,6 +433,7 @@ impl Storage for FileStorage { Ok(()) } + #[tracing::instrument] async fn update_post(&self, url: &'_ str, update: serde_json::Value) -> Result<()> { let path = url_to_path(&self.root_dir, url); let tempfilename = path.with_extension("tmp"); @@ -424,10 +444,7 @@ impl Storage for FileStorage { .create_new(true) .open(&tempfilename) .await?; - let mut file = OpenOptions::new() - .read(true) - .open(&path) - .await?; + let mut file = OpenOptions::new().read(true).open(&path).await?; let mut content = String::new(); file.read_to_string(&mut content).await?; @@ -447,9 +464,16 @@ impl Storage for FileStorage { Ok(()) } + #[tracing::instrument] async fn get_channels(&self, user: &'_ str) -> Result<Vec<super::MicropubChannel>> { let mut path = relative_path::RelativePathBuf::new(); - path.push(warp::http::Uri::try_from(user.to_string()).unwrap().authority().unwrap().to_string()); + path.push( + axum::http::Uri::try_from(user.to_string()) + .unwrap() + .authority() + .unwrap() + .to_string(), + ); path.push("channels"); let path = path.to_path(&self.root_dir); @@ -474,6 +498,7 @@ impl Storage for FileStorage { } } + #[tracing::instrument] async fn read_feed_with_limit( &self, url: &'_ str, @@ -498,7 +523,7 @@ impl Storage for FileStorage { if let Some(after) = after { for s in posts_iter.by_ref() { if &s == after { - break + break; } } }; @@ -539,6 +564,7 @@ impl Storage for FileStorage { } } + #[tracing::instrument] async fn delete_post(&self, url: &'_ str) -> Result<()> { let path = url_to_path(&self.root_dir, url); if let Err(e) = tokio::fs::remove_file(path).await { @@ -549,9 +575,10 @@ impl Storage for FileStorage { } } + #[tracing::instrument] async fn get_setting(&self, setting: Settings, user: &'_ str) -> Result<String> { log::debug!("User for getting settings: {}", user); - let url = warp::http::Uri::try_from(user).expect("Couldn't parse a URL"); + let url = axum::http::Uri::try_from(user).expect("Couldn't parse a URL"); let mut path = relative_path::RelativePathBuf::new(); path.push(url.authority().unwrap().to_string()); path.push("settings"); @@ -572,8 +599,9 @@ impl Storage for FileStorage { .ok_or_else(|| StorageError::new(ErrorKind::Backend, "Setting not set")) } + #[tracing::instrument] async fn set_setting(&self, setting: Settings, user: &'_ str, value: &'_ str) -> Result<()> { - let url = warp::http::Uri::try_from(user).expect("Couldn't parse a URL"); + let url = axum::http::Uri::try_from(user).expect("Couldn't parse a URL"); let mut path = relative_path::RelativePathBuf::new(); path.push(url.authority().unwrap().to_string()); path.push("settings"); @@ -604,14 +632,18 @@ impl Storage for FileStorage { serde_json::from_str(&content)? } } - Err(err) => if err.kind() == IOErrorKind::NotFound { - HashMap::default() - } else { - return Err(err.into()) + Err(err) => { + if err.kind() == IOErrorKind::NotFound { + HashMap::default() + } else { + return Err(err.into()); + } } }; settings.insert(setting, value); - tempfile.write_all(serde_json::to_string(&settings)?.as_bytes()).await?; + tempfile + .write_all(serde_json::to_string(&settings)?.as_bytes()) + .await?; drop(tempfile); tokio::fs::rename(temppath, path).await?; Ok(()) diff --git a/kittybox-rs/src/database/memory.rs b/kittybox-rs/src/database/memory.rs index 786466c..c8cc125 100644 --- a/kittybox-rs/src/database/memory.rs +++ b/kittybox-rs/src/database/memory.rs @@ -1,26 +1,26 @@ #![allow(clippy::todo)] use async_trait::async_trait; +use futures_util::FutureExt; +use serde_json::json; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::RwLock; -use futures_util::FutureExt; -use serde_json::json; -use crate::database::{Storage, Result, StorageError, ErrorKind, MicropubChannel, Settings}; +use crate::database::{ErrorKind, MicropubChannel, Result, Settings, Storage, StorageError}; #[derive(Clone, Debug)] pub struct MemoryStorage { pub mapping: Arc<RwLock<HashMap<String, serde_json::Value>>>, - pub channels: Arc<RwLock<HashMap<String, Vec<String>>>> + pub channels: Arc<RwLock<HashMap<String, Vec<String>>>>, } #[async_trait] impl Storage for MemoryStorage { async fn post_exists(&self, url: &str) -> Result<bool> { - return Ok(self.mapping.read().await.contains_key(url)) + return Ok(self.mapping.read().await.contains_key(url)); } - async fn get_post(&self, url: &str) ->Result<Option<serde_json::Value>> { + async fn get_post(&self, url: &str) -> Result<Option<serde_json::Value>> { let mapping = self.mapping.read().await; match mapping.get(url) { Some(val) => { @@ -36,8 +36,8 @@ impl Storage for MemoryStorage { } else { Ok(Some(val.clone())) } - }, - _ => Ok(None) + } + _ => Ok(None), } } @@ -45,20 +45,45 @@ impl Storage for MemoryStorage { let mapping = &mut self.mapping.write().await; let key: &str = match post["properties"]["uid"][0].as_str() { Some(uid) => uid, - None => return Err(StorageError::new(ErrorKind::Other, "post doesn't have a UID")) + None => { + return Err(StorageError::new( + ErrorKind::Other, + "post doesn't have a UID", + )) + } }; mapping.insert(key.to_string(), post.clone()); if post["properties"]["url"].is_array() { - for url in post["properties"]["url"].as_array().unwrap().iter().map(|i| i.as_str().unwrap().to_string()) { + for url in post["properties"]["url"] + .as_array() + .unwrap() + .iter() + .map(|i| i.as_str().unwrap().to_string()) + { if url != key { - mapping.insert(url, json!({"see_other": key})); + mapping.insert(url, json!({ "see_other": key })); } } } - if post["type"].as_array().unwrap().iter().any(|i| i == "h-feed") { + if post["type"] + .as_array() + .unwrap() + .iter() + .any(|i| i == "h-feed") + { // This is a feed. Add it to the channels array if it's not already there. println!("{:#}", post); - self.channels.write().await.entry(post["properties"]["author"][0].as_str().unwrap().to_string()).or_insert_with(Vec::new).push(key.to_string()) + self.channels + .write() + .await + .entry( + post["properties"]["author"][0] + .as_str() + .unwrap() + .to_string(), + ) + .or_insert_with(Vec::new) + .push(key.to_string()) } Ok(()) } @@ -69,13 +94,24 @@ impl Storage for MemoryStorage { let mut remove_values: HashMap<String, Vec<serde_json::Value>> = HashMap::new(); if let Some(delete) = update["delete"].as_array() { - remove_keys.extend(delete.iter().filter_map(|v| v.as_str()).map(|v| v.to_string())); + remove_keys.extend( + delete + .iter() + .filter_map(|v| v.as_str()) + .map(|v| v.to_string()), + ); } else if let Some(delete) = update["delete"].as_object() { for (k, v) in delete { if let Some(v) = v.as_array() { - remove_values.entry(k.to_string()).or_default().extend(v.clone()); + remove_values + .entry(k.to_string()) + .or_default() + .extend(v.clone()); } else { - return Err(StorageError::new(ErrorKind::BadRequest, "Malformed update object")); + return Err(StorageError::new( + ErrorKind::BadRequest, + "Malformed update object", + )); } } } @@ -84,7 +120,10 @@ impl Storage for MemoryStorage { if v.is_array() { add_keys.insert(k.to_string(), v.clone()); } else { - return Err(StorageError::new(ErrorKind::BadRequest, "Malformed update object")); + return Err(StorageError::new( + ErrorKind::BadRequest, + "Malformed update object", + )); } } } @@ -100,7 +139,10 @@ impl Storage for MemoryStorage { if let Some(new_post) = mapping.get(url) { post = new_post } else { - return Err(StorageError::new(ErrorKind::NotFound, "The post you have requested is not found in the database.")); + return Err(StorageError::new( + ErrorKind::NotFound, + "The post you have requested is not found in the database.", + )); } } let mut post = post.clone(); @@ -131,7 +173,12 @@ impl Storage for MemoryStorage { let k = &k; if let Some(prop) = props[k].as_array_mut() { if k == "children" { - v.as_array().unwrap().iter().cloned().rev().for_each(|v| prop.insert(0, v)); + v.as_array() + .unwrap() + .iter() + .cloned() + .rev() + .for_each(|v| prop.insert(0, v)); } else { prop.extend(v.as_array().unwrap().iter().cloned()); } @@ -139,32 +186,55 @@ impl Storage for MemoryStorage { post["properties"][k] = v } } - mapping.insert(post["properties"]["uid"][0].as_str().unwrap().to_string(), post); + mapping.insert( + post["properties"]["uid"][0].as_str().unwrap().to_string(), + post, + ); } else { - return Err(StorageError::new(ErrorKind::NotFound, "The designated post wasn't found in the database.")); + return Err(StorageError::new( + ErrorKind::NotFound, + "The designated post wasn't found in the database.", + )); } Ok(()) } async fn get_channels(&self, user: &'_ str) -> Result<Vec<MicropubChannel>> { match self.channels.read().await.get(user) { - Some(channels) => Ok(futures_util::future::join_all(channels.iter() - .map(|channel| self.get_post(channel) - .map(|result| result.unwrap()) - .map(|post: Option<serde_json::Value>| { - post.map(|post| MicropubChannel { - uid: post["properties"]["uid"][0].as_str().unwrap().to_string(), - name: post["properties"]["name"][0].as_str().unwrap().to_string() - }) + Some(channels) => Ok(futures_util::future::join_all( + channels + .iter() + .map(|channel| { + self.get_post(channel).map(|result| result.unwrap()).map( + |post: Option<serde_json::Value>| { + post.map(|post| MicropubChannel { + uid: post["properties"]["uid"][0].as_str().unwrap().to_string(), + name: post["properties"]["name"][0] + .as_str() + .unwrap() + .to_string(), + }) + }, + ) }) - ).collect::<Vec<_>>()).await.into_iter().flatten().collect::<Vec<_>>()), - None => Ok(vec![]) + .collect::<Vec<_>>(), + ) + .await + .into_iter() + .flatten() + .collect::<Vec<_>>()), + None => Ok(vec![]), } - } #[allow(unused_variables)] - async fn read_feed_with_limit(&self, url: &'_ str, after: &'_ Option<String>, limit: usize, user: &'_ Option<String>) -> Result<Option<serde_json::Value>> { + async fn read_feed_with_limit( + &self, + url: &'_ str, + after: &'_ Option<String>, + limit: usize, + user: &'_ Option<String>, + ) -> Result<Option<serde_json::Value>> { todo!() } @@ -194,7 +264,7 @@ impl MemoryStorage { pub fn new() -> Self { Self { mapping: Arc::new(RwLock::new(HashMap::new())), - channels: Arc::new(RwLock::new(HashMap::new())) + channels: Arc::new(RwLock::new(HashMap::new())), } } } diff --git a/kittybox-rs/src/database/mod.rs b/kittybox-rs/src/database/mod.rs index 6bf5409..bd25d8d 100644 --- a/kittybox-rs/src/database/mod.rs +++ b/kittybox-rs/src/database/mod.rs @@ -55,8 +55,6 @@ pub struct StorageError { kind: ErrorKind, } -impl warp::reject::Reject for StorageError {} - impl std::error::Error for StorageError { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { self.source @@ -75,18 +73,20 @@ impl From<serde_json::Error> for StorageError { } impl std::fmt::Display for StorageError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match match self.kind { - ErrorKind::Backend => write!(f, "backend error: "), - ErrorKind::JsonParsing => write!(f, "error while parsing JSON: "), - ErrorKind::PermissionDenied => write!(f, "permission denied: "), - ErrorKind::NotFound => write!(f, "not found: "), - ErrorKind::BadRequest => write!(f, "bad request: "), - ErrorKind::Conflict => write!(f, "conflict with an in-flight request or existing data: "), - ErrorKind::Other => write!(f, "generic storage layer error: "), - } { - Ok(_) => write!(f, "{}", self.msg), - Err(err) => Err(err), - } + write!( + f, + "{}: {}", + match self.kind { + ErrorKind::Backend => "backend error", + ErrorKind::JsonParsing => "JSON parsing error", + ErrorKind::PermissionDenied => "permission denied", + ErrorKind::NotFound => "not found", + ErrorKind::BadRequest => "bad request", + ErrorKind::Conflict => "conflict with an in-flight request or existing data", + ErrorKind::Other => "generic storage layer error", + }, + self.msg + ) } } impl serde::Serialize for StorageError { @@ -377,9 +377,11 @@ mod tests { returned_post["properties"]["category"].as_array().unwrap(), &vec![json!("testing")] ); - }, + } something_else => { - something_else.expect("Shouldn't error").expect("Should have the post"); + something_else + .expect("Shouldn't error") + .expect("Should have the post"); } } } @@ -411,7 +413,11 @@ mod tests { async fn test_settings<Backend: Storage>(backend: Backend) { backend - .set_setting(crate::database::Settings::SiteName, "https://fireburn.ru/", "Vika's Hideout") + .set_setting( + crate::database::Settings::SiteName, + "https://fireburn.ru/", + "Vika's Hideout", + ) .await .unwrap(); assert_eq!( @@ -428,7 +434,9 @@ mod tests { let uid = format!( "https://{domain}/posts/{}-{}-{}", - rand::random::<Word>(), rand::random::<Word>(), rand::random::<Word>() + rand::random::<Word>(), + rand::random::<Word>(), + rand::random::<Word>() ); let post = json!({ @@ -467,12 +475,16 @@ mod tests { .unwrap(); println!("---"); for (i, post) in posts.iter().enumerate() { - backend.put_post(post, "https://fireburn.ru/").await.unwrap(); + backend + .put_post(post, "https://fireburn.ru/") + .await + .unwrap(); println!("posts[{}] = {}", i, post["properties"]["uid"][0]); } println!("---"); let limit: usize = 10; - let result = backend.read_feed_with_limit(key, &None, limit, &None) + let result = backend + .read_feed_with_limit(key, &None, limit, &None) .await .unwrap() .unwrap(); @@ -482,31 +494,38 @@ mod tests { println!("---"); assert_eq!(result["children"].as_array().unwrap()[0..10], posts[0..10]); - let result2 = backend.read_feed_with_limit( - key, - &result["children"] - .as_array() - .unwrap() - .last() - .unwrap() - ["properties"]["uid"][0] - .as_str() - .map(|i| i.to_owned()), - limit, &None - ).await.unwrap().unwrap(); + let result2 = backend + .read_feed_with_limit( + key, + &result["children"].as_array().unwrap().last().unwrap()["properties"]["uid"][0] + .as_str() + .map(|i| i.to_owned()), + limit, + &None, + ) + .await + .unwrap() + .unwrap(); for (i, post) in result2["children"].as_array().unwrap().iter().enumerate() { println!("feed[1][{}] = {}", i, post["properties"]["uid"][0]); } println!("---"); - assert_eq!(result2["children"].as_array().unwrap()[0..10], posts[10..20]); + assert_eq!( + result2["children"].as_array().unwrap()[0..10], + posts[10..20] + ); // Regression test for #4 let nonsense_after = Some("1010101010".to_owned()); let result3 = tokio::time::timeout(tokio::time::Duration::from_secs(10), async move { - backend.read_feed_with_limit( - key, &nonsense_after, limit, &None - ).await.unwrap().unwrap() - }).await.expect("Operation should not hang: see https://gitlab.com/kittybox/kittybox/-/issues/4"); + backend + .read_feed_with_limit(key, &nonsense_after, limit, &None) + .await + .unwrap() + .unwrap() + }) + .await + .expect("Operation should not hang: see https://gitlab.com/kittybox/kittybox/-/issues/4"); assert!(result3["children"].as_array().unwrap().is_empty()); } @@ -520,20 +539,21 @@ mod tests { $func_name!(test_update); $func_name!(test_feed_pagination); } - } + }; } macro_rules! file_test { ($func_name:ident) => { #[tokio::test] - async fn $func_name () { + async fn $func_name() { test_logger::ensure_env_logger_initialized(); let tempdir = tempdir::TempDir::new("file").expect("Failed to create tempdir"); - let backend = super::super::FileStorage::new(tempdir.into_path()).await.unwrap(); + let backend = super::super::FileStorage::new(tempdir.into_path()) + .await + .unwrap(); super::$func_name(backend).await } }; } test_all!(file_test, file); - } |