about summary refs log tree commit diff
path: root/src/webmentions
diff options
context:
space:
mode:
Diffstat (limited to 'src/webmentions')
-rw-r--r--src/webmentions/queue.rs14
1 files changed, 5 insertions, 9 deletions
diff --git a/src/webmentions/queue.rs b/src/webmentions/queue.rs
index af1387f..dfa2a48 100644
--- a/src/webmentions/queue.rs
+++ b/src/webmentions/queue.rs
@@ -1,6 +1,4 @@
-use std::pin::Pin;
-
-use futures_util::{Stream, StreamExt};
+use futures_util::StreamExt;
 use sqlx::{postgres::PgListener, ConnectOptions, Executor};
 use uuid::Uuid;
 
@@ -8,7 +6,7 @@ use super::Webmention;
 
 static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!("./migrations/webmention");
 
-pub use kittybox_util::queue::{JobQueue, JobItem, Job};
+pub use kittybox_util::queue::{JobQueue, JobItem, Job, JobStream};
 
 pub trait PostgresJobItem: JobItem + sqlx::FromRow<'static, sqlx::postgres::PgRow> {
     const DATABASE_NAME: &'static str;
@@ -85,7 +83,6 @@ impl<T: PostgresJobItem> PostgresJob<T> {
     }
 }
 
-#[async_trait::async_trait]
 impl Job<Webmention, PostgresJobQueue<Webmention>> for PostgresJob<Webmention> {
     fn job(&self) -> &Webmention {
         &self.job
@@ -140,7 +137,6 @@ impl PostgresJobQueue<Webmention> {
     }
 }
 
-#[async_trait::async_trait]
 impl JobQueue<Webmention> for PostgresJobQueue<Webmention> {
     type Job = PostgresJob<Webmention>;
     type Error = sqlx::Error;
@@ -155,7 +151,7 @@ impl JobQueue<Webmention> for PostgresJobQueue<Webmention> {
             .await?
         {
             Some(job_row) => {
-                return Ok(Some(Self::Job {
+                Ok(Some(Self::Job {
                     id: job_row.id,
                     job: job_row.job,
                     txn: Some(txn),
@@ -174,11 +170,11 @@ impl JobQueue<Webmention> for PostgresJobQueue<Webmention> {
             .await
     }
 
-    async fn into_stream(self) -> Result<Pin<Box<dyn Stream<Item = Result<Self::Job, Self::Error>> + Send>>, Self::Error> {
+    async fn into_stream(self) -> Result<JobStream<Self::Job, Self::Error>, Self::Error> {
         let mut listener = PgListener::connect_with(&self.db).await?;
         listener.listen("incoming_webmention").await?;
 
-        let stream: Pin<Box<dyn Stream<Item = Result<Self::Job, Self::Error>> + Send>> = futures_util::stream::try_unfold((), {
+        let stream: JobStream<Self::Job, Self::Error> = futures_util::stream::try_unfold((), {
             let listener = std::sync::Arc::new(tokio::sync::Mutex::new(listener));
             move |_| {
                 let queue = self.clone();