about summary refs log tree commit diff
path: root/kittybox-rs
diff options
context:
space:
mode:
Diffstat (limited to 'kittybox-rs')
-rw-r--r--kittybox-rs/src/webmentions/mod.rs4
-rw-r--r--kittybox-rs/src/webmentions/queue.rs171
-rw-r--r--kittybox-rs/util/Cargo.toml3
-rw-r--r--kittybox-rs/util/src/lib.rs3
-rw-r--r--kittybox-rs/util/src/queue.rs66
5 files changed, 200 insertions, 47 deletions
diff --git a/kittybox-rs/src/webmentions/mod.rs b/kittybox-rs/src/webmentions/mod.rs
index cffd064..630a1a6 100644
--- a/kittybox-rs/src/webmentions/mod.rs
+++ b/kittybox-rs/src/webmentions/mod.rs
@@ -11,8 +11,10 @@ pub struct Webmention {
     target: String,
 }
 
-impl queue::JobItem for Webmention {
+impl queue::JobItem for Webmention {}
+impl queue::PostgresJobItem for Webmention {
     const DATABASE_NAME: &'static str = "kittybox.incoming_webmention_queue";
+    const NOTIFICATION_CHANNEL: &'static str = "incoming_webmention";
 }
 
 async fn accept_webmention<Q: JobQueue<Webmention>>(
diff --git a/kittybox-rs/src/webmentions/queue.rs b/kittybox-rs/src/webmentions/queue.rs
index b585f58..0b11a4e 100644
--- a/kittybox-rs/src/webmentions/queue.rs
+++ b/kittybox-rs/src/webmentions/queue.rs
@@ -8,29 +8,22 @@ use super::Webmention;
 
 static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!();
 
-#[async_trait::async_trait]
-pub trait JobQueue<T: JobItem>: Send + Sync + Sized + Clone + 'static {
-    type Job: Job<T, Self>;
-    type Error: std::error::Error + Send + Sync + Sized;
-
-    async fn get_one(&self) -> Result<Option<Self::Job>, Self::Error>;
-    async fn put(&self, item: &T) -> Result<Uuid, Self::Error>;
-
-    async fn into_stream(self) -> Result<Pin<Box<dyn Stream<Item = Result<Self::Job, Self::Error>> + Send>>, Self::Error>;
-}
+pub use kittybox_util::queue::{JobQueue, JobItem, Job};
 
-#[async_trait::async_trait]
-pub trait Job<T: JobItem, Q: JobQueue<T>>: Send + Sync + Sized {
-    fn job(&self) -> &T;
-    async fn done(self) -> Result<(), Q::Error>;
+pub trait PostgresJobItem: JobItem + sqlx::FromRow<'static, sqlx::postgres::PgRow> {
+    const DATABASE_NAME: &'static str;
+    const NOTIFICATION_CHANNEL: &'static str;
 }
 
-pub trait JobItem: Send + Sync + Sized + std::fmt::Debug {
-    const DATABASE_NAME: &'static str;
+#[derive(sqlx::FromRow)]
+struct PostgresJobRow<T: PostgresJobItem> {
+    id: Uuid,
+    #[sqlx(flatten)]
+    job: T
 }
 
 #[derive(Debug)]
-pub struct PostgresJobItem<T: JobItem> {
+pub struct PostgresJob<T: PostgresJobItem> {
     id: Uuid,
     job: T,
     // This will normally always be Some, except on drop
@@ -39,7 +32,7 @@ pub struct PostgresJobItem<T: JobItem> {
 }
 
 
-impl<T: JobItem> Drop for PostgresJobItem<T> {
+impl<T: PostgresJobItem> Drop for PostgresJob<T> {
     // This is an emulation of "async drop" — the struct retains a
     // runtime handle, which it uses to block on a future that does
     // the actual cleanup.
@@ -64,15 +57,36 @@ impl<T: JobItem> Drop for PostgresJobItem<T> {
                     .execute(&mut txn)
                     .await
                     .unwrap();
-
+                sqlx::query_builder::QueryBuilder::new("NOTIFY ")
+                    .push(T::NOTIFICATION_CHANNEL)
+                    .build()
+                    .execute(&mut txn)
+                    .await
+                    .unwrap();
                 txn.commit().await.unwrap();
             });
         }
     }
 }
 
+#[cfg(test)]
+impl<T: PostgresJobItem> PostgresJob<T> {
+    async fn attempts(&mut self) -> Result<usize, sqlx::Error> {
+        sqlx::query_builder::QueryBuilder::new("SELECT attempts FROM ")
+            .push(T::DATABASE_NAME)
+            .push(" WHERE id = ")
+            .push_bind(self.id)
+            .build_query_as::<(i32,)>()
+            // It's safe to unwrap here, because we "take" the txn only on drop or commit,
+            // where it's passed by value, not by reference.
+            .fetch_one(self.txn.as_mut().unwrap())
+            .await
+            .map(|(i,)| i as usize)
+    }
+}
+
 #[async_trait::async_trait]
-impl Job<Webmention, PostgresJobQueue<Webmention>> for PostgresJobItem<Webmention> {
+impl Job<Webmention, PostgresJobQueue<Webmention>> for PostgresJob<Webmention> {
     fn job(&self) -> &Webmention {
         &self.job
     }
@@ -126,28 +140,28 @@ impl PostgresJobQueue<Webmention> {
 
 #[async_trait::async_trait]
 impl JobQueue<Webmention> for PostgresJobQueue<Webmention> {
-    type Job = PostgresJobItem<Webmention>;
+    type Job = PostgresJob<Webmention>;
     type Error = sqlx::Error;
 
     async fn get_one(&self) -> Result<Option<Self::Job>, Self::Error> {
         let mut txn = self.db.begin().await?;
 
-        match sqlx::query_as::<_, (Uuid, String, String)>(
+        match sqlx::query_as::<_, PostgresJobRow<Webmention>>(
             "SELECT id, source, target FROM kittybox.incoming_webmention_queue WHERE attempts < 5 FOR UPDATE SKIP LOCKED LIMIT 1"
         )
             .fetch_optional(&mut txn)
             .await?
-            .map(|(id, source, target)| (id, Webmention { source, target })) {
-                Some((id, webmention)) => {
-                    return Ok(Some(Self::Job {
-                        id,
-                        job: webmention,
-                        txn: Some(txn),
-                        runtime_handle: tokio::runtime::Handle::current(),
-                    }))
-                },
-                None => Ok(None)
-            }
+        {
+            Some(job_row) => {
+                return Ok(Some(Self::Job {
+                    id: job_row.id,
+                    job: job_row.job,
+                    txn: Some(txn),
+                    runtime_handle: tokio::runtime::Handle::current(),
+                }))
+            },
+            None => Ok(None)
+        }
     }
 
     async fn put(&self, item: &Webmention) -> Result<Uuid, Self::Error> {
@@ -187,9 +201,12 @@ impl JobQueue<Webmention> for PostgresJobQueue<Webmention> {
 
 #[cfg(test)]
 mod tests {
+    use std::sync::Arc;
+
     use super::{Webmention, PostgresJobQueue, Job, JobQueue};
     use futures_util::StreamExt;
     #[sqlx::test]
+    #[tracing_test::traced_test]
     async fn test_webmention_queue(pool: sqlx::PgPool) -> Result<(), sqlx::Error> {
         let test_webmention = Webmention {
             source: "https://fireburn.ru/posts/lorem-ipsum".to_owned(),
@@ -197,25 +214,87 @@ mod tests {
         };
 
         let queue = PostgresJobQueue::<Webmention>::from_pool(pool).await?;
-        println!("Putting webmention into queue");
+        tracing::debug!("Putting webmention into queue");
         queue.put(&test_webmention).await?;
-        assert_eq!(queue.get_one().await?.as_ref().map(|j| j.job()), Some(&test_webmention));
-        println!("Creating a stream");
+        {
+            let mut job_description = queue.get_one().await?.unwrap();
+            assert_eq!(job_description.job(), &test_webmention);
+            assert_eq!(job_description.attempts().await?, 0);
+        }
+        tracing::debug!("Creating a stream");
         let mut stream = queue.clone().into_stream().await?;
 
-        let future = stream.next();
-        let guard = future.await.transpose()?.unwrap();
-        assert_eq!(guard.job(), &test_webmention);
-        if let Some(item) = queue.get_one().await? {
-            panic!("Unexpected item {:?} returned from job queue!", item)
-        };
-        drop(guard);
-        let guard = stream.next().await.transpose()?.unwrap();
-        assert_eq!(guard.job(), &test_webmention);
-        guard.done().await?;
+        {
+            let mut guard = stream.next().await.transpose()?.unwrap();
+            assert_eq!(guard.job(), &test_webmention);
+            assert_eq!(guard.attempts().await?, 1);
+            if let Some(item) = queue.get_one().await? {
+                panic!("Unexpected item {:?} returned from job queue!", item)
+            };
+        }
+
+        {
+            let mut guard = stream.next().await.transpose()?.unwrap();
+            assert_eq!(guard.job(), &test_webmention);
+            assert_eq!(guard.attempts().await?, 2);
+            guard.done().await?;
+        }
+
         match queue.get_one().await? {
             Some(item) => panic!("Unexpected item {:?} returned from job queue!", item),
             None => Ok(())
         }
     }
+
+    #[sqlx::test]
+    #[tracing_test::traced_test]
+    async fn test_no_hangups_in_queue(pool: sqlx::PgPool) -> Result<(), sqlx::Error> {
+        let test_webmention = Webmention {
+            source: "https://fireburn.ru/posts/lorem-ipsum".to_owned(),
+            target: "https://aaronparecki.com/posts/dolor-sit-amet".to_owned()
+        };
+
+        let queue = PostgresJobQueue::<Webmention>::from_pool(pool.clone()).await?;
+        tracing::debug!("Putting webmention into queue");
+        queue.put(&test_webmention).await?;
+        tracing::debug!("Creating a stream");
+        let mut stream = queue.clone().into_stream().await?;
+
+        // Synchronisation barrier that will be useful later
+        let barrier = Arc::new(tokio::sync::Barrier::new(2));
+        {
+            // Get one job guard from a queue
+            let mut guard = stream.next().await.transpose()?.unwrap();
+            assert_eq!(guard.job(), &test_webmention);
+            assert_eq!(guard.attempts().await?, 0);
+
+            tokio::task::spawn({
+                let barrier = barrier.clone();
+                async move {
+                    // Wait for the signal to drop the guard!
+                    barrier.wait().await;
+
+                    drop(guard)
+                }
+            });
+        }
+        tokio::time::timeout(std::time::Duration::from_secs(1), stream.next()).await.unwrap_err();
+
+        let future = tokio::task::spawn(
+            tokio::time::timeout(
+                std::time::Duration::from_secs(10), async move {
+                    stream.next().await.unwrap().unwrap()
+                }
+            )
+        );
+        // Let the other task drop the guard it is holding
+        barrier.wait().await;
+        let mut guard = future.await
+            .expect("Timeout on fetching item")
+            .expect("Job queue error");
+        assert_eq!(guard.job(), &test_webmention);
+        assert_eq!(guard.attempts().await?, 1);
+
+        Ok(())
+    }
 }
diff --git a/kittybox-rs/util/Cargo.toml b/kittybox-rs/util/Cargo.toml
index b0b725c..f0653db 100644
--- a/kittybox-rs/util/Cargo.toml
+++ b/kittybox-rs/util/Cargo.toml
@@ -13,6 +13,9 @@ serde = { version = "^1.0.125", features = ["derive"] }
 serde_json = "^1.0.64"
 axum-core = "^0.2.6"
 http = "^0.2.7"
+async-trait = "^0.1.50"
+futures-util = "^0.3.14"
+uuid = "^1.3.3"
 [dependencies.rand]
 version = "^0.8.5"
 optional = true
diff --git a/kittybox-rs/util/src/lib.rs b/kittybox-rs/util/src/lib.rs
index 1c2f6f7..617ee97 100644
--- a/kittybox-rs/util/src/lib.rs
+++ b/kittybox-rs/util/src/lib.rs
@@ -41,6 +41,9 @@ pub mod auth {
     }
 }
 
+/// A collection of traits for implementing a robust job queue.
+pub mod queue;
+
 #[cfg(feature = "fs")]
 /// Commonly-used operations with the file system in Kittybox's
 /// underlying storage mechanisms.
diff --git a/kittybox-rs/util/src/queue.rs b/kittybox-rs/util/src/queue.rs
new file mode 100644
index 0000000..c880597
--- /dev/null
+++ b/kittybox-rs/util/src/queue.rs
@@ -0,0 +1,66 @@
+use futures_util::Stream;
+use std::pin::Pin;
+use uuid::Uuid;
+
+#[async_trait::async_trait]
+/// A job queue that can store and return jobs.
+pub trait JobQueue<T: JobItem>: Send + Sync + Sized + Clone + 'static {
+    /// A type of job object that will be returned by the queue.
+    type Job: Job<T, Self>;
+    /// Error type that the queue can produce in its work.
+    type Error: std::error::Error + Send + Sync + Sized;
+
+    /// Get one item from the job queue, if the job queue has pending
+    /// items available.
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if a job queue failed in some way. Having no
+    /// items is not a failure, in which case `Ok(None)` is returned.
+    async fn get_one(&self) -> Result<Option<Self::Job>, Self::Error>;
+    /// Put an item into a job queue, returning its UUID.
+    async fn put(&self, item: &T) -> Result<Uuid, Self::Error>;
+
+    /*
+    /// Check the amount of pending and stuck items in the job queue.
+    async fn len(&self) -> Result<(usize, usize), Self::Error>;
+    /// Returns whether the job queue has some pending items.
+    async fn is_empty(&self) -> Result<bool, Self::Error> {
+        Ok(self.len().await?.0 == 0)
+    }
+    /// Returns whether the job queue has some stuck items that
+    /// require manual cleanup.
+    async fn has_stuck(&self) -> Result<bool, Self::Error> {
+        Ok(self.len().await?.1 > 0)
+    }
+    */
+
+    /// Consume the job queue object and return a stream of unstuck
+    /// items from the job queue.
+    ///
+    /// Note that one item may be returned several times if it is not
+    /// marked as done.
+    async fn into_stream(self) -> Result<Pin<Box<dyn Stream<Item = Result<Self::Job, Self::Error>> + Send>>, Self::Error>;
+}
+
+#[async_trait::async_trait]
+/// A job description yielded from a job queue.
+///
+/// # Implementors
+///
+/// On [`Drop`], the job should be returned to a job queue. If your
+/// job queue tracks attempts, the counter should be incremented by
+/// one.
+///
+/// Figuring how to do this asynchronously from a synchronous trait
+/// is left as an exercise to the reader.
+pub trait Job<T: JobItem, Q: JobQueue<T>>: Send + Sync + Sized {
+    /// Get the object describing the task itself.
+    fn job(&self) -> &T;
+    /// Mark the job as done and remove it from the job queue.
+    async fn done(self) -> Result<(), Q::Error>;
+}
+
+/// An object describing the job itself, returned as part of a
+/// [`Job`].
+pub trait JobItem: Send + Sync + Sized + std::fmt::Debug {}