about summary refs log tree commit diff
path: root/src/database/postgres
diff options
context:
space:
mode:
Diffstat (limited to 'src/database/postgres')
-rw-r--r--src/database/postgres/mod.rs122
1 files changed, 73 insertions, 49 deletions
diff --git a/src/database/postgres/mod.rs b/src/database/postgres/mod.rs
index af19fea..ec67efa 100644
--- a/src/database/postgres/mod.rs
+++ b/src/database/postgres/mod.rs
@@ -5,7 +5,7 @@ use kittybox_util::{micropub::Channel as MicropubChannel, MentionType};
 use sqlx::{ConnectOptions, Executor, PgPool};
 
 use super::settings::Setting;
-use super::{Storage, Result, StorageError, ErrorKind};
+use super::{ErrorKind, Result, Storage, StorageError};
 
 static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!();
 
@@ -14,7 +14,7 @@ impl From<sqlx::Error> for StorageError {
         Self::with_source(
             super::ErrorKind::Backend,
             Cow::Owned(format!("sqlx error: {}", &value)),
-            Box::new(value)
+            Box::new(value),
         )
     }
 }
@@ -24,7 +24,7 @@ impl From<sqlx::migrate::MigrateError> for StorageError {
         Self::with_source(
             super::ErrorKind::Backend,
             Cow::Owned(format!("sqlx migration error: {}", &value)),
-            Box::new(value)
+            Box::new(value),
         )
     }
 }
@@ -32,14 +32,15 @@ impl From<sqlx::migrate::MigrateError> for StorageError {
 /// Micropub storage that uses a PostgreSQL database.
 #[derive(Debug, Clone)]
 pub struct PostgresStorage {
-    db: PgPool
+    db: PgPool,
 }
 
 impl PostgresStorage {
     /// Construct a [`PostgresStorage`] from a [`sqlx::PgPool`],
     /// running appropriate migrations.
     pub(crate) async fn from_pool(db: sqlx::PgPool) -> Result<Self> {
-        db.execute(sqlx::query("CREATE SCHEMA IF NOT EXISTS kittybox")).await?;
+        db.execute(sqlx::query("CREATE SCHEMA IF NOT EXISTS kittybox"))
+            .await?;
         MIGRATOR.run(&db).await?;
         Ok(Self { db })
     }
@@ -50,19 +51,22 @@ impl Storage for PostgresStorage {
     /// migrations on the database.
     async fn new(url: &'_ url::Url) -> Result<Self> {
         tracing::debug!("Postgres URL: {url}");
-        let options = sqlx::postgres::PgConnectOptions::from_url(url)?
-            .options([("search_path", "kittybox")]);
+        let options =
+            sqlx::postgres::PgConnectOptions::from_url(url)?.options([("search_path", "kittybox")]);
 
         Self::from_pool(
             sqlx::postgres::PgPoolOptions::new()
                 .max_connections(50)
                 .connect_with(options)
-                .await?
-        ).await
-
+                .await?,
+        )
+        .await
     }
 
-    async fn all_posts<'this>(&'this self, user: &url::Url) -> Result<impl Stream<Item = serde_json::Value> + Send + 'this> {
+    async fn all_posts<'this>(
+        &'this self,
+        user: &url::Url,
+    ) -> Result<impl Stream<Item = serde_json::Value> + Send + 'this> {
         let authority = user.authority().to_owned();
         Ok(
             sqlx::query_scalar::<_, serde_json::Value>("SELECT mf2 FROM kittybox.mf2_json WHERE owner = $1 ORDER BY (mf2_json.mf2 #>> '{properties,published,0}') DESC")
@@ -74,18 +78,20 @@ impl Storage for PostgresStorage {
 
     #[tracing::instrument(skip(self))]
     async fn categories(&self, url: &str) -> Result<Vec<String>> {
-        sqlx::query_scalar::<_, String>("
+        sqlx::query_scalar::<_, String>(
+            "
 SELECT jsonb_array_elements(mf2['properties']['category']) AS category
 FROM kittybox.mf2_json
 WHERE
     jsonb_typeof(mf2['properties']['category']) = 'array'
     AND uid LIKE ($1 + '%')
     GROUP BY category ORDER BY count(*) DESC
-")
-            .bind(url)
-            .fetch_all(&self.db)
-            .await
-            .map_err(|err| err.into())
+",
+        )
+        .bind(url)
+        .fetch_all(&self.db)
+        .await
+        .map_err(|err| err.into())
     }
     #[tracing::instrument(skip(self))]
     async fn post_exists(&self, url: &str) -> Result<bool> {
@@ -98,13 +104,14 @@ WHERE
 
     #[tracing::instrument(skip(self))]
     async fn get_post(&self, url: &str) -> Result<Option<serde_json::Value>> {
-        sqlx::query_as::<_, (serde_json::Value,)>("SELECT mf2 FROM kittybox.mf2_json WHERE uid = $1 OR mf2['properties']['url'] ? $1")
-            .bind(url)
-            .fetch_optional(&self.db)
-            .await
-            .map(|v| v.map(|v| v.0))
-            .map_err(|err| err.into())
-
+        sqlx::query_as::<_, (serde_json::Value,)>(
+            "SELECT mf2 FROM kittybox.mf2_json WHERE uid = $1 OR mf2['properties']['url'] ? $1",
+        )
+        .bind(url)
+        .fetch_optional(&self.db)
+        .await
+        .map(|v| v.map(|v| v.0))
+        .map_err(|err| err.into())
     }
 
     #[tracing::instrument(skip(self))]
@@ -122,13 +129,15 @@ WHERE
     #[tracing::instrument(skip(self))]
     async fn add_to_feed(&self, feed: &'_ str, post: &'_ str) -> Result<()> {
         tracing::debug!("Inserting {} into {}", post, feed);
-        sqlx::query("INSERT INTO kittybox.children (parent, child) VALUES ($1, $2) ON CONFLICT DO NOTHING")
-            .bind(feed)
-            .bind(post)
-            .execute(&self.db)
-            .await
-            .map(|_| ())
-            .map_err(Into::into)
+        sqlx::query(
+            "INSERT INTO kittybox.children (parent, child) VALUES ($1, $2) ON CONFLICT DO NOTHING",
+        )
+        .bind(feed)
+        .bind(post)
+        .execute(&self.db)
+        .await
+        .map(|_| ())
+        .map_err(Into::into)
     }
 
     #[tracing::instrument(skip(self))]
@@ -143,7 +152,12 @@ WHERE
     }
 
     #[tracing::instrument(skip(self))]
-    async fn add_or_update_webmention(&self, target: &str, mention_type: MentionType, mention: serde_json::Value) -> Result<()> {
+    async fn add_or_update_webmention(
+        &self,
+        target: &str,
+        mention_type: MentionType,
+        mention: serde_json::Value,
+    ) -> Result<()> {
         let mut txn = self.db.begin().await?;
 
         let (uid, mut post) = sqlx::query_as::<_, (String, serde_json::Value)>("SELECT uid, mf2 FROM kittybox.mf2_json WHERE uid = $1 OR mf2['properties']['url'] ? $1 FOR UPDATE")
@@ -190,7 +204,9 @@ WHERE
 
     #[tracing::instrument(skip(self), fields(f = std::any::type_name::<F>()))]
     async fn update_with<F: FnOnce(&mut serde_json::Value) + Send>(
-        &self, url: &str, f: F
+        &self,
+        url: &str,
+        f: F,
     ) -> Result<(serde_json::Value, serde_json::Value)> {
         tracing::debug!("Updating post {}", url);
         let mut txn = self.db.begin().await?;
@@ -250,12 +266,12 @@ WHERE
         url: &'_ str,
         cursor: Option<&'_ str>,
         limit: usize,
-        user: Option<&url::Url>
+        user: Option<&url::Url>,
     ) -> Result<Option<(serde_json::Value, Option<String>)>> {
         let mut txn = self.db.begin().await?;
         sqlx::query("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ, READ ONLY")
-			.execute(&mut *txn)
-			.await?;
+            .execute(&mut *txn)
+            .await?;
         tracing::debug!("Started txn: {:?}", txn);
         let mut feed = match sqlx::query_scalar::<_, serde_json::Value>("
 SELECT kittybox.hydrate_author(mf2) FROM kittybox.mf2_json WHERE uid = $1 OR mf2['properties']['url'] ? $1
@@ -273,11 +289,17 @@ SELECT kittybox.hydrate_author(mf2) FROM kittybox.mf2_json WHERE uid = $1 OR mf2
         // The second query is very long and will probably be extremely
         // expensive. It's best to skip it on types where it doesn't make sense
         // (Kittybox doesn't support rendering children on non-feeds)
-        if !feed["type"].as_array().unwrap().iter().any(|t| *t == serde_json::json!("h-feed")) {
+        if !feed["type"]
+            .as_array()
+            .unwrap()
+            .iter()
+            .any(|t| *t == serde_json::json!("h-feed"))
+        {
             return Ok(Some((feed, None)));
         }
 
-        feed["children"] = sqlx::query_scalar::<_, serde_json::Value>("
+        feed["children"] = sqlx::query_scalar::<_, serde_json::Value>(
+            "
 SELECT kittybox.hydrate_author(mf2) FROM kittybox.mf2_json
 INNER JOIN kittybox.children
 ON mf2_json.uid = children.child
@@ -302,17 +324,19 @@ WHERE
     )
     AND ($4 IS NULL OR ((mf2_json.mf2 #>> '{properties,published,0}') < $4))
 ORDER BY (mf2_json.mf2 #>> '{properties,published,0}') DESC
-LIMIT $2"
+LIMIT $2",
         )
-            .bind(url)
-            .bind(limit as i64)
-            .bind(user.map(url::Url::as_str))
-            .bind(cursor)
-            .fetch_all(&mut *txn)
-            .await
-            .map(serde_json::Value::Array)?;
-
-        let new_cursor = feed["children"].as_array().unwrap()
+        .bind(url)
+        .bind(limit as i64)
+        .bind(user.map(url::Url::as_str))
+        .bind(cursor)
+        .fetch_all(&mut *txn)
+        .await
+        .map(serde_json::Value::Array)?;
+
+        let new_cursor = feed["children"]
+            .as_array()
+            .unwrap()
             .last()
             .map(|v| v["properties"]["published"][0].as_str().unwrap().to_owned());
 
@@ -335,7 +359,7 @@ LIMIT $2"
             .await
         {
             Ok((value,)) => Ok(serde_json::from_value(value)?),
-            Err(err) => Err(err.into())
+            Err(err) => Err(err.into()),
         }
     }