about summary refs log tree commit diff
path: root/kittybox-rs
diff options
context:
space:
mode:
Diffstat (limited to 'kittybox-rs')
-rw-r--r--kittybox-rs/migrations/webmention/0001_init.sql (renamed from kittybox-rs/migrations/0002_webmention_queue.sql)8
-rw-r--r--kittybox-rs/src/database/postgres/mod.rs8
-rw-r--r--kittybox-rs/src/webmentions/queue.rs20
3 files changed, 21 insertions, 15 deletions
diff --git a/kittybox-rs/migrations/0002_webmention_queue.sql b/kittybox-rs/migrations/webmention/0001_init.sql
index 708933b..9e7a192 100644
--- a/kittybox-rs/migrations/0002_webmention_queue.sql
+++ b/kittybox-rs/migrations/webmention/0001_init.sql
@@ -1,4 +1,6 @@
-CREATE TABLE kittybox.incoming_webmention_queue (
+CREATE SCHEMA IF NOT EXISTS kittybox_webmention;
+
+CREATE TABLE kittybox_webmention.incoming_webmention_queue (
        id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
        source TEXT NOT NULL,
        target TEXT NOT NULL,
@@ -6,8 +8,8 @@ CREATE TABLE kittybox.incoming_webmention_queue (
        attempts INTEGER NOT NULL DEFAULT 0
 );
 
-CREATE INDEX webmention_jobs_by_attempts ON kittybox.incoming_webmention_queue (attempts);
+CREATE INDEX webmention_jobs_by_attempts ON kittybox_webmention.incoming_webmention_queue (attempts);
 
 CREATE RULE notify_incoming_webmention AS
-ON INSERT TO kittybox.incoming_webmention_queue
+ON INSERT TO kittybox_webmention.incoming_webmention_queue
 DO ALSO NOTIFY incoming_webmention;
diff --git a/kittybox-rs/src/database/postgres/mod.rs b/kittybox-rs/src/database/postgres/mod.rs
index 286a965..41e4f58 100644
--- a/kittybox-rs/src/database/postgres/mod.rs
+++ b/kittybox-rs/src/database/postgres/mod.rs
@@ -2,8 +2,8 @@
 use std::borrow::Cow;
 use std::str::FromStr;
 
-use kittybox_util::MicropubChannel;
-use sqlx::PgPool;
+use kittybox_util::{MicropubChannel, MentionType};
+use sqlx::{PgPool, Executor};
 use crate::micropub::{MicropubUpdate, MicropubPropertyDeletion};
 
 use super::settings::Setting;
@@ -46,7 +46,8 @@ impl PostgresStorage {
     /// password from it.
     pub async fn new(uri: &str) -> Result<Self> {
         tracing::debug!("Postgres URL: {uri}");
-        let mut options = sqlx::postgres::PgConnectOptions::from_str(uri)?;
+        let mut options = sqlx::postgres::PgConnectOptions::from_str(uri)?
+            .options([("search_path", "kittybox")]);
         if let Ok(password_file) = std::env::var("PGPASS_FILE") {
             let password = tokio::fs::read_to_string(password_file).await.unwrap();
             options = options.password(&password);
@@ -65,6 +66,7 @@ impl PostgresStorage {
     /// Construct a [`PostgresStorage`] from a [`sqlx::PgPool`],
     /// running appropriate migrations.
     pub async fn from_pool(db: sqlx::PgPool) -> Result<Self> {
+        db.execute(sqlx::query("CREATE SCHEMA IF NOT EXISTS kittybox")).await?;
         MIGRATOR.run(&db).await?;
         Ok(Self { db })
     }
diff --git a/kittybox-rs/src/webmentions/queue.rs b/kittybox-rs/src/webmentions/queue.rs
index 3ced831..dc7d8f9 100644
--- a/kittybox-rs/src/webmentions/queue.rs
+++ b/kittybox-rs/src/webmentions/queue.rs
@@ -1,12 +1,12 @@
 use std::{pin::Pin, str::FromStr};
 
 use futures_util::{Stream, StreamExt};
-use sqlx::postgres::PgListener;
+use sqlx::{postgres::PgListener, Executor};
 use uuid::Uuid;
 
 use super::Webmention;
 
-static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!();
+static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!("./migrations/webmention");
 
 pub use kittybox_util::queue::{JobQueue, JobItem, Job};
 
@@ -92,7 +92,7 @@ impl Job<Webmention, PostgresJobQueue<Webmention>> for PostgresJob<Webmention> {
     }
     async fn done(mut self) -> Result<(), <PostgresJobQueue<Webmention> as JobQueue<Webmention>>::Error> {
         tracing::debug!("Deleting {} from the job queue", self.id);
-        sqlx::query("DELETE FROM kittybox.incoming_webmention_queue WHERE id = $1")
+        sqlx::query("DELETE FROM kittybox_webmention.incoming_webmention_queue WHERE id = $1")
             .bind(self.id)
             .execute(self.txn.as_deref_mut().unwrap())
             .await?;
@@ -116,23 +116,25 @@ impl<T> Clone for PostgresJobQueue<T> {
 
 impl PostgresJobQueue<Webmention> {
     pub async fn new(uri: &str) -> Result<Self, sqlx::Error> {
-        let mut options = sqlx::postgres::PgConnectOptions::from_str(uri)?;
+        let mut options = sqlx::postgres::PgConnectOptions::from_str(uri)?
+            .options([("search_path", "kittybox_webmention")]);
         if let Ok(password_file) = std::env::var("PGPASS_FILE") {
             let password = tokio::fs::read_to_string(password_file).await.unwrap();
             options = options.password(&password);
         } else if let Ok(password) = std::env::var("PGPASS") {
             options = options.password(&password)
         }
-        Ok(Self::from_pool(
+        Self::from_pool(
             sqlx::postgres::PgPoolOptions::new()
                 .max_connections(50)
                 .connect_with(options)
                 .await?
-        ).await?)
+        ).await
 
     }
 
-    pub(crate) async fn from_pool(db: sqlx::PgPool) -> Result<Self, sqlx::migrate::MigrateError> {
+    pub(crate) async fn from_pool(db: sqlx::PgPool) -> Result<Self, sqlx::Error> {
+        db.execute(sqlx::query("CREATE SCHEMA IF NOT EXISTS kittybox_webmention")).await?;
         MIGRATOR.run(&db).await?;
         Ok(Self { db, _phantom: std::marker::PhantomData })
     }
@@ -147,7 +149,7 @@ impl JobQueue<Webmention> for PostgresJobQueue<Webmention> {
         let mut txn = self.db.begin().await?;
 
         match sqlx::query_as::<_, PostgresJobRow<Webmention>>(
-            "SELECT id, source, target FROM kittybox.incoming_webmention_queue WHERE attempts < 5 FOR UPDATE SKIP LOCKED LIMIT 1"
+            "SELECT id, source, target FROM kittybox_webmention.incoming_webmention_queue WHERE attempts < 5 FOR UPDATE SKIP LOCKED LIMIT 1"
         )
             .fetch_optional(&mut *txn)
             .await?
@@ -165,7 +167,7 @@ impl JobQueue<Webmention> for PostgresJobQueue<Webmention> {
     }
 
     async fn put(&self, item: &Webmention) -> Result<Uuid, Self::Error> {
-        sqlx::query_scalar::<_, Uuid>("INSERT INTO kittybox.incoming_webmention_queue (source, target) VALUES ($1, $2) RETURNING id")
+        sqlx::query_scalar::<_, Uuid>("INSERT INTO kittybox_webmention.incoming_webmention_queue (source, target) VALUES ($1, $2) RETURNING id")
             .bind(item.source.as_str())
             .bind(item.target.as_str())
             .fetch_one(&self.db)