about summary refs log tree commit diff
diff options
context:
space:
mode:
authorVika <vika@fireburn.ru>2023-07-01 20:32:42 +0300
committerVika <vika@fireburn.ru>2023-07-09 01:42:47 +0300
commita23a0438847a8ae8409edf6445ea4f43990e249c (patch)
tree21963480c48c450acbc74f357f07cda09d1d15ff
parentfacd6f3bf34b5a1142363dc21d8270985020815d (diff)
downloadkittybox-a23a0438847a8ae8409edf6445ea4f43990e249c.tar.zst
Create a job queue based on Postgres
It's generic enough to be used for anything, but for now it's only
gonna be used for webmentions.
-rw-r--r--kittybox-rs/Cargo.lock1
-rw-r--r--kittybox-rs/Cargo.toml1
-rw-r--r--kittybox-rs/migrations/0002_webmention_queue.sql10
-rw-r--r--kittybox-rs/src/webmentions/queue.rs179
4 files changed, 191 insertions, 0 deletions
diff --git a/kittybox-rs/Cargo.lock b/kittybox-rs/Cargo.lock
index 36a785a..f3a0bfb 100644
--- a/kittybox-rs/Cargo.lock
+++ b/kittybox-rs/Cargo.lock
@@ -1503,6 +1503,7 @@ dependencies = [
  "tracing-test",
  "tracing-tree",
  "url",
+ "uuid 1.3.3",
  "webauthn-rs",
  "wiremock",
 ]
diff --git a/kittybox-rs/Cargo.toml b/kittybox-rs/Cargo.toml
index 4c7670d..8f9dff6 100644
--- a/kittybox-rs/Cargo.toml
+++ b/kittybox-rs/Cargo.toml
@@ -81,6 +81,7 @@ serde_urlencoded = "^0.7.0"  # `x-www-form-urlencoded` meets Serde
 serde_variant = "^0.1.1"     # Retrieve serde provided variant names for enum objects
 relative-path = "^1.5.0"     # Portable relative paths for Rust
 sha2 = "^0.9.8"              # SHA-2 series of algorithms for Rust
+uuid = "^1.3.3"
 tracing = { version = "0.1.34", features = [] }
 tracing-tree = "0.2.1"
 tracing-log = "0.1.3"
diff --git a/kittybox-rs/migrations/0002_webmention_queue.sql b/kittybox-rs/migrations/0002_webmention_queue.sql
new file mode 100644
index 0000000..0b95771
--- /dev/null
+++ b/kittybox-rs/migrations/0002_webmention_queue.sql
@@ -0,0 +1,10 @@
+CREATE TABLE incoming_webmention_queue (
+       id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
+       source TEXT NOT NULL,
+       target TEXT NOT NULL,
+       recv_timestamp TIMESTAMPTZ NOT NULL DEFAULT now()
+);
+
+CREATE RULE notify_incoming_webmention AS
+ON INSERT TO incoming_webmention_queue
+DO ALSO NOTIFY incoming_webmention;
diff --git a/kittybox-rs/src/webmentions/queue.rs b/kittybox-rs/src/webmentions/queue.rs
new file mode 100644
index 0000000..77ad4ea
--- /dev/null
+++ b/kittybox-rs/src/webmentions/queue.rs
@@ -0,0 +1,179 @@
+use std::{pin::Pin, str::FromStr};
+
+use futures_util::{Stream, StreamExt};
+use sqlx::postgres::PgListener;
+use uuid::Uuid;
+
+use super::Webmention;
+
+static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!();
+
+#[async_trait::async_trait]
+pub trait JobQueue<T: Send + Sync + Sized>: Send + Sync + Sized + Clone + 'static {
+    type Job: Job<T, Self>;
+    type Error: std::error::Error + Send + Sync + Sized;
+
+    async fn get_one(&self) -> Result<Option<Self::Job>, Self::Error>;
+    async fn put(&self, item: &T) -> Result<Uuid, Self::Error>;
+
+    async fn into_stream(self) -> Result<Pin<Box<dyn Stream<Item = Result<Self::Job, Self::Error>> + Send>>, Self::Error>;
+}
+
+#[async_trait::async_trait]
+pub trait Job<T: Send + Sync + Sized, Q: JobQueue<T>>: Send + Sync + Sized {
+    fn job(&self) -> &T;
+    async fn done(self) -> Result<(), Q::Error>;
+}
+
+#[derive(Debug)]
+pub struct PostgresJobItem<'c, T> {
+    id: Uuid,
+    job: T,
+    txn: sqlx::Transaction<'c, sqlx::Postgres>
+}
+
+#[async_trait::async_trait]
+impl Job<Webmention, PostgresJobQueue<Webmention>> for PostgresJobItem<'_, Webmention> {
+    fn job(&self) -> &Webmention {
+        &self.job
+    }
+    async fn done(mut self) -> Result<(), <PostgresJobQueue<Webmention> as JobQueue<Webmention>>::Error> {
+        println!("Deleting {} from the job queue", self.id);
+        sqlx::query("DELETE FROM incoming_webmention_queue WHERE id = $1")
+            .bind(self.id)
+            .execute(&mut self.txn)
+            .await?;
+
+        self.txn.commit().await
+    }
+}
+
+pub struct PostgresJobQueue<T> {
+    db: sqlx::PgPool,
+    _phantom: std::marker::PhantomData<T>
+}
+impl<T> Clone for PostgresJobQueue<T> {
+    fn clone(&self) -> Self {
+        Self {
+            db: self.db.clone(),
+            _phantom: std::marker::PhantomData
+        }
+    }
+}
+
+impl PostgresJobQueue<Webmention> {
+    pub async fn new(uri: &str) -> Result<Self, sqlx::Error> {
+        let mut options = sqlx::postgres::PgConnectOptions::from_str(uri)?;
+        if let Ok(password_file) = std::env::var("PGPASS_FILE") {
+            let password = tokio::fs::read_to_string(password_file).await.unwrap();
+            options = options.password(&password);
+        } else if let Ok(password) = std::env::var("PGPASS") {
+            options = options.password(&password)
+        }
+        Ok(Self::from_pool(
+            sqlx::postgres::PgPoolOptions::new()
+                .max_connections(50)
+                .connect_with(options)
+                .await?
+        ).await?)
+
+    }
+
+    pub(crate) async fn from_pool(db: sqlx::PgPool) -> Result<Self, sqlx::migrate::MigrateError> {
+        MIGRATOR.run(&db).await?;
+        Ok(Self { db, _phantom: std::marker::PhantomData })
+    }
+}
+
+#[async_trait::async_trait]
+impl JobQueue<Webmention> for PostgresJobQueue<Webmention> {
+    type Job = PostgresJobItem<'static, Webmention>;
+    type Error = sqlx::Error;
+
+    async fn get_one(&self) -> Result<Option<Self::Job>, Self::Error> {
+        let mut txn = self.db.begin().await?;
+
+        match sqlx::query_as::<_, (Uuid, String, String)>(
+            "SELECT id, source, target FROM incoming_webmention_queue FOR UPDATE SKIP LOCKED LIMIT 1"
+        )
+            .fetch_optional(&mut txn)
+            .await?
+            .map(|(id, source, target)| (id, Webmention { source, target })) {
+                Some((id, webmention)) => Ok(Some(Self::Job {
+                    id,
+                    job: webmention,
+                    txn
+                })),
+                None => Ok(None)
+            }
+    }
+
+    async fn put(&self, item: &Webmention) -> Result<Uuid, Self::Error> {
+        sqlx::query_scalar::<_, Uuid>("INSERT INTO incoming_webmention_queue (source, target) VALUES ($1, $2) RETURNING id")
+            .bind(item.source.as_str())
+            .bind(item.target.as_str())
+            .fetch_one(&self.db)
+            .await
+    }
+
+    async fn into_stream(self) -> Result<Pin<Box<dyn Stream<Item = Result<Self::Job, Self::Error>> + Send>>, Self::Error> {
+        let mut listener = PgListener::connect_with(&self.db).await?;
+        listener.listen("incoming_webmention").await?;
+
+        let stream: Pin<Box<dyn Stream<Item = Result<Self::Job, Self::Error>> + Send>> = futures_util::stream::try_unfold((), {
+            let listener = std::sync::Arc::new(tokio::sync::Mutex::new(listener));
+            move |_| {
+                let queue = self.clone();
+                let listener = listener.clone();
+                async move {
+                    loop {
+                        match queue.get_one().await? {
+                            Some(item) => return Ok(Some((item, ()))),
+                            None => {
+                                listener.lock().await.recv().await?;
+                                continue
+                            }
+                        }
+                    }
+                }
+            }
+        }).boxed();
+
+        Ok(stream)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::{Webmention, PostgresJobQueue, Job, JobQueue};
+    use futures_util::StreamExt;
+    #[sqlx::test]
+    async fn test_webmention_queue(pool: sqlx::PgPool) -> Result<(), sqlx::Error> {
+        let test_webmention = Webmention {
+            source: "https://fireburn.ru/posts/lorem-ipsum".to_owned(),
+            target: "https://aaronparecki.com/posts/dolor-sit-amet".to_owned()
+        };
+
+        let queue = PostgresJobQueue::<Webmention>::from_pool(pool).await?;
+        println!("Putting webmention into queue");
+        queue.put(&test_webmention).await?;
+        assert_eq!(queue.get_one().await?.as_ref().map(|j| j.job()), Some(&test_webmention));
+        println!("Creating a stream");
+        let mut stream = queue.clone().into_stream().await?;
+
+        let future = stream.next();
+        let guard = future.await.transpose()?.unwrap();
+        assert_eq!(guard.job(), &test_webmention);
+        if let Some(item) = queue.get_one().await? {
+            panic!("Unexpected item {:?} returned from job queue!", item)
+        };
+        drop(guard);
+        let guard = stream.next().await.transpose()?.unwrap();
+        assert_eq!(guard.job(), &test_webmention);
+        guard.done().await?;
+        match queue.get_one().await? {
+            Some(item) => panic!("Unexpected item {:?} returned from job queue!", item),
+            None => Ok(())
+        }
+    }
+}