about summary refs log tree commit diff
path: root/src/webmentions/queue.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/webmentions/queue.rs')
-rw-r--r--src/webmentions/queue.rs60
1 files changed, 34 insertions, 26 deletions
diff --git a/src/webmentions/queue.rs b/src/webmentions/queue.rs
index 52bcdfa..a33de1a 100644
--- a/src/webmentions/queue.rs
+++ b/src/webmentions/queue.rs
@@ -6,7 +6,7 @@ use super::Webmention;
 
 static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!("./migrations/webmention");
 
-pub use kittybox_util::queue::{JobQueue, JobItem, Job, JobStream};
+pub use kittybox_util::queue::{Job, JobItem, JobQueue, JobStream};
 
 pub trait PostgresJobItem: JobItem + sqlx::FromRow<'static, sqlx::postgres::PgRow> {
     const DATABASE_NAME: &'static str;
@@ -17,7 +17,7 @@ pub trait PostgresJobItem: JobItem + sqlx::FromRow<'static, sqlx::postgres::PgRo
 struct PostgresJobRow<T: PostgresJobItem> {
     id: Uuid,
     #[sqlx(flatten)]
-    job: T
+    job: T,
 }
 
 #[derive(Debug)]
@@ -29,7 +29,6 @@ pub struct PostgresJob<T: PostgresJobItem> {
     runtime_handle: tokio::runtime::Handle,
 }
 
-
 impl<T: PostgresJobItem> Drop for PostgresJob<T> {
     // This is an emulation of "async drop" — the struct retains a
     // runtime handle, which it uses to block on a future that does
@@ -87,7 +86,9 @@ impl Job<Webmention, PostgresJobQueue<Webmention>> for PostgresJob<Webmention> {
     fn job(&self) -> &Webmention {
         &self.job
     }
-    async fn done(mut self) -> Result<(), <PostgresJobQueue<Webmention> as JobQueue<Webmention>>::Error> {
+    async fn done(
+        mut self,
+    ) -> Result<(), <PostgresJobQueue<Webmention> as JobQueue<Webmention>>::Error> {
         tracing::debug!("Deleting {} from the job queue", self.id);
         sqlx::query("DELETE FROM kittybox_webmention.incoming_webmention_queue WHERE id = $1")
             .bind(self.id)
@@ -100,13 +101,13 @@ impl Job<Webmention, PostgresJobQueue<Webmention>> for PostgresJob<Webmention> {
 
 pub struct PostgresJobQueue<T> {
     db: sqlx::PgPool,
-    _phantom: std::marker::PhantomData<T>
+    _phantom: std::marker::PhantomData<T>,
 }
 impl<T> Clone for PostgresJobQueue<T> {
     fn clone(&self) -> Self {
         Self {
             db: self.db.clone(),
-            _phantom: std::marker::PhantomData
+            _phantom: std::marker::PhantomData,
         }
     }
 }
@@ -120,15 +121,21 @@ impl PostgresJobQueue<Webmention> {
             sqlx::postgres::PgPoolOptions::new()
                 .max_connections(50)
                 .connect_with(options)
-                .await?
-        ).await
-
+                .await?,
+        )
+        .await
     }
 
     pub(crate) async fn from_pool(db: sqlx::PgPool) -> Result<Self, sqlx::Error> {
-        db.execute(sqlx::query("CREATE SCHEMA IF NOT EXISTS kittybox_webmention")).await?;
+        db.execute(sqlx::query(
+            "CREATE SCHEMA IF NOT EXISTS kittybox_webmention",
+        ))
+        .await?;
         MIGRATOR.run(&db).await?;
-        Ok(Self { db, _phantom: std::marker::PhantomData })
+        Ok(Self {
+            db,
+            _phantom: std::marker::PhantomData,
+        })
     }
 }
 
@@ -180,13 +187,14 @@ impl JobQueue<Webmention> for PostgresJobQueue<Webmention> {
                             Some(item) => return Ok(Some((item, ()))),
                             None => {
                                 listener.lock().await.recv().await?;
-                                continue
+                                continue;
                             }
                         }
                     }
                 }
             }
-        }).boxed();
+        })
+        .boxed();
 
         Ok(stream)
     }
@@ -196,7 +204,7 @@ impl JobQueue<Webmention> for PostgresJobQueue<Webmention> {
 mod tests {
     use std::sync::Arc;
 
-    use super::{Webmention, PostgresJobQueue, Job, JobQueue, MIGRATOR};
+    use super::{Job, JobQueue, PostgresJobQueue, Webmention, MIGRATOR};
     use futures_util::StreamExt;
 
     #[sqlx::test(migrator = "MIGRATOR")]
@@ -204,7 +212,7 @@ mod tests {
     async fn test_webmention_queue(pool: sqlx::PgPool) -> Result<(), sqlx::Error> {
         let test_webmention = Webmention {
             source: "https://fireburn.ru/posts/lorem-ipsum".to_owned(),
-            target: "https://aaronparecki.com/posts/dolor-sit-amet".to_owned()
+            target: "https://aaronparecki.com/posts/dolor-sit-amet".to_owned(),
         };
 
         let queue = PostgresJobQueue::<Webmention>::from_pool(pool).await?;
@@ -236,7 +244,7 @@ mod tests {
 
         match queue.get_one().await? {
             Some(item) => panic!("Unexpected item {:?} returned from job queue!", item),
-            None => Ok(())
+            None => Ok(()),
         }
     }
 
@@ -245,7 +253,7 @@ mod tests {
     async fn test_no_hangups_in_queue(pool: sqlx::PgPool) -> Result<(), sqlx::Error> {
         let test_webmention = Webmention {
             source: "https://fireburn.ru/posts/lorem-ipsum".to_owned(),
-            target: "https://aaronparecki.com/posts/dolor-sit-amet".to_owned()
+            target: "https://aaronparecki.com/posts/dolor-sit-amet".to_owned(),
         };
 
         let queue = PostgresJobQueue::<Webmention>::from_pool(pool.clone()).await?;
@@ -272,18 +280,18 @@ mod tests {
                 }
             });
         }
-        tokio::time::timeout(std::time::Duration::from_secs(1), stream.next()).await.unwrap_err();
+        tokio::time::timeout(std::time::Duration::from_secs(1), stream.next())
+            .await
+            .unwrap_err();
 
-        let future = tokio::task::spawn(
-            tokio::time::timeout(
-                std::time::Duration::from_secs(10), async move {
-                    stream.next().await.unwrap().unwrap()
-                }
-            )
-        );
+        let future = tokio::task::spawn(tokio::time::timeout(
+            std::time::Duration::from_secs(10),
+            async move { stream.next().await.unwrap().unwrap() },
+        ));
         // Let the other task drop the guard it is holding
         barrier.wait().await;
-        let mut guard = future.await
+        let mut guard = future
+            .await
             .expect("Timeout on fetching item")
             .expect("Job queue error");
         assert_eq!(guard.job(), &test_webmention);