use std::{pin::Pin, str::FromStr};
use futures_util::{Stream, StreamExt};
use sqlx::{postgres::PgListener, Executor};
use uuid::Uuid;
use super::Webmention;
static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!("./migrations/webmention");
pub use kittybox_util::queue::{JobQueue, JobItem, Job};
pub trait PostgresJobItem: JobItem + sqlx::FromRow<'static, sqlx::postgres::PgRow> {
const DATABASE_NAME: &'static str;
const NOTIFICATION_CHANNEL: &'static str;
struct PostgresJobRow<T: PostgresJobItem> {
id: Uuid,
job: T
pub struct PostgresJob<T: PostgresJobItem> {
id: Uuid,
job: T,
// This will normally always be Some, except on drop
txn: Option<sqlx::Transaction<'static, sqlx::Postgres>>,
runtime_handle: tokio::runtime::Handle,
impl<T: PostgresJobItem> Drop for PostgresJob<T> {
// This is an emulation of "async drop" — the struct retains a
// runtime handle, which it uses to block on a future that does
// the actual cleanup.
// Of course, this is not portable between runtimes, but I don't
// care about that, since Kittybox is designed to work within the
// Tokio ecosystem.
fn drop(&mut self) {
tracing::error!("Job {:?} failed, incrementing attempts...", &self);
if let Some(mut txn) = self.txn.take() {
let id =;
self.runtime_handle.spawn(async move {
tracing::debug!("Constructing query to increment attempts for job {}...", id);
// UPDATE "T::DATABASE_NAME" WHERE id = $1 SET attempts = attempts + 1
sqlx::query_builder::QueryBuilder::new("UPDATE ")
// This is safe from a SQL injection standpoint, since it is a constant.
.push(" SET attempts = attempts + 1")
.push(" WHERE id = ")
.execute(&mut *txn)
sqlx::query_builder::QueryBuilder::new("NOTIFY ")
.execute(&mut *txn)
impl<T: PostgresJobItem> PostgresJob<T> {
async fn attempts(&mut self) -> Result<usize, sqlx::Error> {
sqlx::query_builder::QueryBuilder::new("SELECT attempts FROM ")
.push(" WHERE id = ")
// It's safe to unwrap here, because we "take" the txn only on drop or commit,
// where it's passed by value, not by reference.
.map(|(i,)| i as usize)
impl Job<Webmention, PostgresJobQueue<Webmention>> for PostgresJob<Webmention> {
fn job(&self) -> &Webmention {
async fn done(mut self) -> Result<(), <PostgresJobQueue<Webmention> as JobQueue<Webmention>>::Error> {
tracing::debug!("Deleting {} from the job queue",;
sqlx::query("DELETE FROM kittybox_webmention.incoming_webmention_queue WHERE id = $1")
pub struct PostgresJobQueue<T> {
db: sqlx::PgPool,
_phantom: std::marker::PhantomData<T>
impl<T> Clone for PostgresJobQueue<T> {
fn clone(&self) -> Self {
Self {
db: self.db.clone(),
_phantom: std::marker::PhantomData
impl PostgresJobQueue<Webmention> {
pub async fn new(uri: &str) -> Result<Self, sqlx::Error> {
let mut options = sqlx::postgres::PgConnectOptions::from_str(uri)?
.options([("search_path", "kittybox_webmention")]);
if let Ok(password_file) = std::env::var("PGPASS_FILE") {
let password = tokio::fs::read_to_string(password_file).await.unwrap();
options = options.password(&password);
} else if let Ok(password) = std::env::var("PGPASS") {
options = options.password(&password)
pub(crate) async fn from_pool(db: sqlx::PgPool) -> Result<Self, sqlx::Error> {
db.execute(sqlx::query("CREATE SCHEMA IF NOT EXISTS kittybox_webmention")).await?;;
Ok(Self { db, _phantom: std::marker::PhantomData })
impl JobQueue<Webmention> for PostgresJobQueue<Webmention> {
type Job = PostgresJob<Webmention>;
type Error = sqlx::Error;
async fn get_one(&self) -> Result<Option<Self::Job>, Self::Error> {
let mut txn = self.db.begin().await?;
match sqlx::query_as::<_, PostgresJobRow<Webmention>>(
"SELECT id, source, target FROM kittybox_webmention.incoming_webmention_queue WHERE attempts < 5 FOR UPDATE SKIP LOCKED LIMIT 1"
.fetch_optional(&mut *txn)
Some(job_row) => {
return Ok(Some(Self::Job {
job: job_row.job,
txn: Some(txn),
runtime_handle: tokio::runtime::Handle::current(),
None => Ok(None)
async fn put(&self, item: &Webmention) -> Result<Uuid, Self::Error> {
sqlx::query_scalar::<_, Uuid>("INSERT INTO kittybox_webmention.incoming_webmention_queue (source, target) VALUES ($1, $2) RETURNING id")
async fn into_stream(self) -> Result<Pin<Box<dyn Stream<Item = Result<Self::Job, Self::Error>> + Send>>, Self::Error> {
let mut listener = PgListener::connect_with(&self.db).await?;
let stream: Pin<Box<dyn Stream<Item = Result<Self::Job, Self::Error>> + Send>> = futures_util::stream::try_unfold((), {
let listener = std::sync::Arc::new(tokio::sync::Mutex::new(listener));
move |_| {
let queue = self.clone();
let listener = listener.clone();
async move {
loop {
match queue.get_one().await? {
Some(item) => return Ok(Some((item, ()))),
None => {
mod tests {
use std::sync::Arc;
use super::{Webmention, PostgresJobQueue, Job, JobQueue, MIGRATOR};
use futures_util::StreamExt;
#[sqlx::test(migrator = "MIGRATOR")]
async fn test_webmention_queue(pool: sqlx::PgPool) -> Result<(), sqlx::Error> {
let test_webmention = Webmention {
source: "".to_owned(),
target: "".to_owned()
let queue = PostgresJobQueue::<Webmention>::from_pool(pool).await?;
tracing::debug!("Putting webmention into queue");
let mut job_description = queue.get_one().await?.unwrap();
assert_eq!(job_description.job(), &test_webmention);
assert_eq!(job_description.attempts().await?, 0);
tracing::debug!("Creating a stream");
let mut stream = queue.clone().into_stream().await?;
let mut guard =;
assert_eq!(guard.job(), &test_webmention);
assert_eq!(guard.attempts().await?, 1);
if let Some(item) = queue.get_one().await? {
panic!("Unexpected item {:?} returned from job queue!", item)
let mut guard =;
assert_eq!(guard.job(), &test_webmention);
assert_eq!(guard.attempts().await?, 2);
match queue.get_one().await? {
Some(item) => panic!("Unexpected item {:?} returned from job queue!", item),
None => Ok(())
#[sqlx::test(migrator = "MIGRATOR")]
async fn test_no_hangups_in_queue(pool: sqlx::PgPool) -> Result<(), sqlx::Error> {
let test_webmention = Webmention {
source: "".to_owned(),
target: "".to_owned()
let queue = PostgresJobQueue::<Webmention>::from_pool(pool.clone()).await?;
tracing::debug!("Putting webmention into queue");
tracing::debug!("Creating a stream");
let mut stream = queue.clone().into_stream().await?;
// Synchronisation barrier that will be useful later
let barrier = Arc::new(tokio::sync::Barrier::new(2));
// Get one job guard from a queue
let mut guard =;
assert_eq!(guard.job(), &test_webmention);
assert_eq!(guard.attempts().await?, 0);
let barrier = barrier.clone();
async move {
// Wait for the signal to drop the guard!
let future = tokio::task::spawn(
std::time::Duration::from_secs(10), async move {
// Let the other task drop the guard it is holding
let mut guard = future.await
.expect("Timeout on fetching item")
.expect("Job queue error");
assert_eq!(guard.job(), &test_webmention);
assert_eq!(guard.attempts().await?, 1);