about summary refs log tree commit diff
path: root/kittybox-rs/util
diff options
context:
space:
mode:
authorVika <vika@fireburn.ru>2023-07-09 21:06:55 +0300
committerVika <vika@fireburn.ru>2023-07-09 21:06:55 +0300
commitb38b508366a80a2f1c163ae3623c79e883323201 (patch)
tree03d3ea5c90b9e64641b4d32c8accec5a2380e570 /kittybox-rs/util
parent89a35e9dbeb972b50008c7272a87f8eed5121d8c (diff)
downloadkittybox-b38b508366a80a2f1c163ae3623c79e883323201.tar.zst
webmentions/queue: move JobQueue trait into kittybox-util
The trait itself seems basic enough that it could be reused
elsewhere. Better to keep it in a separate crate.

`-util` is a dumping ground for various things anyway.
Diffstat (limited to 'kittybox-rs/util')
-rw-r--r--kittybox-rs/util/Cargo.toml3
-rw-r--r--kittybox-rs/util/src/lib.rs3
-rw-r--r--kittybox-rs/util/src/queue.rs66
3 files changed, 72 insertions, 0 deletions
diff --git a/kittybox-rs/util/Cargo.toml b/kittybox-rs/util/Cargo.toml
index b0b725c..f0653db 100644
--- a/kittybox-rs/util/Cargo.toml
+++ b/kittybox-rs/util/Cargo.toml
@@ -13,6 +13,9 @@ serde = { version = "^1.0.125", features = ["derive"] }
 serde_json = "^1.0.64"
 axum-core = "^0.2.6"
 http = "^0.2.7"
+async-trait = "^0.1.50"
+futures-util = "^0.3.14"
+uuid = "^1.3.3"
 [dependencies.rand]
 version = "^0.8.5"
 optional = true
diff --git a/kittybox-rs/util/src/lib.rs b/kittybox-rs/util/src/lib.rs
index 1c2f6f7..617ee97 100644
--- a/kittybox-rs/util/src/lib.rs
+++ b/kittybox-rs/util/src/lib.rs
@@ -41,6 +41,9 @@ pub mod auth {
     }
 }
 
+/// A collection of traits for implementing a robust job queue.
+pub mod queue;
+
 #[cfg(feature = "fs")]
 /// Commonly-used operations with the file system in Kittybox's
 /// underlying storage mechanisms.
diff --git a/kittybox-rs/util/src/queue.rs b/kittybox-rs/util/src/queue.rs
new file mode 100644
index 0000000..c880597
--- /dev/null
+++ b/kittybox-rs/util/src/queue.rs
@@ -0,0 +1,66 @@
+use futures_util::Stream;
+use std::pin::Pin;
+use uuid::Uuid;
+
+#[async_trait::async_trait]
+/// A job queue that can store and return jobs.
+pub trait JobQueue<T: JobItem>: Send + Sync + Sized + Clone + 'static {
+    /// A type of job object that will be returned by the queue.
+    type Job: Job<T, Self>;
+    /// Error type that the queue can produce in its work.
+    type Error: std::error::Error + Send + Sync + Sized;
+
+    /// Get one item from the job queue, if the job queue has pending
+    /// items available.
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if a job queue failed in some way. Having no
+    /// items is not a failure, in which case `Ok(None)` is returned.
+    async fn get_one(&self) -> Result<Option<Self::Job>, Self::Error>;
+    /// Put an item into a job queue, returning its UUID.
+    async fn put(&self, item: &T) -> Result<Uuid, Self::Error>;
+
+    /*
+    /// Check the amount of pending and stuck items in the job queue.
+    async fn len(&self) -> Result<(usize, usize), Self::Error>;
+    /// Returns whether the job queue has some pending items.
+    async fn is_empty(&self) -> Result<bool, Self::Error> {
+        Ok(self.len().await?.0 == 0)
+    }
+    /// Returns whether the job queue has some stuck items that
+    /// require manual cleanup.
+    async fn has_stuck(&self) -> Result<bool, Self::Error> {
+        Ok(self.len().await?.1 > 0)
+    }
+    */
+
+    /// Consume the job queue object and return a stream of unstuck
+    /// items from the job queue.
+    ///
+    /// Note that one item may be returned several times if it is not
+    /// marked as done.
+    async fn into_stream(self) -> Result<Pin<Box<dyn Stream<Item = Result<Self::Job, Self::Error>> + Send>>, Self::Error>;
+}
+
+#[async_trait::async_trait]
+/// A job description yielded from a job queue.
+///
+/// # Implementors
+///
+/// On [`Drop`], the job should be returned to a job queue. If your
+/// job queue tracks attempts, the counter should be incremented by
+/// one.
+///
+/// Figuring how to do this asynchronously from a synchronous trait
+/// is left as an exercise to the reader.
+pub trait Job<T: JobItem, Q: JobQueue<T>>: Send + Sync + Sized {
+    /// Get the object describing the task itself.
+    fn job(&self) -> &T;
+    /// Mark the job as done and remove it from the job queue.
+    async fn done(self) -> Result<(), Q::Error>;
+}
+
+/// An object describing the job itself, returned as part of a
+/// [`Job`].
+pub trait JobItem: Send + Sync + Sized + std::fmt::Debug {}