about summary refs log tree commit diff
path: root/kittybox-rs/src/webmentions/queue.rs
diff options
context:
space:
mode:
Diffstat (limited to 'kittybox-rs/src/webmentions/queue.rs')
-rw-r--r--kittybox-rs/src/webmentions/queue.rs20
1 files changed, 11 insertions, 9 deletions
diff --git a/kittybox-rs/src/webmentions/queue.rs b/kittybox-rs/src/webmentions/queue.rs
index 3ced831..dc7d8f9 100644
--- a/kittybox-rs/src/webmentions/queue.rs
+++ b/kittybox-rs/src/webmentions/queue.rs
@@ -1,12 +1,12 @@
 use std::{pin::Pin, str::FromStr};
 
 use futures_util::{Stream, StreamExt};
-use sqlx::postgres::PgListener;
+use sqlx::{postgres::PgListener, Executor};
 use uuid::Uuid;
 
 use super::Webmention;
 
-static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!();
+static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!("./migrations/webmention");
 
 pub use kittybox_util::queue::{JobQueue, JobItem, Job};
 
@@ -92,7 +92,7 @@ impl Job<Webmention, PostgresJobQueue<Webmention>> for PostgresJob<Webmention> {
     }
     async fn done(mut self) -> Result<(), <PostgresJobQueue<Webmention> as JobQueue<Webmention>>::Error> {
         tracing::debug!("Deleting {} from the job queue", self.id);
-        sqlx::query("DELETE FROM kittybox.incoming_webmention_queue WHERE id = $1")
+        sqlx::query("DELETE FROM kittybox_webmention.incoming_webmention_queue WHERE id = $1")
             .bind(self.id)
             .execute(self.txn.as_deref_mut().unwrap())
             .await?;
@@ -116,23 +116,25 @@ impl<T> Clone for PostgresJobQueue<T> {
 
 impl PostgresJobQueue<Webmention> {
     pub async fn new(uri: &str) -> Result<Self, sqlx::Error> {
-        let mut options = sqlx::postgres::PgConnectOptions::from_str(uri)?;
+        let mut options = sqlx::postgres::PgConnectOptions::from_str(uri)?
+            .options([("search_path", "kittybox_webmention")]);
         if let Ok(password_file) = std::env::var("PGPASS_FILE") {
             let password = tokio::fs::read_to_string(password_file).await.unwrap();
             options = options.password(&password);
         } else if let Ok(password) = std::env::var("PGPASS") {
             options = options.password(&password)
         }
-        Ok(Self::from_pool(
+        Self::from_pool(
             sqlx::postgres::PgPoolOptions::new()
                 .max_connections(50)
                 .connect_with(options)
                 .await?
-        ).await?)
+        ).await
 
     }
 
-    pub(crate) async fn from_pool(db: sqlx::PgPool) -> Result<Self, sqlx::migrate::MigrateError> {
+    pub(crate) async fn from_pool(db: sqlx::PgPool) -> Result<Self, sqlx::Error> {
+        db.execute(sqlx::query("CREATE SCHEMA IF NOT EXISTS kittybox_webmention")).await?;
         MIGRATOR.run(&db).await?;
         Ok(Self { db, _phantom: std::marker::PhantomData })
     }
@@ -147,7 +149,7 @@ impl JobQueue<Webmention> for PostgresJobQueue<Webmention> {
         let mut txn = self.db.begin().await?;
 
         match sqlx::query_as::<_, PostgresJobRow<Webmention>>(
-            "SELECT id, source, target FROM kittybox.incoming_webmention_queue WHERE attempts < 5 FOR UPDATE SKIP LOCKED LIMIT 1"
+            "SELECT id, source, target FROM kittybox_webmention.incoming_webmention_queue WHERE attempts < 5 FOR UPDATE SKIP LOCKED LIMIT 1"
         )
             .fetch_optional(&mut *txn)
             .await?
@@ -165,7 +167,7 @@ impl JobQueue<Webmention> for PostgresJobQueue<Webmention> {
     }
 
     async fn put(&self, item: &Webmention) -> Result<Uuid, Self::Error> {
-        sqlx::query_scalar::<_, Uuid>("INSERT INTO kittybox.incoming_webmention_queue (source, target) VALUES ($1, $2) RETURNING id")
+        sqlx::query_scalar::<_, Uuid>("INSERT INTO kittybox_webmention.incoming_webmention_queue (source, target) VALUES ($1, $2) RETURNING id")
             .bind(item.source.as_str())
             .bind(item.target.as_str())
             .fetch_one(&self.db)