From b38b508366a80a2f1c163ae3623c79e883323201 Mon Sep 17 00:00:00 2001 From: Vika Date: Sun, 9 Jul 2023 21:06:55 +0300 Subject: webmentions/queue: move JobQueue trait into kittybox-util The trait itself seems basic enough that it could be reused elsewhere. Better to keep it in a separate crate. `-util` is a dumping ground for various things anyway. --- kittybox-rs/src/webmentions/mod.rs | 4 +- kittybox-rs/src/webmentions/queue.rs | 171 +++++++++++++++++++++++++---------- kittybox-rs/util/Cargo.toml | 3 + kittybox-rs/util/src/lib.rs | 3 + kittybox-rs/util/src/queue.rs | 66 ++++++++++++++ 5 files changed, 200 insertions(+), 47 deletions(-) create mode 100644 kittybox-rs/util/src/queue.rs diff --git a/kittybox-rs/src/webmentions/mod.rs b/kittybox-rs/src/webmentions/mod.rs index cffd064..630a1a6 100644 --- a/kittybox-rs/src/webmentions/mod.rs +++ b/kittybox-rs/src/webmentions/mod.rs @@ -11,8 +11,10 @@ pub struct Webmention { target: String, } -impl queue::JobItem for Webmention { +impl queue::JobItem for Webmention {} +impl queue::PostgresJobItem for Webmention { const DATABASE_NAME: &'static str = "kittybox.incoming_webmention_queue"; + const NOTIFICATION_CHANNEL: &'static str = "incoming_webmention"; } async fn accept_webmention>( diff --git a/kittybox-rs/src/webmentions/queue.rs b/kittybox-rs/src/webmentions/queue.rs index b585f58..0b11a4e 100644 --- a/kittybox-rs/src/webmentions/queue.rs +++ b/kittybox-rs/src/webmentions/queue.rs @@ -8,29 +8,22 @@ use super::Webmention; static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!(); -#[async_trait::async_trait] -pub trait JobQueue: Send + Sync + Sized + Clone + 'static { - type Job: Job; - type Error: std::error::Error + Send + Sync + Sized; - - async fn get_one(&self) -> Result, Self::Error>; - async fn put(&self, item: &T) -> Result; - - async fn into_stream(self) -> Result> + Send>>, Self::Error>; -} +pub use kittybox_util::queue::{JobQueue, JobItem, Job}; -#[async_trait::async_trait] -pub trait Job>: Send + Sync + Sized { - fn job(&self) -> &T; - async fn done(self) -> Result<(), Q::Error>; +pub trait PostgresJobItem: JobItem + sqlx::FromRow<'static, sqlx::postgres::PgRow> { + const DATABASE_NAME: &'static str; + const NOTIFICATION_CHANNEL: &'static str; } -pub trait JobItem: Send + Sync + Sized + std::fmt::Debug { - const DATABASE_NAME: &'static str; +#[derive(sqlx::FromRow)] +struct PostgresJobRow { + id: Uuid, + #[sqlx(flatten)] + job: T } #[derive(Debug)] -pub struct PostgresJobItem { +pub struct PostgresJob { id: Uuid, job: T, // This will normally always be Some, except on drop @@ -39,7 +32,7 @@ pub struct PostgresJobItem { } -impl Drop for PostgresJobItem { +impl Drop for PostgresJob { // This is an emulation of "async drop" — the struct retains a // runtime handle, which it uses to block on a future that does // the actual cleanup. @@ -64,15 +57,36 @@ impl Drop for PostgresJobItem { .execute(&mut txn) .await .unwrap(); - + sqlx::query_builder::QueryBuilder::new("NOTIFY ") + .push(T::NOTIFICATION_CHANNEL) + .build() + .execute(&mut txn) + .await + .unwrap(); txn.commit().await.unwrap(); }); } } } +#[cfg(test)] +impl PostgresJob { + async fn attempts(&mut self) -> Result { + sqlx::query_builder::QueryBuilder::new("SELECT attempts FROM ") + .push(T::DATABASE_NAME) + .push(" WHERE id = ") + .push_bind(self.id) + .build_query_as::<(i32,)>() + // It's safe to unwrap here, because we "take" the txn only on drop or commit, + // where it's passed by value, not by reference. + .fetch_one(self.txn.as_mut().unwrap()) + .await + .map(|(i,)| i as usize) + } +} + #[async_trait::async_trait] -impl Job> for PostgresJobItem { +impl Job> for PostgresJob { fn job(&self) -> &Webmention { &self.job } @@ -126,28 +140,28 @@ impl PostgresJobQueue { #[async_trait::async_trait] impl JobQueue for PostgresJobQueue { - type Job = PostgresJobItem; + type Job = PostgresJob; type Error = sqlx::Error; async fn get_one(&self) -> Result, Self::Error> { let mut txn = self.db.begin().await?; - match sqlx::query_as::<_, (Uuid, String, String)>( + match sqlx::query_as::<_, PostgresJobRow>( "SELECT id, source, target FROM kittybox.incoming_webmention_queue WHERE attempts < 5 FOR UPDATE SKIP LOCKED LIMIT 1" ) .fetch_optional(&mut txn) .await? - .map(|(id, source, target)| (id, Webmention { source, target })) { - Some((id, webmention)) => { - return Ok(Some(Self::Job { - id, - job: webmention, - txn: Some(txn), - runtime_handle: tokio::runtime::Handle::current(), - })) - }, - None => Ok(None) - } + { + Some(job_row) => { + return Ok(Some(Self::Job { + id: job_row.id, + job: job_row.job, + txn: Some(txn), + runtime_handle: tokio::runtime::Handle::current(), + })) + }, + None => Ok(None) + } } async fn put(&self, item: &Webmention) -> Result { @@ -187,9 +201,12 @@ impl JobQueue for PostgresJobQueue { #[cfg(test)] mod tests { + use std::sync::Arc; + use super::{Webmention, PostgresJobQueue, Job, JobQueue}; use futures_util::StreamExt; #[sqlx::test] + #[tracing_test::traced_test] async fn test_webmention_queue(pool: sqlx::PgPool) -> Result<(), sqlx::Error> { let test_webmention = Webmention { source: "https://fireburn.ru/posts/lorem-ipsum".to_owned(), @@ -197,25 +214,87 @@ mod tests { }; let queue = PostgresJobQueue::::from_pool(pool).await?; - println!("Putting webmention into queue"); + tracing::debug!("Putting webmention into queue"); queue.put(&test_webmention).await?; - assert_eq!(queue.get_one().await?.as_ref().map(|j| j.job()), Some(&test_webmention)); - println!("Creating a stream"); + { + let mut job_description = queue.get_one().await?.unwrap(); + assert_eq!(job_description.job(), &test_webmention); + assert_eq!(job_description.attempts().await?, 0); + } + tracing::debug!("Creating a stream"); let mut stream = queue.clone().into_stream().await?; - let future = stream.next(); - let guard = future.await.transpose()?.unwrap(); - assert_eq!(guard.job(), &test_webmention); - if let Some(item) = queue.get_one().await? { - panic!("Unexpected item {:?} returned from job queue!", item) - }; - drop(guard); - let guard = stream.next().await.transpose()?.unwrap(); - assert_eq!(guard.job(), &test_webmention); - guard.done().await?; + { + let mut guard = stream.next().await.transpose()?.unwrap(); + assert_eq!(guard.job(), &test_webmention); + assert_eq!(guard.attempts().await?, 1); + if let Some(item) = queue.get_one().await? { + panic!("Unexpected item {:?} returned from job queue!", item) + }; + } + + { + let mut guard = stream.next().await.transpose()?.unwrap(); + assert_eq!(guard.job(), &test_webmention); + assert_eq!(guard.attempts().await?, 2); + guard.done().await?; + } + match queue.get_one().await? { Some(item) => panic!("Unexpected item {:?} returned from job queue!", item), None => Ok(()) } } + + #[sqlx::test] + #[tracing_test::traced_test] + async fn test_no_hangups_in_queue(pool: sqlx::PgPool) -> Result<(), sqlx::Error> { + let test_webmention = Webmention { + source: "https://fireburn.ru/posts/lorem-ipsum".to_owned(), + target: "https://aaronparecki.com/posts/dolor-sit-amet".to_owned() + }; + + let queue = PostgresJobQueue::::from_pool(pool.clone()).await?; + tracing::debug!("Putting webmention into queue"); + queue.put(&test_webmention).await?; + tracing::debug!("Creating a stream"); + let mut stream = queue.clone().into_stream().await?; + + // Synchronisation barrier that will be useful later + let barrier = Arc::new(tokio::sync::Barrier::new(2)); + { + // Get one job guard from a queue + let mut guard = stream.next().await.transpose()?.unwrap(); + assert_eq!(guard.job(), &test_webmention); + assert_eq!(guard.attempts().await?, 0); + + tokio::task::spawn({ + let barrier = barrier.clone(); + async move { + // Wait for the signal to drop the guard! + barrier.wait().await; + + drop(guard) + } + }); + } + tokio::time::timeout(std::time::Duration::from_secs(1), stream.next()).await.unwrap_err(); + + let future = tokio::task::spawn( + tokio::time::timeout( + std::time::Duration::from_secs(10), async move { + stream.next().await.unwrap().unwrap() + } + ) + ); + // Let the other task drop the guard it is holding + barrier.wait().await; + let mut guard = future.await + .expect("Timeout on fetching item") + .expect("Job queue error"); + assert_eq!(guard.job(), &test_webmention); + assert_eq!(guard.attempts().await?, 1); + + Ok(()) + } } diff --git a/kittybox-rs/util/Cargo.toml b/kittybox-rs/util/Cargo.toml index b0b725c..f0653db 100644 --- a/kittybox-rs/util/Cargo.toml +++ b/kittybox-rs/util/Cargo.toml @@ -13,6 +13,9 @@ serde = { version = "^1.0.125", features = ["derive"] } serde_json = "^1.0.64" axum-core = "^0.2.6" http = "^0.2.7" +async-trait = "^0.1.50" +futures-util = "^0.3.14" +uuid = "^1.3.3" [dependencies.rand] version = "^0.8.5" optional = true diff --git a/kittybox-rs/util/src/lib.rs b/kittybox-rs/util/src/lib.rs index 1c2f6f7..617ee97 100644 --- a/kittybox-rs/util/src/lib.rs +++ b/kittybox-rs/util/src/lib.rs @@ -41,6 +41,9 @@ pub mod auth { } } +/// A collection of traits for implementing a robust job queue. +pub mod queue; + #[cfg(feature = "fs")] /// Commonly-used operations with the file system in Kittybox's /// underlying storage mechanisms. diff --git a/kittybox-rs/util/src/queue.rs b/kittybox-rs/util/src/queue.rs new file mode 100644 index 0000000..c880597 --- /dev/null +++ b/kittybox-rs/util/src/queue.rs @@ -0,0 +1,66 @@ +use futures_util::Stream; +use std::pin::Pin; +use uuid::Uuid; + +#[async_trait::async_trait] +/// A job queue that can store and return jobs. +pub trait JobQueue: Send + Sync + Sized + Clone + 'static { + /// A type of job object that will be returned by the queue. + type Job: Job; + /// Error type that the queue can produce in its work. + type Error: std::error::Error + Send + Sync + Sized; + + /// Get one item from the job queue, if the job queue has pending + /// items available. + /// + /// # Errors + /// + /// Returns an error if a job queue failed in some way. Having no + /// items is not a failure, in which case `Ok(None)` is returned. + async fn get_one(&self) -> Result, Self::Error>; + /// Put an item into a job queue, returning its UUID. + async fn put(&self, item: &T) -> Result; + + /* + /// Check the amount of pending and stuck items in the job queue. + async fn len(&self) -> Result<(usize, usize), Self::Error>; + /// Returns whether the job queue has some pending items. + async fn is_empty(&self) -> Result { + Ok(self.len().await?.0 == 0) + } + /// Returns whether the job queue has some stuck items that + /// require manual cleanup. + async fn has_stuck(&self) -> Result { + Ok(self.len().await?.1 > 0) + } + */ + + /// Consume the job queue object and return a stream of unstuck + /// items from the job queue. + /// + /// Note that one item may be returned several times if it is not + /// marked as done. + async fn into_stream(self) -> Result> + Send>>, Self::Error>; +} + +#[async_trait::async_trait] +/// A job description yielded from a job queue. +/// +/// # Implementors +/// +/// On [`Drop`], the job should be returned to a job queue. If your +/// job queue tracks attempts, the counter should be incremented by +/// one. +/// +/// Figuring how to do this asynchronously from a synchronous trait +/// is left as an exercise to the reader. +pub trait Job>: Send + Sync + Sized { + /// Get the object describing the task itself. + fn job(&self) -> &T; + /// Mark the job as done and remove it from the job queue. + async fn done(self) -> Result<(), Q::Error>; +} + +/// An object describing the job itself, returned as part of a +/// [`Job`]. +pub trait JobItem: Send + Sync + Sized + std::fmt::Debug {} -- cgit 1.4.1