From f34c2a2c95559bcb2d068abd611fff5cc677f159 Mon Sep 17 00:00:00 2001 From: Vika Date: Fri, 21 Jul 2023 17:43:01 +0300 Subject: Split Postgres schemas into two Now the postgres schemas are completely independent of each other. This took a while to figure out! --- kittybox-rs/src/database/postgres/mod.rs | 8 +++++--- kittybox-rs/src/webmentions/queue.rs | 20 +++++++++++--------- 2 files changed, 16 insertions(+), 12 deletions(-) (limited to 'kittybox-rs/src') diff --git a/kittybox-rs/src/database/postgres/mod.rs b/kittybox-rs/src/database/postgres/mod.rs index 286a965..41e4f58 100644 --- a/kittybox-rs/src/database/postgres/mod.rs +++ b/kittybox-rs/src/database/postgres/mod.rs @@ -2,8 +2,8 @@ use std::borrow::Cow; use std::str::FromStr; -use kittybox_util::MicropubChannel; -use sqlx::PgPool; +use kittybox_util::{MicropubChannel, MentionType}; +use sqlx::{PgPool, Executor}; use crate::micropub::{MicropubUpdate, MicropubPropertyDeletion}; use super::settings::Setting; @@ -46,7 +46,8 @@ impl PostgresStorage { /// password from it. pub async fn new(uri: &str) -> Result { tracing::debug!("Postgres URL: {uri}"); - let mut options = sqlx::postgres::PgConnectOptions::from_str(uri)?; + let mut options = sqlx::postgres::PgConnectOptions::from_str(uri)? + .options([("search_path", "kittybox")]); if let Ok(password_file) = std::env::var("PGPASS_FILE") { let password = tokio::fs::read_to_string(password_file).await.unwrap(); options = options.password(&password); @@ -65,6 +66,7 @@ impl PostgresStorage { /// Construct a [`PostgresStorage`] from a [`sqlx::PgPool`], /// running appropriate migrations. pub async fn from_pool(db: sqlx::PgPool) -> Result { + db.execute(sqlx::query("CREATE SCHEMA IF NOT EXISTS kittybox")).await?; MIGRATOR.run(&db).await?; Ok(Self { db }) } diff --git a/kittybox-rs/src/webmentions/queue.rs b/kittybox-rs/src/webmentions/queue.rs index 3ced831..dc7d8f9 100644 --- a/kittybox-rs/src/webmentions/queue.rs +++ b/kittybox-rs/src/webmentions/queue.rs @@ -1,12 +1,12 @@ use std::{pin::Pin, str::FromStr}; use futures_util::{Stream, StreamExt}; -use sqlx::postgres::PgListener; +use sqlx::{postgres::PgListener, Executor}; use uuid::Uuid; use super::Webmention; -static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!(); +static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!("./migrations/webmention"); pub use kittybox_util::queue::{JobQueue, JobItem, Job}; @@ -92,7 +92,7 @@ impl Job> for PostgresJob { } async fn done(mut self) -> Result<(), as JobQueue>::Error> { tracing::debug!("Deleting {} from the job queue", self.id); - sqlx::query("DELETE FROM kittybox.incoming_webmention_queue WHERE id = $1") + sqlx::query("DELETE FROM kittybox_webmention.incoming_webmention_queue WHERE id = $1") .bind(self.id) .execute(self.txn.as_deref_mut().unwrap()) .await?; @@ -116,23 +116,25 @@ impl Clone for PostgresJobQueue { impl PostgresJobQueue { pub async fn new(uri: &str) -> Result { - let mut options = sqlx::postgres::PgConnectOptions::from_str(uri)?; + let mut options = sqlx::postgres::PgConnectOptions::from_str(uri)? + .options([("search_path", "kittybox_webmention")]); if let Ok(password_file) = std::env::var("PGPASS_FILE") { let password = tokio::fs::read_to_string(password_file).await.unwrap(); options = options.password(&password); } else if let Ok(password) = std::env::var("PGPASS") { options = options.password(&password) } - Ok(Self::from_pool( + Self::from_pool( sqlx::postgres::PgPoolOptions::new() .max_connections(50) .connect_with(options) .await? - ).await?) + ).await } - pub(crate) async fn from_pool(db: sqlx::PgPool) -> Result { + pub(crate) async fn from_pool(db: sqlx::PgPool) -> Result { + db.execute(sqlx::query("CREATE SCHEMA IF NOT EXISTS kittybox_webmention")).await?; MIGRATOR.run(&db).await?; Ok(Self { db, _phantom: std::marker::PhantomData }) } @@ -147,7 +149,7 @@ impl JobQueue for PostgresJobQueue { let mut txn = self.db.begin().await?; match sqlx::query_as::<_, PostgresJobRow>( - "SELECT id, source, target FROM kittybox.incoming_webmention_queue WHERE attempts < 5 FOR UPDATE SKIP LOCKED LIMIT 1" + "SELECT id, source, target FROM kittybox_webmention.incoming_webmention_queue WHERE attempts < 5 FOR UPDATE SKIP LOCKED LIMIT 1" ) .fetch_optional(&mut *txn) .await? @@ -165,7 +167,7 @@ impl JobQueue for PostgresJobQueue { } async fn put(&self, item: &Webmention) -> Result { - sqlx::query_scalar::<_, Uuid>("INSERT INTO kittybox.incoming_webmention_queue (source, target) VALUES ($1, $2) RETURNING id") + sqlx::query_scalar::<_, Uuid>("INSERT INTO kittybox_webmention.incoming_webmention_queue (source, target) VALUES ($1, $2) RETURNING id") .bind(item.source.as_str()) .bind(item.target.as_str()) .fetch_one(&self.db) -- cgit 1.4.1