about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--kittybox-rs/migrations/0002_webmention_queue.sql9
-rw-r--r--kittybox-rs/src/webmentions/mod.rs4
-rw-r--r--kittybox-rs/src/webmentions/queue.rs76
3 files changed, 69 insertions, 20 deletions
diff --git a/kittybox-rs/migrations/0002_webmention_queue.sql b/kittybox-rs/migrations/0002_webmention_queue.sql
index 0b95771..708933b 100644
--- a/kittybox-rs/migrations/0002_webmention_queue.sql
+++ b/kittybox-rs/migrations/0002_webmention_queue.sql
@@ -1,10 +1,13 @@
-CREATE TABLE incoming_webmention_queue (
+CREATE TABLE kittybox.incoming_webmention_queue (
        id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
        source TEXT NOT NULL,
        target TEXT NOT NULL,
-       recv_timestamp TIMESTAMPTZ NOT NULL DEFAULT now()
+       recv_timestamp TIMESTAMPTZ NOT NULL DEFAULT now(),
+       attempts INTEGER NOT NULL DEFAULT 0
 );
 
+CREATE INDEX webmention_jobs_by_attempts ON kittybox.incoming_webmention_queue (attempts);
+
 CREATE RULE notify_incoming_webmention AS
-ON INSERT TO incoming_webmention_queue
+ON INSERT TO kittybox.incoming_webmention_queue
 DO ALSO NOTIFY incoming_webmention;
diff --git a/kittybox-rs/src/webmentions/mod.rs b/kittybox-rs/src/webmentions/mod.rs
index 1c4886e..d798c50 100644
--- a/kittybox-rs/src/webmentions/mod.rs
+++ b/kittybox-rs/src/webmentions/mod.rs
@@ -11,6 +11,10 @@ pub struct Webmention {
     target: String,
 }
 
+impl queue::JobItem for Webmention {
+    const DATABASE_NAME: &'static str = "kittybox.incoming_webmention_queue";
+}
+
 async fn accept_webmention<Q: JobQueue<Webmention>>(
     Form(webmention): Form<Webmention>,
     Extension(queue): Extension<Q>
diff --git a/kittybox-rs/src/webmentions/queue.rs b/kittybox-rs/src/webmentions/queue.rs
index 77ad4ea..b585f58 100644
--- a/kittybox-rs/src/webmentions/queue.rs
+++ b/kittybox-rs/src/webmentions/queue.rs
@@ -9,7 +9,7 @@ use super::Webmention;
 static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!();
 
 #[async_trait::async_trait]
-pub trait JobQueue<T: Send + Sync + Sized>: Send + Sync + Sized + Clone + 'static {
+pub trait JobQueue<T: JobItem>: Send + Sync + Sized + Clone + 'static {
     type Job: Job<T, Self>;
     type Error: std::error::Error + Send + Sync + Sized;
 
@@ -20,31 +20,70 @@ pub trait JobQueue<T: Send + Sync + Sized>: Send + Sync + Sized + Clone + 'stati
 }
 
 #[async_trait::async_trait]
-pub trait Job<T: Send + Sync + Sized, Q: JobQueue<T>>: Send + Sync + Sized {
+pub trait Job<T: JobItem, Q: JobQueue<T>>: Send + Sync + Sized {
     fn job(&self) -> &T;
     async fn done(self) -> Result<(), Q::Error>;
 }
 
+pub trait JobItem: Send + Sync + Sized + std::fmt::Debug {
+    const DATABASE_NAME: &'static str;
+}
+
 #[derive(Debug)]
-pub struct PostgresJobItem<'c, T> {
+pub struct PostgresJobItem<T: JobItem> {
     id: Uuid,
     job: T,
-    txn: sqlx::Transaction<'c, sqlx::Postgres>
+    // This will normally always be Some, except on drop
+    txn: Option<sqlx::Transaction<'static, sqlx::Postgres>>,
+    runtime_handle: tokio::runtime::Handle,
+}
+
+
+impl<T: JobItem> Drop for PostgresJobItem<T> {
+    // This is an emulation of "async drop" — the struct retains a
+    // runtime handle, which it uses to block on a future that does
+    // the actual cleanup.
+    //
+    // Of course, this is not portable between runtimes, but I don't
+    // care about that, since Kittybox is designed to work within the
+    // Tokio ecosystem.
+    fn drop(&mut self) {
+        tracing::error!("Job {:?} failed, incrementing attempts...", &self);
+        if let Some(mut txn) = self.txn.take() {
+            let id = self.id;
+            self.runtime_handle.spawn(async move {
+                tracing::debug!("Constructing query to increment attempts for job {}...", id);
+                // UPDATE "T::DATABASE_NAME" WHERE id = $1 SET attempts = attempts + 1
+                sqlx::query_builder::QueryBuilder::new("UPDATE ")
+                    // This is safe from a SQL injection standpoint, since it is a constant.
+                    .push(T::DATABASE_NAME)
+                    .push(" SET attempts = attempts + 1")
+                    .push(" WHERE id = ")
+                    .push_bind(id)
+                    .build()
+                    .execute(&mut txn)
+                    .await
+                    .unwrap();
+
+                txn.commit().await.unwrap();
+            });
+        }
+    }
 }
 
 #[async_trait::async_trait]
-impl Job<Webmention, PostgresJobQueue<Webmention>> for PostgresJobItem<'_, Webmention> {
+impl Job<Webmention, PostgresJobQueue<Webmention>> for PostgresJobItem<Webmention> {
     fn job(&self) -> &Webmention {
         &self.job
     }
     async fn done(mut self) -> Result<(), <PostgresJobQueue<Webmention> as JobQueue<Webmention>>::Error> {
-        println!("Deleting {} from the job queue", self.id);
-        sqlx::query("DELETE FROM incoming_webmention_queue WHERE id = $1")
+        tracing::debug!("Deleting {} from the job queue", self.id);
+        sqlx::query("DELETE FROM kittybox.incoming_webmention_queue WHERE id = $1")
             .bind(self.id)
-            .execute(&mut self.txn)
+            .execute(self.txn.as_mut().unwrap())
             .await?;
 
-        self.txn.commit().await
+        self.txn.take().unwrap().commit().await
     }
 }
 
@@ -87,29 +126,32 @@ impl PostgresJobQueue<Webmention> {
 
 #[async_trait::async_trait]
 impl JobQueue<Webmention> for PostgresJobQueue<Webmention> {
-    type Job = PostgresJobItem<'static, Webmention>;
+    type Job = PostgresJobItem<Webmention>;
     type Error = sqlx::Error;
 
     async fn get_one(&self) -> Result<Option<Self::Job>, Self::Error> {
         let mut txn = self.db.begin().await?;
 
         match sqlx::query_as::<_, (Uuid, String, String)>(
-            "SELECT id, source, target FROM incoming_webmention_queue FOR UPDATE SKIP LOCKED LIMIT 1"
+            "SELECT id, source, target FROM kittybox.incoming_webmention_queue WHERE attempts < 5 FOR UPDATE SKIP LOCKED LIMIT 1"
         )
             .fetch_optional(&mut txn)
             .await?
             .map(|(id, source, target)| (id, Webmention { source, target })) {
-                Some((id, webmention)) => Ok(Some(Self::Job {
-                    id,
-                    job: webmention,
-                    txn
-                })),
+                Some((id, webmention)) => {
+                    return Ok(Some(Self::Job {
+                        id,
+                        job: webmention,
+                        txn: Some(txn),
+                        runtime_handle: tokio::runtime::Handle::current(),
+                    }))
+                },
                 None => Ok(None)
             }
     }
 
     async fn put(&self, item: &Webmention) -> Result<Uuid, Self::Error> {
-        sqlx::query_scalar::<_, Uuid>("INSERT INTO incoming_webmention_queue (source, target) VALUES ($1, $2) RETURNING id")
+        sqlx::query_scalar::<_, Uuid>("INSERT INTO kittybox.incoming_webmention_queue (source, target) VALUES ($1, $2) RETURNING id")
             .bind(item.source.as_str())
             .bind(item.target.as_str())
             .fetch_one(&self.db)