diff options
author | Vika <vika@fireburn.ru> | 2023-07-21 17:43:01 +0300 |
---|---|---|
committer | Vika <vika@fireburn.ru> | 2023-07-21 17:43:01 +0300 |
commit | f34c2a2c95559bcb2d068abd611fff5cc677f159 (patch) | |
tree | 84aad5a16c950d9c1f55ac2252952b5e5bddb05a /kittybox-rs | |
parent | f13c60b70e1d9435b5f2803fc48c44eed7be761c (diff) | |
download | kittybox-f34c2a2c95559bcb2d068abd611fff5cc677f159.tar.zst |
Split Postgres schemas into two
Now the postgres schemas are completely independent of each other. This took a while to figure out!
Diffstat (limited to 'kittybox-rs')
-rw-r--r-- | kittybox-rs/migrations/webmention/0001_init.sql (renamed from kittybox-rs/migrations/0002_webmention_queue.sql) | 8 | ||||
-rw-r--r-- | kittybox-rs/src/database/postgres/mod.rs | 8 | ||||
-rw-r--r-- | kittybox-rs/src/webmentions/queue.rs | 20 |
3 files changed, 21 insertions, 15 deletions
diff --git a/kittybox-rs/migrations/0002_webmention_queue.sql b/kittybox-rs/migrations/webmention/0001_init.sql index 708933b..9e7a192 100644 --- a/kittybox-rs/migrations/0002_webmention_queue.sql +++ b/kittybox-rs/migrations/webmention/0001_init.sql @@ -1,4 +1,6 @@ -CREATE TABLE kittybox.incoming_webmention_queue ( +CREATE SCHEMA IF NOT EXISTS kittybox_webmention; + +CREATE TABLE kittybox_webmention.incoming_webmention_queue ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), source TEXT NOT NULL, target TEXT NOT NULL, @@ -6,8 +8,8 @@ CREATE TABLE kittybox.incoming_webmention_queue ( attempts INTEGER NOT NULL DEFAULT 0 ); -CREATE INDEX webmention_jobs_by_attempts ON kittybox.incoming_webmention_queue (attempts); +CREATE INDEX webmention_jobs_by_attempts ON kittybox_webmention.incoming_webmention_queue (attempts); CREATE RULE notify_incoming_webmention AS -ON INSERT TO kittybox.incoming_webmention_queue +ON INSERT TO kittybox_webmention.incoming_webmention_queue DO ALSO NOTIFY incoming_webmention; diff --git a/kittybox-rs/src/database/postgres/mod.rs b/kittybox-rs/src/database/postgres/mod.rs index 286a965..41e4f58 100644 --- a/kittybox-rs/src/database/postgres/mod.rs +++ b/kittybox-rs/src/database/postgres/mod.rs @@ -2,8 +2,8 @@ use std::borrow::Cow; use std::str::FromStr; -use kittybox_util::MicropubChannel; -use sqlx::PgPool; +use kittybox_util::{MicropubChannel, MentionType}; +use sqlx::{PgPool, Executor}; use crate::micropub::{MicropubUpdate, MicropubPropertyDeletion}; use super::settings::Setting; @@ -46,7 +46,8 @@ impl PostgresStorage { /// password from it. pub async fn new(uri: &str) -> Result<Self> { tracing::debug!("Postgres URL: {uri}"); - let mut options = sqlx::postgres::PgConnectOptions::from_str(uri)?; + let mut options = sqlx::postgres::PgConnectOptions::from_str(uri)? + .options([("search_path", "kittybox")]); if let Ok(password_file) = std::env::var("PGPASS_FILE") { let password = tokio::fs::read_to_string(password_file).await.unwrap(); options = options.password(&password); @@ -65,6 +66,7 @@ impl PostgresStorage { /// Construct a [`PostgresStorage`] from a [`sqlx::PgPool`], /// running appropriate migrations. pub async fn from_pool(db: sqlx::PgPool) -> Result<Self> { + db.execute(sqlx::query("CREATE SCHEMA IF NOT EXISTS kittybox")).await?; MIGRATOR.run(&db).await?; Ok(Self { db }) } diff --git a/kittybox-rs/src/webmentions/queue.rs b/kittybox-rs/src/webmentions/queue.rs index 3ced831..dc7d8f9 100644 --- a/kittybox-rs/src/webmentions/queue.rs +++ b/kittybox-rs/src/webmentions/queue.rs @@ -1,12 +1,12 @@ use std::{pin::Pin, str::FromStr}; use futures_util::{Stream, StreamExt}; -use sqlx::postgres::PgListener; +use sqlx::{postgres::PgListener, Executor}; use uuid::Uuid; use super::Webmention; -static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!(); +static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!("./migrations/webmention"); pub use kittybox_util::queue::{JobQueue, JobItem, Job}; @@ -92,7 +92,7 @@ impl Job<Webmention, PostgresJobQueue<Webmention>> for PostgresJob<Webmention> { } async fn done(mut self) -> Result<(), <PostgresJobQueue<Webmention> as JobQueue<Webmention>>::Error> { tracing::debug!("Deleting {} from the job queue", self.id); - sqlx::query("DELETE FROM kittybox.incoming_webmention_queue WHERE id = $1") + sqlx::query("DELETE FROM kittybox_webmention.incoming_webmention_queue WHERE id = $1") .bind(self.id) .execute(self.txn.as_deref_mut().unwrap()) .await?; @@ -116,23 +116,25 @@ impl<T> Clone for PostgresJobQueue<T> { impl PostgresJobQueue<Webmention> { pub async fn new(uri: &str) -> Result<Self, sqlx::Error> { - let mut options = sqlx::postgres::PgConnectOptions::from_str(uri)?; + let mut options = sqlx::postgres::PgConnectOptions::from_str(uri)? + .options([("search_path", "kittybox_webmention")]); if let Ok(password_file) = std::env::var("PGPASS_FILE") { let password = tokio::fs::read_to_string(password_file).await.unwrap(); options = options.password(&password); } else if let Ok(password) = std::env::var("PGPASS") { options = options.password(&password) } - Ok(Self::from_pool( + Self::from_pool( sqlx::postgres::PgPoolOptions::new() .max_connections(50) .connect_with(options) .await? - ).await?) + ).await } - pub(crate) async fn from_pool(db: sqlx::PgPool) -> Result<Self, sqlx::migrate::MigrateError> { + pub(crate) async fn from_pool(db: sqlx::PgPool) -> Result<Self, sqlx::Error> { + db.execute(sqlx::query("CREATE SCHEMA IF NOT EXISTS kittybox_webmention")).await?; MIGRATOR.run(&db).await?; Ok(Self { db, _phantom: std::marker::PhantomData }) } @@ -147,7 +149,7 @@ impl JobQueue<Webmention> for PostgresJobQueue<Webmention> { let mut txn = self.db.begin().await?; match sqlx::query_as::<_, PostgresJobRow<Webmention>>( - "SELECT id, source, target FROM kittybox.incoming_webmention_queue WHERE attempts < 5 FOR UPDATE SKIP LOCKED LIMIT 1" + "SELECT id, source, target FROM kittybox_webmention.incoming_webmention_queue WHERE attempts < 5 FOR UPDATE SKIP LOCKED LIMIT 1" ) .fetch_optional(&mut *txn) .await? @@ -165,7 +167,7 @@ impl JobQueue<Webmention> for PostgresJobQueue<Webmention> { } async fn put(&self, item: &Webmention) -> Result<Uuid, Self::Error> { - sqlx::query_scalar::<_, Uuid>("INSERT INTO kittybox.incoming_webmention_queue (source, target) VALUES ($1, $2) RETURNING id") + sqlx::query_scalar::<_, Uuid>("INSERT INTO kittybox_webmention.incoming_webmention_queue (source, target) VALUES ($1, $2) RETURNING id") .bind(item.source.as_str()) .bind(item.target.as_str()) .fetch_one(&self.db) |