about summary refs log blame commit diff
path: root/util/src/queue.rs
blob: edbec86e7d1378e0a0c25e85ae8f7ae4ab8d1547 (plain) (tree)
1
2
3
4
5
6
7
8
                        


                         

                                                                            












                                                                       
                                                                                             
                                                         
                                                                                      

                                                                     
                                                                                      














                                                                     
                                                                                                                
 












                                                                   
                                                                       



                                                              
use std::future::Future;
use futures_util::Stream;
use std::pin::Pin;
use uuid::Uuid;

/// A stream of jobs from the job queue.
pub type JobStream<J, E> = Pin<Box<dyn Stream<Item = Result<J, E>> + Send>>;

/// A job queue that can store and return jobs.
pub trait JobQueue<T: JobItem>: Send + Sync + Sized + Clone + 'static {
    /// A type of job object that will be returned by the queue.
    type Job: Job<T, Self>;
    /// Error type that the queue can produce in its work.
    type Error: std::error::Error + Send + Sync + Sized;

    /// Get one item from the job queue, if the job queue has pending
    /// items available.
    ///
    /// # Errors
    ///
    /// Returns an error if a job queue failed in some way. Having no
    /// items is not a failure, in which case `Ok(None)` is returned.
    #[must_use = "Undone jobs get put back into the queue."]
    fn get_one(&self) -> impl Future<Output = Result<Option<Self::Job>, Self::Error>> + Send;
    /// Put an item into a job queue, returning its UUID.
    fn put(&self, item: &T) -> impl Future<Output = Result<Uuid, Self::Error>> + Send;

    /*
    /// Check the amount of pending and stuck items in the job queue.
    fn len(&self) -> impl Future<Output = Result<(usize, usize), Self::Error>> + Send;
    /// Returns whether the job queue has some pending items.
    async fn is_empty(&self) -> Result<bool, Self::Error> {
        Ok(self.len().await?.0 == 0)
    }
    /// Returns whether the job queue has some stuck items that
    /// require manual cleanup.
    async fn has_stuck(&self) -> Result<bool, Self::Error> {
        Ok(self.len().await?.1 > 0)
    }
    */

    /// Consume the job queue object and return a stream of unstuck
    /// items from the job queue.
    ///
    /// Note that one item may be returned several times if it is not
    /// marked as done.
    fn into_stream(self) -> impl Future<Output = Result<JobStream<Self::Job, Self::Error>, Self::Error>> + Send;
}

/// A job description yielded from a job queue.
///
/// # Implementors
///
/// On [`Drop`], the job should be returned to a job queue. If your
/// job queue tracks attempts, the counter should be incremented by
/// one.
///
/// Figuring how to do this asynchronously from a synchronous trait
/// is left as an exercise to the reader.
pub trait Job<T: JobItem, Q: JobQueue<T>>: Send + Sync + Sized {
    /// Get the object describing the task itself.
    fn job(&self) -> &T;
    /// Mark the job as done and remove it from the job queue.
    fn done(self) -> impl Future<Output = Result<(), Q::Error>> + Send;
}

/// An object describing the job itself, returned as part of a
/// [`Job`].
pub trait JobItem: Send + Sync + Sized + std::fmt::Debug {}