diff options
author | Vika <vika@fireburn.ru> | 2023-07-29 21:59:56 +0300 |
---|---|---|
committer | Vika <vika@fireburn.ru> | 2023-07-29 21:59:56 +0300 |
commit | 0617663b249f9ca488e5de652108b17d67fbaf45 (patch) | |
tree | 11564b6c8fa37bf9203a0a4cc1c4e9cc088cb1a5 /src/database/mod.rs | |
parent | 26c2b79f6a6380ae3224e9309b9f3352f5717bd7 (diff) | |
download | kittybox-0617663b249f9ca488e5de652108b17d67fbaf45.tar.zst |
Moved the entire Kittybox tree into the root
Diffstat (limited to 'src/database/mod.rs')
-rw-r--r-- | src/database/mod.rs | 793 |
1 files changed, 793 insertions, 0 deletions
diff --git a/src/database/mod.rs b/src/database/mod.rs new file mode 100644 index 0000000..b4b70b2 --- /dev/null +++ b/src/database/mod.rs @@ -0,0 +1,793 @@ +#![warn(missing_docs)] +use std::borrow::Cow; + +use async_trait::async_trait; +use kittybox_util::MentionType; + +mod file; +pub use crate::database::file::FileStorage; +use crate::micropub::MicropubUpdate; +#[cfg(feature = "postgres")] +mod postgres; +#[cfg(feature = "postgres")] +pub use postgres::PostgresStorage; + +#[cfg(test)] +mod memory; +#[cfg(test)] +pub use crate::database::memory::MemoryStorage; + +pub use kittybox_util::MicropubChannel; + +use self::settings::Setting; + +/// Enum representing different errors that might occur during the database query. +#[derive(Debug, Clone, Copy)] +pub enum ErrorKind { + /// Backend error (e.g. database connection error) + Backend, + /// Error due to insufficient contextual permissions for the query + PermissionDenied, + /// Error due to the database being unable to parse JSON returned from the backing storage. + /// Usually indicative of someone fiddling with the database manually instead of using proper tools. + JsonParsing, + /// - ErrorKind::NotFound - equivalent to a 404 error. Note, some requests return an Option, + /// in which case None is also equivalent to a 404. + NotFound, + /// The user's query or request to the database was malformed. Used whenever the database processes + /// the user's query directly, such as when editing posts inside of the database (e.g. Redis backend) + BadRequest, + /// the user's query collided with an in-flight request and needs to be retried + Conflict, + /// - ErrorKind::Other - when something so weird happens that it becomes undescribable. + Other, +} + +/// Settings that can be stored in the database. +pub mod settings { + mod private { + pub trait Sealed {} + } + + /// A trait for various settings that should be contained here. + /// + /// **Note**: this trait is sealed to prevent external + /// implementations, as it wouldn't make sense to add new settings + /// that aren't used by Kittybox itself. + pub trait Setting<'de>: private::Sealed + std::fmt::Debug + Default + Clone + serde::Serialize + serde::de::DeserializeOwned + /*From<Settings> +*/ Send + Sync { + type Data: std::fmt::Debug + Send + Sync; + const ID: &'static str; + + /// Unwrap the setting type, returning owned data contained within. + fn into_inner(self) -> Self::Data; + /// Create a new instance of this type containing certain data. + fn new(data: Self::Data) -> Self; + } + + /// A website's title, shown in the header. + #[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq, Eq)] + pub struct SiteName(String); + impl Default for SiteName { + fn default() -> Self { + Self("Kittybox".to_string()) + } + } + impl AsRef<str> for SiteName { + fn as_ref(&self) -> &str { + self.0.as_str() + } + } + impl private::Sealed for SiteName {} + impl Setting<'_> for SiteName { + type Data = String; + const ID: &'static str = "site_name"; + + fn into_inner(self) -> String { + self.0 + } + fn new(data: Self::Data) -> Self { + Self(data) + } + } + impl SiteName { + fn from_str(data: &str) -> Self { + Self(data.to_owned()) + } + } + + /// Participation status in the IndieWeb Webring: https://πΈπ.ws/dashboard + #[derive(Debug, Default, serde::Deserialize, serde::Serialize, Clone, Copy, PartialEq, Eq)] + pub struct Webring(bool); + impl private::Sealed for Webring {} + impl Setting<'_> for Webring { + type Data = bool; + const ID: &'static str = "webring"; + + fn into_inner(self) -> Self::Data { + self.0 + } + + fn new(data: Self::Data) -> Self { + Self(data) + } + } +} + +/// Error signalled from the database. +#[derive(Debug)] +pub struct StorageError { + msg: std::borrow::Cow<'static, str>, + source: Option<Box<dyn std::error::Error + Send + Sync>>, + kind: ErrorKind, +} + +impl std::error::Error for StorageError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + self.source + .as_ref() + .map(|e| e.as_ref() as &dyn std::error::Error) + } +} +impl From<serde_json::Error> for StorageError { + fn from(err: serde_json::Error) -> Self { + Self { + msg: std::borrow::Cow::Owned(format!("{}", err)), + source: Some(Box::new(err)), + kind: ErrorKind::JsonParsing, + } + } +} +impl std::fmt::Display for StorageError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}: {}", + match self.kind { + ErrorKind::Backend => "backend error", + ErrorKind::JsonParsing => "JSON parsing error", + ErrorKind::PermissionDenied => "permission denied", + ErrorKind::NotFound => "not found", + ErrorKind::BadRequest => "bad request", + ErrorKind::Conflict => "conflict with an in-flight request or existing data", + ErrorKind::Other => "generic storage layer error", + }, + self.msg + ) + } +} +impl serde::Serialize for StorageError { + fn serialize<S: serde::Serializer>( + &self, + serializer: S, + ) -> std::result::Result<S::Ok, S::Error> { + serializer.serialize_str(&self.to_string()) + } +} +impl StorageError { + /// Create a new StorageError of an ErrorKind with a message. + pub fn new(kind: ErrorKind, msg: String) -> Self { + Self { + msg: Cow::Owned(msg), + source: None, + kind, + } + } + /// Create a new StorageError of an ErrorKind with a message from + /// a static string. + /// + /// This saves an allocation for a new string and is the preferred + /// way in case the error message doesn't change. + pub fn from_static(kind: ErrorKind, msg: &'static str) -> Self { + Self { + msg: Cow::Borrowed(msg), + source: None, + kind + } + } + /// Create a StorageError using another arbitrary Error as a source. + pub fn with_source( + kind: ErrorKind, + msg: std::borrow::Cow<'static, str>, + source: Box<dyn std::error::Error + Send + Sync>, + ) -> Self { + Self { + msg, + source: Some(source), + kind, + } + } + /// Get the kind of an error. + pub fn kind(&self) -> ErrorKind { + self.kind + } + /// Get the message as a string slice. + pub fn msg(&self) -> &str { + &self.msg + } +} + +/// A special Result type for the Micropub backing storage. +pub type Result<T> = std::result::Result<T, StorageError>; + +/// A storage backend for the Micropub server. +/// +/// Implementations should note that all methods listed on this trait MUST be fully atomic +/// or lock the database so that write conflicts or reading half-written data should not occur. +#[async_trait] +pub trait Storage: std::fmt::Debug + Clone + Send + Sync { + /// Check if a post exists in the database. + async fn post_exists(&self, url: &str) -> Result<bool>; + + /// Load a post from the database in MF2-JSON format, deserialized from JSON. + async fn get_post(&self, url: &str) -> Result<Option<serde_json::Value>>; + + /// Save a post to the database as an MF2-JSON structure. + /// + /// Note that the `post` object MUST have `post["properties"]["uid"][0]` defined. + async fn put_post(&self, post: &'_ serde_json::Value, user: &'_ str) -> Result<()>; + + /// Add post to feed. Some database implementations might have optimized ways to do this. + #[tracing::instrument(skip(self))] + async fn add_to_feed(&self, feed: &'_ str, post: &'_ str) -> Result<()> { + tracing::debug!("Inserting {} into {} using `update_post`", post, feed); + self.update_post(feed, serde_json::from_value( + serde_json::json!({"add": {"children": [post]}})).unwrap() + ).await + } + /// Remove post from feed. Some database implementations might have optimized ways to do this. + #[tracing::instrument(skip(self))] + async fn remove_from_feed(&self, feed: &'_ str, post: &'_ str) -> Result<()> { + tracing::debug!("Removing {} into {} using `update_post`", post, feed); + self.update_post(feed, serde_json::from_value( + serde_json::json!({"delete": {"children": [post]}})).unwrap() + ).await + } + + /// Modify a post using an update object as defined in the + /// Micropub spec. + /// + /// Note to implementors: the update operation MUST be atomic and + /// SHOULD lock the database to prevent two clients overwriting + /// each other's changes or simply corrupting something. Rejecting + /// is allowed in case of concurrent updates if waiting for a lock + /// cannot be done. + async fn update_post(&self, url: &str, update: MicropubUpdate) -> Result<()>; + + /// Get a list of channels available for the user represented by + /// the `user` domain to write to. + async fn get_channels(&self, user: &'_ str) -> Result<Vec<MicropubChannel>>; + + /// Fetch a feed at `url` and return an h-feed object containing + /// `limit` posts after a post by url `after`, filtering the content + /// in context of a user specified by `user` (or an anonymous user). + /// + /// This method MUST hydrate the `author` property with an h-card + /// from the database by replacing URLs with corresponding h-cards. + /// + /// When encountering posts which the `user` is not authorized to + /// access, this method MUST elide such posts (as an optimization + /// for the frontend) and not return them, but still return up to + /// `limit` posts (to not reveal the hidden posts' presence). + /// + /// Note for implementors: if you use streams to fetch posts in + /// parallel from the database, preferably make this method use a + /// connection pool to reduce overhead of creating a database + /// connection per post for parallel fetching. + async fn read_feed_with_limit( + &self, + url: &'_ str, + after: &'_ Option<String>, + limit: usize, + user: &'_ Option<String>, + ) -> Result<Option<serde_json::Value>>; + + /// Fetch a feed at `url` and return an h-feed object containing + /// `limit` posts after a `cursor` (filtering the content in + /// context of a user specified by `user`, or an anonymous user), + /// as well as a new cursor to paginate with. + /// + /// This method MUST hydrate the `author` property with an h-card + /// from the database by replacing URLs with corresponding h-cards. + /// + /// When encountering posts which the `user` is not authorized to + /// access, this method MUST elide such posts (as an optimization + /// for the frontend) and not return them, but still return an + /// amount of posts as close to `limit` as possible (to avoid + /// revealing the existence of the hidden post). + /// + /// Note for implementors: if you use streams to fetch posts in + /// parallel from the database, preferably make this method use a + /// connection pool to reduce overhead of creating a database + /// connection per post for parallel fetching. + async fn read_feed_with_cursor( + &self, + url: &'_ str, + cursor: Option<&'_ str>, + limit: usize, + user: Option<&'_ str> + ) -> Result<Option<(serde_json::Value, Option<String>)>>; + + /// Deletes a post from the database irreversibly. Must be idempotent. + async fn delete_post(&self, url: &'_ str) -> Result<()>; + + /// Gets a setting from the setting store and passes the result. + async fn get_setting<S: Setting<'a>, 'a>(&'_ self, user: &'_ str) -> Result<S>; + + /// Commits a setting to the setting store. + async fn set_setting<S: Setting<'a> + 'a, 'a>(&self, user: &'a str, value: S::Data) -> Result<()>; + + /// Add (or update) a webmention on a certian post. + /// + /// The MF2 object describing the webmention content will always + /// be of type `h-cite`, and the `uid` property on the object will + /// always be set. + /// + /// The rationale for this function is as follows: webmentions + /// might be duplicated, and we need to deduplicate them first. As + /// we lack support for transactions and locking posts on the + /// database, the only way is to implement the operation on the + /// database itself. + /// + /// Besides, it may even allow for nice tricks like storing the + /// webmentions separately and rehydrating them on feed reads. + async fn add_or_update_webmention(&self, target: &str, mention_type: MentionType, mention: serde_json::Value) -> Result<()>; +} + +#[cfg(test)] +mod tests { + use super::settings; + + use super::{MicropubChannel, Storage}; + use kittybox_util::MentionType; + use serde_json::json; + + async fn test_basic_operations<Backend: Storage>(backend: Backend) { + let post: serde_json::Value = json!({ + "type": ["h-entry"], + "properties": { + "content": ["Test content"], + "author": ["https://fireburn.ru/"], + "uid": ["https://fireburn.ru/posts/hello"], + "url": ["https://fireburn.ru/posts/hello", "https://fireburn.ru/posts/test"] + } + }); + let key = post["properties"]["uid"][0].as_str().unwrap().to_string(); + let alt_url = post["properties"]["url"][1].as_str().unwrap().to_string(); + + // Reading and writing + backend + .put_post(&post, "fireburn.ru") + .await + .unwrap(); + if let Some(returned_post) = backend.get_post(&key).await.unwrap() { + assert!(returned_post.is_object()); + assert_eq!( + returned_post["type"].as_array().unwrap().len(), + post["type"].as_array().unwrap().len() + ); + assert_eq!( + returned_post["type"].as_array().unwrap(), + post["type"].as_array().unwrap() + ); + let props: &serde_json::Map<String, serde_json::Value> = + post["properties"].as_object().unwrap(); + for key in props.keys() { + assert_eq!( + returned_post["properties"][key].as_array().unwrap(), + post["properties"][key].as_array().unwrap() + ) + } + } else { + panic!("For some reason the backend did not return the post.") + } + // Check the alternative URL - it should return the same post + if let Ok(Some(returned_post)) = backend.get_post(&alt_url).await { + assert!(returned_post.is_object()); + assert_eq!( + returned_post["type"].as_array().unwrap().len(), + post["type"].as_array().unwrap().len() + ); + assert_eq!( + returned_post["type"].as_array().unwrap(), + post["type"].as_array().unwrap() + ); + let props: &serde_json::Map<String, serde_json::Value> = + post["properties"].as_object().unwrap(); + for key in props.keys() { + assert_eq!( + returned_post["properties"][key].as_array().unwrap(), + post["properties"][key].as_array().unwrap() + ) + } + } else { + panic!("For some reason the backend did not return the post.") + } + } + + /// Note: this is merely a smoke check and is in no way comprehensive. + // TODO updates for feeds must update children using special logic + async fn test_update<Backend: Storage>(backend: Backend) { + let post: serde_json::Value = json!({ + "type": ["h-entry"], + "properties": { + "content": ["Test content"], + "author": ["https://fireburn.ru/"], + "uid": ["https://fireburn.ru/posts/hello"], + "url": ["https://fireburn.ru/posts/hello", "https://fireburn.ru/posts/test"] + } + }); + let key = post["properties"]["uid"][0].as_str().unwrap().to_string(); + + // Reading and writing + backend + .put_post(&post, "fireburn.ru") + .await + .unwrap(); + + backend + .update_post( + &key, + serde_json::from_value(json!({ + "url": &key, + "add": { + "category": ["testing"], + }, + "replace": { + "content": ["Different test content"] + } + })).unwrap(), + ) + .await + .unwrap(); + + match backend.get_post(&key).await { + Ok(Some(returned_post)) => { + assert!(returned_post.is_object()); + assert_eq!( + returned_post["type"].as_array().unwrap().len(), + post["type"].as_array().unwrap().len() + ); + assert_eq!( + returned_post["type"].as_array().unwrap(), + post["type"].as_array().unwrap() + ); + assert_eq!( + returned_post["properties"]["content"][0].as_str().unwrap(), + "Different test content" + ); + assert_eq!( + returned_post["properties"]["category"].as_array().unwrap(), + &vec![json!("testing")] + ); + } + something_else => { + something_else + .expect("Shouldn't error") + .expect("Should have the post"); + } + } + } + + async fn test_get_channel_list<Backend: Storage>(backend: Backend) { + let feed = json!({ + "type": ["h-feed"], + "properties": { + "name": ["Main Page"], + "author": ["https://fireburn.ru/"], + "uid": ["https://fireburn.ru/feeds/main"] + }, + "children": [] + }); + backend + .put_post(&feed, "fireburn.ru") + .await + .unwrap(); + let chans = backend.get_channels("fireburn.ru").await.unwrap(); + assert_eq!(chans.len(), 1); + assert_eq!( + chans[0], + MicropubChannel { + uid: "https://fireburn.ru/feeds/main".to_string(), + name: "Main Page".to_string() + } + ); + } + + async fn test_settings<Backend: Storage>(backend: Backend) { + backend + .set_setting::<settings::SiteName>( + "https://fireburn.ru/", + "Vika's Hideout".to_owned() + ) + .await + .unwrap(); + assert_eq!( + backend + .get_setting::<settings::SiteName>("https://fireburn.ru/") + .await + .unwrap() + .as_ref(), + "Vika's Hideout" + ); + } + + fn gen_random_post(domain: &str) -> serde_json::Value { + use faker_rand::lorem::{Paragraphs, Word}; + + let uid = format!( + "https://{domain}/posts/{}-{}-{}", + rand::random::<Word>(), + rand::random::<Word>(), + rand::random::<Word>() + ); + + let time = chrono::Local::now().to_rfc3339(); + let post = json!({ + "type": ["h-entry"], + "properties": { + "content": [rand::random::<Paragraphs>().to_string()], + "uid": [&uid], + "url": [&uid], + "published": [&time] + } + }); + + post + } + + fn gen_random_mention(domain: &str, mention_type: MentionType, url: &str) -> serde_json::Value { + use faker_rand::lorem::{Paragraphs, Word}; + + let uid = format!( + "https://{domain}/posts/{}-{}-{}", + rand::random::<Word>(), + rand::random::<Word>(), + rand::random::<Word>() + ); + + let time = chrono::Local::now().to_rfc3339(); + let post = json!({ + "type": ["h-cite"], + "properties": { + "content": [rand::random::<Paragraphs>().to_string()], + "uid": [&uid], + "url": [&uid], + "published": [&time], + (match mention_type { + MentionType::Reply => "in-reply-to", + MentionType::Like => "like-of", + MentionType::Repost => "repost-of", + MentionType::Bookmark => "bookmark-of", + MentionType::Mention => unimplemented!(), + }): [url] + } + }); + + post + } + + async fn test_feed_pagination<Backend: Storage>(backend: Backend) { + let posts = { + let mut posts = std::iter::from_fn( + || Some(gen_random_post("fireburn.ru")) + ) + .take(40) + .collect::<Vec<serde_json::Value>>(); + + // Reverse the array so it's in reverse-chronological order + posts.reverse(); + + posts + }; + + let feed = json!({ + "type": ["h-feed"], + "properties": { + "name": ["Main Page"], + "author": ["https://fireburn.ru/"], + "uid": ["https://fireburn.ru/feeds/main"] + }, + }); + let key = feed["properties"]["uid"][0].as_str().unwrap(); + + backend + .put_post(&feed, "fireburn.ru") + .await + .unwrap(); + + for (i, post) in posts.iter().rev().enumerate() { + backend + .put_post(post, "fireburn.ru") + .await + .unwrap(); + backend.add_to_feed(key, post["properties"]["uid"][0].as_str().unwrap()).await.unwrap(); + } + + let limit: usize = 10; + + tracing::debug!("Starting feed reading..."); + let (result, cursor) = backend + .read_feed_with_cursor(key, None, limit, None) + .await + .unwrap() + .unwrap(); + + assert_eq!(result["children"].as_array().unwrap().len(), limit); + assert_eq!( + result["children"] + .as_array() + .unwrap() + .iter() + .map(|post| post["properties"]["uid"][0].as_str().unwrap()) + .collect::<Vec<_>>() + [0..10], + posts + .iter() + .map(|post| post["properties"]["uid"][0].as_str().unwrap()) + .collect::<Vec<_>>() + [0..10] + ); + + tracing::debug!("Continuing with cursor: {:?}", cursor); + let (result2, cursor2) = backend + .read_feed_with_cursor( + key, + cursor.as_deref(), + limit, + None, + ) + .await + .unwrap() + .unwrap(); + + assert_eq!( + result2["children"].as_array().unwrap()[0..10], + posts[10..20] + ); + + tracing::debug!("Continuing with cursor: {:?}", cursor); + let (result3, cursor3) = backend + .read_feed_with_cursor( + key, + cursor2.as_deref(), + limit, + None, + ) + .await + .unwrap() + .unwrap(); + + assert_eq!( + result3["children"].as_array().unwrap()[0..10], + posts[20..30] + ); + + tracing::debug!("Continuing with cursor: {:?}", cursor); + let (result4, _) = backend + .read_feed_with_cursor( + key, + cursor3.as_deref(), + limit, + None, + ) + .await + .unwrap() + .unwrap(); + + assert_eq!( + result4["children"].as_array().unwrap()[0..10], + posts[30..40] + ); + + // Regression test for #4 + // + // Results for a bogus cursor are undefined, so we aren't + // checking them. But the function at least shouldn't hang. + let nonsense_after = Some("1010101010"); + let _ = tokio::time::timeout(tokio::time::Duration::from_secs(10), async move { + backend + .read_feed_with_cursor(key, nonsense_after, limit, None) + .await + }) + .await + .expect("Operation should not hang: see https://gitlab.com/kittybox/kittybox/-/issues/4"); + } + + async fn test_webmention_addition<Backend: Storage>(db: Backend) { + let post = gen_random_post("fireburn.ru"); + + db.put_post(&post, "fireburn.ru").await.unwrap(); + const TYPE: MentionType = MentionType::Reply; + + let target = post["properties"]["uid"][0].as_str().unwrap(); + let mut reply = gen_random_mention("aaronparecki.com", TYPE, target); + + let (read_post, _) = db.read_feed_with_cursor(target, None, 20, None).await.unwrap().unwrap(); + assert_eq!(post, read_post); + + db.add_or_update_webmention(target, TYPE, reply.clone()).await.unwrap(); + + let (read_post, _) = db.read_feed_with_cursor(target, None, 20, None).await.unwrap().unwrap(); + assert_eq!(read_post["properties"]["comment"][0], reply); + + reply["properties"]["content"][0] = json!(rand::random::<faker_rand::lorem::Paragraphs>().to_string()); + + db.add_or_update_webmention(target, TYPE, reply.clone()).await.unwrap(); + let (read_post, _) = db.read_feed_with_cursor(target, None, 20, None).await.unwrap().unwrap(); + assert_eq!(read_post["properties"]["comment"][0], reply); + } + + async fn test_pretty_permalinks<Backend: Storage>(db: Backend) { + const PERMALINK: &str = "https://fireburn.ru/posts/pretty-permalink"; + + let post = { + let mut post = gen_random_post("fireburn.ru"); + let urls = post["properties"]["url"].as_array_mut().unwrap(); + urls.push(serde_json::Value::String( + PERMALINK.to_owned() + )); + + post + }; + db.put_post(&post, "fireburn.ru").await.unwrap(); + + for i in post["properties"]["url"].as_array().unwrap() { + let (read_post, _) = db.read_feed_with_cursor(i.as_str().unwrap(), None, 20, None).await.unwrap().unwrap(); + assert_eq!(read_post, post); + } + } + /// Automatically generates a test suite for + macro_rules! test_all { + ($func_name:ident, $mod_name:ident) => { + mod $mod_name { + $func_name!(test_basic_operations); + $func_name!(test_get_channel_list); + $func_name!(test_settings); + $func_name!(test_update); + $func_name!(test_feed_pagination); + $func_name!(test_webmention_addition); + $func_name!(test_pretty_permalinks); + } + }; + } + macro_rules! file_test { + ($func_name:ident) => { + #[tokio::test] + #[tracing_test::traced_test] + async fn $func_name() { + let tempdir = tempfile::tempdir().expect("Failed to create tempdir"); + let backend = super::super::FileStorage::new( + tempdir.path().to_path_buf() + ) + .await + .unwrap(); + super::$func_name(backend).await + } + }; + } + + macro_rules! postgres_test { + ($func_name:ident) => { + #[cfg(feature = "sqlx")] + #[sqlx::test] + #[tracing_test::traced_test] + async fn $func_name( + pool_opts: sqlx::postgres::PgPoolOptions, + connect_opts: sqlx::postgres::PgConnectOptions + ) -> Result<(), sqlx::Error> { + let db = { + //use sqlx::ConnectOptions; + //connect_opts.log_statements(log::LevelFilter::Debug); + + pool_opts.connect_with(connect_opts).await? + }; + let backend = super::super::PostgresStorage::from_pool(db).await.unwrap(); + + Ok(super::$func_name(backend).await) + } + }; + } + + test_all!(file_test, file); + test_all!(postgres_test, postgres); +} |