diff options
Diffstat (limited to 'kittybox-rs/src/database/mod.rs')
-rw-r--r-- | kittybox-rs/src/database/mod.rs | 793 |
1 files changed, 0 insertions, 793 deletions
diff --git a/kittybox-rs/src/database/mod.rs b/kittybox-rs/src/database/mod.rs deleted file mode 100644 index b4b70b2..0000000 --- a/kittybox-rs/src/database/mod.rs +++ /dev/null @@ -1,793 +0,0 @@ -#![warn(missing_docs)] -use std::borrow::Cow; - -use async_trait::async_trait; -use kittybox_util::MentionType; - -mod file; -pub use crate::database::file::FileStorage; -use crate::micropub::MicropubUpdate; -#[cfg(feature = "postgres")] -mod postgres; -#[cfg(feature = "postgres")] -pub use postgres::PostgresStorage; - -#[cfg(test)] -mod memory; -#[cfg(test)] -pub use crate::database::memory::MemoryStorage; - -pub use kittybox_util::MicropubChannel; - -use self::settings::Setting; - -/// Enum representing different errors that might occur during the database query. -#[derive(Debug, Clone, Copy)] -pub enum ErrorKind { - /// Backend error (e.g. database connection error) - Backend, - /// Error due to insufficient contextual permissions for the query - PermissionDenied, - /// Error due to the database being unable to parse JSON returned from the backing storage. - /// Usually indicative of someone fiddling with the database manually instead of using proper tools. - JsonParsing, - /// - ErrorKind::NotFound - equivalent to a 404 error. Note, some requests return an Option, - /// in which case None is also equivalent to a 404. - NotFound, - /// The user's query or request to the database was malformed. Used whenever the database processes - /// the user's query directly, such as when editing posts inside of the database (e.g. Redis backend) - BadRequest, - /// the user's query collided with an in-flight request and needs to be retried - Conflict, - /// - ErrorKind::Other - when something so weird happens that it becomes undescribable. - Other, -} - -/// Settings that can be stored in the database. -pub mod settings { - mod private { - pub trait Sealed {} - } - - /// A trait for various settings that should be contained here. - /// - /// **Note**: this trait is sealed to prevent external - /// implementations, as it wouldn't make sense to add new settings - /// that aren't used by Kittybox itself. - pub trait Setting<'de>: private::Sealed + std::fmt::Debug + Default + Clone + serde::Serialize + serde::de::DeserializeOwned + /*From<Settings> +*/ Send + Sync { - type Data: std::fmt::Debug + Send + Sync; - const ID: &'static str; - - /// Unwrap the setting type, returning owned data contained within. - fn into_inner(self) -> Self::Data; - /// Create a new instance of this type containing certain data. - fn new(data: Self::Data) -> Self; - } - - /// A website's title, shown in the header. - #[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq, Eq)] - pub struct SiteName(String); - impl Default for SiteName { - fn default() -> Self { - Self("Kittybox".to_string()) - } - } - impl AsRef<str> for SiteName { - fn as_ref(&self) -> &str { - self.0.as_str() - } - } - impl private::Sealed for SiteName {} - impl Setting<'_> for SiteName { - type Data = String; - const ID: &'static str = "site_name"; - - fn into_inner(self) -> String { - self.0 - } - fn new(data: Self::Data) -> Self { - Self(data) - } - } - impl SiteName { - fn from_str(data: &str) -> Self { - Self(data.to_owned()) - } - } - - /// Participation status in the IndieWeb Webring: https://πΈπ.ws/dashboard - #[derive(Debug, Default, serde::Deserialize, serde::Serialize, Clone, Copy, PartialEq, Eq)] - pub struct Webring(bool); - impl private::Sealed for Webring {} - impl Setting<'_> for Webring { - type Data = bool; - const ID: &'static str = "webring"; - - fn into_inner(self) -> Self::Data { - self.0 - } - - fn new(data: Self::Data) -> Self { - Self(data) - } - } -} - -/// Error signalled from the database. -#[derive(Debug)] -pub struct StorageError { - msg: std::borrow::Cow<'static, str>, - source: Option<Box<dyn std::error::Error + Send + Sync>>, - kind: ErrorKind, -} - -impl std::error::Error for StorageError { - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - self.source - .as_ref() - .map(|e| e.as_ref() as &dyn std::error::Error) - } -} -impl From<serde_json::Error> for StorageError { - fn from(err: serde_json::Error) -> Self { - Self { - msg: std::borrow::Cow::Owned(format!("{}", err)), - source: Some(Box::new(err)), - kind: ErrorKind::JsonParsing, - } - } -} -impl std::fmt::Display for StorageError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "{}: {}", - match self.kind { - ErrorKind::Backend => "backend error", - ErrorKind::JsonParsing => "JSON parsing error", - ErrorKind::PermissionDenied => "permission denied", - ErrorKind::NotFound => "not found", - ErrorKind::BadRequest => "bad request", - ErrorKind::Conflict => "conflict with an in-flight request or existing data", - ErrorKind::Other => "generic storage layer error", - }, - self.msg - ) - } -} -impl serde::Serialize for StorageError { - fn serialize<S: serde::Serializer>( - &self, - serializer: S, - ) -> std::result::Result<S::Ok, S::Error> { - serializer.serialize_str(&self.to_string()) - } -} -impl StorageError { - /// Create a new StorageError of an ErrorKind with a message. - pub fn new(kind: ErrorKind, msg: String) -> Self { - Self { - msg: Cow::Owned(msg), - source: None, - kind, - } - } - /// Create a new StorageError of an ErrorKind with a message from - /// a static string. - /// - /// This saves an allocation for a new string and is the preferred - /// way in case the error message doesn't change. - pub fn from_static(kind: ErrorKind, msg: &'static str) -> Self { - Self { - msg: Cow::Borrowed(msg), - source: None, - kind - } - } - /// Create a StorageError using another arbitrary Error as a source. - pub fn with_source( - kind: ErrorKind, - msg: std::borrow::Cow<'static, str>, - source: Box<dyn std::error::Error + Send + Sync>, - ) -> Self { - Self { - msg, - source: Some(source), - kind, - } - } - /// Get the kind of an error. - pub fn kind(&self) -> ErrorKind { - self.kind - } - /// Get the message as a string slice. - pub fn msg(&self) -> &str { - &self.msg - } -} - -/// A special Result type for the Micropub backing storage. -pub type Result<T> = std::result::Result<T, StorageError>; - -/// A storage backend for the Micropub server. -/// -/// Implementations should note that all methods listed on this trait MUST be fully atomic -/// or lock the database so that write conflicts or reading half-written data should not occur. -#[async_trait] -pub trait Storage: std::fmt::Debug + Clone + Send + Sync { - /// Check if a post exists in the database. - async fn post_exists(&self, url: &str) -> Result<bool>; - - /// Load a post from the database in MF2-JSON format, deserialized from JSON. - async fn get_post(&self, url: &str) -> Result<Option<serde_json::Value>>; - - /// Save a post to the database as an MF2-JSON structure. - /// - /// Note that the `post` object MUST have `post["properties"]["uid"][0]` defined. - async fn put_post(&self, post: &'_ serde_json::Value, user: &'_ str) -> Result<()>; - - /// Add post to feed. Some database implementations might have optimized ways to do this. - #[tracing::instrument(skip(self))] - async fn add_to_feed(&self, feed: &'_ str, post: &'_ str) -> Result<()> { - tracing::debug!("Inserting {} into {} using `update_post`", post, feed); - self.update_post(feed, serde_json::from_value( - serde_json::json!({"add": {"children": [post]}})).unwrap() - ).await - } - /// Remove post from feed. Some database implementations might have optimized ways to do this. - #[tracing::instrument(skip(self))] - async fn remove_from_feed(&self, feed: &'_ str, post: &'_ str) -> Result<()> { - tracing::debug!("Removing {} into {} using `update_post`", post, feed); - self.update_post(feed, serde_json::from_value( - serde_json::json!({"delete": {"children": [post]}})).unwrap() - ).await - } - - /// Modify a post using an update object as defined in the - /// Micropub spec. - /// - /// Note to implementors: the update operation MUST be atomic and - /// SHOULD lock the database to prevent two clients overwriting - /// each other's changes or simply corrupting something. Rejecting - /// is allowed in case of concurrent updates if waiting for a lock - /// cannot be done. - async fn update_post(&self, url: &str, update: MicropubUpdate) -> Result<()>; - - /// Get a list of channels available for the user represented by - /// the `user` domain to write to. - async fn get_channels(&self, user: &'_ str) -> Result<Vec<MicropubChannel>>; - - /// Fetch a feed at `url` and return an h-feed object containing - /// `limit` posts after a post by url `after`, filtering the content - /// in context of a user specified by `user` (or an anonymous user). - /// - /// This method MUST hydrate the `author` property with an h-card - /// from the database by replacing URLs with corresponding h-cards. - /// - /// When encountering posts which the `user` is not authorized to - /// access, this method MUST elide such posts (as an optimization - /// for the frontend) and not return them, but still return up to - /// `limit` posts (to not reveal the hidden posts' presence). - /// - /// Note for implementors: if you use streams to fetch posts in - /// parallel from the database, preferably make this method use a - /// connection pool to reduce overhead of creating a database - /// connection per post for parallel fetching. - async fn read_feed_with_limit( - &self, - url: &'_ str, - after: &'_ Option<String>, - limit: usize, - user: &'_ Option<String>, - ) -> Result<Option<serde_json::Value>>; - - /// Fetch a feed at `url` and return an h-feed object containing - /// `limit` posts after a `cursor` (filtering the content in - /// context of a user specified by `user`, or an anonymous user), - /// as well as a new cursor to paginate with. - /// - /// This method MUST hydrate the `author` property with an h-card - /// from the database by replacing URLs with corresponding h-cards. - /// - /// When encountering posts which the `user` is not authorized to - /// access, this method MUST elide such posts (as an optimization - /// for the frontend) and not return them, but still return an - /// amount of posts as close to `limit` as possible (to avoid - /// revealing the existence of the hidden post). - /// - /// Note for implementors: if you use streams to fetch posts in - /// parallel from the database, preferably make this method use a - /// connection pool to reduce overhead of creating a database - /// connection per post for parallel fetching. - async fn read_feed_with_cursor( - &self, - url: &'_ str, - cursor: Option<&'_ str>, - limit: usize, - user: Option<&'_ str> - ) -> Result<Option<(serde_json::Value, Option<String>)>>; - - /// Deletes a post from the database irreversibly. Must be idempotent. - async fn delete_post(&self, url: &'_ str) -> Result<()>; - - /// Gets a setting from the setting store and passes the result. - async fn get_setting<S: Setting<'a>, 'a>(&'_ self, user: &'_ str) -> Result<S>; - - /// Commits a setting to the setting store. - async fn set_setting<S: Setting<'a> + 'a, 'a>(&self, user: &'a str, value: S::Data) -> Result<()>; - - /// Add (or update) a webmention on a certian post. - /// - /// The MF2 object describing the webmention content will always - /// be of type `h-cite`, and the `uid` property on the object will - /// always be set. - /// - /// The rationale for this function is as follows: webmentions - /// might be duplicated, and we need to deduplicate them first. As - /// we lack support for transactions and locking posts on the - /// database, the only way is to implement the operation on the - /// database itself. - /// - /// Besides, it may even allow for nice tricks like storing the - /// webmentions separately and rehydrating them on feed reads. - async fn add_or_update_webmention(&self, target: &str, mention_type: MentionType, mention: serde_json::Value) -> Result<()>; -} - -#[cfg(test)] -mod tests { - use super::settings; - - use super::{MicropubChannel, Storage}; - use kittybox_util::MentionType; - use serde_json::json; - - async fn test_basic_operations<Backend: Storage>(backend: Backend) { - let post: serde_json::Value = json!({ - "type": ["h-entry"], - "properties": { - "content": ["Test content"], - "author": ["https://fireburn.ru/"], - "uid": ["https://fireburn.ru/posts/hello"], - "url": ["https://fireburn.ru/posts/hello", "https://fireburn.ru/posts/test"] - } - }); - let key = post["properties"]["uid"][0].as_str().unwrap().to_string(); - let alt_url = post["properties"]["url"][1].as_str().unwrap().to_string(); - - // Reading and writing - backend - .put_post(&post, "fireburn.ru") - .await - .unwrap(); - if let Some(returned_post) = backend.get_post(&key).await.unwrap() { - assert!(returned_post.is_object()); - assert_eq!( - returned_post["type"].as_array().unwrap().len(), - post["type"].as_array().unwrap().len() - ); - assert_eq!( - returned_post["type"].as_array().unwrap(), - post["type"].as_array().unwrap() - ); - let props: &serde_json::Map<String, serde_json::Value> = - post["properties"].as_object().unwrap(); - for key in props.keys() { - assert_eq!( - returned_post["properties"][key].as_array().unwrap(), - post["properties"][key].as_array().unwrap() - ) - } - } else { - panic!("For some reason the backend did not return the post.") - } - // Check the alternative URL - it should return the same post - if let Ok(Some(returned_post)) = backend.get_post(&alt_url).await { - assert!(returned_post.is_object()); - assert_eq!( - returned_post["type"].as_array().unwrap().len(), - post["type"].as_array().unwrap().len() - ); - assert_eq!( - returned_post["type"].as_array().unwrap(), - post["type"].as_array().unwrap() - ); - let props: &serde_json::Map<String, serde_json::Value> = - post["properties"].as_object().unwrap(); - for key in props.keys() { - assert_eq!( - returned_post["properties"][key].as_array().unwrap(), - post["properties"][key].as_array().unwrap() - ) - } - } else { - panic!("For some reason the backend did not return the post.") - } - } - - /// Note: this is merely a smoke check and is in no way comprehensive. - // TODO updates for feeds must update children using special logic - async fn test_update<Backend: Storage>(backend: Backend) { - let post: serde_json::Value = json!({ - "type": ["h-entry"], - "properties": { - "content": ["Test content"], - "author": ["https://fireburn.ru/"], - "uid": ["https://fireburn.ru/posts/hello"], - "url": ["https://fireburn.ru/posts/hello", "https://fireburn.ru/posts/test"] - } - }); - let key = post["properties"]["uid"][0].as_str().unwrap().to_string(); - - // Reading and writing - backend - .put_post(&post, "fireburn.ru") - .await - .unwrap(); - - backend - .update_post( - &key, - serde_json::from_value(json!({ - "url": &key, - "add": { - "category": ["testing"], - }, - "replace": { - "content": ["Different test content"] - } - })).unwrap(), - ) - .await - .unwrap(); - - match backend.get_post(&key).await { - Ok(Some(returned_post)) => { - assert!(returned_post.is_object()); - assert_eq!( - returned_post["type"].as_array().unwrap().len(), - post["type"].as_array().unwrap().len() - ); - assert_eq!( - returned_post["type"].as_array().unwrap(), - post["type"].as_array().unwrap() - ); - assert_eq!( - returned_post["properties"]["content"][0].as_str().unwrap(), - "Different test content" - ); - assert_eq!( - returned_post["properties"]["category"].as_array().unwrap(), - &vec![json!("testing")] - ); - } - something_else => { - something_else - .expect("Shouldn't error") - .expect("Should have the post"); - } - } - } - - async fn test_get_channel_list<Backend: Storage>(backend: Backend) { - let feed = json!({ - "type": ["h-feed"], - "properties": { - "name": ["Main Page"], - "author": ["https://fireburn.ru/"], - "uid": ["https://fireburn.ru/feeds/main"] - }, - "children": [] - }); - backend - .put_post(&feed, "fireburn.ru") - .await - .unwrap(); - let chans = backend.get_channels("fireburn.ru").await.unwrap(); - assert_eq!(chans.len(), 1); - assert_eq!( - chans[0], - MicropubChannel { - uid: "https://fireburn.ru/feeds/main".to_string(), - name: "Main Page".to_string() - } - ); - } - - async fn test_settings<Backend: Storage>(backend: Backend) { - backend - .set_setting::<settings::SiteName>( - "https://fireburn.ru/", - "Vika's Hideout".to_owned() - ) - .await - .unwrap(); - assert_eq!( - backend - .get_setting::<settings::SiteName>("https://fireburn.ru/") - .await - .unwrap() - .as_ref(), - "Vika's Hideout" - ); - } - - fn gen_random_post(domain: &str) -> serde_json::Value { - use faker_rand::lorem::{Paragraphs, Word}; - - let uid = format!( - "https://{domain}/posts/{}-{}-{}", - rand::random::<Word>(), - rand::random::<Word>(), - rand::random::<Word>() - ); - - let time = chrono::Local::now().to_rfc3339(); - let post = json!({ - "type": ["h-entry"], - "properties": { - "content": [rand::random::<Paragraphs>().to_string()], - "uid": [&uid], - "url": [&uid], - "published": [&time] - } - }); - - post - } - - fn gen_random_mention(domain: &str, mention_type: MentionType, url: &str) -> serde_json::Value { - use faker_rand::lorem::{Paragraphs, Word}; - - let uid = format!( - "https://{domain}/posts/{}-{}-{}", - rand::random::<Word>(), - rand::random::<Word>(), - rand::random::<Word>() - ); - - let time = chrono::Local::now().to_rfc3339(); - let post = json!({ - "type": ["h-cite"], - "properties": { - "content": [rand::random::<Paragraphs>().to_string()], - "uid": [&uid], - "url": [&uid], - "published": [&time], - (match mention_type { - MentionType::Reply => "in-reply-to", - MentionType::Like => "like-of", - MentionType::Repost => "repost-of", - MentionType::Bookmark => "bookmark-of", - MentionType::Mention => unimplemented!(), - }): [url] - } - }); - - post - } - - async fn test_feed_pagination<Backend: Storage>(backend: Backend) { - let posts = { - let mut posts = std::iter::from_fn( - || Some(gen_random_post("fireburn.ru")) - ) - .take(40) - .collect::<Vec<serde_json::Value>>(); - - // Reverse the array so it's in reverse-chronological order - posts.reverse(); - - posts - }; - - let feed = json!({ - "type": ["h-feed"], - "properties": { - "name": ["Main Page"], - "author": ["https://fireburn.ru/"], - "uid": ["https://fireburn.ru/feeds/main"] - }, - }); - let key = feed["properties"]["uid"][0].as_str().unwrap(); - - backend - .put_post(&feed, "fireburn.ru") - .await - .unwrap(); - - for (i, post) in posts.iter().rev().enumerate() { - backend - .put_post(post, "fireburn.ru") - .await - .unwrap(); - backend.add_to_feed(key, post["properties"]["uid"][0].as_str().unwrap()).await.unwrap(); - } - - let limit: usize = 10; - - tracing::debug!("Starting feed reading..."); - let (result, cursor) = backend - .read_feed_with_cursor(key, None, limit, None) - .await - .unwrap() - .unwrap(); - - assert_eq!(result["children"].as_array().unwrap().len(), limit); - assert_eq!( - result["children"] - .as_array() - .unwrap() - .iter() - .map(|post| post["properties"]["uid"][0].as_str().unwrap()) - .collect::<Vec<_>>() - [0..10], - posts - .iter() - .map(|post| post["properties"]["uid"][0].as_str().unwrap()) - .collect::<Vec<_>>() - [0..10] - ); - - tracing::debug!("Continuing with cursor: {:?}", cursor); - let (result2, cursor2) = backend - .read_feed_with_cursor( - key, - cursor.as_deref(), - limit, - None, - ) - .await - .unwrap() - .unwrap(); - - assert_eq!( - result2["children"].as_array().unwrap()[0..10], - posts[10..20] - ); - - tracing::debug!("Continuing with cursor: {:?}", cursor); - let (result3, cursor3) = backend - .read_feed_with_cursor( - key, - cursor2.as_deref(), - limit, - None, - ) - .await - .unwrap() - .unwrap(); - - assert_eq!( - result3["children"].as_array().unwrap()[0..10], - posts[20..30] - ); - - tracing::debug!("Continuing with cursor: {:?}", cursor); - let (result4, _) = backend - .read_feed_with_cursor( - key, - cursor3.as_deref(), - limit, - None, - ) - .await - .unwrap() - .unwrap(); - - assert_eq!( - result4["children"].as_array().unwrap()[0..10], - posts[30..40] - ); - - // Regression test for #4 - // - // Results for a bogus cursor are undefined, so we aren't - // checking them. But the function at least shouldn't hang. - let nonsense_after = Some("1010101010"); - let _ = tokio::time::timeout(tokio::time::Duration::from_secs(10), async move { - backend - .read_feed_with_cursor(key, nonsense_after, limit, None) - .await - }) - .await - .expect("Operation should not hang: see https://gitlab.com/kittybox/kittybox/-/issues/4"); - } - - async fn test_webmention_addition<Backend: Storage>(db: Backend) { - let post = gen_random_post("fireburn.ru"); - - db.put_post(&post, "fireburn.ru").await.unwrap(); - const TYPE: MentionType = MentionType::Reply; - - let target = post["properties"]["uid"][0].as_str().unwrap(); - let mut reply = gen_random_mention("aaronparecki.com", TYPE, target); - - let (read_post, _) = db.read_feed_with_cursor(target, None, 20, None).await.unwrap().unwrap(); - assert_eq!(post, read_post); - - db.add_or_update_webmention(target, TYPE, reply.clone()).await.unwrap(); - - let (read_post, _) = db.read_feed_with_cursor(target, None, 20, None).await.unwrap().unwrap(); - assert_eq!(read_post["properties"]["comment"][0], reply); - - reply["properties"]["content"][0] = json!(rand::random::<faker_rand::lorem::Paragraphs>().to_string()); - - db.add_or_update_webmention(target, TYPE, reply.clone()).await.unwrap(); - let (read_post, _) = db.read_feed_with_cursor(target, None, 20, None).await.unwrap().unwrap(); - assert_eq!(read_post["properties"]["comment"][0], reply); - } - - async fn test_pretty_permalinks<Backend: Storage>(db: Backend) { - const PERMALINK: &str = "https://fireburn.ru/posts/pretty-permalink"; - - let post = { - let mut post = gen_random_post("fireburn.ru"); - let urls = post["properties"]["url"].as_array_mut().unwrap(); - urls.push(serde_json::Value::String( - PERMALINK.to_owned() - )); - - post - }; - db.put_post(&post, "fireburn.ru").await.unwrap(); - - for i in post["properties"]["url"].as_array().unwrap() { - let (read_post, _) = db.read_feed_with_cursor(i.as_str().unwrap(), None, 20, None).await.unwrap().unwrap(); - assert_eq!(read_post, post); - } - } - /// Automatically generates a test suite for - macro_rules! test_all { - ($func_name:ident, $mod_name:ident) => { - mod $mod_name { - $func_name!(test_basic_operations); - $func_name!(test_get_channel_list); - $func_name!(test_settings); - $func_name!(test_update); - $func_name!(test_feed_pagination); - $func_name!(test_webmention_addition); - $func_name!(test_pretty_permalinks); - } - }; - } - macro_rules! file_test { - ($func_name:ident) => { - #[tokio::test] - #[tracing_test::traced_test] - async fn $func_name() { - let tempdir = tempfile::tempdir().expect("Failed to create tempdir"); - let backend = super::super::FileStorage::new( - tempdir.path().to_path_buf() - ) - .await - .unwrap(); - super::$func_name(backend).await - } - }; - } - - macro_rules! postgres_test { - ($func_name:ident) => { - #[cfg(feature = "sqlx")] - #[sqlx::test] - #[tracing_test::traced_test] - async fn $func_name( - pool_opts: sqlx::postgres::PgPoolOptions, - connect_opts: sqlx::postgres::PgConnectOptions - ) -> Result<(), sqlx::Error> { - let db = { - //use sqlx::ConnectOptions; - //connect_opts.log_statements(log::LevelFilter::Debug); - - pool_opts.connect_with(connect_opts).await? - }; - let backend = super::super::PostgresStorage::from_pool(db).await.unwrap(); - - Ok(super::$func_name(backend).await) - } - }; - } - - test_all!(file_test, file); - test_all!(postgres_test, postgres); -} |