use std::{pin::Pin, str::FromStr}; use futures_util::{Stream, StreamExt}; use sqlx::postgres::PgListener; use uuid::Uuid; use super::Webmention; static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!(); #[async_trait::async_trait] pub trait JobQueue: Send + Sync + Sized + Clone + 'static { type Job: Job; type Error: std::error::Error + Send + Sync + Sized; async fn get_one(&self) -> Result, Self::Error>; async fn put(&self, item: &T) -> Result; async fn into_stream(self) -> Result> + Send>>, Self::Error>; } #[async_trait::async_trait] pub trait Job>: Send + Sync + Sized { fn job(&self) -> &T; async fn done(self) -> Result<(), Q::Error>; } pub trait JobItem: Send + Sync + Sized + std::fmt::Debug { const DATABASE_NAME: &'static str; } #[derive(Debug)] pub struct PostgresJobItem { id: Uuid, job: T, // This will normally always be Some, except on drop txn: Option>, runtime_handle: tokio::runtime::Handle, } impl Drop for PostgresJobItem { // This is an emulation of "async drop" — the struct retains a // runtime handle, which it uses to block on a future that does // the actual cleanup. // // Of course, this is not portable between runtimes, but I don't // care about that, since Kittybox is designed to work within the // Tokio ecosystem. fn drop(&mut self) { tracing::error!("Job {:?} failed, incrementing attempts...", &self); if let Some(mut txn) = self.txn.take() { let id = self.id; self.runtime_handle.spawn(async move { tracing::debug!("Constructing query to increment attempts for job {}...", id); // UPDATE "T::DATABASE_NAME" WHERE id = $1 SET attempts = attempts + 1 sqlx::query_builder::QueryBuilder::new("UPDATE ") // This is safe from a SQL injection standpoint, since it is a constant. .push(T::DATABASE_NAME) .push(" SET attempts = attempts + 1") .push(" WHERE id = ") .push_bind(id) .build() .execute(&mut txn) .await .unwrap(); txn.commit().await.unwrap(); }); } } } #[async_trait::async_trait] impl Job> for PostgresJobItem { fn job(&self) -> &Webmention { &self.job } async fn done(mut self) -> Result<(), as JobQueue>::Error> { tracing::debug!("Deleting {} from the job queue", self.id); sqlx::query("DELETE FROM kittybox.incoming_webmention_queue WHERE id = $1") .bind(self.id) .execute(self.txn.as_mut().unwrap()) .await?; self.txn.take().unwrap().commit().await } } pub struct PostgresJobQueue { db: sqlx::PgPool, _phantom: std::marker::PhantomData } impl Clone for PostgresJobQueue { fn clone(&self) -> Self { Self { db: self.db.clone(), _phantom: std::marker::PhantomData } } } impl PostgresJobQueue { pub async fn new(uri: &str) -> Result { let mut options = sqlx::postgres::PgConnectOptions::from_str(uri)?; if let Ok(password_file) = std::env::var("PGPASS_FILE") { let password = tokio::fs::read_to_string(password_file).await.unwrap(); options = options.password(&password); } else if let Ok(password) = std::env::var("PGPASS") { options = options.password(&password) } Ok(Self::from_pool( sqlx::postgres::PgPoolOptions::new() .max_connections(50) .connect_with(options) .await? ).await?) } pub(crate) async fn from_pool(db: sqlx::PgPool) -> Result { MIGRATOR.run(&db).await?; Ok(Self { db, _phantom: std::marker::PhantomData }) } } #[async_trait::async_trait] impl JobQueue for PostgresJobQueue { type Job = PostgresJobItem; type Error = sqlx::Error; async fn get_one(&self) -> Result, Self::Error> { let mut txn = self.db.begin().await?; match sqlx::query_as::<_, (Uuid, String, String)>( "SELECT id, source, target FROM kittybox.incoming_webmention_queue WHERE attempts < 5 FOR UPDATE SKIP LOCKED LIMIT 1" ) .fetch_optional(&mut txn) .await? .map(|(id, source, target)| (id, Webmention { source, target })) { Some((id, webmention)) => { return Ok(Some(Self::Job { id, job: webmention, txn: Some(txn), runtime_handle: tokio::runtime::Handle::current(), })) }, None => Ok(None) } } async fn put(&self, item: &Webmention) -> Result { sqlx::query_scalar::<_, Uuid>("INSERT INTO kittybox.incoming_webmention_queue (source, target) VALUES ($1, $2) RETURNING id") .bind(item.source.as_str()) .bind(item.target.as_str()) .fetch_one(&self.db) .await } async fn into_stream(self) -> Result> + Send>>, Self::Error> { let mut listener = PgListener::connect_with(&self.db).await?; listener.listen("incoming_webmention").await?; let stream: Pin> + Send>> = futures_util::stream::try_unfold((), { let listener = std::sync::Arc::new(tokio::sync::Mutex::new(listener)); move |_| { let queue = self.clone(); let listener = listener.clone(); async move { loop { match queue.get_one().await? { Some(item) => return Ok(Some((item, ()))), None => { listener.lock().await.recv().await?; continue } } } } } }).boxed(); Ok(stream) } } #[cfg(test)] mod tests { use super::{Webmention, PostgresJobQueue, Job, JobQueue}; use futures_util::StreamExt; #[sqlx::test] async fn test_webmention_queue(pool: sqlx::PgPool) -> Result<(), sqlx::Error> { let test_webmention = Webmention { source: "https://fireburn.ru/posts/lorem-ipsum".to_owned(), target: "https://aaronparecki.com/posts/dolor-sit-amet".to_owned() }; let queue = PostgresJobQueue::::from_pool(pool).await?; println!("Putting webmention into queue"); queue.put(&test_webmention).await?; assert_eq!(queue.get_one().await?.as_ref().map(|j| j.job()), Some(&test_webmention)); println!("Creating a stream"); let mut stream = queue.clone().into_stream().await?; let future = stream.next(); let guard = future.await.transpose()?.unwrap(); assert_eq!(guard.job(), &test_webmention); if let Some(item) = queue.get_one().await? { panic!("Unexpected item {:?} returned from job queue!", item) }; drop(guard); let guard = stream.next().await.transpose()?.unwrap(); assert_eq!(guard.job(), &test_webmention); guard.done().await?; match queue.get_one().await? { Some(item) => panic!("Unexpected item {:?} returned from job queue!", item), None => Ok(()) } } }