about summary refs log tree commit diff
path: root/src/database
diff options
context:
space:
mode:
authorVika <vika@fireburn.ru>2025-04-09 23:31:02 +0300
committerVika <vika@fireburn.ru>2025-04-09 23:31:57 +0300
commit8826d9446e6c492db2243b9921e59ce496027bef (patch)
tree63738aa9001cb73b11cb0e974e93129bcdf1adbb /src/database
parent519cadfbb298f50cbf819dde757037ab56e2863e (diff)
downloadkittybox-8826d9446e6c492db2243b9921e59ce496027bef.tar.zst
cargo fmt
Change-Id: I80e81ebba3f0cdf8c094451c9fe3ee4126b8c888
Diffstat (limited to 'src/database')
-rw-r--r--src/database/file/mod.rs159
-rw-r--r--src/database/memory.rs41
-rw-r--r--src/database/mod.rs165
-rw-r--r--src/database/postgres/mod.rs122
4 files changed, 303 insertions, 184 deletions
diff --git a/src/database/file/mod.rs b/src/database/file/mod.rs
index b9f27b2..5c93beb 100644
--- a/src/database/file/mod.rs
+++ b/src/database/file/mod.rs
@@ -1,6 +1,6 @@
 //#![warn(clippy::unwrap_used)]
-use crate::database::{ErrorKind, Result, settings, Storage, StorageError};
-use crate::micropub::{MicropubUpdate, MicropubPropertyDeletion};
+use crate::database::{settings, ErrorKind, Result, Storage, StorageError};
+use crate::micropub::{MicropubPropertyDeletion, MicropubUpdate};
 use futures::{stream, StreamExt, TryStreamExt};
 use kittybox_util::MentionType;
 use serde_json::json;
@@ -247,7 +247,9 @@ async fn hydrate_author<S: Storage>(
 impl Storage for FileStorage {
     async fn new(url: &'_ url::Url) -> Result<Self> {
         // TODO: sanity check
-        Ok(Self { root_dir: PathBuf::from(url.path()) })
+        Ok(Self {
+            root_dir: PathBuf::from(url.path()),
+        })
     }
     #[tracing::instrument(skip(self))]
     async fn categories(&self, url: &str) -> Result<Vec<String>> {
@@ -259,7 +261,7 @@ impl Storage for FileStorage {
         // perform well.
         Err(std::io::Error::new(
             std::io::ErrorKind::Unsupported,
-            "?q=category queries are not implemented due to resource constraints"
+            "?q=category queries are not implemented due to resource constraints",
         ))?
     }
 
@@ -340,7 +342,10 @@ impl Storage for FileStorage {
         file.sync_all().await?;
         drop(file);
         tokio::fs::rename(&tempfile, &path).await?;
-        tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?;
+        tokio::fs::File::open(path.parent().unwrap())
+            .await?
+            .sync_all()
+            .await?;
 
         if let Some(urls) = post["properties"]["url"].as_array() {
             for url in urls.iter().map(|i| i.as_str().unwrap()) {
@@ -350,8 +355,8 @@ impl Storage for FileStorage {
                         "{}{}",
                         url.host_str().unwrap(),
                         url.port()
-                        .map(|port| format!(":{}", port))
-                        .unwrap_or_default()
+                            .map(|port| format!(":{}", port))
+                            .unwrap_or_default()
                     )
                 };
                 if url != key && url_domain == user.authority() {
@@ -410,26 +415,24 @@ impl Storage for FileStorage {
                     .create(false)
                     .open(&path)
                     .await
-                 {
-                     Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
-                         Vec::default()
-                     }
-                     Err(err) => {
-                         // Propagate the error upwards
-                         return Err(err.into());
-                     }
-                     Ok(mut file) => {
-                         let mut content = String::new();
-                         file.read_to_string(&mut content).await?;
-                         drop(file);
-
-                         if !content.is_empty() {
-                             serde_json::from_str(&content)?
-                         } else {
-                             Vec::default()
-                         }
-                     }
-                 }
+                {
+                    Err(err) if err.kind() == std::io::ErrorKind::NotFound => Vec::default(),
+                    Err(err) => {
+                        // Propagate the error upwards
+                        return Err(err.into());
+                    }
+                    Ok(mut file) => {
+                        let mut content = String::new();
+                        file.read_to_string(&mut content).await?;
+                        drop(file);
+
+                        if !content.is_empty() {
+                            serde_json::from_str(&content)?
+                        } else {
+                            Vec::default()
+                        }
+                    }
+                }
             };
 
             channels.push(super::MicropubChannel {
@@ -444,7 +447,10 @@ impl Storage for FileStorage {
             tempfile.sync_all().await?;
             drop(tempfile);
             tokio::fs::rename(tempfilename, &path).await?;
-            tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?;
+            tokio::fs::File::open(path.parent().unwrap())
+                .await?
+                .sync_all()
+                .await?;
         }
         Ok(())
     }
@@ -476,7 +482,10 @@ impl Storage for FileStorage {
             temp.sync_all().await?;
             drop(temp);
             tokio::fs::rename(tempfilename, &path).await?;
-            tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?;
+            tokio::fs::File::open(path.parent().unwrap())
+                .await?
+                .sync_all()
+                .await?;
 
             (json, new_json)
         };
@@ -486,7 +495,9 @@ impl Storage for FileStorage {
 
     #[tracing::instrument(skip(self, f), fields(f = std::any::type_name::<F>()))]
     async fn update_with<F: FnOnce(&mut serde_json::Value) + Send>(
-        &self, url: &str, f: F
+        &self,
+        url: &str,
+        f: F,
     ) -> Result<(serde_json::Value, serde_json::Value)> {
         todo!("update_with is not yet implemented due to special requirements of the file backend")
     }
@@ -526,25 +537,25 @@ impl Storage for FileStorage {
         url: &'_ str,
         cursor: Option<&'_ str>,
         limit: usize,
-        user: Option<&url::Url>
+        user: Option<&url::Url>,
     ) -> Result<Option<(serde_json::Value, Option<String>)>> {
         #[allow(deprecated)]
-        Ok(self.read_feed_with_limit(
-            url,
-            cursor,
-            limit,
-            user
-        ).await?
+        Ok(self
+            .read_feed_with_limit(url, cursor, limit, user)
+            .await?
             .map(|feed| {
-                tracing::debug!("Feed: {:#}", serde_json::Value::Array(
-                    feed["children"]
-                    .as_array()
-                    .map(|v| v.as_slice())
-                    .unwrap_or_default()
-                    .iter()
-                    .map(|mf2| mf2["properties"]["uid"][0].clone())
-                    .collect::<Vec<_>>()
-                ));
+                tracing::debug!(
+                    "Feed: {:#}",
+                    serde_json::Value::Array(
+                        feed["children"]
+                            .as_array()
+                            .map(|v| v.as_slice())
+                            .unwrap_or_default()
+                            .iter()
+                            .map(|mf2| mf2["properties"]["uid"][0].clone())
+                            .collect::<Vec<_>>()
+                    )
+                );
                 let cursor: Option<String> = feed["children"]
                     .as_array()
                     .map(|v| v.as_slice())
@@ -553,8 +564,7 @@ impl Storage for FileStorage {
                     .map(|v| v["properties"]["uid"][0].as_str().unwrap().to_owned());
                 tracing::debug!("Extracted the cursor: {:?}", cursor);
                 (feed, cursor)
-            })
-        )
+            }))
     }
 
     #[tracing::instrument(skip(self))]
@@ -574,9 +584,12 @@ impl Storage for FileStorage {
                 let children: Vec<serde_json::Value> = match feed["children"].take() {
                     serde_json::Value::Array(children) => children,
                     // We've already checked it's an array
-                    _ => unreachable!()
+                    _ => unreachable!(),
                 };
-                tracing::debug!("Full children array: {:#}", serde_json::Value::Array(children.clone()));
+                tracing::debug!(
+                    "Full children array: {:#}",
+                    serde_json::Value::Array(children.clone())
+                );
                 let mut posts_iter = children
                     .into_iter()
                     .map(|s: serde_json::Value| s.as_str().unwrap().to_string());
@@ -589,7 +602,7 @@ impl Storage for FileStorage {
                 // incredibly long feeds.
                 if let Some(after) = after {
                     tokio::task::block_in_place(|| {
-                       for s in posts_iter.by_ref() {
+                        for s in posts_iter.by_ref() {
                             if s == after {
                                 break;
                             }
@@ -655,12 +668,19 @@ impl Storage for FileStorage {
         let settings: HashMap<&str, serde_json::Value> = serde_json::from_str(&content)?;
         match settings.get(S::ID) {
             Some(value) => Ok(serde_json::from_value::<S>(value.clone())?),
-            None => Err(StorageError::from_static(ErrorKind::Backend, "Setting not set"))
+            None => Err(StorageError::from_static(
+                ErrorKind::Backend,
+                "Setting not set",
+            )),
         }
     }
 
     #[tracing::instrument(skip(self))]
-    async fn set_setting<S: settings::Setting>(&self, user: &url::Url, value: S::Data) -> Result<()> {
+    async fn set_setting<S: settings::Setting>(
+        &self,
+        user: &url::Url,
+        value: S::Data,
+    ) -> Result<()> {
         let mut path = relative_path::RelativePathBuf::new();
         path.push(user.authority());
         path.push("settings");
@@ -704,20 +724,28 @@ impl Storage for FileStorage {
         tempfile.sync_all().await?;
         drop(tempfile);
         tokio::fs::rename(temppath, &path).await?;
-        tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?;
+        tokio::fs::File::open(path.parent().unwrap())
+            .await?
+            .sync_all()
+            .await?;
         Ok(())
     }
 
     #[tracing::instrument(skip(self))]
-    async fn add_or_update_webmention(&self, target: &str, mention_type: MentionType, mention: serde_json::Value) -> Result<()> {
+    async fn add_or_update_webmention(
+        &self,
+        target: &str,
+        mention_type: MentionType,
+        mention: serde_json::Value,
+    ) -> Result<()> {
         let path = url_to_path(&self.root_dir, target);
         let tempfilename = path.with_extension("tmp");
 
         let mut temp = OpenOptions::new()
-                .write(true)
-                .create_new(true)
-                .open(&tempfilename)
-                .await?;
+            .write(true)
+            .create_new(true)
+            .open(&tempfilename)
+            .await?;
         let mut file = OpenOptions::new().read(true).open(&path).await?;
 
         let mut post: serde_json::Value = {
@@ -752,13 +780,20 @@ impl Storage for FileStorage {
         temp.sync_all().await?;
         drop(temp);
         tokio::fs::rename(tempfilename, &path).await?;
-        tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?;
+        tokio::fs::File::open(path.parent().unwrap())
+            .await?
+            .sync_all()
+            .await?;
 
         Ok(())
     }
 
-    async fn all_posts<'this>(&'this self, user: &url::Url) -> Result<impl futures::Stream<Item = serde_json::Value> + Send + 'this> {
+    async fn all_posts<'this>(
+        &'this self,
+        user: &url::Url,
+    ) -> Result<impl futures::Stream<Item = serde_json::Value> + Send + 'this> {
         todo!();
-        #[allow(unreachable_code)] Ok(futures::stream::empty()) // for type inference
+        #[allow(unreachable_code)]
+        Ok(futures::stream::empty()) // for type inference
     }
 }
diff --git a/src/database/memory.rs b/src/database/memory.rs
index c2ceb85..75f04de 100644
--- a/src/database/memory.rs
+++ b/src/database/memory.rs
@@ -5,7 +5,7 @@ use std::collections::HashMap;
 use std::sync::Arc;
 use tokio::sync::RwLock;
 
-use crate::database::{ErrorKind, MicropubChannel, Result, settings, Storage, StorageError};
+use crate::database::{settings, ErrorKind, MicropubChannel, Result, Storage, StorageError};
 
 #[derive(Clone, Debug, Default)]
 /// A simple in-memory store for testing purposes.
@@ -90,9 +90,16 @@ impl Storage for MemoryStorage {
         Ok(())
     }
 
-    async fn update_post(&self, url: &'_ str, update: crate::micropub::MicropubUpdate) -> Result<()> {
+    async fn update_post(
+        &self,
+        url: &'_ str,
+        update: crate::micropub::MicropubUpdate,
+    ) -> Result<()> {
         let mut guard = self.mapping.write().await;
-        let mut post = guard.get_mut(url).ok_or(StorageError::from_static(ErrorKind::NotFound, "The specified post wasn't found in the database."))?;
+        let mut post = guard.get_mut(url).ok_or(StorageError::from_static(
+            ErrorKind::NotFound,
+            "The specified post wasn't found in the database.",
+        ))?;
 
         use crate::micropub::MicropubPropertyDeletion;
 
@@ -208,7 +215,7 @@ impl Storage for MemoryStorage {
         url: &'_ str,
         cursor: Option<&'_ str>,
         limit: usize,
-        user: Option<&url::Url>
+        user: Option<&url::Url>,
     ) -> Result<Option<(serde_json::Value, Option<String>)>> {
         todo!()
     }
@@ -224,25 +231,39 @@ impl Storage for MemoryStorage {
     }
 
     #[allow(unused_variables)]
-    async fn set_setting<S: settings::Setting>(&self, user: &url::Url, value: S::Data) -> Result<()> {
+    async fn set_setting<S: settings::Setting>(
+        &self,
+        user: &url::Url,
+        value: S::Data,
+    ) -> Result<()> {
         todo!()
     }
 
     #[allow(unused_variables)]
-    async fn add_or_update_webmention(&self, target: &str, mention_type: kittybox_util::MentionType, mention: serde_json::Value) -> Result<()> {
+    async fn add_or_update_webmention(
+        &self,
+        target: &str,
+        mention_type: kittybox_util::MentionType,
+        mention: serde_json::Value,
+    ) -> Result<()> {
         todo!()
     }
 
     #[allow(unused_variables)]
     async fn update_with<F: FnOnce(&mut serde_json::Value) + Send>(
-        &self, url: &str, f: F
+        &self,
+        url: &str,
+        f: F,
     ) -> Result<(serde_json::Value, serde_json::Value)> {
         todo!()
     }
 
-    async fn all_posts<'this>(&'this self, _user: &url::Url) -> Result<impl futures::Stream<Item = serde_json::Value> + Send + 'this> {
+    async fn all_posts<'this>(
+        &'this self,
+        _user: &url::Url,
+    ) -> Result<impl futures::Stream<Item = serde_json::Value> + Send + 'this> {
         todo!();
-        #[allow(unreachable_code)] Ok(futures::stream::pending())
+        #[allow(unreachable_code)]
+        Ok(futures::stream::pending())
     }
-
 }
diff --git a/src/database/mod.rs b/src/database/mod.rs
index 4390ae7..de51c2c 100644
--- a/src/database/mod.rs
+++ b/src/database/mod.rs
@@ -177,7 +177,7 @@ impl StorageError {
         Self {
             msg: Cow::Borrowed(msg),
             source: None,
-            kind
+            kind,
         }
     }
     /// Create a StorageError using another arbitrary Error as a source.
@@ -219,27 +219,34 @@ pub trait Storage: std::fmt::Debug + Clone + Send + Sync {
     fn post_exists(&self, url: &str) -> impl Future<Output = Result<bool>> + Send;
 
     /// Load a post from the database in MF2-JSON format, deserialized from JSON.
-    fn get_post(&self, url: &str) -> impl Future<Output = Result<Option<serde_json::Value>>> + Send;
+    fn get_post(&self, url: &str)
+        -> impl Future<Output = Result<Option<serde_json::Value>>> + Send;
 
     /// Save a post to the database as an MF2-JSON structure.
     ///
     /// Note that the `post` object MUST have `post["properties"]["uid"][0]` defined.
-    fn put_post(&self, post: &serde_json::Value, user: &url::Url) -> impl Future<Output = Result<()>> + Send;
+    fn put_post(
+        &self,
+        post: &serde_json::Value,
+        user: &url::Url,
+    ) -> impl Future<Output = Result<()>> + Send;
 
     /// Add post to feed. Some database implementations might have optimized ways to do this.
     #[tracing::instrument(skip(self))]
     fn add_to_feed(&self, feed: &str, post: &str) -> impl Future<Output = Result<()>> + Send {
         tracing::debug!("Inserting {} into {} using `update_post`", post, feed);
-        self.update_post(feed, serde_json::from_value(
-            serde_json::json!({"add": {"children": [post]}})).unwrap()
+        self.update_post(
+            feed,
+            serde_json::from_value(serde_json::json!({"add": {"children": [post]}})).unwrap(),
         )
     }
     /// Remove post from feed. Some database implementations might have optimized ways to do this.
     #[tracing::instrument(skip(self))]
     fn remove_from_feed(&self, feed: &str, post: &str) -> impl Future<Output = Result<()>> + Send {
         tracing::debug!("Removing {} into {} using `update_post`", post, feed);
-        self.update_post(feed, serde_json::from_value(
-            serde_json::json!({"delete": {"children": [post]}})).unwrap()
+        self.update_post(
+            feed,
+            serde_json::from_value(serde_json::json!({"delete": {"children": [post]}})).unwrap(),
         )
     }
 
@@ -254,7 +261,11 @@ pub trait Storage: std::fmt::Debug + Clone + Send + Sync {
     ///
     /// Default implementation calls [`Storage::update_with`] and uses
     /// [`update.apply`][MicropubUpdate::apply] to update the post.
-    fn update_post(&self, url: &str, update: MicropubUpdate) -> impl Future<Output = Result<()>> + Send {
+    fn update_post(
+        &self,
+        url: &str,
+        update: MicropubUpdate,
+    ) -> impl Future<Output = Result<()>> + Send {
         let fut = self.update_with(url, |post| {
             update.apply(post);
         });
@@ -274,12 +285,17 @@ pub trait Storage: std::fmt::Debug + Clone + Send + Sync {
     ///
     /// Returns old post and the new post after editing.
     fn update_with<F: FnOnce(&mut serde_json::Value) + Send>(
-        &self, url: &str, f: F
+        &self,
+        url: &str,
+        f: F,
     ) -> impl Future<Output = Result<(serde_json::Value, serde_json::Value)>> + Send;
 
     /// Get a list of channels available for the user represented by
     /// the `user` domain to write to.
-    fn get_channels(&self, user: &url::Url) -> impl Future<Output = Result<Vec<MicropubChannel>>> + Send;
+    fn get_channels(
+        &self,
+        user: &url::Url,
+    ) -> impl Future<Output = Result<Vec<MicropubChannel>>> + Send;
 
     /// Fetch a feed at `url` and return an h-feed object containing
     /// `limit` posts after a post by url `after`, filtering the content
@@ -329,7 +345,7 @@ pub trait Storage: std::fmt::Debug + Clone + Send + Sync {
         url: &'_ str,
         cursor: Option<&'_ str>,
         limit: usize,
-        user: Option<&url::Url>
+        user: Option<&url::Url>,
     ) -> impl Future<Output = Result<Option<(serde_json::Value, Option<String>)>>> + Send;
 
     /// Deletes a post from the database irreversibly. Must be idempotent.
@@ -339,7 +355,11 @@ pub trait Storage: std::fmt::Debug + Clone + Send + Sync {
     fn get_setting<S: Setting>(&self, user: &url::Url) -> impl Future<Output = Result<S>> + Send;
 
     /// Commits a setting to the setting store.
-    fn set_setting<S: Setting>(&self, user: &url::Url, value: S::Data) -> impl Future<Output = Result<()>> + Send;
+    fn set_setting<S: Setting>(
+        &self,
+        user: &url::Url,
+        value: S::Data,
+    ) -> impl Future<Output = Result<()>> + Send;
 
     /// Add (or update) a webmention on a certian post.
     ///
@@ -355,11 +375,19 @@ pub trait Storage: std::fmt::Debug + Clone + Send + Sync {
     ///
     /// Besides, it may even allow for nice tricks like storing the
     /// webmentions separately and rehydrating them on feed reads.
-    fn add_or_update_webmention(&self, target: &str, mention_type: MentionType, mention: serde_json::Value) -> impl Future<Output = Result<()>> + Send;
+    fn add_or_update_webmention(
+        &self,
+        target: &str,
+        mention_type: MentionType,
+        mention: serde_json::Value,
+    ) -> impl Future<Output = Result<()>> + Send;
 
     /// Return a stream of all posts ever made by a certain user, in
     /// reverse-chronological order.
-    fn all_posts<'this>(&'this self, user: &url::Url) -> impl Future<Output = Result<impl futures::Stream<Item = serde_json::Value> + Send + 'this>> + Send;
+    fn all_posts<'this>(
+        &'this self,
+        user: &url::Url,
+    ) -> impl Future<Output = Result<impl futures::Stream<Item = serde_json::Value> + Send + 'this>> + Send;
 }
 
 #[cfg(test)]
@@ -464,7 +492,8 @@ mod tests {
                     "replace": {
                         "content": ["Different test content"]
                     }
-                })).unwrap(),
+                }))
+                .unwrap(),
             )
             .await
             .unwrap();
@@ -511,7 +540,10 @@ mod tests {
             .put_post(&feed, &"https://fireburn.ru/".parse().unwrap())
             .await
             .unwrap();
-        let chans = backend.get_channels(&"https://fireburn.ru/".parse().unwrap()).await.unwrap();
+        let chans = backend
+            .get_channels(&"https://fireburn.ru/".parse().unwrap())
+            .await
+            .unwrap();
         assert_eq!(chans.len(), 1);
         assert_eq!(
             chans[0],
@@ -526,16 +558,16 @@ mod tests {
         backend
             .set_setting::<settings::SiteName>(
                 &"https://fireburn.ru/".parse().unwrap(),
-                "Vika's Hideout".to_owned()
+                "Vika's Hideout".to_owned(),
             )
             .await
             .unwrap();
         assert_eq!(
             backend
-            .get_setting::<settings::SiteName>(&"https://fireburn.ru/".parse().unwrap())
-            .await
-            .unwrap()
-            .as_ref(),
+                .get_setting::<settings::SiteName>(&"https://fireburn.ru/".parse().unwrap())
+                .await
+                .unwrap()
+                .as_ref(),
             "Vika's Hideout"
         );
     }
@@ -597,11 +629,9 @@ mod tests {
 
     async fn test_feed_pagination<Backend: Storage>(backend: Backend) {
         let posts = {
-            let mut posts = std::iter::from_fn(
-                || Some(gen_random_post("fireburn.ru"))
-            )
-            .take(40)
-            .collect::<Vec<serde_json::Value>>();
+            let mut posts = std::iter::from_fn(|| Some(gen_random_post("fireburn.ru")))
+                .take(40)
+                .collect::<Vec<serde_json::Value>>();
 
             // Reverse the array so it's in reverse-chronological order
             posts.reverse();
@@ -629,7 +659,10 @@ mod tests {
                 .put_post(post, &"https://fireburn.ru/".parse().unwrap())
                 .await
                 .unwrap();
-            backend.add_to_feed(key, post["properties"]["uid"][0].as_str().unwrap()).await.unwrap();
+            backend
+                .add_to_feed(key, post["properties"]["uid"][0].as_str().unwrap())
+                .await
+                .unwrap();
         }
 
         let limit: usize = 10;
@@ -648,23 +681,16 @@ mod tests {
                 .unwrap()
                 .iter()
                 .map(|post| post["properties"]["uid"][0].as_str().unwrap())
-                .collect::<Vec<_>>()
-                [0..10],
+                .collect::<Vec<_>>()[0..10],
             posts
                 .iter()
                 .map(|post| post["properties"]["uid"][0].as_str().unwrap())
-                .collect::<Vec<_>>()
-                [0..10]
+                .collect::<Vec<_>>()[0..10]
         );
 
         tracing::debug!("Continuing with cursor: {:?}", cursor);
         let (result2, cursor2) = backend
-            .read_feed_with_cursor(
-                key,
-                cursor.as_deref(),
-                limit,
-                None,
-            )
+            .read_feed_with_cursor(key, cursor.as_deref(), limit, None)
             .await
             .unwrap()
             .unwrap();
@@ -676,12 +702,7 @@ mod tests {
 
         tracing::debug!("Continuing with cursor: {:?}", cursor);
         let (result3, cursor3) = backend
-            .read_feed_with_cursor(
-                key,
-                cursor2.as_deref(),
-                limit,
-                None,
-            )
+            .read_feed_with_cursor(key, cursor2.as_deref(), limit, None)
             .await
             .unwrap()
             .unwrap();
@@ -693,12 +714,7 @@ mod tests {
 
         tracing::debug!("Continuing with cursor: {:?}", cursor);
         let (result4, _) = backend
-            .read_feed_with_cursor(
-                key,
-                cursor3.as_deref(),
-                limit,
-                None,
-            )
+            .read_feed_with_cursor(key, cursor3.as_deref(), limit, None)
             .await
             .unwrap()
             .unwrap();
@@ -725,24 +741,43 @@ mod tests {
     async fn test_webmention_addition<Backend: Storage>(db: Backend) {
         let post = gen_random_post("fireburn.ru");
 
-        db.put_post(&post, &"https://fireburn.ru/".parse().unwrap()).await.unwrap();
+        db.put_post(&post, &"https://fireburn.ru/".parse().unwrap())
+            .await
+            .unwrap();
         const TYPE: MentionType = MentionType::Reply;
 
         let target = post["properties"]["uid"][0].as_str().unwrap();
         let mut reply = gen_random_mention("aaronparecki.com", TYPE, target);
 
-        let (read_post, _) = db.read_feed_with_cursor(target, None, 20, None).await.unwrap().unwrap();
+        let (read_post, _) = db
+            .read_feed_with_cursor(target, None, 20, None)
+            .await
+            .unwrap()
+            .unwrap();
         assert_eq!(post, read_post);
 
-        db.add_or_update_webmention(target, TYPE, reply.clone()).await.unwrap();
+        db.add_or_update_webmention(target, TYPE, reply.clone())
+            .await
+            .unwrap();
 
-        let (read_post, _) = db.read_feed_with_cursor(target, None, 20, None).await.unwrap().unwrap();
+        let (read_post, _) = db
+            .read_feed_with_cursor(target, None, 20, None)
+            .await
+            .unwrap()
+            .unwrap();
         assert_eq!(read_post["properties"]["comment"][0], reply);
 
-        reply["properties"]["content"][0] = json!(rand::random::<faker_rand::lorem::Paragraphs>().to_string());
+        reply["properties"]["content"][0] =
+            json!(rand::random::<faker_rand::lorem::Paragraphs>().to_string());
 
-        db.add_or_update_webmention(target, TYPE, reply.clone()).await.unwrap();
-        let (read_post, _) = db.read_feed_with_cursor(target, None, 20, None).await.unwrap().unwrap();
+        db.add_or_update_webmention(target, TYPE, reply.clone())
+            .await
+            .unwrap();
+        let (read_post, _) = db
+            .read_feed_with_cursor(target, None, 20, None)
+            .await
+            .unwrap()
+            .unwrap();
         assert_eq!(read_post["properties"]["comment"][0], reply);
     }
 
@@ -752,16 +787,20 @@ mod tests {
         let post = {
             let mut post = gen_random_post("fireburn.ru");
             let urls = post["properties"]["url"].as_array_mut().unwrap();
-            urls.push(serde_json::Value::String(
-                PERMALINK.to_owned()
-            ));
+            urls.push(serde_json::Value::String(PERMALINK.to_owned()));
 
             post
         };
-        db.put_post(&post, &"https://fireburn.ru/".parse().unwrap()).await.unwrap();
+        db.put_post(&post, &"https://fireburn.ru/".parse().unwrap())
+            .await
+            .unwrap();
 
         for i in post["properties"]["url"].as_array().unwrap() {
-            let (read_post, _) = db.read_feed_with_cursor(i.as_str().unwrap(), None, 20, None).await.unwrap().unwrap();
+            let (read_post, _) = db
+                .read_feed_with_cursor(i.as_str().unwrap(), None, 20, None)
+                .await
+                .unwrap()
+                .unwrap();
             assert_eq!(read_post, post);
         }
     }
@@ -786,7 +825,7 @@ mod tests {
             async fn $func_name() {
                 let tempdir = tempfile::tempdir().expect("Failed to create tempdir");
                 let backend = super::super::FileStorage {
-                    root_dir: tempdir.path().to_path_buf()
+                    root_dir: tempdir.path().to_path_buf(),
                 };
                 super::$func_name(backend).await
             }
@@ -800,7 +839,7 @@ mod tests {
             #[tracing_test::traced_test]
             async fn $func_name(
                 pool_opts: sqlx::postgres::PgPoolOptions,
-                connect_opts: sqlx::postgres::PgConnectOptions
+                connect_opts: sqlx::postgres::PgConnectOptions,
             ) -> Result<(), sqlx::Error> {
                 let db = {
                     //use sqlx::ConnectOptions;
diff --git a/src/database/postgres/mod.rs b/src/database/postgres/mod.rs
index af19fea..ec67efa 100644
--- a/src/database/postgres/mod.rs
+++ b/src/database/postgres/mod.rs
@@ -5,7 +5,7 @@ use kittybox_util::{micropub::Channel as MicropubChannel, MentionType};
 use sqlx::{ConnectOptions, Executor, PgPool};
 
 use super::settings::Setting;
-use super::{Storage, Result, StorageError, ErrorKind};
+use super::{ErrorKind, Result, Storage, StorageError};
 
 static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!();
 
@@ -14,7 +14,7 @@ impl From<sqlx::Error> for StorageError {
         Self::with_source(
             super::ErrorKind::Backend,
             Cow::Owned(format!("sqlx error: {}", &value)),
-            Box::new(value)
+            Box::new(value),
         )
     }
 }
@@ -24,7 +24,7 @@ impl From<sqlx::migrate::MigrateError> for StorageError {
         Self::with_source(
             super::ErrorKind::Backend,
             Cow::Owned(format!("sqlx migration error: {}", &value)),
-            Box::new(value)
+            Box::new(value),
         )
     }
 }
@@ -32,14 +32,15 @@ impl From<sqlx::migrate::MigrateError> for StorageError {
 /// Micropub storage that uses a PostgreSQL database.
 #[derive(Debug, Clone)]
 pub struct PostgresStorage {
-    db: PgPool
+    db: PgPool,
 }
 
 impl PostgresStorage {
     /// Construct a [`PostgresStorage`] from a [`sqlx::PgPool`],
     /// running appropriate migrations.
     pub(crate) async fn from_pool(db: sqlx::PgPool) -> Result<Self> {
-        db.execute(sqlx::query("CREATE SCHEMA IF NOT EXISTS kittybox")).await?;
+        db.execute(sqlx::query("CREATE SCHEMA IF NOT EXISTS kittybox"))
+            .await?;
         MIGRATOR.run(&db).await?;
         Ok(Self { db })
     }
@@ -50,19 +51,22 @@ impl Storage for PostgresStorage {
     /// migrations on the database.
     async fn new(url: &'_ url::Url) -> Result<Self> {
         tracing::debug!("Postgres URL: {url}");
-        let options = sqlx::postgres::PgConnectOptions::from_url(url)?
-            .options([("search_path", "kittybox")]);
+        let options =
+            sqlx::postgres::PgConnectOptions::from_url(url)?.options([("search_path", "kittybox")]);
 
         Self::from_pool(
             sqlx::postgres::PgPoolOptions::new()
                 .max_connections(50)
                 .connect_with(options)
-                .await?
-        ).await
-
+                .await?,
+        )
+        .await
     }
 
-    async fn all_posts<'this>(&'this self, user: &url::Url) -> Result<impl Stream<Item = serde_json::Value> + Send + 'this> {
+    async fn all_posts<'this>(
+        &'this self,
+        user: &url::Url,
+    ) -> Result<impl Stream<Item = serde_json::Value> + Send + 'this> {
         let authority = user.authority().to_owned();
         Ok(
             sqlx::query_scalar::<_, serde_json::Value>("SELECT mf2 FROM kittybox.mf2_json WHERE owner = $1 ORDER BY (mf2_json.mf2 #>> '{properties,published,0}') DESC")
@@ -74,18 +78,20 @@ impl Storage for PostgresStorage {
 
     #[tracing::instrument(skip(self))]
     async fn categories(&self, url: &str) -> Result<Vec<String>> {
-        sqlx::query_scalar::<_, String>("
+        sqlx::query_scalar::<_, String>(
+            "
 SELECT jsonb_array_elements(mf2['properties']['category']) AS category
 FROM kittybox.mf2_json
 WHERE
     jsonb_typeof(mf2['properties']['category']) = 'array'
     AND uid LIKE ($1 + '%')
     GROUP BY category ORDER BY count(*) DESC
-")
-            .bind(url)
-            .fetch_all(&self.db)
-            .await
-            .map_err(|err| err.into())
+",
+        )
+        .bind(url)
+        .fetch_all(&self.db)
+        .await
+        .map_err(|err| err.into())
     }
     #[tracing::instrument(skip(self))]
     async fn post_exists(&self, url: &str) -> Result<bool> {
@@ -98,13 +104,14 @@ WHERE
 
     #[tracing::instrument(skip(self))]
     async fn get_post(&self, url: &str) -> Result<Option<serde_json::Value>> {
-        sqlx::query_as::<_, (serde_json::Value,)>("SELECT mf2 FROM kittybox.mf2_json WHERE uid = $1 OR mf2['properties']['url'] ? $1")
-            .bind(url)
-            .fetch_optional(&self.db)
-            .await
-            .map(|v| v.map(|v| v.0))
-            .map_err(|err| err.into())
-
+        sqlx::query_as::<_, (serde_json::Value,)>(
+            "SELECT mf2 FROM kittybox.mf2_json WHERE uid = $1 OR mf2['properties']['url'] ? $1",
+        )
+        .bind(url)
+        .fetch_optional(&self.db)
+        .await
+        .map(|v| v.map(|v| v.0))
+        .map_err(|err| err.into())
     }
 
     #[tracing::instrument(skip(self))]
@@ -122,13 +129,15 @@ WHERE
     #[tracing::instrument(skip(self))]
     async fn add_to_feed(&self, feed: &'_ str, post: &'_ str) -> Result<()> {
         tracing::debug!("Inserting {} into {}", post, feed);
-        sqlx::query("INSERT INTO kittybox.children (parent, child) VALUES ($1, $2) ON CONFLICT DO NOTHING")
-            .bind(feed)
-            .bind(post)
-            .execute(&self.db)
-            .await
-            .map(|_| ())
-            .map_err(Into::into)
+        sqlx::query(
+            "INSERT INTO kittybox.children (parent, child) VALUES ($1, $2) ON CONFLICT DO NOTHING",
+        )
+        .bind(feed)
+        .bind(post)
+        .execute(&self.db)
+        .await
+        .map(|_| ())
+        .map_err(Into::into)
     }
 
     #[tracing::instrument(skip(self))]
@@ -143,7 +152,12 @@ WHERE
     }
 
     #[tracing::instrument(skip(self))]
-    async fn add_or_update_webmention(&self, target: &str, mention_type: MentionType, mention: serde_json::Value) -> Result<()> {
+    async fn add_or_update_webmention(
+        &self,
+        target: &str,
+        mention_type: MentionType,
+        mention: serde_json::Value,
+    ) -> Result<()> {
         let mut txn = self.db.begin().await?;
 
         let (uid, mut post) = sqlx::query_as::<_, (String, serde_json::Value)>("SELECT uid, mf2 FROM kittybox.mf2_json WHERE uid = $1 OR mf2['properties']['url'] ? $1 FOR UPDATE")
@@ -190,7 +204,9 @@ WHERE
 
     #[tracing::instrument(skip(self), fields(f = std::any::type_name::<F>()))]
     async fn update_with<F: FnOnce(&mut serde_json::Value) + Send>(
-        &self, url: &str, f: F
+        &self,
+        url: &str,
+        f: F,
     ) -> Result<(serde_json::Value, serde_json::Value)> {
         tracing::debug!("Updating post {}", url);
         let mut txn = self.db.begin().await?;
@@ -250,12 +266,12 @@ WHERE
         url: &'_ str,
         cursor: Option<&'_ str>,
         limit: usize,
-        user: Option<&url::Url>
+        user: Option<&url::Url>,
     ) -> Result<Option<(serde_json::Value, Option<String>)>> {
         let mut txn = self.db.begin().await?;
         sqlx::query("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ, READ ONLY")
-			.execute(&mut *txn)
-			.await?;
+            .execute(&mut *txn)
+            .await?;
         tracing::debug!("Started txn: {:?}", txn);
         let mut feed = match sqlx::query_scalar::<_, serde_json::Value>("
 SELECT kittybox.hydrate_author(mf2) FROM kittybox.mf2_json WHERE uid = $1 OR mf2['properties']['url'] ? $1
@@ -273,11 +289,17 @@ SELECT kittybox.hydrate_author(mf2) FROM kittybox.mf2_json WHERE uid = $1 OR mf2
         // The second query is very long and will probably be extremely
         // expensive. It's best to skip it on types where it doesn't make sense
         // (Kittybox doesn't support rendering children on non-feeds)
-        if !feed["type"].as_array().unwrap().iter().any(|t| *t == serde_json::json!("h-feed")) {
+        if !feed["type"]
+            .as_array()
+            .unwrap()
+            .iter()
+            .any(|t| *t == serde_json::json!("h-feed"))
+        {
             return Ok(Some((feed, None)));
         }
 
-        feed["children"] = sqlx::query_scalar::<_, serde_json::Value>("
+        feed["children"] = sqlx::query_scalar::<_, serde_json::Value>(
+            "
 SELECT kittybox.hydrate_author(mf2) FROM kittybox.mf2_json
 INNER JOIN kittybox.children
 ON mf2_json.uid = children.child
@@ -302,17 +324,19 @@ WHERE
     )
     AND ($4 IS NULL OR ((mf2_json.mf2 #>> '{properties,published,0}') < $4))
 ORDER BY (mf2_json.mf2 #>> '{properties,published,0}') DESC
-LIMIT $2"
+LIMIT $2",
         )
-            .bind(url)
-            .bind(limit as i64)
-            .bind(user.map(url::Url::as_str))
-            .bind(cursor)
-            .fetch_all(&mut *txn)
-            .await
-            .map(serde_json::Value::Array)?;
-
-        let new_cursor = feed["children"].as_array().unwrap()
+        .bind(url)
+        .bind(limit as i64)
+        .bind(user.map(url::Url::as_str))
+        .bind(cursor)
+        .fetch_all(&mut *txn)
+        .await
+        .map(serde_json::Value::Array)?;
+
+        let new_cursor = feed["children"]
+            .as_array()
+            .unwrap()
             .last()
             .map(|v| v["properties"]["published"][0].as_str().unwrap().to_owned());
 
@@ -335,7 +359,7 @@ LIMIT $2"
             .await
         {
             Ok((value,)) => Ok(serde_json::from_value(value)?),
-            Err(err) => Err(err.into())
+            Err(err) => Err(err.into()),
         }
     }