From b38b508366a80a2f1c163ae3623c79e883323201 Mon Sep 17 00:00:00 2001 From: Vika Date: Sun, 9 Jul 2023 21:06:55 +0300 Subject: webmentions/queue: move JobQueue trait into kittybox-util The trait itself seems basic enough that it could be reused elsewhere. Better to keep it in a separate crate. `-util` is a dumping ground for various things anyway. --- kittybox-rs/src/webmentions/mod.rs | 4 +- kittybox-rs/src/webmentions/queue.rs | 171 +++++++++++++++++++++++++---------- 2 files changed, 128 insertions(+), 47 deletions(-) (limited to 'kittybox-rs/src') diff --git a/kittybox-rs/src/webmentions/mod.rs b/kittybox-rs/src/webmentions/mod.rs index cffd064..630a1a6 100644 --- a/kittybox-rs/src/webmentions/mod.rs +++ b/kittybox-rs/src/webmentions/mod.rs @@ -11,8 +11,10 @@ pub struct Webmention { target: String, } -impl queue::JobItem for Webmention { +impl queue::JobItem for Webmention {} +impl queue::PostgresJobItem for Webmention { const DATABASE_NAME: &'static str = "kittybox.incoming_webmention_queue"; + const NOTIFICATION_CHANNEL: &'static str = "incoming_webmention"; } async fn accept_webmention>( diff --git a/kittybox-rs/src/webmentions/queue.rs b/kittybox-rs/src/webmentions/queue.rs index b585f58..0b11a4e 100644 --- a/kittybox-rs/src/webmentions/queue.rs +++ b/kittybox-rs/src/webmentions/queue.rs @@ -8,29 +8,22 @@ use super::Webmention; static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!(); -#[async_trait::async_trait] -pub trait JobQueue: Send + Sync + Sized + Clone + 'static { - type Job: Job; - type Error: std::error::Error + Send + Sync + Sized; - - async fn get_one(&self) -> Result, Self::Error>; - async fn put(&self, item: &T) -> Result; - - async fn into_stream(self) -> Result> + Send>>, Self::Error>; -} +pub use kittybox_util::queue::{JobQueue, JobItem, Job}; -#[async_trait::async_trait] -pub trait Job>: Send + Sync + Sized { - fn job(&self) -> &T; - async fn done(self) -> Result<(), Q::Error>; +pub trait PostgresJobItem: JobItem + sqlx::FromRow<'static, sqlx::postgres::PgRow> { + const DATABASE_NAME: &'static str; + const NOTIFICATION_CHANNEL: &'static str; } -pub trait JobItem: Send + Sync + Sized + std::fmt::Debug { - const DATABASE_NAME: &'static str; +#[derive(sqlx::FromRow)] +struct PostgresJobRow { + id: Uuid, + #[sqlx(flatten)] + job: T } #[derive(Debug)] -pub struct PostgresJobItem { +pub struct PostgresJob { id: Uuid, job: T, // This will normally always be Some, except on drop @@ -39,7 +32,7 @@ pub struct PostgresJobItem { } -impl Drop for PostgresJobItem { +impl Drop for PostgresJob { // This is an emulation of "async drop" — the struct retains a // runtime handle, which it uses to block on a future that does // the actual cleanup. @@ -64,15 +57,36 @@ impl Drop for PostgresJobItem { .execute(&mut txn) .await .unwrap(); - + sqlx::query_builder::QueryBuilder::new("NOTIFY ") + .push(T::NOTIFICATION_CHANNEL) + .build() + .execute(&mut txn) + .await + .unwrap(); txn.commit().await.unwrap(); }); } } } +#[cfg(test)] +impl PostgresJob { + async fn attempts(&mut self) -> Result { + sqlx::query_builder::QueryBuilder::new("SELECT attempts FROM ") + .push(T::DATABASE_NAME) + .push(" WHERE id = ") + .push_bind(self.id) + .build_query_as::<(i32,)>() + // It's safe to unwrap here, because we "take" the txn only on drop or commit, + // where it's passed by value, not by reference. + .fetch_one(self.txn.as_mut().unwrap()) + .await + .map(|(i,)| i as usize) + } +} + #[async_trait::async_trait] -impl Job> for PostgresJobItem { +impl Job> for PostgresJob { fn job(&self) -> &Webmention { &self.job } @@ -126,28 +140,28 @@ impl PostgresJobQueue { #[async_trait::async_trait] impl JobQueue for PostgresJobQueue { - type Job = PostgresJobItem; + type Job = PostgresJob; type Error = sqlx::Error; async fn get_one(&self) -> Result, Self::Error> { let mut txn = self.db.begin().await?; - match sqlx::query_as::<_, (Uuid, String, String)>( + match sqlx::query_as::<_, PostgresJobRow>( "SELECT id, source, target FROM kittybox.incoming_webmention_queue WHERE attempts < 5 FOR UPDATE SKIP LOCKED LIMIT 1" ) .fetch_optional(&mut txn) .await? - .map(|(id, source, target)| (id, Webmention { source, target })) { - Some((id, webmention)) => { - return Ok(Some(Self::Job { - id, - job: webmention, - txn: Some(txn), - runtime_handle: tokio::runtime::Handle::current(), - })) - }, - None => Ok(None) - } + { + Some(job_row) => { + return Ok(Some(Self::Job { + id: job_row.id, + job: job_row.job, + txn: Some(txn), + runtime_handle: tokio::runtime::Handle::current(), + })) + }, + None => Ok(None) + } } async fn put(&self, item: &Webmention) -> Result { @@ -187,9 +201,12 @@ impl JobQueue for PostgresJobQueue { #[cfg(test)] mod tests { + use std::sync::Arc; + use super::{Webmention, PostgresJobQueue, Job, JobQueue}; use futures_util::StreamExt; #[sqlx::test] + #[tracing_test::traced_test] async fn test_webmention_queue(pool: sqlx::PgPool) -> Result<(), sqlx::Error> { let test_webmention = Webmention { source: "https://fireburn.ru/posts/lorem-ipsum".to_owned(), @@ -197,25 +214,87 @@ mod tests { }; let queue = PostgresJobQueue::::from_pool(pool).await?; - println!("Putting webmention into queue"); + tracing::debug!("Putting webmention into queue"); queue.put(&test_webmention).await?; - assert_eq!(queue.get_one().await?.as_ref().map(|j| j.job()), Some(&test_webmention)); - println!("Creating a stream"); + { + let mut job_description = queue.get_one().await?.unwrap(); + assert_eq!(job_description.job(), &test_webmention); + assert_eq!(job_description.attempts().await?, 0); + } + tracing::debug!("Creating a stream"); let mut stream = queue.clone().into_stream().await?; - let future = stream.next(); - let guard = future.await.transpose()?.unwrap(); - assert_eq!(guard.job(), &test_webmention); - if let Some(item) = queue.get_one().await? { - panic!("Unexpected item {:?} returned from job queue!", item) - }; - drop(guard); - let guard = stream.next().await.transpose()?.unwrap(); - assert_eq!(guard.job(), &test_webmention); - guard.done().await?; + { + let mut guard = stream.next().await.transpose()?.unwrap(); + assert_eq!(guard.job(), &test_webmention); + assert_eq!(guard.attempts().await?, 1); + if let Some(item) = queue.get_one().await? { + panic!("Unexpected item {:?} returned from job queue!", item) + }; + } + + { + let mut guard = stream.next().await.transpose()?.unwrap(); + assert_eq!(guard.job(), &test_webmention); + assert_eq!(guard.attempts().await?, 2); + guard.done().await?; + } + match queue.get_one().await? { Some(item) => panic!("Unexpected item {:?} returned from job queue!", item), None => Ok(()) } } + + #[sqlx::test] + #[tracing_test::traced_test] + async fn test_no_hangups_in_queue(pool: sqlx::PgPool) -> Result<(), sqlx::Error> { + let test_webmention = Webmention { + source: "https://fireburn.ru/posts/lorem-ipsum".to_owned(), + target: "https://aaronparecki.com/posts/dolor-sit-amet".to_owned() + }; + + let queue = PostgresJobQueue::::from_pool(pool.clone()).await?; + tracing::debug!("Putting webmention into queue"); + queue.put(&test_webmention).await?; + tracing::debug!("Creating a stream"); + let mut stream = queue.clone().into_stream().await?; + + // Synchronisation barrier that will be useful later + let barrier = Arc::new(tokio::sync::Barrier::new(2)); + { + // Get one job guard from a queue + let mut guard = stream.next().await.transpose()?.unwrap(); + assert_eq!(guard.job(), &test_webmention); + assert_eq!(guard.attempts().await?, 0); + + tokio::task::spawn({ + let barrier = barrier.clone(); + async move { + // Wait for the signal to drop the guard! + barrier.wait().await; + + drop(guard) + } + }); + } + tokio::time::timeout(std::time::Duration::from_secs(1), stream.next()).await.unwrap_err(); + + let future = tokio::task::spawn( + tokio::time::timeout( + std::time::Duration::from_secs(10), async move { + stream.next().await.unwrap().unwrap() + } + ) + ); + // Let the other task drop the guard it is holding + barrier.wait().await; + let mut guard = future.await + .expect("Timeout on fetching item") + .expect("Job queue error"); + assert_eq!(guard.job(), &test_webmention); + assert_eq!(guard.attempts().await?, 1); + + Ok(()) + } } -- cgit 1.4.1