#![warn(missing_docs)] use std::borrow::Cow; use async_trait::async_trait; use kittybox_util::MentionType; mod file; pub use crate::database::file::FileStorage; use crate::micropub::MicropubUpdate; #[cfg(feature = "postgres")] mod postgres; #[cfg(feature = "postgres")] pub use postgres::PostgresStorage; #[cfg(test)] mod memory; #[cfg(test)] pub use crate::database::memory::MemoryStorage; pub use kittybox_util::MicropubChannel; use self::settings::Setting; /// Enum representing different errors that might occur during the database query. #[derive(Debug, Clone, Copy)] pub enum ErrorKind { /// Backend error (e.g. database connection error) Backend, /// Error due to insufficient contextual permissions for the query PermissionDenied, /// Error due to the database being unable to parse JSON returned from the backing storage. /// Usually indicative of someone fiddling with the database manually instead of using proper tools. JsonParsing, /// - ErrorKind::NotFound - equivalent to a 404 error. Note, some requests return an Option, /// in which case None is also equivalent to a 404. NotFound, /// The user's query or request to the database was malformed. Used whenever the database processes /// the user's query directly, such as when editing posts inside of the database (e.g. Redis backend) BadRequest, /// the user's query collided with an in-flight request and needs to be retried Conflict, /// - ErrorKind::Other - when something so weird happens that it becomes undescribable. Other, } /// Settings that can be stored in the database. pub mod settings { mod private { pub trait Sealed {} } /// A trait for various settings that should be contained here. /// /// **Note**: this trait is sealed to prevent external /// implementations, as it wouldn't make sense to add new settings /// that aren't used by Kittybox itself. pub trait Setting<'de>: private::Sealed + std::fmt::Debug + Default + Clone + serde::Serialize + serde::de::DeserializeOwned + /*From<Settings> +*/ Send + Sync { type Data: std::fmt::Debug + Send + Sync; const ID: &'static str; /// Unwrap the setting type, returning owned data contained within. fn into_inner(self) -> Self::Data; /// Create a new instance of this type containing certain data. fn new(data: Self::Data) -> Self; } /// A website's title, shown in the header. #[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq, Eq)] pub struct SiteName(String); impl Default for SiteName { fn default() -> Self { Self("Kittybox".to_string()) } } impl AsRef<str> for SiteName { fn as_ref(&self) -> &str { self.0.as_str() } } impl private::Sealed for SiteName {} impl Setting<'_> for SiteName { type Data = String; const ID: &'static str = "site_name"; fn into_inner(self) -> String { self.0 } fn new(data: Self::Data) -> Self { Self(data) } } impl SiteName { fn from_str(data: &str) -> Self { Self(data.to_owned()) } } /// Participation status in the IndieWeb Webring: https://πΈπ.ws/dashboard #[derive(Debug, Default, serde::Deserialize, serde::Serialize, Clone, Copy, PartialEq, Eq)] pub struct Webring(bool); impl private::Sealed for Webring {} impl Setting<'_> for Webring { type Data = bool; const ID: &'static str = "webring"; fn into_inner(self) -> Self::Data { self.0 } fn new(data: Self::Data) -> Self { Self(data) } } } /// Error signalled from the database. #[derive(Debug)] pub struct StorageError { msg: std::borrow::Cow<'static, str>, source: Option<Box<dyn std::error::Error + Send + Sync>>, kind: ErrorKind, } impl std::error::Error for StorageError { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { self.source .as_ref() .map(|e| e.as_ref() as &dyn std::error::Error) } } impl From<serde_json::Error> for StorageError { fn from(err: serde_json::Error) -> Self { Self { msg: std::borrow::Cow::Owned(format!("{}", err)), source: Some(Box::new(err)), kind: ErrorKind::JsonParsing, } } } impl std::fmt::Display for StorageError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, "{}: {}", match self.kind { ErrorKind::Backend => "backend error", ErrorKind::JsonParsing => "JSON parsing error", ErrorKind::PermissionDenied => "permission denied", ErrorKind::NotFound => "not found", ErrorKind::BadRequest => "bad request", ErrorKind::Conflict => "conflict with an in-flight request or existing data", ErrorKind::Other => "generic storage layer error", }, self.msg ) } } impl serde::Serialize for StorageError { fn serialize<S: serde::Serializer>( &self, serializer: S, ) -> std::result::Result<S::Ok, S::Error> { serializer.serialize_str(&self.to_string()) } } impl StorageError { /// Create a new StorageError of an ErrorKind with a message. pub fn new(kind: ErrorKind, msg: String) -> Self { Self { msg: Cow::Owned(msg), source: None, kind, } } /// Create a new StorageError of an ErrorKind with a message from /// a static string. /// /// This saves an allocation for a new string and is the preferred /// way in case the error message doesn't change. pub fn from_static(kind: ErrorKind, msg: &'static str) -> Self { Self { msg: Cow::Borrowed(msg), source: None, kind } } /// Create a StorageError using another arbitrary Error as a source. pub fn with_source( kind: ErrorKind, msg: std::borrow::Cow<'static, str>, source: Box<dyn std::error::Error + Send + Sync>, ) -> Self { Self { msg, source: Some(source), kind, } } /// Get the kind of an error. pub fn kind(&self) -> ErrorKind { self.kind } /// Get the message as a string slice. pub fn msg(&self) -> &str { &self.msg } } /// A special Result type for the Micropub backing storage. pub type Result<T> = std::result::Result<T, StorageError>; /// A storage backend for the Micropub server. /// /// Implementations should note that all methods listed on this trait MUST be fully atomic /// or lock the database so that write conflicts or reading half-written data should not occur. #[async_trait] pub trait Storage: std::fmt::Debug + Clone + Send + Sync { /// Return the list of categories used in blog posts of a specified blog. async fn categories(&self, url: &str) -> Result<Vec<String>>; /// Check if a post exists in the database. async fn post_exists(&self, url: &str) -> Result<bool>; /// Load a post from the database in MF2-JSON format, deserialized from JSON. async fn get_post(&self, url: &str) -> Result<Option<serde_json::Value>>; /// Save a post to the database as an MF2-JSON structure. /// /// Note that the `post` object MUST have `post["properties"]["uid"][0]` defined. async fn put_post(&self, post: &'_ serde_json::Value, user: &url::Url) -> Result<()>; /// Add post to feed. Some database implementations might have optimized ways to do this. #[tracing::instrument(skip(self))] async fn add_to_feed(&self, feed: &'_ str, post: &'_ str) -> Result<()> { tracing::debug!("Inserting {} into {} using `update_post`", post, feed); self.update_post(feed, serde_json::from_value( serde_json::json!({"add": {"children": [post]}})).unwrap() ).await } /// Remove post from feed. Some database implementations might have optimized ways to do this. #[tracing::instrument(skip(self))] async fn remove_from_feed(&self, feed: &str, post: &str) -> Result<()> { tracing::debug!("Removing {} into {} using `update_post`", post, feed); self.update_post(feed, serde_json::from_value( serde_json::json!({"delete": {"children": [post]}})).unwrap() ).await } /// Modify a post using an update object as defined in the /// Micropub spec. /// /// Note to implementors: the update operation MUST be atomic and /// SHOULD lock the database to prevent two clients overwriting /// each other's changes or simply corrupting something. Rejecting /// is allowed in case of concurrent updates if waiting for a lock /// cannot be done. async fn update_post(&self, url: &str, update: MicropubUpdate) -> Result<()>; /// Get a list of channels available for the user represented by /// the `user` domain to write to. async fn get_channels(&self, user: &url::Url) -> Result<Vec<MicropubChannel>>; /// Fetch a feed at `url` and return an h-feed object containing /// `limit` posts after a post by url `after`, filtering the content /// in context of a user specified by `user` (or an anonymous user). /// /// This method MUST hydrate the `author` property with an h-card /// from the database by replacing URLs with corresponding h-cards. /// /// When encountering posts which the `user` is not authorized to /// access, this method MUST elide such posts (as an optimization /// for the frontend) and not return them, but still return up to /// `limit` posts (to not reveal the hidden posts' presence). /// /// Note for implementors: if you use streams to fetch posts in /// parallel from the database, preferably make this method use a /// connection pool to reduce overhead of creating a database /// connection per post for parallel fetching. async fn read_feed_with_limit( &self, url: &'_ str, after: Option<&str>, limit: usize, user: Option<&url::Url>, ) -> Result<Option<serde_json::Value>>; /// Fetch a feed at `url` and return an h-feed object containing /// `limit` posts after a `cursor` (filtering the content in /// context of a user specified by `user`, or an anonymous user), /// as well as a new cursor to paginate with. /// /// This method MUST hydrate the `author` property with an h-card /// from the database by replacing URLs with corresponding h-cards. /// /// When encountering posts which the `user` is not authorized to /// access, this method MUST elide such posts (as an optimization /// for the frontend) and not return them, but still return an /// amount of posts as close to `limit` as possible (to avoid /// revealing the existence of the hidden post). /// /// Note for implementors: if you use streams to fetch posts in /// parallel from the database, preferably make this method use a /// connection pool to reduce overhead of creating a database /// connection per post for parallel fetching. async fn read_feed_with_cursor( &self, url: &'_ str, cursor: Option<&'_ str>, limit: usize, user: Option<&url::Url> ) -> Result<Option<(serde_json::Value, Option<String>)>>; /// Deletes a post from the database irreversibly. Must be idempotent. async fn delete_post(&self, url: &'_ str) -> Result<()>; /// Gets a setting from the setting store and passes the result. async fn get_setting<S: Setting<'a>, 'a>(&'_ self, user: &url::Url) -> Result<S>; /// Commits a setting to the setting store. async fn set_setting<S: Setting<'a> + 'a, 'a>(&self, user: &'a url::Url, value: S::Data) -> Result<()>; /// Add (or update) a webmention on a certian post. /// /// The MF2 object describing the webmention content will always /// be of type `h-cite`, and the `uid` property on the object will /// always be set. /// /// The rationale for this function is as follows: webmentions /// might be duplicated, and we need to deduplicate them first. As /// we lack support for transactions and locking posts on the /// database, the only way is to implement the operation on the /// database itself. /// /// Besides, it may even allow for nice tricks like storing the /// webmentions separately and rehydrating them on feed reads. async fn add_or_update_webmention(&self, target: &str, mention_type: MentionType, mention: serde_json::Value) -> Result<()>; } #[cfg(test)] mod tests { use super::settings; use super::{MicropubChannel, Storage}; use kittybox_util::MentionType; use serde_json::json; async fn test_basic_operations<Backend: Storage>(backend: Backend) { let post: serde_json::Value = json!({ "type": ["h-entry"], "properties": { "content": ["Test content"], "author": ["https://fireburn.ru/"], "uid": ["https://fireburn.ru/posts/hello"], "url": ["https://fireburn.ru/posts/hello", "https://fireburn.ru/posts/test"] } }); let key = post["properties"]["uid"][0].as_str().unwrap().to_string(); let alt_url = post["properties"]["url"][1].as_str().unwrap().to_string(); // Reading and writing backend .put_post(&post, &"https://fireburn.ru/".parse().unwrap()) .await .unwrap(); if let Some(returned_post) = backend.get_post(&key).await.unwrap() { assert!(returned_post.is_object()); assert_eq!( returned_post["type"].as_array().unwrap().len(), post["type"].as_array().unwrap().len() ); assert_eq!( returned_post["type"].as_array().unwrap(), post["type"].as_array().unwrap() ); let props: &serde_json::Map<String, serde_json::Value> = post["properties"].as_object().unwrap(); for key in props.keys() { assert_eq!( returned_post["properties"][key].as_array().unwrap(), post["properties"][key].as_array().unwrap() ) } } else { panic!("For some reason the backend did not return the post.") } // Check the alternative URL - it should return the same post if let Ok(Some(returned_post)) = backend.get_post(&alt_url).await { assert!(returned_post.is_object()); assert_eq!( returned_post["type"].as_array().unwrap().len(), post["type"].as_array().unwrap().len() ); assert_eq!( returned_post["type"].as_array().unwrap(), post["type"].as_array().unwrap() ); let props: &serde_json::Map<String, serde_json::Value> = post["properties"].as_object().unwrap(); for key in props.keys() { assert_eq!( returned_post["properties"][key].as_array().unwrap(), post["properties"][key].as_array().unwrap() ) } } else { panic!("For some reason the backend did not return the post.") } } /// Note: this is merely a smoke check and is in no way comprehensive. // TODO updates for feeds must update children using special logic async fn test_update<Backend: Storage>(backend: Backend) { let post: serde_json::Value = json!({ "type": ["h-entry"], "properties": { "content": ["Test content"], "author": ["https://fireburn.ru/"], "uid": ["https://fireburn.ru/posts/hello"], "url": ["https://fireburn.ru/posts/hello", "https://fireburn.ru/posts/test"] } }); let key = post["properties"]["uid"][0].as_str().unwrap().to_string(); // Reading and writing backend .put_post(&post, &"https://fireburn.ru/".parse().unwrap()) .await .unwrap(); backend .update_post( &key, serde_json::from_value(json!({ "url": &key, "add": { "category": ["testing"], }, "replace": { "content": ["Different test content"] } })).unwrap(), ) .await .unwrap(); match backend.get_post(&key).await { Ok(Some(returned_post)) => { assert!(returned_post.is_object()); assert_eq!( returned_post["type"].as_array().unwrap().len(), post["type"].as_array().unwrap().len() ); assert_eq!( returned_post["type"].as_array().unwrap(), post["type"].as_array().unwrap() ); assert_eq!( returned_post["properties"]["content"][0].as_str().unwrap(), "Different test content" ); assert_eq!( returned_post["properties"]["category"].as_array().unwrap(), &vec![json!("testing")] ); } something_else => { something_else .expect("Shouldn't error") .expect("Should have the post"); } } } async fn test_get_channel_list<Backend: Storage>(backend: Backend) { let feed = json!({ "type": ["h-feed"], "properties": { "name": ["Main Page"], "author": ["https://fireburn.ru/"], "uid": ["https://fireburn.ru/feeds/main"] }, "children": [] }); backend .put_post(&feed, &"https://fireburn.ru/".parse().unwrap()) .await .unwrap(); let chans = backend.get_channels(&"https://fireburn.ru/".parse().unwrap()).await.unwrap(); assert_eq!(chans.len(), 1); assert_eq!( chans[0], MicropubChannel { uid: "https://fireburn.ru/feeds/main".to_string(), name: "Main Page".to_string() } ); } async fn test_settings<Backend: Storage>(backend: Backend) { backend .set_setting::<settings::SiteName>( &"https://fireburn.ru/".parse().unwrap(), "Vika's Hideout".to_owned() ) .await .unwrap(); assert_eq!( backend .get_setting::<settings::SiteName>(&"https://fireburn.ru/".parse().unwrap()) .await .unwrap() .as_ref(), "Vika's Hideout" ); } fn gen_random_post(domain: &str) -> serde_json::Value { use faker_rand::lorem::{Paragraphs, Word}; let uid = format!( "https://{domain}/posts/{}-{}-{}", rand::random::<Word>(), rand::random::<Word>(), rand::random::<Word>() ); let time = chrono::Local::now().to_rfc3339(); let post = json!({ "type": ["h-entry"], "properties": { "content": [rand::random::<Paragraphs>().to_string()], "uid": [&uid], "url": [&uid], "published": [&time] } }); post } fn gen_random_mention(domain: &str, mention_type: MentionType, url: &str) -> serde_json::Value { use faker_rand::lorem::{Paragraphs, Word}; let uid = format!( "https://{domain}/posts/{}-{}-{}", rand::random::<Word>(), rand::random::<Word>(), rand::random::<Word>() ); let time = chrono::Local::now().to_rfc3339(); let post = json!({ "type": ["h-cite"], "properties": { "content": [rand::random::<Paragraphs>().to_string()], "uid": [&uid], "url": [&uid], "published": [&time], (match mention_type { MentionType::Reply => "in-reply-to", MentionType::Like => "like-of", MentionType::Repost => "repost-of", MentionType::Bookmark => "bookmark-of", MentionType::Mention => unimplemented!(), }): [url] } }); post } async fn test_feed_pagination<Backend: Storage>(backend: Backend) { let posts = { let mut posts = std::iter::from_fn( || Some(gen_random_post("fireburn.ru")) ) .take(40) .collect::<Vec<serde_json::Value>>(); // Reverse the array so it's in reverse-chronological order posts.reverse(); posts }; let feed = json!({ "type": ["h-feed"], "properties": { "name": ["Main Page"], "author": ["https://fireburn.ru/"], "uid": ["https://fireburn.ru/feeds/main"] }, }); let key = feed["properties"]["uid"][0].as_str().unwrap(); backend .put_post(&feed, &"https://fireburn.ru/".parse().unwrap()) .await .unwrap(); for (i, post) in posts.iter().rev().enumerate() { backend .put_post(post, &"https://fireburn.ru/".parse().unwrap()) .await .unwrap(); backend.add_to_feed(key, post["properties"]["uid"][0].as_str().unwrap()).await.unwrap(); } let limit: usize = 10; tracing::debug!("Starting feed reading..."); let (result, cursor) = backend .read_feed_with_cursor(key, None, limit, None) .await .unwrap() .unwrap(); assert_eq!(result["children"].as_array().unwrap().len(), limit); assert_eq!( result["children"] .as_array() .unwrap() .iter() .map(|post| post["properties"]["uid"][0].as_str().unwrap()) .collect::<Vec<_>>() [0..10], posts .iter() .map(|post| post["properties"]["uid"][0].as_str().unwrap()) .collect::<Vec<_>>() [0..10] ); tracing::debug!("Continuing with cursor: {:?}", cursor); let (result2, cursor2) = backend .read_feed_with_cursor( key, cursor.as_deref(), limit, None, ) .await .unwrap() .unwrap(); assert_eq!( result2["children"].as_array().unwrap()[0..10], posts[10..20] ); tracing::debug!("Continuing with cursor: {:?}", cursor); let (result3, cursor3) = backend .read_feed_with_cursor( key, cursor2.as_deref(), limit, None, ) .await .unwrap() .unwrap(); assert_eq!( result3["children"].as_array().unwrap()[0..10], posts[20..30] ); tracing::debug!("Continuing with cursor: {:?}", cursor); let (result4, _) = backend .read_feed_with_cursor( key, cursor3.as_deref(), limit, None, ) .await .unwrap() .unwrap(); assert_eq!( result4["children"].as_array().unwrap()[0..10], posts[30..40] ); // Regression test for #4 // // Results for a bogus cursor are undefined, so we aren't // checking them. But the function at least shouldn't hang. let nonsense_after = Some("1010101010"); let _ = tokio::time::timeout(tokio::time::Duration::from_secs(10), async move { backend .read_feed_with_cursor(key, nonsense_after, limit, None) .await }) .await .expect("Operation should not hang: see https://gitlab.com/kittybox/kittybox/-/issues/4"); } async fn test_webmention_addition<Backend: Storage>(db: Backend) { let post = gen_random_post("fireburn.ru"); db.put_post(&post, &"https://fireburn.ru/".parse().unwrap()).await.unwrap(); const TYPE: MentionType = MentionType::Reply; let target = post["properties"]["uid"][0].as_str().unwrap(); let mut reply = gen_random_mention("aaronparecki.com", TYPE, target); let (read_post, _) = db.read_feed_with_cursor(target, None, 20, None).await.unwrap().unwrap(); assert_eq!(post, read_post); db.add_or_update_webmention(target, TYPE, reply.clone()).await.unwrap(); let (read_post, _) = db.read_feed_with_cursor(target, None, 20, None).await.unwrap().unwrap(); assert_eq!(read_post["properties"]["comment"][0], reply); reply["properties"]["content"][0] = json!(rand::random::<faker_rand::lorem::Paragraphs>().to_string()); db.add_or_update_webmention(target, TYPE, reply.clone()).await.unwrap(); let (read_post, _) = db.read_feed_with_cursor(target, None, 20, None).await.unwrap().unwrap(); assert_eq!(read_post["properties"]["comment"][0], reply); } async fn test_pretty_permalinks<Backend: Storage>(db: Backend) { const PERMALINK: &str = "https://fireburn.ru/posts/pretty-permalink"; let post = { let mut post = gen_random_post("fireburn.ru"); let urls = post["properties"]["url"].as_array_mut().unwrap(); urls.push(serde_json::Value::String( PERMALINK.to_owned() )); post }; db.put_post(&post, &"https://fireburn.ru/".parse().unwrap()).await.unwrap(); for i in post["properties"]["url"].as_array().unwrap() { let (read_post, _) = db.read_feed_with_cursor(i.as_str().unwrap(), None, 20, None).await.unwrap().unwrap(); assert_eq!(read_post, post); } } /// Automatically generates a test suite for macro_rules! test_all { ($func_name:ident, $mod_name:ident) => { mod $mod_name { $func_name!(test_basic_operations); $func_name!(test_get_channel_list); $func_name!(test_settings); $func_name!(test_update); $func_name!(test_feed_pagination); $func_name!(test_webmention_addition); $func_name!(test_pretty_permalinks); } }; } macro_rules! file_test { ($func_name:ident) => { #[tokio::test] #[tracing_test::traced_test] async fn $func_name() { let tempdir = tempfile::tempdir().expect("Failed to create tempdir"); let backend = super::super::FileStorage::new( tempdir.path().to_path_buf() ) .await .unwrap(); super::$func_name(backend).await } }; } macro_rules! postgres_test { ($func_name:ident) => { #[cfg(feature = "sqlx")] #[sqlx::test] #[tracing_test::traced_test] async fn $func_name( pool_opts: sqlx::postgres::PgPoolOptions, connect_opts: sqlx::postgres::PgConnectOptions ) -> Result<(), sqlx::Error> { let db = { //use sqlx::ConnectOptions; //connect_opts.log_statements(log::LevelFilter::Debug); pool_opts.connect_with(connect_opts).await? }; let backend = super::super::PostgresStorage::from_pool(db).await.unwrap(); Ok(super::$func_name(backend).await) } }; } test_all!(file_test, file); test_all!(postgres_test, postgres); }