From a23a0438847a8ae8409edf6445ea4f43990e249c Mon Sep 17 00:00:00 2001 From: Vika Date: Sat, 1 Jul 2023 20:32:42 +0300 Subject: Create a job queue based on Postgres It's generic enough to be used for anything, but for now it's only gonna be used for webmentions. --- kittybox-rs/Cargo.lock | 1 + kittybox-rs/Cargo.toml | 1 + kittybox-rs/migrations/0002_webmention_queue.sql | 10 ++ kittybox-rs/src/webmentions/queue.rs | 179 +++++++++++++++++++++++ 4 files changed, 191 insertions(+) create mode 100644 kittybox-rs/migrations/0002_webmention_queue.sql create mode 100644 kittybox-rs/src/webmentions/queue.rs diff --git a/kittybox-rs/Cargo.lock b/kittybox-rs/Cargo.lock index 36a785a..f3a0bfb 100644 --- a/kittybox-rs/Cargo.lock +++ b/kittybox-rs/Cargo.lock @@ -1503,6 +1503,7 @@ dependencies = [ "tracing-test", "tracing-tree", "url", + "uuid 1.3.3", "webauthn-rs", "wiremock", ] diff --git a/kittybox-rs/Cargo.toml b/kittybox-rs/Cargo.toml index 4c7670d..8f9dff6 100644 --- a/kittybox-rs/Cargo.toml +++ b/kittybox-rs/Cargo.toml @@ -81,6 +81,7 @@ serde_urlencoded = "^0.7.0" # `x-www-form-urlencoded` meets Serde serde_variant = "^0.1.1" # Retrieve serde provided variant names for enum objects relative-path = "^1.5.0" # Portable relative paths for Rust sha2 = "^0.9.8" # SHA-2 series of algorithms for Rust +uuid = "^1.3.3" tracing = { version = "0.1.34", features = [] } tracing-tree = "0.2.1" tracing-log = "0.1.3" diff --git a/kittybox-rs/migrations/0002_webmention_queue.sql b/kittybox-rs/migrations/0002_webmention_queue.sql new file mode 100644 index 0000000..0b95771 --- /dev/null +++ b/kittybox-rs/migrations/0002_webmention_queue.sql @@ -0,0 +1,10 @@ +CREATE TABLE incoming_webmention_queue ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + source TEXT NOT NULL, + target TEXT NOT NULL, + recv_timestamp TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE RULE notify_incoming_webmention AS +ON INSERT TO incoming_webmention_queue +DO ALSO NOTIFY incoming_webmention; diff --git a/kittybox-rs/src/webmentions/queue.rs b/kittybox-rs/src/webmentions/queue.rs new file mode 100644 index 0000000..77ad4ea --- /dev/null +++ b/kittybox-rs/src/webmentions/queue.rs @@ -0,0 +1,179 @@ +use std::{pin::Pin, str::FromStr}; + +use futures_util::{Stream, StreamExt}; +use sqlx::postgres::PgListener; +use uuid::Uuid; + +use super::Webmention; + +static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!(); + +#[async_trait::async_trait] +pub trait JobQueue: Send + Sync + Sized + Clone + 'static { + type Job: Job; + type Error: std::error::Error + Send + Sync + Sized; + + async fn get_one(&self) -> Result, Self::Error>; + async fn put(&self, item: &T) -> Result; + + async fn into_stream(self) -> Result> + Send>>, Self::Error>; +} + +#[async_trait::async_trait] +pub trait Job>: Send + Sync + Sized { + fn job(&self) -> &T; + async fn done(self) -> Result<(), Q::Error>; +} + +#[derive(Debug)] +pub struct PostgresJobItem<'c, T> { + id: Uuid, + job: T, + txn: sqlx::Transaction<'c, sqlx::Postgres> +} + +#[async_trait::async_trait] +impl Job> for PostgresJobItem<'_, Webmention> { + fn job(&self) -> &Webmention { + &self.job + } + async fn done(mut self) -> Result<(), as JobQueue>::Error> { + println!("Deleting {} from the job queue", self.id); + sqlx::query("DELETE FROM incoming_webmention_queue WHERE id = $1") + .bind(self.id) + .execute(&mut self.txn) + .await?; + + self.txn.commit().await + } +} + +pub struct PostgresJobQueue { + db: sqlx::PgPool, + _phantom: std::marker::PhantomData +} +impl Clone for PostgresJobQueue { + fn clone(&self) -> Self { + Self { + db: self.db.clone(), + _phantom: std::marker::PhantomData + } + } +} + +impl PostgresJobQueue { + pub async fn new(uri: &str) -> Result { + let mut options = sqlx::postgres::PgConnectOptions::from_str(uri)?; + if let Ok(password_file) = std::env::var("PGPASS_FILE") { + let password = tokio::fs::read_to_string(password_file).await.unwrap(); + options = options.password(&password); + } else if let Ok(password) = std::env::var("PGPASS") { + options = options.password(&password) + } + Ok(Self::from_pool( + sqlx::postgres::PgPoolOptions::new() + .max_connections(50) + .connect_with(options) + .await? + ).await?) + + } + + pub(crate) async fn from_pool(db: sqlx::PgPool) -> Result { + MIGRATOR.run(&db).await?; + Ok(Self { db, _phantom: std::marker::PhantomData }) + } +} + +#[async_trait::async_trait] +impl JobQueue for PostgresJobQueue { + type Job = PostgresJobItem<'static, Webmention>; + type Error = sqlx::Error; + + async fn get_one(&self) -> Result, Self::Error> { + let mut txn = self.db.begin().await?; + + match sqlx::query_as::<_, (Uuid, String, String)>( + "SELECT id, source, target FROM incoming_webmention_queue FOR UPDATE SKIP LOCKED LIMIT 1" + ) + .fetch_optional(&mut txn) + .await? + .map(|(id, source, target)| (id, Webmention { source, target })) { + Some((id, webmention)) => Ok(Some(Self::Job { + id, + job: webmention, + txn + })), + None => Ok(None) + } + } + + async fn put(&self, item: &Webmention) -> Result { + sqlx::query_scalar::<_, Uuid>("INSERT INTO incoming_webmention_queue (source, target) VALUES ($1, $2) RETURNING id") + .bind(item.source.as_str()) + .bind(item.target.as_str()) + .fetch_one(&self.db) + .await + } + + async fn into_stream(self) -> Result> + Send>>, Self::Error> { + let mut listener = PgListener::connect_with(&self.db).await?; + listener.listen("incoming_webmention").await?; + + let stream: Pin> + Send>> = futures_util::stream::try_unfold((), { + let listener = std::sync::Arc::new(tokio::sync::Mutex::new(listener)); + move |_| { + let queue = self.clone(); + let listener = listener.clone(); + async move { + loop { + match queue.get_one().await? { + Some(item) => return Ok(Some((item, ()))), + None => { + listener.lock().await.recv().await?; + continue + } + } + } + } + } + }).boxed(); + + Ok(stream) + } +} + +#[cfg(test)] +mod tests { + use super::{Webmention, PostgresJobQueue, Job, JobQueue}; + use futures_util::StreamExt; + #[sqlx::test] + async fn test_webmention_queue(pool: sqlx::PgPool) -> Result<(), sqlx::Error> { + let test_webmention = Webmention { + source: "https://fireburn.ru/posts/lorem-ipsum".to_owned(), + target: "https://aaronparecki.com/posts/dolor-sit-amet".to_owned() + }; + + let queue = PostgresJobQueue::::from_pool(pool).await?; + println!("Putting webmention into queue"); + queue.put(&test_webmention).await?; + assert_eq!(queue.get_one().await?.as_ref().map(|j| j.job()), Some(&test_webmention)); + println!("Creating a stream"); + let mut stream = queue.clone().into_stream().await?; + + let future = stream.next(); + let guard = future.await.transpose()?.unwrap(); + assert_eq!(guard.job(), &test_webmention); + if let Some(item) = queue.get_one().await? { + panic!("Unexpected item {:?} returned from job queue!", item) + }; + drop(guard); + let guard = stream.next().await.transpose()?.unwrap(); + assert_eq!(guard.job(), &test_webmention); + guard.done().await?; + match queue.get_one().await? { + Some(item) => panic!("Unexpected item {:?} returned from job queue!", item), + None => Ok(()) + } + } +} -- cgit 1.4.1