about summary refs log tree commit diff
path: root/kittybox-rs/src/webmentions/queue.rs
diff options
context:
space:
mode:
Diffstat (limited to 'kittybox-rs/src/webmentions/queue.rs')
-rw-r--r--kittybox-rs/src/webmentions/queue.rs303
1 files changed, 0 insertions, 303 deletions
diff --git a/kittybox-rs/src/webmentions/queue.rs b/kittybox-rs/src/webmentions/queue.rs
deleted file mode 100644
index b811e71..0000000
--- a/kittybox-rs/src/webmentions/queue.rs
+++ /dev/null
@@ -1,303 +0,0 @@
-use std::{pin::Pin, str::FromStr};
-
-use futures_util::{Stream, StreamExt};
-use sqlx::{postgres::PgListener, Executor};
-use uuid::Uuid;
-
-use super::Webmention;
-
-static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!("./migrations/webmention");
-
-pub use kittybox_util::queue::{JobQueue, JobItem, Job};
-
-pub trait PostgresJobItem: JobItem + sqlx::FromRow<'static, sqlx::postgres::PgRow> {
-    const DATABASE_NAME: &'static str;
-    const NOTIFICATION_CHANNEL: &'static str;
-}
-
-#[derive(sqlx::FromRow)]
-struct PostgresJobRow<T: PostgresJobItem> {
-    id: Uuid,
-    #[sqlx(flatten)]
-    job: T
-}
-
-#[derive(Debug)]
-pub struct PostgresJob<T: PostgresJobItem> {
-    id: Uuid,
-    job: T,
-    // This will normally always be Some, except on drop
-    txn: Option<sqlx::Transaction<'static, sqlx::Postgres>>,
-    runtime_handle: tokio::runtime::Handle,
-}
-
-
-impl<T: PostgresJobItem> Drop for PostgresJob<T> {
-    // This is an emulation of "async drop" — the struct retains a
-    // runtime handle, which it uses to block on a future that does
-    // the actual cleanup.
-    //
-    // Of course, this is not portable between runtimes, but I don't
-    // care about that, since Kittybox is designed to work within the
-    // Tokio ecosystem.
-    fn drop(&mut self) {
-        tracing::error!("Job {:?} failed, incrementing attempts...", &self);
-        if let Some(mut txn) = self.txn.take() {
-            let id = self.id;
-            self.runtime_handle.spawn(async move {
-                tracing::debug!("Constructing query to increment attempts for job {}...", id);
-                // UPDATE "T::DATABASE_NAME" WHERE id = $1 SET attempts = attempts + 1
-                sqlx::query_builder::QueryBuilder::new("UPDATE ")
-                    // This is safe from a SQL injection standpoint, since it is a constant.
-                    .push(T::DATABASE_NAME)
-                    .push(" SET attempts = attempts + 1")
-                    .push(" WHERE id = ")
-                    .push_bind(id)
-                    .build()
-                    .execute(&mut *txn)
-                    .await
-                    .unwrap();
-                sqlx::query_builder::QueryBuilder::new("NOTIFY ")
-                    .push(T::NOTIFICATION_CHANNEL)
-                    .build()
-                    .execute(&mut *txn)
-                    .await
-                    .unwrap();
-                txn.commit().await.unwrap();
-            });
-        }
-    }
-}
-
-#[cfg(test)]
-impl<T: PostgresJobItem> PostgresJob<T> {
-    async fn attempts(&mut self) -> Result<usize, sqlx::Error> {
-        sqlx::query_builder::QueryBuilder::new("SELECT attempts FROM ")
-            .push(T::DATABASE_NAME)
-            .push(" WHERE id = ")
-            .push_bind(self.id)
-            .build_query_as::<(i32,)>()
-            // It's safe to unwrap here, because we "take" the txn only on drop or commit,
-            // where it's passed by value, not by reference.
-            .fetch_one(self.txn.as_deref_mut().unwrap())
-            .await
-            .map(|(i,)| i as usize)
-    }
-}
-
-#[async_trait::async_trait]
-impl Job<Webmention, PostgresJobQueue<Webmention>> for PostgresJob<Webmention> {
-    fn job(&self) -> &Webmention {
-        &self.job
-    }
-    async fn done(mut self) -> Result<(), <PostgresJobQueue<Webmention> as JobQueue<Webmention>>::Error> {
-        tracing::debug!("Deleting {} from the job queue", self.id);
-        sqlx::query("DELETE FROM kittybox_webmention.incoming_webmention_queue WHERE id = $1")
-            .bind(self.id)
-            .execute(self.txn.as_deref_mut().unwrap())
-            .await?;
-
-        self.txn.take().unwrap().commit().await
-    }
-}
-
-pub struct PostgresJobQueue<T> {
-    db: sqlx::PgPool,
-    _phantom: std::marker::PhantomData<T>
-}
-impl<T> Clone for PostgresJobQueue<T> {
-    fn clone(&self) -> Self {
-        Self {
-            db: self.db.clone(),
-            _phantom: std::marker::PhantomData
-        }
-    }
-}
-
-impl PostgresJobQueue<Webmention> {
-    pub async fn new(uri: &str) -> Result<Self, sqlx::Error> {
-        let mut options = sqlx::postgres::PgConnectOptions::from_str(uri)?
-            .options([("search_path", "kittybox_webmention")]);
-        if let Ok(password_file) = std::env::var("PGPASS_FILE") {
-            let password = tokio::fs::read_to_string(password_file).await.unwrap();
-            options = options.password(&password);
-        } else if let Ok(password) = std::env::var("PGPASS") {
-            options = options.password(&password)
-        }
-        Self::from_pool(
-            sqlx::postgres::PgPoolOptions::new()
-                .max_connections(50)
-                .connect_with(options)
-                .await?
-        ).await
-
-    }
-
-    pub(crate) async fn from_pool(db: sqlx::PgPool) -> Result<Self, sqlx::Error> {
-        db.execute(sqlx::query("CREATE SCHEMA IF NOT EXISTS kittybox_webmention")).await?;
-        MIGRATOR.run(&db).await?;
-        Ok(Self { db, _phantom: std::marker::PhantomData })
-    }
-}
-
-#[async_trait::async_trait]
-impl JobQueue<Webmention> for PostgresJobQueue<Webmention> {
-    type Job = PostgresJob<Webmention>;
-    type Error = sqlx::Error;
-
-    async fn get_one(&self) -> Result<Option<Self::Job>, Self::Error> {
-        let mut txn = self.db.begin().await?;
-
-        match sqlx::query_as::<_, PostgresJobRow<Webmention>>(
-            "SELECT id, source, target FROM kittybox_webmention.incoming_webmention_queue WHERE attempts < 5 FOR UPDATE SKIP LOCKED LIMIT 1"
-        )
-            .fetch_optional(&mut *txn)
-            .await?
-        {
-            Some(job_row) => {
-                return Ok(Some(Self::Job {
-                    id: job_row.id,
-                    job: job_row.job,
-                    txn: Some(txn),
-                    runtime_handle: tokio::runtime::Handle::current(),
-                }))
-            },
-            None => Ok(None)
-        }
-    }
-
-    async fn put(&self, item: &Webmention) -> Result<Uuid, Self::Error> {
-        sqlx::query_scalar::<_, Uuid>("INSERT INTO kittybox_webmention.incoming_webmention_queue (source, target) VALUES ($1, $2) RETURNING id")
-            .bind(item.source.as_str())
-            .bind(item.target.as_str())
-            .fetch_one(&self.db)
-            .await
-    }
-
-    async fn into_stream(self) -> Result<Pin<Box<dyn Stream<Item = Result<Self::Job, Self::Error>> + Send>>, Self::Error> {
-        let mut listener = PgListener::connect_with(&self.db).await?;
-        listener.listen("incoming_webmention").await?;
-
-        let stream: Pin<Box<dyn Stream<Item = Result<Self::Job, Self::Error>> + Send>> = futures_util::stream::try_unfold((), {
-            let listener = std::sync::Arc::new(tokio::sync::Mutex::new(listener));
-            move |_| {
-                let queue = self.clone();
-                let listener = listener.clone();
-                async move {
-                    loop {
-                        match queue.get_one().await? {
-                            Some(item) => return Ok(Some((item, ()))),
-                            None => {
-                                listener.lock().await.recv().await?;
-                                continue
-                            }
-                        }
-                    }
-                }
-            }
-        }).boxed();
-
-        Ok(stream)
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use std::sync::Arc;
-
-    use super::{Webmention, PostgresJobQueue, Job, JobQueue, MIGRATOR};
-    use futures_util::StreamExt;
-
-    #[sqlx::test(migrator = "MIGRATOR")]
-    #[tracing_test::traced_test]
-    async fn test_webmention_queue(pool: sqlx::PgPool) -> Result<(), sqlx::Error> {
-        let test_webmention = Webmention {
-            source: "https://fireburn.ru/posts/lorem-ipsum".to_owned(),
-            target: "https://aaronparecki.com/posts/dolor-sit-amet".to_owned()
-        };
-
-        let queue = PostgresJobQueue::<Webmention>::from_pool(pool).await?;
-        tracing::debug!("Putting webmention into queue");
-        queue.put(&test_webmention).await?;
-        {
-            let mut job_description = queue.get_one().await?.unwrap();
-            assert_eq!(job_description.job(), &test_webmention);
-            assert_eq!(job_description.attempts().await?, 0);
-        }
-        tracing::debug!("Creating a stream");
-        let mut stream = queue.clone().into_stream().await?;
-
-        {
-            let mut guard = stream.next().await.transpose()?.unwrap();
-            assert_eq!(guard.job(), &test_webmention);
-            assert_eq!(guard.attempts().await?, 1);
-            if let Some(item) = queue.get_one().await? {
-                panic!("Unexpected item {:?} returned from job queue!", item)
-            };
-        }
-
-        {
-            let mut guard = stream.next().await.transpose()?.unwrap();
-            assert_eq!(guard.job(), &test_webmention);
-            assert_eq!(guard.attempts().await?, 2);
-            guard.done().await?;
-        }
-
-        match queue.get_one().await? {
-            Some(item) => panic!("Unexpected item {:?} returned from job queue!", item),
-            None => Ok(())
-        }
-    }
-
-    #[sqlx::test(migrator = "MIGRATOR")]
-    #[tracing_test::traced_test]
-    async fn test_no_hangups_in_queue(pool: sqlx::PgPool) -> Result<(), sqlx::Error> {
-        let test_webmention = Webmention {
-            source: "https://fireburn.ru/posts/lorem-ipsum".to_owned(),
-            target: "https://aaronparecki.com/posts/dolor-sit-amet".to_owned()
-        };
-
-        let queue = PostgresJobQueue::<Webmention>::from_pool(pool.clone()).await?;
-        tracing::debug!("Putting webmention into queue");
-        queue.put(&test_webmention).await?;
-        tracing::debug!("Creating a stream");
-        let mut stream = queue.clone().into_stream().await?;
-
-        // Synchronisation barrier that will be useful later
-        let barrier = Arc::new(tokio::sync::Barrier::new(2));
-        {
-            // Get one job guard from a queue
-            let mut guard = stream.next().await.transpose()?.unwrap();
-            assert_eq!(guard.job(), &test_webmention);
-            assert_eq!(guard.attempts().await?, 0);
-
-            tokio::task::spawn({
-                let barrier = barrier.clone();
-                async move {
-                    // Wait for the signal to drop the guard!
-                    barrier.wait().await;
-
-                    drop(guard)
-                }
-            });
-        }
-        tokio::time::timeout(std::time::Duration::from_secs(1), stream.next()).await.unwrap_err();
-
-        let future = tokio::task::spawn(
-            tokio::time::timeout(
-                std::time::Duration::from_secs(10), async move {
-                    stream.next().await.unwrap().unwrap()
-                }
-            )
-        );
-        // Let the other task drop the guard it is holding
-        barrier.wait().await;
-        let mut guard = future.await
-            .expect("Timeout on fetching item")
-            .expect("Job queue error");
-        assert_eq!(guard.job(), &test_webmention);
-        assert_eq!(guard.attempts().await?, 1);
-
-        Ok(())
-    }
-}