diff options
author | Vika <vika@fireburn.ru> | 2023-07-09 21:06:55 +0300 |
---|---|---|
committer | Vika <vika@fireburn.ru> | 2023-07-09 21:06:55 +0300 |
commit | b38b508366a80a2f1c163ae3623c79e883323201 (patch) | |
tree | 03d3ea5c90b9e64641b4d32c8accec5a2380e570 /kittybox-rs/util | |
parent | 89a35e9dbeb972b50008c7272a87f8eed5121d8c (diff) | |
download | kittybox-b38b508366a80a2f1c163ae3623c79e883323201.tar.zst |
webmentions/queue: move JobQueue trait into kittybox-util
The trait itself seems basic enough that it could be reused elsewhere. Better to keep it in a separate crate. `-util` is a dumping ground for various things anyway.
Diffstat (limited to 'kittybox-rs/util')
-rw-r--r-- | kittybox-rs/util/Cargo.toml | 3 | ||||
-rw-r--r-- | kittybox-rs/util/src/lib.rs | 3 | ||||
-rw-r--r-- | kittybox-rs/util/src/queue.rs | 66 |
3 files changed, 72 insertions, 0 deletions
diff --git a/kittybox-rs/util/Cargo.toml b/kittybox-rs/util/Cargo.toml index b0b725c..f0653db 100644 --- a/kittybox-rs/util/Cargo.toml +++ b/kittybox-rs/util/Cargo.toml @@ -13,6 +13,9 @@ serde = { version = "^1.0.125", features = ["derive"] } serde_json = "^1.0.64" axum-core = "^0.2.6" http = "^0.2.7" +async-trait = "^0.1.50" +futures-util = "^0.3.14" +uuid = "^1.3.3" [dependencies.rand] version = "^0.8.5" optional = true diff --git a/kittybox-rs/util/src/lib.rs b/kittybox-rs/util/src/lib.rs index 1c2f6f7..617ee97 100644 --- a/kittybox-rs/util/src/lib.rs +++ b/kittybox-rs/util/src/lib.rs @@ -41,6 +41,9 @@ pub mod auth { } } +/// A collection of traits for implementing a robust job queue. +pub mod queue; + #[cfg(feature = "fs")] /// Commonly-used operations with the file system in Kittybox's /// underlying storage mechanisms. diff --git a/kittybox-rs/util/src/queue.rs b/kittybox-rs/util/src/queue.rs new file mode 100644 index 0000000..c880597 --- /dev/null +++ b/kittybox-rs/util/src/queue.rs @@ -0,0 +1,66 @@ +use futures_util::Stream; +use std::pin::Pin; +use uuid::Uuid; + +#[async_trait::async_trait] +/// A job queue that can store and return jobs. +pub trait JobQueue<T: JobItem>: Send + Sync + Sized + Clone + 'static { + /// A type of job object that will be returned by the queue. + type Job: Job<T, Self>; + /// Error type that the queue can produce in its work. + type Error: std::error::Error + Send + Sync + Sized; + + /// Get one item from the job queue, if the job queue has pending + /// items available. + /// + /// # Errors + /// + /// Returns an error if a job queue failed in some way. Having no + /// items is not a failure, in which case `Ok(None)` is returned. + async fn get_one(&self) -> Result<Option<Self::Job>, Self::Error>; + /// Put an item into a job queue, returning its UUID. + async fn put(&self, item: &T) -> Result<Uuid, Self::Error>; + + /* + /// Check the amount of pending and stuck items in the job queue. + async fn len(&self) -> Result<(usize, usize), Self::Error>; + /// Returns whether the job queue has some pending items. + async fn is_empty(&self) -> Result<bool, Self::Error> { + Ok(self.len().await?.0 == 0) + } + /// Returns whether the job queue has some stuck items that + /// require manual cleanup. + async fn has_stuck(&self) -> Result<bool, Self::Error> { + Ok(self.len().await?.1 > 0) + } + */ + + /// Consume the job queue object and return a stream of unstuck + /// items from the job queue. + /// + /// Note that one item may be returned several times if it is not + /// marked as done. + async fn into_stream(self) -> Result<Pin<Box<dyn Stream<Item = Result<Self::Job, Self::Error>> + Send>>, Self::Error>; +} + +#[async_trait::async_trait] +/// A job description yielded from a job queue. +/// +/// # Implementors +/// +/// On [`Drop`], the job should be returned to a job queue. If your +/// job queue tracks attempts, the counter should be incremented by +/// one. +/// +/// Figuring how to do this asynchronously from a synchronous trait +/// is left as an exercise to the reader. +pub trait Job<T: JobItem, Q: JobQueue<T>>: Send + Sync + Sized { + /// Get the object describing the task itself. + fn job(&self) -> &T; + /// Mark the job as done and remove it from the job queue. + async fn done(self) -> Result<(), Q::Error>; +} + +/// An object describing the job itself, returned as part of a +/// [`Job`]. +pub trait JobItem: Send + Sync + Sized + std::fmt::Debug {} |