From ca422132c8f739f3ee873046a3f59d80f2462152 Mon Sep 17 00:00:00 2001 From: Vika Date: Sun, 9 Jul 2023 18:25:58 +0300 Subject: webmentions/queue: give up on an item after 5 attempts This also involves a crude "async drop" implementation that fires a future incrementing an attempt if a Job has been dropped without marking it as done. --- kittybox-rs/migrations/0002_webmention_queue.sql | 9 ++- kittybox-rs/src/webmentions/mod.rs | 4 ++ kittybox-rs/src/webmentions/queue.rs | 76 ++++++++++++++++++------ 3 files changed, 69 insertions(+), 20 deletions(-) (limited to 'kittybox-rs') diff --git a/kittybox-rs/migrations/0002_webmention_queue.sql b/kittybox-rs/migrations/0002_webmention_queue.sql index 0b95771..708933b 100644 --- a/kittybox-rs/migrations/0002_webmention_queue.sql +++ b/kittybox-rs/migrations/0002_webmention_queue.sql @@ -1,10 +1,13 @@ -CREATE TABLE incoming_webmention_queue ( +CREATE TABLE kittybox.incoming_webmention_queue ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), source TEXT NOT NULL, target TEXT NOT NULL, - recv_timestamp TIMESTAMPTZ NOT NULL DEFAULT now() + recv_timestamp TIMESTAMPTZ NOT NULL DEFAULT now(), + attempts INTEGER NOT NULL DEFAULT 0 ); +CREATE INDEX webmention_jobs_by_attempts ON kittybox.incoming_webmention_queue (attempts); + CREATE RULE notify_incoming_webmention AS -ON INSERT TO incoming_webmention_queue +ON INSERT TO kittybox.incoming_webmention_queue DO ALSO NOTIFY incoming_webmention; diff --git a/kittybox-rs/src/webmentions/mod.rs b/kittybox-rs/src/webmentions/mod.rs index 1c4886e..d798c50 100644 --- a/kittybox-rs/src/webmentions/mod.rs +++ b/kittybox-rs/src/webmentions/mod.rs @@ -11,6 +11,10 @@ pub struct Webmention { target: String, } +impl queue::JobItem for Webmention { + const DATABASE_NAME: &'static str = "kittybox.incoming_webmention_queue"; +} + async fn accept_webmention>( Form(webmention): Form, Extension(queue): Extension diff --git a/kittybox-rs/src/webmentions/queue.rs b/kittybox-rs/src/webmentions/queue.rs index 77ad4ea..b585f58 100644 --- a/kittybox-rs/src/webmentions/queue.rs +++ b/kittybox-rs/src/webmentions/queue.rs @@ -9,7 +9,7 @@ use super::Webmention; static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!(); #[async_trait::async_trait] -pub trait JobQueue: Send + Sync + Sized + Clone + 'static { +pub trait JobQueue: Send + Sync + Sized + Clone + 'static { type Job: Job; type Error: std::error::Error + Send + Sync + Sized; @@ -20,31 +20,70 @@ pub trait JobQueue: Send + Sync + Sized + Clone + 'stati } #[async_trait::async_trait] -pub trait Job>: Send + Sync + Sized { +pub trait Job>: Send + Sync + Sized { fn job(&self) -> &T; async fn done(self) -> Result<(), Q::Error>; } +pub trait JobItem: Send + Sync + Sized + std::fmt::Debug { + const DATABASE_NAME: &'static str; +} + #[derive(Debug)] -pub struct PostgresJobItem<'c, T> { +pub struct PostgresJobItem { id: Uuid, job: T, - txn: sqlx::Transaction<'c, sqlx::Postgres> + // This will normally always be Some, except on drop + txn: Option>, + runtime_handle: tokio::runtime::Handle, +} + + +impl Drop for PostgresJobItem { + // This is an emulation of "async drop" — the struct retains a + // runtime handle, which it uses to block on a future that does + // the actual cleanup. + // + // Of course, this is not portable between runtimes, but I don't + // care about that, since Kittybox is designed to work within the + // Tokio ecosystem. + fn drop(&mut self) { + tracing::error!("Job {:?} failed, incrementing attempts...", &self); + if let Some(mut txn) = self.txn.take() { + let id = self.id; + self.runtime_handle.spawn(async move { + tracing::debug!("Constructing query to increment attempts for job {}...", id); + // UPDATE "T::DATABASE_NAME" WHERE id = $1 SET attempts = attempts + 1 + sqlx::query_builder::QueryBuilder::new("UPDATE ") + // This is safe from a SQL injection standpoint, since it is a constant. + .push(T::DATABASE_NAME) + .push(" SET attempts = attempts + 1") + .push(" WHERE id = ") + .push_bind(id) + .build() + .execute(&mut txn) + .await + .unwrap(); + + txn.commit().await.unwrap(); + }); + } + } } #[async_trait::async_trait] -impl Job> for PostgresJobItem<'_, Webmention> { +impl Job> for PostgresJobItem { fn job(&self) -> &Webmention { &self.job } async fn done(mut self) -> Result<(), as JobQueue>::Error> { - println!("Deleting {} from the job queue", self.id); - sqlx::query("DELETE FROM incoming_webmention_queue WHERE id = $1") + tracing::debug!("Deleting {} from the job queue", self.id); + sqlx::query("DELETE FROM kittybox.incoming_webmention_queue WHERE id = $1") .bind(self.id) - .execute(&mut self.txn) + .execute(self.txn.as_mut().unwrap()) .await?; - self.txn.commit().await + self.txn.take().unwrap().commit().await } } @@ -87,29 +126,32 @@ impl PostgresJobQueue { #[async_trait::async_trait] impl JobQueue for PostgresJobQueue { - type Job = PostgresJobItem<'static, Webmention>; + type Job = PostgresJobItem; type Error = sqlx::Error; async fn get_one(&self) -> Result, Self::Error> { let mut txn = self.db.begin().await?; match sqlx::query_as::<_, (Uuid, String, String)>( - "SELECT id, source, target FROM incoming_webmention_queue FOR UPDATE SKIP LOCKED LIMIT 1" + "SELECT id, source, target FROM kittybox.incoming_webmention_queue WHERE attempts < 5 FOR UPDATE SKIP LOCKED LIMIT 1" ) .fetch_optional(&mut txn) .await? .map(|(id, source, target)| (id, Webmention { source, target })) { - Some((id, webmention)) => Ok(Some(Self::Job { - id, - job: webmention, - txn - })), + Some((id, webmention)) => { + return Ok(Some(Self::Job { + id, + job: webmention, + txn: Some(txn), + runtime_handle: tokio::runtime::Handle::current(), + })) + }, None => Ok(None) } } async fn put(&self, item: &Webmention) -> Result { - sqlx::query_scalar::<_, Uuid>("INSERT INTO incoming_webmention_queue (source, target) VALUES ($1, $2) RETURNING id") + sqlx::query_scalar::<_, Uuid>("INSERT INTO kittybox.incoming_webmention_queue (source, target) VALUES ($1, $2) RETURNING id") .bind(item.source.as_str()) .bind(item.target.as_str()) .fetch_one(&self.db) -- cgit 1.4.1