From f34c2a2c95559bcb2d068abd611fff5cc677f159 Mon Sep 17 00:00:00 2001 From: Vika Date: Fri, 21 Jul 2023 17:43:01 +0300 Subject: Split Postgres schemas into two Now the postgres schemas are completely independent of each other. This took a while to figure out! --- kittybox-rs/migrations/0002_webmention_queue.sql | 13 ------------- kittybox-rs/migrations/webmention/0001_init.sql | 15 +++++++++++++++ kittybox-rs/src/database/postgres/mod.rs | 8 +++++--- kittybox-rs/src/webmentions/queue.rs | 20 +++++++++++--------- 4 files changed, 31 insertions(+), 25 deletions(-) delete mode 100644 kittybox-rs/migrations/0002_webmention_queue.sql create mode 100644 kittybox-rs/migrations/webmention/0001_init.sql (limited to 'kittybox-rs') diff --git a/kittybox-rs/migrations/0002_webmention_queue.sql b/kittybox-rs/migrations/0002_webmention_queue.sql deleted file mode 100644 index 708933b..0000000 --- a/kittybox-rs/migrations/0002_webmention_queue.sql +++ /dev/null @@ -1,13 +0,0 @@ -CREATE TABLE kittybox.incoming_webmention_queue ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - source TEXT NOT NULL, - target TEXT NOT NULL, - recv_timestamp TIMESTAMPTZ NOT NULL DEFAULT now(), - attempts INTEGER NOT NULL DEFAULT 0 -); - -CREATE INDEX webmention_jobs_by_attempts ON kittybox.incoming_webmention_queue (attempts); - -CREATE RULE notify_incoming_webmention AS -ON INSERT TO kittybox.incoming_webmention_queue -DO ALSO NOTIFY incoming_webmention; diff --git a/kittybox-rs/migrations/webmention/0001_init.sql b/kittybox-rs/migrations/webmention/0001_init.sql new file mode 100644 index 0000000..9e7a192 --- /dev/null +++ b/kittybox-rs/migrations/webmention/0001_init.sql @@ -0,0 +1,15 @@ +CREATE SCHEMA IF NOT EXISTS kittybox_webmention; + +CREATE TABLE kittybox_webmention.incoming_webmention_queue ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + source TEXT NOT NULL, + target TEXT NOT NULL, + recv_timestamp TIMESTAMPTZ NOT NULL DEFAULT now(), + attempts INTEGER NOT NULL DEFAULT 0 +); + +CREATE INDEX webmention_jobs_by_attempts ON kittybox_webmention.incoming_webmention_queue (attempts); + +CREATE RULE notify_incoming_webmention AS +ON INSERT TO kittybox_webmention.incoming_webmention_queue +DO ALSO NOTIFY incoming_webmention; diff --git a/kittybox-rs/src/database/postgres/mod.rs b/kittybox-rs/src/database/postgres/mod.rs index 286a965..41e4f58 100644 --- a/kittybox-rs/src/database/postgres/mod.rs +++ b/kittybox-rs/src/database/postgres/mod.rs @@ -2,8 +2,8 @@ use std::borrow::Cow; use std::str::FromStr; -use kittybox_util::MicropubChannel; -use sqlx::PgPool; +use kittybox_util::{MicropubChannel, MentionType}; +use sqlx::{PgPool, Executor}; use crate::micropub::{MicropubUpdate, MicropubPropertyDeletion}; use super::settings::Setting; @@ -46,7 +46,8 @@ impl PostgresStorage { /// password from it. pub async fn new(uri: &str) -> Result { tracing::debug!("Postgres URL: {uri}"); - let mut options = sqlx::postgres::PgConnectOptions::from_str(uri)?; + let mut options = sqlx::postgres::PgConnectOptions::from_str(uri)? + .options([("search_path", "kittybox")]); if let Ok(password_file) = std::env::var("PGPASS_FILE") { let password = tokio::fs::read_to_string(password_file).await.unwrap(); options = options.password(&password); @@ -65,6 +66,7 @@ impl PostgresStorage { /// Construct a [`PostgresStorage`] from a [`sqlx::PgPool`], /// running appropriate migrations. pub async fn from_pool(db: sqlx::PgPool) -> Result { + db.execute(sqlx::query("CREATE SCHEMA IF NOT EXISTS kittybox")).await?; MIGRATOR.run(&db).await?; Ok(Self { db }) } diff --git a/kittybox-rs/src/webmentions/queue.rs b/kittybox-rs/src/webmentions/queue.rs index 3ced831..dc7d8f9 100644 --- a/kittybox-rs/src/webmentions/queue.rs +++ b/kittybox-rs/src/webmentions/queue.rs @@ -1,12 +1,12 @@ use std::{pin::Pin, str::FromStr}; use futures_util::{Stream, StreamExt}; -use sqlx::postgres::PgListener; +use sqlx::{postgres::PgListener, Executor}; use uuid::Uuid; use super::Webmention; -static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!(); +static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!("./migrations/webmention"); pub use kittybox_util::queue::{JobQueue, JobItem, Job}; @@ -92,7 +92,7 @@ impl Job> for PostgresJob { } async fn done(mut self) -> Result<(), as JobQueue>::Error> { tracing::debug!("Deleting {} from the job queue", self.id); - sqlx::query("DELETE FROM kittybox.incoming_webmention_queue WHERE id = $1") + sqlx::query("DELETE FROM kittybox_webmention.incoming_webmention_queue WHERE id = $1") .bind(self.id) .execute(self.txn.as_deref_mut().unwrap()) .await?; @@ -116,23 +116,25 @@ impl Clone for PostgresJobQueue { impl PostgresJobQueue { pub async fn new(uri: &str) -> Result { - let mut options = sqlx::postgres::PgConnectOptions::from_str(uri)?; + let mut options = sqlx::postgres::PgConnectOptions::from_str(uri)? + .options([("search_path", "kittybox_webmention")]); if let Ok(password_file) = std::env::var("PGPASS_FILE") { let password = tokio::fs::read_to_string(password_file).await.unwrap(); options = options.password(&password); } else if let Ok(password) = std::env::var("PGPASS") { options = options.password(&password) } - Ok(Self::from_pool( + Self::from_pool( sqlx::postgres::PgPoolOptions::new() .max_connections(50) .connect_with(options) .await? - ).await?) + ).await } - pub(crate) async fn from_pool(db: sqlx::PgPool) -> Result { + pub(crate) async fn from_pool(db: sqlx::PgPool) -> Result { + db.execute(sqlx::query("CREATE SCHEMA IF NOT EXISTS kittybox_webmention")).await?; MIGRATOR.run(&db).await?; Ok(Self { db, _phantom: std::marker::PhantomData }) } @@ -147,7 +149,7 @@ impl JobQueue for PostgresJobQueue { let mut txn = self.db.begin().await?; match sqlx::query_as::<_, PostgresJobRow>( - "SELECT id, source, target FROM kittybox.incoming_webmention_queue WHERE attempts < 5 FOR UPDATE SKIP LOCKED LIMIT 1" + "SELECT id, source, target FROM kittybox_webmention.incoming_webmention_queue WHERE attempts < 5 FOR UPDATE SKIP LOCKED LIMIT 1" ) .fetch_optional(&mut *txn) .await? @@ -165,7 +167,7 @@ impl JobQueue for PostgresJobQueue { } async fn put(&self, item: &Webmention) -> Result { - sqlx::query_scalar::<_, Uuid>("INSERT INTO kittybox.incoming_webmention_queue (source, target) VALUES ($1, $2) RETURNING id") + sqlx::query_scalar::<_, Uuid>("INSERT INTO kittybox_webmention.incoming_webmention_queue (source, target) VALUES ($1, $2) RETURNING id") .bind(item.source.as_str()) .bind(item.target.as_str()) .fetch_one(&self.db) -- cgit 1.4.1