diff options
Diffstat (limited to 'src/webmentions')
-rw-r--r-- | src/webmentions/queue.rs | 14 |
1 files changed, 5 insertions, 9 deletions
diff --git a/src/webmentions/queue.rs b/src/webmentions/queue.rs index af1387f..dfa2a48 100644 --- a/src/webmentions/queue.rs +++ b/src/webmentions/queue.rs @@ -1,6 +1,4 @@ -use std::pin::Pin; - -use futures_util::{Stream, StreamExt}; +use futures_util::StreamExt; use sqlx::{postgres::PgListener, ConnectOptions, Executor}; use uuid::Uuid; @@ -8,7 +6,7 @@ use super::Webmention; static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!("./migrations/webmention"); -pub use kittybox_util::queue::{JobQueue, JobItem, Job}; +pub use kittybox_util::queue::{JobQueue, JobItem, Job, JobStream}; pub trait PostgresJobItem: JobItem + sqlx::FromRow<'static, sqlx::postgres::PgRow> { const DATABASE_NAME: &'static str; @@ -85,7 +83,6 @@ impl<T: PostgresJobItem> PostgresJob<T> { } } -#[async_trait::async_trait] impl Job<Webmention, PostgresJobQueue<Webmention>> for PostgresJob<Webmention> { fn job(&self) -> &Webmention { &self.job @@ -140,7 +137,6 @@ impl PostgresJobQueue<Webmention> { } } -#[async_trait::async_trait] impl JobQueue<Webmention> for PostgresJobQueue<Webmention> { type Job = PostgresJob<Webmention>; type Error = sqlx::Error; @@ -155,7 +151,7 @@ impl JobQueue<Webmention> for PostgresJobQueue<Webmention> { .await? { Some(job_row) => { - return Ok(Some(Self::Job { + Ok(Some(Self::Job { id: job_row.id, job: job_row.job, txn: Some(txn), @@ -174,11 +170,11 @@ impl JobQueue<Webmention> for PostgresJobQueue<Webmention> { .await } - async fn into_stream(self) -> Result<Pin<Box<dyn Stream<Item = Result<Self::Job, Self::Error>> + Send>>, Self::Error> { + async fn into_stream(self) -> Result<JobStream<Self::Job, Self::Error>, Self::Error> { let mut listener = PgListener::connect_with(&self.db).await?; listener.listen("incoming_webmention").await?; - let stream: Pin<Box<dyn Stream<Item = Result<Self::Job, Self::Error>> + Send>> = futures_util::stream::try_unfold((), { + let stream: JobStream<Self::Job, Self::Error> = futures_util::stream::try_unfold((), { let listener = std::sync::Arc::new(tokio::sync::Mutex::new(listener)); move |_| { let queue = self.clone(); |