diff options
author | Vika <vika@fireburn.ru> | 2025-04-09 23:31:02 +0300 |
---|---|---|
committer | Vika <vika@fireburn.ru> | 2025-04-09 23:31:57 +0300 |
commit | 8826d9446e6c492db2243b9921e59ce496027bef (patch) | |
tree | 63738aa9001cb73b11cb0e974e93129bcdf1adbb /src/database | |
parent | 519cadfbb298f50cbf819dde757037ab56e2863e (diff) | |
download | kittybox-8826d9446e6c492db2243b9921e59ce496027bef.tar.zst |
cargo fmt
Change-Id: I80e81ebba3f0cdf8c094451c9fe3ee4126b8c888
Diffstat (limited to 'src/database')
-rw-r--r-- | src/database/file/mod.rs | 159 | ||||
-rw-r--r-- | src/database/memory.rs | 41 | ||||
-rw-r--r-- | src/database/mod.rs | 165 | ||||
-rw-r--r-- | src/database/postgres/mod.rs | 122 |
4 files changed, 303 insertions, 184 deletions
diff --git a/src/database/file/mod.rs b/src/database/file/mod.rs index b9f27b2..5c93beb 100644 --- a/src/database/file/mod.rs +++ b/src/database/file/mod.rs @@ -1,6 +1,6 @@ //#![warn(clippy::unwrap_used)] -use crate::database::{ErrorKind, Result, settings, Storage, StorageError}; -use crate::micropub::{MicropubUpdate, MicropubPropertyDeletion}; +use crate::database::{settings, ErrorKind, Result, Storage, StorageError}; +use crate::micropub::{MicropubPropertyDeletion, MicropubUpdate}; use futures::{stream, StreamExt, TryStreamExt}; use kittybox_util::MentionType; use serde_json::json; @@ -247,7 +247,9 @@ async fn hydrate_author<S: Storage>( impl Storage for FileStorage { async fn new(url: &'_ url::Url) -> Result<Self> { // TODO: sanity check - Ok(Self { root_dir: PathBuf::from(url.path()) }) + Ok(Self { + root_dir: PathBuf::from(url.path()), + }) } #[tracing::instrument(skip(self))] async fn categories(&self, url: &str) -> Result<Vec<String>> { @@ -259,7 +261,7 @@ impl Storage for FileStorage { // perform well. Err(std::io::Error::new( std::io::ErrorKind::Unsupported, - "?q=category queries are not implemented due to resource constraints" + "?q=category queries are not implemented due to resource constraints", ))? } @@ -340,7 +342,10 @@ impl Storage for FileStorage { file.sync_all().await?; drop(file); tokio::fs::rename(&tempfile, &path).await?; - tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?; + tokio::fs::File::open(path.parent().unwrap()) + .await? + .sync_all() + .await?; if let Some(urls) = post["properties"]["url"].as_array() { for url in urls.iter().map(|i| i.as_str().unwrap()) { @@ -350,8 +355,8 @@ impl Storage for FileStorage { "{}{}", url.host_str().unwrap(), url.port() - .map(|port| format!(":{}", port)) - .unwrap_or_default() + .map(|port| format!(":{}", port)) + .unwrap_or_default() ) }; if url != key && url_domain == user.authority() { @@ -410,26 +415,24 @@ impl Storage for FileStorage { .create(false) .open(&path) .await - { - Err(err) if err.kind() == std::io::ErrorKind::NotFound => { - Vec::default() - } - Err(err) => { - // Propagate the error upwards - return Err(err.into()); - } - Ok(mut file) => { - let mut content = String::new(); - file.read_to_string(&mut content).await?; - drop(file); - - if !content.is_empty() { - serde_json::from_str(&content)? - } else { - Vec::default() - } - } - } + { + Err(err) if err.kind() == std::io::ErrorKind::NotFound => Vec::default(), + Err(err) => { + // Propagate the error upwards + return Err(err.into()); + } + Ok(mut file) => { + let mut content = String::new(); + file.read_to_string(&mut content).await?; + drop(file); + + if !content.is_empty() { + serde_json::from_str(&content)? + } else { + Vec::default() + } + } + } }; channels.push(super::MicropubChannel { @@ -444,7 +447,10 @@ impl Storage for FileStorage { tempfile.sync_all().await?; drop(tempfile); tokio::fs::rename(tempfilename, &path).await?; - tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?; + tokio::fs::File::open(path.parent().unwrap()) + .await? + .sync_all() + .await?; } Ok(()) } @@ -476,7 +482,10 @@ impl Storage for FileStorage { temp.sync_all().await?; drop(temp); tokio::fs::rename(tempfilename, &path).await?; - tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?; + tokio::fs::File::open(path.parent().unwrap()) + .await? + .sync_all() + .await?; (json, new_json) }; @@ -486,7 +495,9 @@ impl Storage for FileStorage { #[tracing::instrument(skip(self, f), fields(f = std::any::type_name::<F>()))] async fn update_with<F: FnOnce(&mut serde_json::Value) + Send>( - &self, url: &str, f: F + &self, + url: &str, + f: F, ) -> Result<(serde_json::Value, serde_json::Value)> { todo!("update_with is not yet implemented due to special requirements of the file backend") } @@ -526,25 +537,25 @@ impl Storage for FileStorage { url: &'_ str, cursor: Option<&'_ str>, limit: usize, - user: Option<&url::Url> + user: Option<&url::Url>, ) -> Result<Option<(serde_json::Value, Option<String>)>> { #[allow(deprecated)] - Ok(self.read_feed_with_limit( - url, - cursor, - limit, - user - ).await? + Ok(self + .read_feed_with_limit(url, cursor, limit, user) + .await? .map(|feed| { - tracing::debug!("Feed: {:#}", serde_json::Value::Array( - feed["children"] - .as_array() - .map(|v| v.as_slice()) - .unwrap_or_default() - .iter() - .map(|mf2| mf2["properties"]["uid"][0].clone()) - .collect::<Vec<_>>() - )); + tracing::debug!( + "Feed: {:#}", + serde_json::Value::Array( + feed["children"] + .as_array() + .map(|v| v.as_slice()) + .unwrap_or_default() + .iter() + .map(|mf2| mf2["properties"]["uid"][0].clone()) + .collect::<Vec<_>>() + ) + ); let cursor: Option<String> = feed["children"] .as_array() .map(|v| v.as_slice()) @@ -553,8 +564,7 @@ impl Storage for FileStorage { .map(|v| v["properties"]["uid"][0].as_str().unwrap().to_owned()); tracing::debug!("Extracted the cursor: {:?}", cursor); (feed, cursor) - }) - ) + })) } #[tracing::instrument(skip(self))] @@ -574,9 +584,12 @@ impl Storage for FileStorage { let children: Vec<serde_json::Value> = match feed["children"].take() { serde_json::Value::Array(children) => children, // We've already checked it's an array - _ => unreachable!() + _ => unreachable!(), }; - tracing::debug!("Full children array: {:#}", serde_json::Value::Array(children.clone())); + tracing::debug!( + "Full children array: {:#}", + serde_json::Value::Array(children.clone()) + ); let mut posts_iter = children .into_iter() .map(|s: serde_json::Value| s.as_str().unwrap().to_string()); @@ -589,7 +602,7 @@ impl Storage for FileStorage { // incredibly long feeds. if let Some(after) = after { tokio::task::block_in_place(|| { - for s in posts_iter.by_ref() { + for s in posts_iter.by_ref() { if s == after { break; } @@ -655,12 +668,19 @@ impl Storage for FileStorage { let settings: HashMap<&str, serde_json::Value> = serde_json::from_str(&content)?; match settings.get(S::ID) { Some(value) => Ok(serde_json::from_value::<S>(value.clone())?), - None => Err(StorageError::from_static(ErrorKind::Backend, "Setting not set")) + None => Err(StorageError::from_static( + ErrorKind::Backend, + "Setting not set", + )), } } #[tracing::instrument(skip(self))] - async fn set_setting<S: settings::Setting>(&self, user: &url::Url, value: S::Data) -> Result<()> { + async fn set_setting<S: settings::Setting>( + &self, + user: &url::Url, + value: S::Data, + ) -> Result<()> { let mut path = relative_path::RelativePathBuf::new(); path.push(user.authority()); path.push("settings"); @@ -704,20 +724,28 @@ impl Storage for FileStorage { tempfile.sync_all().await?; drop(tempfile); tokio::fs::rename(temppath, &path).await?; - tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?; + tokio::fs::File::open(path.parent().unwrap()) + .await? + .sync_all() + .await?; Ok(()) } #[tracing::instrument(skip(self))] - async fn add_or_update_webmention(&self, target: &str, mention_type: MentionType, mention: serde_json::Value) -> Result<()> { + async fn add_or_update_webmention( + &self, + target: &str, + mention_type: MentionType, + mention: serde_json::Value, + ) -> Result<()> { let path = url_to_path(&self.root_dir, target); let tempfilename = path.with_extension("tmp"); let mut temp = OpenOptions::new() - .write(true) - .create_new(true) - .open(&tempfilename) - .await?; + .write(true) + .create_new(true) + .open(&tempfilename) + .await?; let mut file = OpenOptions::new().read(true).open(&path).await?; let mut post: serde_json::Value = { @@ -752,13 +780,20 @@ impl Storage for FileStorage { temp.sync_all().await?; drop(temp); tokio::fs::rename(tempfilename, &path).await?; - tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?; + tokio::fs::File::open(path.parent().unwrap()) + .await? + .sync_all() + .await?; Ok(()) } - async fn all_posts<'this>(&'this self, user: &url::Url) -> Result<impl futures::Stream<Item = serde_json::Value> + Send + 'this> { + async fn all_posts<'this>( + &'this self, + user: &url::Url, + ) -> Result<impl futures::Stream<Item = serde_json::Value> + Send + 'this> { todo!(); - #[allow(unreachable_code)] Ok(futures::stream::empty()) // for type inference + #[allow(unreachable_code)] + Ok(futures::stream::empty()) // for type inference } } diff --git a/src/database/memory.rs b/src/database/memory.rs index c2ceb85..75f04de 100644 --- a/src/database/memory.rs +++ b/src/database/memory.rs @@ -5,7 +5,7 @@ use std::collections::HashMap; use std::sync::Arc; use tokio::sync::RwLock; -use crate::database::{ErrorKind, MicropubChannel, Result, settings, Storage, StorageError}; +use crate::database::{settings, ErrorKind, MicropubChannel, Result, Storage, StorageError}; #[derive(Clone, Debug, Default)] /// A simple in-memory store for testing purposes. @@ -90,9 +90,16 @@ impl Storage for MemoryStorage { Ok(()) } - async fn update_post(&self, url: &'_ str, update: crate::micropub::MicropubUpdate) -> Result<()> { + async fn update_post( + &self, + url: &'_ str, + update: crate::micropub::MicropubUpdate, + ) -> Result<()> { let mut guard = self.mapping.write().await; - let mut post = guard.get_mut(url).ok_or(StorageError::from_static(ErrorKind::NotFound, "The specified post wasn't found in the database."))?; + let mut post = guard.get_mut(url).ok_or(StorageError::from_static( + ErrorKind::NotFound, + "The specified post wasn't found in the database.", + ))?; use crate::micropub::MicropubPropertyDeletion; @@ -208,7 +215,7 @@ impl Storage for MemoryStorage { url: &'_ str, cursor: Option<&'_ str>, limit: usize, - user: Option<&url::Url> + user: Option<&url::Url>, ) -> Result<Option<(serde_json::Value, Option<String>)>> { todo!() } @@ -224,25 +231,39 @@ impl Storage for MemoryStorage { } #[allow(unused_variables)] - async fn set_setting<S: settings::Setting>(&self, user: &url::Url, value: S::Data) -> Result<()> { + async fn set_setting<S: settings::Setting>( + &self, + user: &url::Url, + value: S::Data, + ) -> Result<()> { todo!() } #[allow(unused_variables)] - async fn add_or_update_webmention(&self, target: &str, mention_type: kittybox_util::MentionType, mention: serde_json::Value) -> Result<()> { + async fn add_or_update_webmention( + &self, + target: &str, + mention_type: kittybox_util::MentionType, + mention: serde_json::Value, + ) -> Result<()> { todo!() } #[allow(unused_variables)] async fn update_with<F: FnOnce(&mut serde_json::Value) + Send>( - &self, url: &str, f: F + &self, + url: &str, + f: F, ) -> Result<(serde_json::Value, serde_json::Value)> { todo!() } - async fn all_posts<'this>(&'this self, _user: &url::Url) -> Result<impl futures::Stream<Item = serde_json::Value> + Send + 'this> { + async fn all_posts<'this>( + &'this self, + _user: &url::Url, + ) -> Result<impl futures::Stream<Item = serde_json::Value> + Send + 'this> { todo!(); - #[allow(unreachable_code)] Ok(futures::stream::pending()) + #[allow(unreachable_code)] + Ok(futures::stream::pending()) } - } diff --git a/src/database/mod.rs b/src/database/mod.rs index 4390ae7..de51c2c 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -177,7 +177,7 @@ impl StorageError { Self { msg: Cow::Borrowed(msg), source: None, - kind + kind, } } /// Create a StorageError using another arbitrary Error as a source. @@ -219,27 +219,34 @@ pub trait Storage: std::fmt::Debug + Clone + Send + Sync { fn post_exists(&self, url: &str) -> impl Future<Output = Result<bool>> + Send; /// Load a post from the database in MF2-JSON format, deserialized from JSON. - fn get_post(&self, url: &str) -> impl Future<Output = Result<Option<serde_json::Value>>> + Send; + fn get_post(&self, url: &str) + -> impl Future<Output = Result<Option<serde_json::Value>>> + Send; /// Save a post to the database as an MF2-JSON structure. /// /// Note that the `post` object MUST have `post["properties"]["uid"][0]` defined. - fn put_post(&self, post: &serde_json::Value, user: &url::Url) -> impl Future<Output = Result<()>> + Send; + fn put_post( + &self, + post: &serde_json::Value, + user: &url::Url, + ) -> impl Future<Output = Result<()>> + Send; /// Add post to feed. Some database implementations might have optimized ways to do this. #[tracing::instrument(skip(self))] fn add_to_feed(&self, feed: &str, post: &str) -> impl Future<Output = Result<()>> + Send { tracing::debug!("Inserting {} into {} using `update_post`", post, feed); - self.update_post(feed, serde_json::from_value( - serde_json::json!({"add": {"children": [post]}})).unwrap() + self.update_post( + feed, + serde_json::from_value(serde_json::json!({"add": {"children": [post]}})).unwrap(), ) } /// Remove post from feed. Some database implementations might have optimized ways to do this. #[tracing::instrument(skip(self))] fn remove_from_feed(&self, feed: &str, post: &str) -> impl Future<Output = Result<()>> + Send { tracing::debug!("Removing {} into {} using `update_post`", post, feed); - self.update_post(feed, serde_json::from_value( - serde_json::json!({"delete": {"children": [post]}})).unwrap() + self.update_post( + feed, + serde_json::from_value(serde_json::json!({"delete": {"children": [post]}})).unwrap(), ) } @@ -254,7 +261,11 @@ pub trait Storage: std::fmt::Debug + Clone + Send + Sync { /// /// Default implementation calls [`Storage::update_with`] and uses /// [`update.apply`][MicropubUpdate::apply] to update the post. - fn update_post(&self, url: &str, update: MicropubUpdate) -> impl Future<Output = Result<()>> + Send { + fn update_post( + &self, + url: &str, + update: MicropubUpdate, + ) -> impl Future<Output = Result<()>> + Send { let fut = self.update_with(url, |post| { update.apply(post); }); @@ -274,12 +285,17 @@ pub trait Storage: std::fmt::Debug + Clone + Send + Sync { /// /// Returns old post and the new post after editing. fn update_with<F: FnOnce(&mut serde_json::Value) + Send>( - &self, url: &str, f: F + &self, + url: &str, + f: F, ) -> impl Future<Output = Result<(serde_json::Value, serde_json::Value)>> + Send; /// Get a list of channels available for the user represented by /// the `user` domain to write to. - fn get_channels(&self, user: &url::Url) -> impl Future<Output = Result<Vec<MicropubChannel>>> + Send; + fn get_channels( + &self, + user: &url::Url, + ) -> impl Future<Output = Result<Vec<MicropubChannel>>> + Send; /// Fetch a feed at `url` and return an h-feed object containing /// `limit` posts after a post by url `after`, filtering the content @@ -329,7 +345,7 @@ pub trait Storage: std::fmt::Debug + Clone + Send + Sync { url: &'_ str, cursor: Option<&'_ str>, limit: usize, - user: Option<&url::Url> + user: Option<&url::Url>, ) -> impl Future<Output = Result<Option<(serde_json::Value, Option<String>)>>> + Send; /// Deletes a post from the database irreversibly. Must be idempotent. @@ -339,7 +355,11 @@ pub trait Storage: std::fmt::Debug + Clone + Send + Sync { fn get_setting<S: Setting>(&self, user: &url::Url) -> impl Future<Output = Result<S>> + Send; /// Commits a setting to the setting store. - fn set_setting<S: Setting>(&self, user: &url::Url, value: S::Data) -> impl Future<Output = Result<()>> + Send; + fn set_setting<S: Setting>( + &self, + user: &url::Url, + value: S::Data, + ) -> impl Future<Output = Result<()>> + Send; /// Add (or update) a webmention on a certian post. /// @@ -355,11 +375,19 @@ pub trait Storage: std::fmt::Debug + Clone + Send + Sync { /// /// Besides, it may even allow for nice tricks like storing the /// webmentions separately and rehydrating them on feed reads. - fn add_or_update_webmention(&self, target: &str, mention_type: MentionType, mention: serde_json::Value) -> impl Future<Output = Result<()>> + Send; + fn add_or_update_webmention( + &self, + target: &str, + mention_type: MentionType, + mention: serde_json::Value, + ) -> impl Future<Output = Result<()>> + Send; /// Return a stream of all posts ever made by a certain user, in /// reverse-chronological order. - fn all_posts<'this>(&'this self, user: &url::Url) -> impl Future<Output = Result<impl futures::Stream<Item = serde_json::Value> + Send + 'this>> + Send; + fn all_posts<'this>( + &'this self, + user: &url::Url, + ) -> impl Future<Output = Result<impl futures::Stream<Item = serde_json::Value> + Send + 'this>> + Send; } #[cfg(test)] @@ -464,7 +492,8 @@ mod tests { "replace": { "content": ["Different test content"] } - })).unwrap(), + })) + .unwrap(), ) .await .unwrap(); @@ -511,7 +540,10 @@ mod tests { .put_post(&feed, &"https://fireburn.ru/".parse().unwrap()) .await .unwrap(); - let chans = backend.get_channels(&"https://fireburn.ru/".parse().unwrap()).await.unwrap(); + let chans = backend + .get_channels(&"https://fireburn.ru/".parse().unwrap()) + .await + .unwrap(); assert_eq!(chans.len(), 1); assert_eq!( chans[0], @@ -526,16 +558,16 @@ mod tests { backend .set_setting::<settings::SiteName>( &"https://fireburn.ru/".parse().unwrap(), - "Vika's Hideout".to_owned() + "Vika's Hideout".to_owned(), ) .await .unwrap(); assert_eq!( backend - .get_setting::<settings::SiteName>(&"https://fireburn.ru/".parse().unwrap()) - .await - .unwrap() - .as_ref(), + .get_setting::<settings::SiteName>(&"https://fireburn.ru/".parse().unwrap()) + .await + .unwrap() + .as_ref(), "Vika's Hideout" ); } @@ -597,11 +629,9 @@ mod tests { async fn test_feed_pagination<Backend: Storage>(backend: Backend) { let posts = { - let mut posts = std::iter::from_fn( - || Some(gen_random_post("fireburn.ru")) - ) - .take(40) - .collect::<Vec<serde_json::Value>>(); + let mut posts = std::iter::from_fn(|| Some(gen_random_post("fireburn.ru"))) + .take(40) + .collect::<Vec<serde_json::Value>>(); // Reverse the array so it's in reverse-chronological order posts.reverse(); @@ -629,7 +659,10 @@ mod tests { .put_post(post, &"https://fireburn.ru/".parse().unwrap()) .await .unwrap(); - backend.add_to_feed(key, post["properties"]["uid"][0].as_str().unwrap()).await.unwrap(); + backend + .add_to_feed(key, post["properties"]["uid"][0].as_str().unwrap()) + .await + .unwrap(); } let limit: usize = 10; @@ -648,23 +681,16 @@ mod tests { .unwrap() .iter() .map(|post| post["properties"]["uid"][0].as_str().unwrap()) - .collect::<Vec<_>>() - [0..10], + .collect::<Vec<_>>()[0..10], posts .iter() .map(|post| post["properties"]["uid"][0].as_str().unwrap()) - .collect::<Vec<_>>() - [0..10] + .collect::<Vec<_>>()[0..10] ); tracing::debug!("Continuing with cursor: {:?}", cursor); let (result2, cursor2) = backend - .read_feed_with_cursor( - key, - cursor.as_deref(), - limit, - None, - ) + .read_feed_with_cursor(key, cursor.as_deref(), limit, None) .await .unwrap() .unwrap(); @@ -676,12 +702,7 @@ mod tests { tracing::debug!("Continuing with cursor: {:?}", cursor); let (result3, cursor3) = backend - .read_feed_with_cursor( - key, - cursor2.as_deref(), - limit, - None, - ) + .read_feed_with_cursor(key, cursor2.as_deref(), limit, None) .await .unwrap() .unwrap(); @@ -693,12 +714,7 @@ mod tests { tracing::debug!("Continuing with cursor: {:?}", cursor); let (result4, _) = backend - .read_feed_with_cursor( - key, - cursor3.as_deref(), - limit, - None, - ) + .read_feed_with_cursor(key, cursor3.as_deref(), limit, None) .await .unwrap() .unwrap(); @@ -725,24 +741,43 @@ mod tests { async fn test_webmention_addition<Backend: Storage>(db: Backend) { let post = gen_random_post("fireburn.ru"); - db.put_post(&post, &"https://fireburn.ru/".parse().unwrap()).await.unwrap(); + db.put_post(&post, &"https://fireburn.ru/".parse().unwrap()) + .await + .unwrap(); const TYPE: MentionType = MentionType::Reply; let target = post["properties"]["uid"][0].as_str().unwrap(); let mut reply = gen_random_mention("aaronparecki.com", TYPE, target); - let (read_post, _) = db.read_feed_with_cursor(target, None, 20, None).await.unwrap().unwrap(); + let (read_post, _) = db + .read_feed_with_cursor(target, None, 20, None) + .await + .unwrap() + .unwrap(); assert_eq!(post, read_post); - db.add_or_update_webmention(target, TYPE, reply.clone()).await.unwrap(); + db.add_or_update_webmention(target, TYPE, reply.clone()) + .await + .unwrap(); - let (read_post, _) = db.read_feed_with_cursor(target, None, 20, None).await.unwrap().unwrap(); + let (read_post, _) = db + .read_feed_with_cursor(target, None, 20, None) + .await + .unwrap() + .unwrap(); assert_eq!(read_post["properties"]["comment"][0], reply); - reply["properties"]["content"][0] = json!(rand::random::<faker_rand::lorem::Paragraphs>().to_string()); + reply["properties"]["content"][0] = + json!(rand::random::<faker_rand::lorem::Paragraphs>().to_string()); - db.add_or_update_webmention(target, TYPE, reply.clone()).await.unwrap(); - let (read_post, _) = db.read_feed_with_cursor(target, None, 20, None).await.unwrap().unwrap(); + db.add_or_update_webmention(target, TYPE, reply.clone()) + .await + .unwrap(); + let (read_post, _) = db + .read_feed_with_cursor(target, None, 20, None) + .await + .unwrap() + .unwrap(); assert_eq!(read_post["properties"]["comment"][0], reply); } @@ -752,16 +787,20 @@ mod tests { let post = { let mut post = gen_random_post("fireburn.ru"); let urls = post["properties"]["url"].as_array_mut().unwrap(); - urls.push(serde_json::Value::String( - PERMALINK.to_owned() - )); + urls.push(serde_json::Value::String(PERMALINK.to_owned())); post }; - db.put_post(&post, &"https://fireburn.ru/".parse().unwrap()).await.unwrap(); + db.put_post(&post, &"https://fireburn.ru/".parse().unwrap()) + .await + .unwrap(); for i in post["properties"]["url"].as_array().unwrap() { - let (read_post, _) = db.read_feed_with_cursor(i.as_str().unwrap(), None, 20, None).await.unwrap().unwrap(); + let (read_post, _) = db + .read_feed_with_cursor(i.as_str().unwrap(), None, 20, None) + .await + .unwrap() + .unwrap(); assert_eq!(read_post, post); } } @@ -786,7 +825,7 @@ mod tests { async fn $func_name() { let tempdir = tempfile::tempdir().expect("Failed to create tempdir"); let backend = super::super::FileStorage { - root_dir: tempdir.path().to_path_buf() + root_dir: tempdir.path().to_path_buf(), }; super::$func_name(backend).await } @@ -800,7 +839,7 @@ mod tests { #[tracing_test::traced_test] async fn $func_name( pool_opts: sqlx::postgres::PgPoolOptions, - connect_opts: sqlx::postgres::PgConnectOptions + connect_opts: sqlx::postgres::PgConnectOptions, ) -> Result<(), sqlx::Error> { let db = { //use sqlx::ConnectOptions; diff --git a/src/database/postgres/mod.rs b/src/database/postgres/mod.rs index af19fea..ec67efa 100644 --- a/src/database/postgres/mod.rs +++ b/src/database/postgres/mod.rs @@ -5,7 +5,7 @@ use kittybox_util::{micropub::Channel as MicropubChannel, MentionType}; use sqlx::{ConnectOptions, Executor, PgPool}; use super::settings::Setting; -use super::{Storage, Result, StorageError, ErrorKind}; +use super::{ErrorKind, Result, Storage, StorageError}; static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!(); @@ -14,7 +14,7 @@ impl From<sqlx::Error> for StorageError { Self::with_source( super::ErrorKind::Backend, Cow::Owned(format!("sqlx error: {}", &value)), - Box::new(value) + Box::new(value), ) } } @@ -24,7 +24,7 @@ impl From<sqlx::migrate::MigrateError> for StorageError { Self::with_source( super::ErrorKind::Backend, Cow::Owned(format!("sqlx migration error: {}", &value)), - Box::new(value) + Box::new(value), ) } } @@ -32,14 +32,15 @@ impl From<sqlx::migrate::MigrateError> for StorageError { /// Micropub storage that uses a PostgreSQL database. #[derive(Debug, Clone)] pub struct PostgresStorage { - db: PgPool + db: PgPool, } impl PostgresStorage { /// Construct a [`PostgresStorage`] from a [`sqlx::PgPool`], /// running appropriate migrations. pub(crate) async fn from_pool(db: sqlx::PgPool) -> Result<Self> { - db.execute(sqlx::query("CREATE SCHEMA IF NOT EXISTS kittybox")).await?; + db.execute(sqlx::query("CREATE SCHEMA IF NOT EXISTS kittybox")) + .await?; MIGRATOR.run(&db).await?; Ok(Self { db }) } @@ -50,19 +51,22 @@ impl Storage for PostgresStorage { /// migrations on the database. async fn new(url: &'_ url::Url) -> Result<Self> { tracing::debug!("Postgres URL: {url}"); - let options = sqlx::postgres::PgConnectOptions::from_url(url)? - .options([("search_path", "kittybox")]); + let options = + sqlx::postgres::PgConnectOptions::from_url(url)?.options([("search_path", "kittybox")]); Self::from_pool( sqlx::postgres::PgPoolOptions::new() .max_connections(50) .connect_with(options) - .await? - ).await - + .await?, + ) + .await } - async fn all_posts<'this>(&'this self, user: &url::Url) -> Result<impl Stream<Item = serde_json::Value> + Send + 'this> { + async fn all_posts<'this>( + &'this self, + user: &url::Url, + ) -> Result<impl Stream<Item = serde_json::Value> + Send + 'this> { let authority = user.authority().to_owned(); Ok( sqlx::query_scalar::<_, serde_json::Value>("SELECT mf2 FROM kittybox.mf2_json WHERE owner = $1 ORDER BY (mf2_json.mf2 #>> '{properties,published,0}') DESC") @@ -74,18 +78,20 @@ impl Storage for PostgresStorage { #[tracing::instrument(skip(self))] async fn categories(&self, url: &str) -> Result<Vec<String>> { - sqlx::query_scalar::<_, String>(" + sqlx::query_scalar::<_, String>( + " SELECT jsonb_array_elements(mf2['properties']['category']) AS category FROM kittybox.mf2_json WHERE jsonb_typeof(mf2['properties']['category']) = 'array' AND uid LIKE ($1 + '%') GROUP BY category ORDER BY count(*) DESC -") - .bind(url) - .fetch_all(&self.db) - .await - .map_err(|err| err.into()) +", + ) + .bind(url) + .fetch_all(&self.db) + .await + .map_err(|err| err.into()) } #[tracing::instrument(skip(self))] async fn post_exists(&self, url: &str) -> Result<bool> { @@ -98,13 +104,14 @@ WHERE #[tracing::instrument(skip(self))] async fn get_post(&self, url: &str) -> Result<Option<serde_json::Value>> { - sqlx::query_as::<_, (serde_json::Value,)>("SELECT mf2 FROM kittybox.mf2_json WHERE uid = $1 OR mf2['properties']['url'] ? $1") - .bind(url) - .fetch_optional(&self.db) - .await - .map(|v| v.map(|v| v.0)) - .map_err(|err| err.into()) - + sqlx::query_as::<_, (serde_json::Value,)>( + "SELECT mf2 FROM kittybox.mf2_json WHERE uid = $1 OR mf2['properties']['url'] ? $1", + ) + .bind(url) + .fetch_optional(&self.db) + .await + .map(|v| v.map(|v| v.0)) + .map_err(|err| err.into()) } #[tracing::instrument(skip(self))] @@ -122,13 +129,15 @@ WHERE #[tracing::instrument(skip(self))] async fn add_to_feed(&self, feed: &'_ str, post: &'_ str) -> Result<()> { tracing::debug!("Inserting {} into {}", post, feed); - sqlx::query("INSERT INTO kittybox.children (parent, child) VALUES ($1, $2) ON CONFLICT DO NOTHING") - .bind(feed) - .bind(post) - .execute(&self.db) - .await - .map(|_| ()) - .map_err(Into::into) + sqlx::query( + "INSERT INTO kittybox.children (parent, child) VALUES ($1, $2) ON CONFLICT DO NOTHING", + ) + .bind(feed) + .bind(post) + .execute(&self.db) + .await + .map(|_| ()) + .map_err(Into::into) } #[tracing::instrument(skip(self))] @@ -143,7 +152,12 @@ WHERE } #[tracing::instrument(skip(self))] - async fn add_or_update_webmention(&self, target: &str, mention_type: MentionType, mention: serde_json::Value) -> Result<()> { + async fn add_or_update_webmention( + &self, + target: &str, + mention_type: MentionType, + mention: serde_json::Value, + ) -> Result<()> { let mut txn = self.db.begin().await?; let (uid, mut post) = sqlx::query_as::<_, (String, serde_json::Value)>("SELECT uid, mf2 FROM kittybox.mf2_json WHERE uid = $1 OR mf2['properties']['url'] ? $1 FOR UPDATE") @@ -190,7 +204,9 @@ WHERE #[tracing::instrument(skip(self), fields(f = std::any::type_name::<F>()))] async fn update_with<F: FnOnce(&mut serde_json::Value) + Send>( - &self, url: &str, f: F + &self, + url: &str, + f: F, ) -> Result<(serde_json::Value, serde_json::Value)> { tracing::debug!("Updating post {}", url); let mut txn = self.db.begin().await?; @@ -250,12 +266,12 @@ WHERE url: &'_ str, cursor: Option<&'_ str>, limit: usize, - user: Option<&url::Url> + user: Option<&url::Url>, ) -> Result<Option<(serde_json::Value, Option<String>)>> { let mut txn = self.db.begin().await?; sqlx::query("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ, READ ONLY") - .execute(&mut *txn) - .await?; + .execute(&mut *txn) + .await?; tracing::debug!("Started txn: {:?}", txn); let mut feed = match sqlx::query_scalar::<_, serde_json::Value>(" SELECT kittybox.hydrate_author(mf2) FROM kittybox.mf2_json WHERE uid = $1 OR mf2['properties']['url'] ? $1 @@ -273,11 +289,17 @@ SELECT kittybox.hydrate_author(mf2) FROM kittybox.mf2_json WHERE uid = $1 OR mf2 // The second query is very long and will probably be extremely // expensive. It's best to skip it on types where it doesn't make sense // (Kittybox doesn't support rendering children on non-feeds) - if !feed["type"].as_array().unwrap().iter().any(|t| *t == serde_json::json!("h-feed")) { + if !feed["type"] + .as_array() + .unwrap() + .iter() + .any(|t| *t == serde_json::json!("h-feed")) + { return Ok(Some((feed, None))); } - feed["children"] = sqlx::query_scalar::<_, serde_json::Value>(" + feed["children"] = sqlx::query_scalar::<_, serde_json::Value>( + " SELECT kittybox.hydrate_author(mf2) FROM kittybox.mf2_json INNER JOIN kittybox.children ON mf2_json.uid = children.child @@ -302,17 +324,19 @@ WHERE ) AND ($4 IS NULL OR ((mf2_json.mf2 #>> '{properties,published,0}') < $4)) ORDER BY (mf2_json.mf2 #>> '{properties,published,0}') DESC -LIMIT $2" +LIMIT $2", ) - .bind(url) - .bind(limit as i64) - .bind(user.map(url::Url::as_str)) - .bind(cursor) - .fetch_all(&mut *txn) - .await - .map(serde_json::Value::Array)?; - - let new_cursor = feed["children"].as_array().unwrap() + .bind(url) + .bind(limit as i64) + .bind(user.map(url::Url::as_str)) + .bind(cursor) + .fetch_all(&mut *txn) + .await + .map(serde_json::Value::Array)?; + + let new_cursor = feed["children"] + .as_array() + .unwrap() .last() .map(|v| v["properties"]["published"][0].as_str().unwrap().to_owned()); @@ -335,7 +359,7 @@ LIMIT $2" .await { Ok((value,)) => Ok(serde_json::from_value(value)?), - Err(err) => Err(err.into()) + Err(err) => Err(err.into()), } } |