From 39ddd3689aa4ef38580ea90087e1e204b55fcfc7 Mon Sep 17 00:00:00 2001 From: Vika Date: Sat, 22 Jul 2023 12:24:08 +0300 Subject: database: add "add_or_update_webmention" operation This is an operation that atomically adds or updates a webmention cite attached to a post. This is used so a database backend can optimize for it (for example, using a transaction or shifting the JSON modification operation to the database) --- kittybox-rs/src/database/file/mod.rs | 50 +++++++++++++++++++++ kittybox-rs/src/database/memory.rs | 5 +++ kittybox-rs/src/database/mod.rs | 74 ++++++++++++++++++++++++++++++++ kittybox-rs/src/database/postgres/mod.rs | 39 +++++++++++++++++ 4 files changed, 168 insertions(+) diff --git a/kittybox-rs/src/database/file/mod.rs b/kittybox-rs/src/database/file/mod.rs index fe53ea5..ca8e2ac 100644 --- a/kittybox-rs/src/database/file/mod.rs +++ b/kittybox-rs/src/database/file/mod.rs @@ -3,6 +3,7 @@ use crate::database::{ErrorKind, Result, settings, Storage, StorageError}; use crate::micropub::{MicropubUpdate, MicropubPropertyDeletion}; use async_trait::async_trait; use futures::{stream, StreamExt, TryStreamExt}; +use kittybox_util::MentionType; use serde_json::json; use std::borrow::Cow; use std::collections::HashMap; @@ -680,4 +681,53 @@ impl Storage for FileStorage { tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?; Ok(()) } + + #[tracing::instrument(skip(self))] + async fn add_or_update_webmention(&self, target: &str, mention_type: MentionType, mention: serde_json::Value) -> Result<()> { + let path = url_to_path(&self.root_dir, target); + let tempfilename = path.with_extension("tmp"); + + let mut temp = OpenOptions::new() + .write(true) + .create_new(true) + .open(&tempfilename) + .await?; + let mut file = OpenOptions::new().read(true).open(&path).await?; + + let mut post: serde_json::Value = { + let mut content = String::new(); + file.read_to_string(&mut content).await?; + drop(file); + + serde_json::from_str(&content)? + }; + + let key: &'static str = match mention_type { + MentionType::Reply => "reply", + MentionType::Like => "like", + MentionType::Repost => "repost", + MentionType::Bookmark => "bookmark", + MentionType::Mention => "mention", + }; + let mention_uid = mention["properties"]["uid"][0].clone(); + if let Some(values) = post["properties"][key].as_array_mut() { + for value in values.iter_mut() { + if value["properties"]["uid"][0] == mention_uid { + *value = mention; + break; + } + } + } else { + post["properties"][key] = serde_json::Value::Array(vec![mention]); + } + + temp.write_all(post.to_string().as_bytes()).await?; + temp.flush().await?; + temp.sync_all().await?; + drop(temp); + tokio::fs::rename(tempfilename, &path).await?; + tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?; + + Ok(()) + } } diff --git a/kittybox-rs/src/database/memory.rs b/kittybox-rs/src/database/memory.rs index 26d3095..6339e7a 100644 --- a/kittybox-rs/src/database/memory.rs +++ b/kittybox-rs/src/database/memory.rs @@ -226,6 +226,11 @@ impl Storage for MemoryStorage { todo!() } + #[allow(unused_variables)] + async fn add_or_update_webmention(&self, target: &str, mention_type: kittybox_util::MentionType, mention: serde_json::Value) -> Result<()> { + todo!() + } + } impl Default for MemoryStorage { diff --git a/kittybox-rs/src/database/mod.rs b/kittybox-rs/src/database/mod.rs index 231fd26..1d1cf15 100644 --- a/kittybox-rs/src/database/mod.rs +++ b/kittybox-rs/src/database/mod.rs @@ -2,6 +2,7 @@ use std::borrow::Cow; use async_trait::async_trait; +use kittybox_util::MentionType; mod file; pub use crate::database::file::FileStorage; @@ -296,6 +297,22 @@ pub trait Storage: std::fmt::Debug + Clone + Send + Sync { /// Commits a setting to the setting store. async fn set_setting + 'a, 'a>(&self, user: &'a str, value: S::Data) -> Result<()>; + + /// Add (or update) a webmention on a certian post. + /// + /// The MF2 object describing the webmention content will always + /// be of type `h-cite`, and the `uid` property on the object will + /// always be set. + /// + /// The rationale for this function is as follows: webmentions + /// might be duplicated, and we need to deduplicate them first. As + /// we lack support for transactions and locking posts on the + /// database, the only way is to implement the operation on the + /// database itself. + /// + /// Besides, it may even allow for nice tricks like storing the + /// webmentions separately and rehydrating them on feed reads. + async fn add_or_update_webmention(&self, target: &str, mention_type: MentionType, mention: serde_json::Value) -> Result<()>; } #[cfg(test)] @@ -303,6 +320,7 @@ mod tests { use super::settings; use super::{MicropubChannel, Storage}; + use kittybox_util::MentionType; use serde_json::json; async fn test_basic_operations(backend: Backend) { @@ -499,6 +517,37 @@ mod tests { post } + fn gen_random_mention(domain: &str, mention_type: MentionType, url: &str) -> serde_json::Value { + use faker_rand::lorem::{Paragraphs, Word}; + + let uid = format!( + "https://{domain}/posts/{}-{}-{}", + rand::random::(), + rand::random::(), + rand::random::() + ); + + let time = chrono::Local::now().to_rfc3339(); + let post = json!({ + "type": ["h-cite"], + "properties": { + "content": [rand::random::().to_string()], + "uid": [&uid], + "url": [&uid], + "published": [&time], + (match mention_type { + MentionType::Reply => "in-reply-to", + MentionType::Like => "like-of", + MentionType::Repost => "repost-of", + MentionType::Bookmark => "bookmark-of", + MentionType::Mention => unimplemented!(), + }): [url] + } + }); + + post + } + async fn test_feed_pagination(backend: Backend) { let posts = { let mut posts = std::iter::from_fn( @@ -626,6 +675,30 @@ mod tests { .expect("Operation should not hang: see https://gitlab.com/kittybox/kittybox/-/issues/4"); } + async fn test_webmention_addition(db: Backend) { + let post = gen_random_post("fireburn.ru"); + + db.put_post(&post, "fireburn.ru").await.unwrap(); + const TYPE: MentionType = MentionType::Reply; + + let target = post["properties"]["uid"][0].as_str().unwrap(); + let mut reply = gen_random_mention("aaronparecki.com", TYPE, target); + + let (read_post, _) = db.read_feed_with_cursor(target, None, 20, None).await.unwrap().unwrap(); + assert_eq!(post, read_post); + + db.add_or_update_webmention(target, TYPE, reply.clone()).await.unwrap(); + + let (read_post, _) = db.read_feed_with_cursor(target, None, 20, None).await.unwrap().unwrap(); + assert_eq!(read_post["properties"]["reply"][0], reply); + + reply["properties"]["content"][0] = json!(rand::random::().to_string()); + + db.add_or_update_webmention(target, TYPE, reply.clone()).await.unwrap(); + let (read_post, _) = db.read_feed_with_cursor(target, None, 20, None).await.unwrap().unwrap(); + assert_eq!(read_post["properties"]["reply"][0], reply); + } + /// Automatically generates a test suite for macro_rules! test_all { ($func_name:ident, $mod_name:ident) => { @@ -635,6 +708,7 @@ mod tests { $func_name!(test_settings); $func_name!(test_update); $func_name!(test_feed_pagination); + $func_name!(test_webmention_addition); } }; } diff --git a/kittybox-rs/src/database/postgres/mod.rs b/kittybox-rs/src/database/postgres/mod.rs index b9a21c3..b1a03b1 100644 --- a/kittybox-rs/src/database/postgres/mod.rs +++ b/kittybox-rs/src/database/postgres/mod.rs @@ -130,6 +130,45 @@ impl Storage for PostgresStorage { .map(|_| ()) } + async fn add_or_update_webmention(&self, target: &str, mention_type: MentionType, mention: serde_json::Value) -> Result<()> { + let mut txn = self.db.begin().await?; + + let (uid, mut post) = sqlx::query_as::<_, (String, serde_json::Value)>("SELECT uid, mf2 FROM kittybox.mf2_json WHERE uid = $1 OR mf2['properties']['url'] ? $1 FOR UPDATE") + .bind(target) + .fetch_optional(&mut *txn) + .await? + .ok_or(StorageError::from_static( + ErrorKind::NotFound, + "The specified post wasn't found in the database." + ))?; + + let key: &'static str = match mention_type { + MentionType::Reply => "reply", + MentionType::Like => "like", + MentionType::Repost => "repost", + MentionType::Bookmark => "bookmark", + MentionType::Mention => "mention", + }; + let mention_uid = mention["properties"]["uid"][0].clone(); + if let Some(values) = post["properties"][key].as_array_mut() { + for value in values.iter_mut() { + if value["properties"]["uid"][0] == mention_uid { + *value = mention; + break; + } + } + } else { + post["properties"][key] = serde_json::Value::Array(vec![mention]); + } + + sqlx::query("UPDATE kittybox.mf2_json SET mf2 = $2 WHERE uid = $1") + .bind(uid) + .bind(post) + .execute(&mut *txn) + .await?; + + txn.commit().await.map_err(Into::into) + } #[tracing::instrument(skip(self))] async fn update_post(&self, url: &'_ str, update: MicropubUpdate) -> Result<()> { tracing::debug!("Updating post {}", url); -- cgit 1.4.1