diff options
-rw-r--r-- | kittybox-rs/migrations/0002_webmention_queue.sql | 9 | ||||
-rw-r--r-- | kittybox-rs/src/webmentions/mod.rs | 4 | ||||
-rw-r--r-- | kittybox-rs/src/webmentions/queue.rs | 76 |
3 files changed, 69 insertions, 20 deletions
diff --git a/kittybox-rs/migrations/0002_webmention_queue.sql b/kittybox-rs/migrations/0002_webmention_queue.sql index 0b95771..708933b 100644 --- a/kittybox-rs/migrations/0002_webmention_queue.sql +++ b/kittybox-rs/migrations/0002_webmention_queue.sql @@ -1,10 +1,13 @@ -CREATE TABLE incoming_webmention_queue ( +CREATE TABLE kittybox.incoming_webmention_queue ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), source TEXT NOT NULL, target TEXT NOT NULL, - recv_timestamp TIMESTAMPTZ NOT NULL DEFAULT now() + recv_timestamp TIMESTAMPTZ NOT NULL DEFAULT now(), + attempts INTEGER NOT NULL DEFAULT 0 ); +CREATE INDEX webmention_jobs_by_attempts ON kittybox.incoming_webmention_queue (attempts); + CREATE RULE notify_incoming_webmention AS -ON INSERT TO incoming_webmention_queue +ON INSERT TO kittybox.incoming_webmention_queue DO ALSO NOTIFY incoming_webmention; diff --git a/kittybox-rs/src/webmentions/mod.rs b/kittybox-rs/src/webmentions/mod.rs index 1c4886e..d798c50 100644 --- a/kittybox-rs/src/webmentions/mod.rs +++ b/kittybox-rs/src/webmentions/mod.rs @@ -11,6 +11,10 @@ pub struct Webmention { target: String, } +impl queue::JobItem for Webmention { + const DATABASE_NAME: &'static str = "kittybox.incoming_webmention_queue"; +} + async fn accept_webmention<Q: JobQueue<Webmention>>( Form(webmention): Form<Webmention>, Extension(queue): Extension<Q> diff --git a/kittybox-rs/src/webmentions/queue.rs b/kittybox-rs/src/webmentions/queue.rs index 77ad4ea..b585f58 100644 --- a/kittybox-rs/src/webmentions/queue.rs +++ b/kittybox-rs/src/webmentions/queue.rs @@ -9,7 +9,7 @@ use super::Webmention; static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!(); #[async_trait::async_trait] -pub trait JobQueue<T: Send + Sync + Sized>: Send + Sync + Sized + Clone + 'static { +pub trait JobQueue<T: JobItem>: Send + Sync + Sized + Clone + 'static { type Job: Job<T, Self>; type Error: std::error::Error + Send + Sync + Sized; @@ -20,31 +20,70 @@ pub trait JobQueue<T: Send + Sync + Sized>: Send + Sync + Sized + Clone + 'stati } #[async_trait::async_trait] -pub trait Job<T: Send + Sync + Sized, Q: JobQueue<T>>: Send + Sync + Sized { +pub trait Job<T: JobItem, Q: JobQueue<T>>: Send + Sync + Sized { fn job(&self) -> &T; async fn done(self) -> Result<(), Q::Error>; } +pub trait JobItem: Send + Sync + Sized + std::fmt::Debug { + const DATABASE_NAME: &'static str; +} + #[derive(Debug)] -pub struct PostgresJobItem<'c, T> { +pub struct PostgresJobItem<T: JobItem> { id: Uuid, job: T, - txn: sqlx::Transaction<'c, sqlx::Postgres> + // This will normally always be Some, except on drop + txn: Option<sqlx::Transaction<'static, sqlx::Postgres>>, + runtime_handle: tokio::runtime::Handle, +} + + +impl<T: JobItem> Drop for PostgresJobItem<T> { + // This is an emulation of "async drop" — the struct retains a + // runtime handle, which it uses to block on a future that does + // the actual cleanup. + // + // Of course, this is not portable between runtimes, but I don't + // care about that, since Kittybox is designed to work within the + // Tokio ecosystem. + fn drop(&mut self) { + tracing::error!("Job {:?} failed, incrementing attempts...", &self); + if let Some(mut txn) = self.txn.take() { + let id = self.id; + self.runtime_handle.spawn(async move { + tracing::debug!("Constructing query to increment attempts for job {}...", id); + // UPDATE "T::DATABASE_NAME" WHERE id = $1 SET attempts = attempts + 1 + sqlx::query_builder::QueryBuilder::new("UPDATE ") + // This is safe from a SQL injection standpoint, since it is a constant. + .push(T::DATABASE_NAME) + .push(" SET attempts = attempts + 1") + .push(" WHERE id = ") + .push_bind(id) + .build() + .execute(&mut txn) + .await + .unwrap(); + + txn.commit().await.unwrap(); + }); + } + } } #[async_trait::async_trait] -impl Job<Webmention, PostgresJobQueue<Webmention>> for PostgresJobItem<'_, Webmention> { +impl Job<Webmention, PostgresJobQueue<Webmention>> for PostgresJobItem<Webmention> { fn job(&self) -> &Webmention { &self.job } async fn done(mut self) -> Result<(), <PostgresJobQueue<Webmention> as JobQueue<Webmention>>::Error> { - println!("Deleting {} from the job queue", self.id); - sqlx::query("DELETE FROM incoming_webmention_queue WHERE id = $1") + tracing::debug!("Deleting {} from the job queue", self.id); + sqlx::query("DELETE FROM kittybox.incoming_webmention_queue WHERE id = $1") .bind(self.id) - .execute(&mut self.txn) + .execute(self.txn.as_mut().unwrap()) .await?; - self.txn.commit().await + self.txn.take().unwrap().commit().await } } @@ -87,29 +126,32 @@ impl PostgresJobQueue<Webmention> { #[async_trait::async_trait] impl JobQueue<Webmention> for PostgresJobQueue<Webmention> { - type Job = PostgresJobItem<'static, Webmention>; + type Job = PostgresJobItem<Webmention>; type Error = sqlx::Error; async fn get_one(&self) -> Result<Option<Self::Job>, Self::Error> { let mut txn = self.db.begin().await?; match sqlx::query_as::<_, (Uuid, String, String)>( - "SELECT id, source, target FROM incoming_webmention_queue FOR UPDATE SKIP LOCKED LIMIT 1" + "SELECT id, source, target FROM kittybox.incoming_webmention_queue WHERE attempts < 5 FOR UPDATE SKIP LOCKED LIMIT 1" ) .fetch_optional(&mut txn) .await? .map(|(id, source, target)| (id, Webmention { source, target })) { - Some((id, webmention)) => Ok(Some(Self::Job { - id, - job: webmention, - txn - })), + Some((id, webmention)) => { + return Ok(Some(Self::Job { + id, + job: webmention, + txn: Some(txn), + runtime_handle: tokio::runtime::Handle::current(), + })) + }, None => Ok(None) } } async fn put(&self, item: &Webmention) -> Result<Uuid, Self::Error> { - sqlx::query_scalar::<_, Uuid>("INSERT INTO incoming_webmention_queue (source, target) VALUES ($1, $2) RETURNING id") + sqlx::query_scalar::<_, Uuid>("INSERT INTO kittybox.incoming_webmention_queue (source, target) VALUES ($1, $2) RETURNING id") .bind(item.source.as_str()) .bind(item.target.as_str()) .fetch_one(&self.db) |