about summary refs log tree commit diff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/database/file/mod.rs6
-rw-r--r--src/database/memory.rs6
-rw-r--r--src/database/mod.rs51
-rw-r--r--src/database/postgres/mod.rs8
-rw-r--r--src/database/redis/mod.rs2
-rw-r--r--src/indieauth/backend.rs50
-rw-r--r--src/indieauth/backend/fs.rs2
-rw-r--r--src/media/storage/file.rs2
-rw-r--r--src/media/storage/mod.rs28
-rw-r--r--src/webmentions/queue.rs14
10 files changed, 78 insertions, 91 deletions
diff --git a/src/database/file/mod.rs b/src/database/file/mod.rs
index 117ba17..10d6079 100644
--- a/src/database/file/mod.rs
+++ b/src/database/file/mod.rs
@@ -1,7 +1,6 @@
 //#![warn(clippy::unwrap_used)]
 use crate::database::{ErrorKind, Result, settings, Storage, StorageError};
 use crate::micropub::{MicropubUpdate, MicropubPropertyDeletion};
-use async_trait::async_trait;
 use futures::{stream, StreamExt, TryStreamExt};
 use kittybox_util::MentionType;
 use serde_json::json;
@@ -245,7 +244,6 @@ async fn hydrate_author<S: Storage>(
     }
 }
 
-#[async_trait]
 impl Storage for FileStorage {
     async fn new(url: &'_ url::Url) -> Result<Self> {
         // TODO: sanity check
@@ -630,7 +628,7 @@ impl Storage for FileStorage {
     }
 
     #[tracing::instrument(skip(self))]
-    async fn get_setting<S: settings::Setting<'a>, 'a>(&self, user: &url::Url) -> Result<S> {
+    async fn get_setting<S: settings::Setting>(&self, user: &url::Url) -> Result<S> {
         debug!("User for getting settings: {}", user);
         let mut path = relative_path::RelativePathBuf::new();
         path.push(user.authority());
@@ -651,7 +649,7 @@ impl Storage for FileStorage {
     }
 
     #[tracing::instrument(skip(self))]
-    async fn set_setting<S: settings::Setting<'a> + 'a, 'a>(&self, user: &'a url::Url, value: S::Data) -> Result<()> {
+    async fn set_setting<S: settings::Setting>(&self, user: &url::Url, value: S::Data) -> Result<()> {
         let mut path = relative_path::RelativePathBuf::new();
         path.push(user.authority());
         path.push("settings");
diff --git a/src/database/memory.rs b/src/database/memory.rs
index be37fed..a4ffc7b 100644
--- a/src/database/memory.rs
+++ b/src/database/memory.rs
@@ -1,5 +1,4 @@
 #![allow(clippy::todo)]
-use async_trait::async_trait;
 use futures_util::FutureExt;
 use serde_json::json;
 use std::collections::HashMap;
@@ -14,7 +13,6 @@ pub struct MemoryStorage {
     pub channels: Arc<RwLock<HashMap<url::Url, Vec<String>>>>,
 }
 
-#[async_trait]
 impl Storage for MemoryStorage {
     async fn new(_url: &url::Url) -> Result<Self> {
         Ok(Self::default())
@@ -220,12 +218,12 @@ impl Storage for MemoryStorage {
     }
 
     #[allow(unused_variables)]
-    async fn get_setting<S: settings::Setting<'a>, 'a>(&'_ self, user: &url::Url) -> Result<S> {
+    async fn get_setting<S: settings::Setting>(&'_ self, user: &url::Url) -> Result<S> {
         todo!()
     }
 
     #[allow(unused_variables)]
-    async fn set_setting<S: settings::Setting<'a> + 'a, 'a>(&self, user: &'a url::Url, value: S::Data) -> Result<()> {
+    async fn set_setting<S: settings::Setting>(&self, user: &url::Url, value: S::Data) -> Result<()> {
         todo!()
     }
 
diff --git a/src/database/mod.rs b/src/database/mod.rs
index 3b13cb3..058fc0c 100644
--- a/src/database/mod.rs
+++ b/src/database/mod.rs
@@ -1,7 +1,6 @@
 #![warn(missing_docs)]
-use std::borrow::Cow;
+use std::{borrow::Cow, future::Future};
 
-use async_trait::async_trait;
 use kittybox_util::MentionType;
 
 mod file;
@@ -54,7 +53,7 @@ pub mod settings {
     /// **Note**: this trait is sealed to prevent external
     /// implementations, as it wouldn't make sense to add new settings
     /// that aren't used by Kittybox itself.
-    pub trait Setting<'de>: private::Sealed + std::fmt::Debug + Default + Clone + serde::Serialize + serde::de::DeserializeOwned + /*From<Settings> +*/ Send + Sync {
+    pub trait Setting: private::Sealed + std::fmt::Debug + Default + Clone + serde::Serialize + serde::de::DeserializeOwned + /*From<Settings> +*/ Send + Sync + 'static {
         /// The data that the setting carries.
         type Data: std::fmt::Debug + Send + Sync;
         /// The string ID for the setting, usable as an identifier in the database.
@@ -80,7 +79,7 @@ pub mod settings {
         }
     }
     impl private::Sealed for SiteName {}
-    impl Setting<'_> for SiteName {
+    impl Setting for SiteName {
         type Data = String;
         const ID: &'static str = "site_name";
 
@@ -96,7 +95,7 @@ pub mod settings {
     #[derive(Debug, Default, serde::Deserialize, serde::Serialize, Clone, Copy, PartialEq, Eq)]
     pub struct Webring(bool);
     impl private::Sealed for Webring {}
-    impl Setting<'_> for Webring {
+    impl Setting for Webring {
         type Data = bool;
         const ID: &'static str = "webring";
 
@@ -210,39 +209,38 @@ pub type Result<T> = std::result::Result<T, StorageError>;
 ///
 /// Implementations should note that all methods listed on this trait MUST be fully atomic
 /// or lock the database so that write conflicts or reading half-written data should not occur.
-#[async_trait]
 pub trait Storage: std::fmt::Debug + Clone + Send + Sync {
     /// Initialize Self from a URL, possibly performing initialization.
-    async fn new(url: &'_ url::Url) -> Result<Self>;
+    fn new(url: &url::Url) -> impl Future<Output = Result<Self>> + Send;
     /// Return the list of categories used in blog posts of a specified blog.
-    async fn categories(&self, url: &str) -> Result<Vec<String>>;
+    fn categories(&self, url: &str) -> impl Future<Output = Result<Vec<String>>> + Send;
 
     /// Check if a post exists in the database.
-    async fn post_exists(&self, url: &str) -> Result<bool>;
+    fn post_exists(&self, url: &str) -> impl Future<Output = Result<bool>> + Send;
 
     /// Load a post from the database in MF2-JSON format, deserialized from JSON.
-    async fn get_post(&self, url: &str) -> Result<Option<serde_json::Value>>;
+    fn get_post(&self, url: &str) -> impl Future<Output = Result<Option<serde_json::Value>>> + Send;
 
     /// Save a post to the database as an MF2-JSON structure.
     ///
     /// Note that the `post` object MUST have `post["properties"]["uid"][0]` defined.
-    async fn put_post(&self, post: &'_ serde_json::Value, user: &url::Url) -> Result<()>;
+    fn put_post(&self, post: &serde_json::Value, user: &url::Url) -> impl Future<Output = Result<()>> + Send;
 
     /// Add post to feed. Some database implementations might have optimized ways to do this.
     #[tracing::instrument(skip(self))]
-    async fn add_to_feed(&self, feed: &'_ str, post: &'_ str) -> Result<()> {
+    fn add_to_feed(&self, feed: &str, post: &str) -> impl Future<Output = Result<()>> + Send {
         tracing::debug!("Inserting {} into {} using `update_post`", post, feed);
         self.update_post(feed, serde_json::from_value(
             serde_json::json!({"add": {"children": [post]}})).unwrap()
-        ).await
+        )
     }
     /// Remove post from feed. Some database implementations might have optimized ways to do this.
     #[tracing::instrument(skip(self))]
-    async fn remove_from_feed(&self, feed: &str, post: &str) -> Result<()> {
+    fn remove_from_feed(&self, feed: &str, post: &str) -> impl Future<Output = Result<()>> + Send {
         tracing::debug!("Removing {} into {} using `update_post`", post, feed);
         self.update_post(feed, serde_json::from_value(
             serde_json::json!({"delete": {"children": [post]}})).unwrap()
-        ).await
+        )
     }
 
     /// Modify a post using an update object as defined in the
@@ -253,11 +251,11 @@ pub trait Storage: std::fmt::Debug + Clone + Send + Sync {
     /// each other's changes or simply corrupting something. Rejecting
     /// is allowed in case of concurrent updates if waiting for a lock
     /// cannot be done.
-    async fn update_post(&self, url: &str, update: MicropubUpdate) -> Result<()>;
+    fn update_post(&self, url: &str, update: MicropubUpdate) -> impl Future<Output = Result<()>> + Send;
 
     /// Get a list of channels available for the user represented by
     /// the `user` domain to write to.
-    async fn get_channels(&self, user: &url::Url) -> Result<Vec<MicropubChannel>>;
+    fn get_channels(&self, user: &url::Url) -> impl Future<Output = Result<Vec<MicropubChannel>>> + Send;
 
     /// Fetch a feed at `url` and return an h-feed object containing
     /// `limit` posts after a post by url `after`, filtering the content
@@ -275,13 +273,14 @@ pub trait Storage: std::fmt::Debug + Clone + Send + Sync {
     /// parallel from the database, preferably make this method use a
     /// connection pool to reduce overhead of creating a database
     /// connection per post for parallel fetching.
-    async fn read_feed_with_limit(
+    #[deprecated]
+    fn read_feed_with_limit(
         &self,
-        url: &'_ str,
+        url: &str,
         after: Option<&str>,
         limit: usize,
         user: Option<&url::Url>,
-    ) -> Result<Option<serde_json::Value>>;
+    ) -> impl Future<Output = Result<Option<serde_json::Value>>> + Send;
 
     /// Fetch a feed at `url` and return an h-feed object containing
     /// `limit` posts after a `cursor` (filtering the content in
@@ -301,22 +300,22 @@ pub trait Storage: std::fmt::Debug + Clone + Send + Sync {
     /// parallel from the database, preferably make this method use a
     /// connection pool to reduce overhead of creating a database
     /// connection per post for parallel fetching.
-    async fn read_feed_with_cursor(
+    fn read_feed_with_cursor(
         &self,
         url: &'_ str,
         cursor: Option<&'_ str>,
         limit: usize,
         user: Option<&url::Url>
-    ) -> Result<Option<(serde_json::Value, Option<String>)>>;
+    ) -> impl Future<Output = Result<Option<(serde_json::Value, Option<String>)>>> + Send;
 
     /// Deletes a post from the database irreversibly. Must be idempotent.
-    async fn delete_post(&self, url: &'_ str) -> Result<()>;
+    fn delete_post(&self, url: &'_ str) -> impl Future<Output = Result<()>> + Send;
 
     /// Gets a setting from the setting store and passes the result.
-    async fn get_setting<S: Setting<'a>, 'a>(&'_ self, user: &url::Url) -> Result<S>;
+    fn get_setting<S: Setting>(&self, user: &url::Url) -> impl Future<Output = Result<S>> + Send;
 
     /// Commits a setting to the setting store.
-    async fn set_setting<S: Setting<'a> + 'a, 'a>(&self, user: &'a url::Url, value: S::Data) -> Result<()>;
+    fn set_setting<S: Setting>(&self, user: &url::Url, value: S::Data) -> impl Future<Output = Result<()>> + Send;
 
     /// Add (or update) a webmention on a certian post.
     ///
@@ -332,7 +331,7 @@ pub trait Storage: std::fmt::Debug + Clone + Send + Sync {
     ///
     /// Besides, it may even allow for nice tricks like storing the
     /// webmentions separately and rehydrating them on feed reads.
-    async fn add_or_update_webmention(&self, target: &str, mention_type: MentionType, mention: serde_json::Value) -> Result<()>;
+    fn add_or_update_webmention(&self, target: &str, mention_type: MentionType, mention: serde_json::Value) -> impl Future<Output = Result<()>> + Send;
 }
 
 #[cfg(test)]
diff --git a/src/database/postgres/mod.rs b/src/database/postgres/mod.rs
index b2d4339..83e04c7 100644
--- a/src/database/postgres/mod.rs
+++ b/src/database/postgres/mod.rs
@@ -45,7 +45,6 @@ impl PostgresStorage {
     }
 }
 
-#[async_trait::async_trait]
 impl Storage for PostgresStorage {
     /// Construct a new [`PostgresStorage`] from an URI string and run
     /// migrations on the database.
@@ -189,8 +188,9 @@ WHERE
 
         txn.commit().await.map_err(Into::into)
     }
+
     #[tracing::instrument(skip(self))]
-    async fn update_post(&self, url: &'_ str, update: MicropubUpdate) -> Result<()> {
+    async fn update_post(&self, url: &str, update: MicropubUpdate) -> Result<()> {
         tracing::debug!("Updating post {}", url);
         let mut txn = self.db.begin().await?;
         let (uid, mut post) = sqlx::query_as::<_, (String, serde_json::Value)>("SELECT uid, mf2 FROM kittybox.mf2_json WHERE uid = $1 OR mf2['properties']['url'] ? $1 FOR UPDATE")
@@ -348,7 +348,7 @@ LIMIT $2"
     }
 
     #[tracing::instrument(skip(self))]
-    async fn get_setting<S: Setting<'a>, 'a>(&'_ self, user: &url::Url) -> Result<S> {
+    async fn get_setting<S: Setting>(&'_ self, user: &url::Url) -> Result<S> {
         match sqlx::query_as::<_, (serde_json::Value,)>("SELECT kittybox.get_setting($1, $2)")
             .bind(user.authority())
             .bind(S::ID)
@@ -361,7 +361,7 @@ LIMIT $2"
     }
 
     #[tracing::instrument(skip(self))]
-    async fn set_setting<S: Setting<'a> + 'a, 'a>(&self, user: &'a url::Url, value: S::Data) -> Result<()> {
+    async fn set_setting<S: Setting>(&self, user: &url::Url, value: S::Data) -> Result<()> {
         sqlx::query("SELECT kittybox.set_setting($1, $2, $3)")
             .bind(user.authority())
             .bind(S::ID)
diff --git a/src/database/redis/mod.rs b/src/database/redis/mod.rs
index 39ee852..e92a773 100644
--- a/src/database/redis/mod.rs
+++ b/src/database/redis/mod.rs
@@ -1,4 +1,3 @@
-use async_trait::async_trait;
 use futures::stream;
 use futures_util::FutureExt;
 use futures_util::StreamExt;
@@ -63,7 +62,6 @@ pub struct RedisStorage {
     redis: mobc::Pool<RedisConnectionManager>,
 }
 
-#[async_trait]
 impl Storage for RedisStorage {
     async fn get_setting<'a>(&self, setting: &'a str, user: &'a str) -> Result<String> {
         let mut conn = self.redis.get().await.map_err(|e| StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e)))?;
diff --git a/src/indieauth/backend.rs b/src/indieauth/backend.rs
index 5814dc2..b913256 100644
--- a/src/indieauth/backend.rs
+++ b/src/indieauth/backend.rs
@@ -1,3 +1,4 @@
+use std::future::Future;
 use std::collections::HashMap;
 use kittybox_indieauth::{
     AuthorizationRequest, TokenData
@@ -9,10 +10,9 @@ type Result<T> = std::io::Result<T>;
 pub mod fs;
 pub use fs::FileBackend;
 
-#[async_trait::async_trait]
 pub trait AuthBackend: Clone + Send + Sync + 'static {
     /// Initialize self from URL, possibly performing initialization.
-    async fn new(url: &'_ url::Url) -> Result<Self>;
+    fn new(url: &'_ url::Url) -> impl Future<Output = Result<Self>> + Send;
     // Authorization code management.
     /// Create a one-time OAuth2 authorization code for the passed
     /// authorization request, and save it for later retrieval.
@@ -20,33 +20,33 @@ pub trait AuthBackend: Clone + Send + Sync + 'static {
     /// Note for implementors: the [`AuthorizationRequest::me`] value
     /// is guaranteed to be [`Some(url::Url)`][Option::Some] and can
     /// be trusted to be correct and non-malicious.
-    async fn create_code(&self, data: AuthorizationRequest) -> Result<String>;
+    fn create_code(&self, data: AuthorizationRequest) -> impl Future<Output = Result<String>> + Send;
     /// Retreive an authorization request using the one-time
     /// code. Implementations must sanitize the `code` field to
     /// prevent exploits, and must check if the code should still be
     /// valid at this point in time (validity interval is left up to
     /// the implementation, but is recommended to be no more than 10
     /// minutes).
-    async fn get_code(&self, code: &str) -> Result<Option<AuthorizationRequest>>;
+    fn get_code(&self, code: &str) -> impl Future<Output = Result<Option<AuthorizationRequest>>> + Send;
     // Token management.
-    async fn create_token(&self, data: TokenData) -> Result<String>;
-    async fn get_token(&self, website: &url::Url, token: &str) -> Result<Option<TokenData>>;
-    async fn list_tokens(&self, website: &url::Url) -> Result<HashMap<String, TokenData>>;
-    async fn revoke_token(&self, website: &url::Url, token: &str) -> Result<()>;
+    fn create_token(&self, data: TokenData) -> impl Future<Output = Result<String>> + Send;
+    fn get_token(&self, website: &url::Url, token: &str) -> impl Future<Output = Result<Option<TokenData>>> + Send;
+    fn list_tokens(&self, website: &url::Url) -> impl Future<Output = Result<HashMap<String, TokenData>>> + Send;
+    fn revoke_token(&self, website: &url::Url, token: &str) -> impl Future<Output = Result<()>> + Send;
     // Refresh token management.
-    async fn create_refresh_token(&self, data: TokenData) -> Result<String>;
-    async fn get_refresh_token(&self, website: &url::Url, token: &str) -> Result<Option<TokenData>>;
-    async fn list_refresh_tokens(&self, website: &url::Url) -> Result<HashMap<String, TokenData>>;
-    async fn revoke_refresh_token(&self, website: &url::Url, token: &str) -> Result<()>;
+    fn create_refresh_token(&self, data: TokenData) -> impl Future<Output = Result<String>> + Send;
+    fn get_refresh_token(&self, website: &url::Url, token: &str) -> impl Future<Output = Result<Option<TokenData>>> + Send;
+    fn list_refresh_tokens(&self, website: &url::Url) -> impl Future<Output = Result<HashMap<String, TokenData>>> + Send;
+    fn revoke_refresh_token(&self, website: &url::Url, token: &str) -> impl Future<Output = Result<()>> + Send;
     // Password management.
     /// Verify a password.
     #[must_use]
-    async fn verify_password(&self, website: &url::Url, password: String) -> Result<bool>;
+    fn verify_password(&self, website: &url::Url, password: String) -> impl Future<Output = Result<bool>> + Send;
     /// Enroll a password credential for a user. Only one password
     /// credential must exist for a given user.
-    async fn enroll_password(&self, website: &url::Url, password: String) -> Result<()>;
+    fn enroll_password(&self, website: &url::Url, password: String) -> impl Future<Output = Result<()>> + Send;
     /// List currently enrolled credential types for a given user.
-    async fn list_user_credential_types(&self, website: &url::Url) -> Result<Vec<EnrolledCredential>>;
+    fn list_user_credential_types(&self, website: &url::Url) -> impl Future<Output = Result<Vec<EnrolledCredential>>> + Send;
     // WebAuthn credential management.
     #[cfg(feature = "webauthn")]
     /// Enroll a WebAuthn authenticator public key for this user.
@@ -56,30 +56,30 @@ pub trait AuthBackend: Clone + Send + Sync + 'static {
     /// This function can also be used to overwrite a passkey with an
     /// updated version after using
     /// [webauthn::prelude::Passkey::update_credential()].
-    async fn enroll_webauthn(&self, website: &url::Url, credential: webauthn::prelude::Passkey) -> Result<()>;
+    fn enroll_webauthn(&self, website: &url::Url, credential: webauthn::prelude::Passkey) -> impl Future<Output = Result<()>> + Send;
     #[cfg(feature = "webauthn")]
     /// List currently enrolled WebAuthn authenticators for a given user.
-    async fn list_webauthn_pubkeys(&self, website: &url::Url) -> Result<Vec<webauthn::prelude::Passkey>>;
+    fn list_webauthn_pubkeys(&self, website: &url::Url) -> impl Future<Output = Result<Vec<webauthn::prelude::Passkey>>> + Send;
     #[cfg(feature = "webauthn")]
     /// Persist registration challenge state for a little while so it
     /// can be used later.
     ///
     /// Challenges saved in this manner MUST expire after a little
     /// while. 10 minutes is recommended.
-    async fn persist_registration_challenge(
+    fn persist_registration_challenge(
         &self,
         website: &url::Url,
         state: webauthn::prelude::PasskeyRegistration
-    ) -> Result<String>;
+    ) -> impl Future<Output = Result<String>> + Send;
     #[cfg(feature = "webauthn")]
     /// Retrieve a persisted registration challenge.
     ///
     /// The challenge should be deleted after retrieval.
-    async fn retrieve_registration_challenge(
+    fn retrieve_registration_challenge(
         &self,
         website: &url::Url,
         challenge_id: &str
-    ) -> Result<webauthn::prelude::PasskeyRegistration>;
+    ) -> impl Future<Output = Result<webauthn::prelude::PasskeyRegistration>> + Send;
     #[cfg(feature = "webauthn")]
     /// Persist authentication challenge state for a little while so
     /// it can be used later.
@@ -89,19 +89,19 @@ pub trait AuthBackend: Clone + Send + Sync + 'static {
     ///
     /// To support multiple authentication options, this can return an
     /// opaque token that should be set as a cookie.
-    async fn persist_authentication_challenge(
+    fn persist_authentication_challenge(
         &self,
         website: &url::Url,
         state: webauthn::prelude::PasskeyAuthentication
-    ) -> Result<String>;
+    ) -> impl Future<Output = Result<String>> + Send;
     #[cfg(feature = "webauthn")]
     /// Retrieve a persisted authentication challenge.
     ///
     /// The challenge should be deleted after retrieval.
-    async fn retrieve_authentication_challenge(
+    fn retrieve_authentication_challenge(
         &self,
         website: &url::Url,
         challenge_id: &str
-    ) -> Result<webauthn::prelude::PasskeyAuthentication>;
+    ) -> impl Future<Output = Result<webauthn::prelude::PasskeyAuthentication>> + Send;
 
 }
diff --git a/src/indieauth/backend/fs.rs b/src/indieauth/backend/fs.rs
index ce519da..f74fbbc 100644
--- a/src/indieauth/backend/fs.rs
+++ b/src/indieauth/backend/fs.rs
@@ -1,7 +1,6 @@
 use std::{path::PathBuf, collections::HashMap, borrow::Cow, time::{SystemTime, Duration}};
 
 use super::{AuthBackend, Result, EnrolledCredential};
-use async_trait::async_trait;
 use kittybox_indieauth::{
     AuthorizationRequest, TokenData
 };
@@ -186,7 +185,6 @@ impl FileBackend {
     }
 }
 
-#[async_trait]
 impl AuthBackend for FileBackend {
     async fn new(path: &'_ url::Url) -> Result<Self> {
         let orig_path = path;
diff --git a/src/media/storage/file.rs b/src/media/storage/file.rs
index b9ab541..711b298 100644
--- a/src/media/storage/file.rs
+++ b/src/media/storage/file.rs
@@ -1,5 +1,4 @@
 use super::{Metadata, ErrorKind, MediaStore, MediaStoreError, Result};
-use async_trait::async_trait;
 use std::{path::PathBuf, fmt::Debug};
 use tokio::fs::OpenOptions;
 use tokio::io::{BufReader, BufWriter, AsyncWriteExt, AsyncSeekExt};
@@ -39,7 +38,6 @@ impl FileStore {
     }
 }
 
-#[async_trait]
 impl MediaStore for FileStore {
     async fn new(url: &'_ url::Url) -> Result<Self> {
         Ok(Self { base: url.path().into() })
diff --git a/src/media/storage/mod.rs b/src/media/storage/mod.rs
index 38410e6..c2a66ec 100644
--- a/src/media/storage/mod.rs
+++ b/src/media/storage/mod.rs
@@ -1,8 +1,8 @@
-use async_trait::async_trait;
 use axum::extract::multipart::Field;
 use tokio_stream::Stream;
 use bytes::Bytes;
 use serde::{Deserialize, Serialize};
+use std::future::Future;
 use std::ops::Bound;
 use std::pin::Pin;
 use std::fmt::Debug;
@@ -84,31 +84,33 @@ impl std::fmt::Display for MediaStoreError {
 
 pub type Result<T> = std::result::Result<T, MediaStoreError>;
 
-#[async_trait]
 pub trait MediaStore: 'static + Send + Sync + Clone {
     // Initialize self from a URL, possibly performing asynchronous initialization.
-    async fn new(url: &'_ url::Url) -> Result<Self>;
-    async fn write_streaming<T>(
+    fn new(url: &'_ url::Url) -> impl Future<Output = Result<Self>> + Send;
+
+    fn write_streaming<T>(
         &self,
         domain: &str,
         metadata: Metadata,
         content: T,
-    ) -> Result<String>
+    ) -> impl Future<Output = Result<String>> + Send
     where
         T: tokio_stream::Stream<Item = std::result::Result<bytes::Bytes, axum::extract::multipart::MultipartError>> + Unpin + Send + Debug;
 
-    async fn read_streaming(
+    fn read_streaming(
         &self,
         domain: &str,
         filename: &str,
-    ) -> Result<(Metadata, Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>)>;
+    ) -> impl Future<Output = Result<
+        (Metadata, Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>)
+        >> + Send;
 
-    async fn stream_range(
+    fn stream_range(
         &self,
         domain: &str,
         filename: &str,
         range: (Bound<u64>, Bound<u64>)
-    ) -> Result<Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>> {
+    ) -> impl Future<Output = Result<Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>>> + Send { async move {
         use futures::stream::TryStreamExt;
         use tracing::debug;
         let (metadata, mut stream) = self.read_streaming(domain, filename).await?;
@@ -163,17 +165,17 @@ pub trait MediaStore: 'static + Send + Sync + Clone {
         );
 
         return Ok(stream);
-    }
+    } }
 
     /// Read metadata for a file.
     ///
     /// The default implementation uses the `read_streaming` method
     /// and drops the stream containing file content.
-    async fn metadata(&self, domain: &str, filename: &str) -> Result<Metadata> {
+    fn metadata(&self, domain: &str, filename: &str) -> impl Future<Output = Result<Metadata>> + Send { async move {
         self.read_streaming(domain, filename)
             .await
             .map(|(meta, stream)| meta)
-    }
+    } }
 
-    async fn delete(&self, domain: &str, filename: &str) -> Result<()>;
+    fn delete(&self, domain: &str, filename: &str) -> impl Future<Output = Result<()>> + Send;
 }
diff --git a/src/webmentions/queue.rs b/src/webmentions/queue.rs
index af1387f..dfa2a48 100644
--- a/src/webmentions/queue.rs
+++ b/src/webmentions/queue.rs
@@ -1,6 +1,4 @@
-use std::pin::Pin;
-
-use futures_util::{Stream, StreamExt};
+use futures_util::StreamExt;
 use sqlx::{postgres::PgListener, ConnectOptions, Executor};
 use uuid::Uuid;
 
@@ -8,7 +6,7 @@ use super::Webmention;
 
 static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!("./migrations/webmention");
 
-pub use kittybox_util::queue::{JobQueue, JobItem, Job};
+pub use kittybox_util::queue::{JobQueue, JobItem, Job, JobStream};
 
 pub trait PostgresJobItem: JobItem + sqlx::FromRow<'static, sqlx::postgres::PgRow> {
     const DATABASE_NAME: &'static str;
@@ -85,7 +83,6 @@ impl<T: PostgresJobItem> PostgresJob<T> {
     }
 }
 
-#[async_trait::async_trait]
 impl Job<Webmention, PostgresJobQueue<Webmention>> for PostgresJob<Webmention> {
     fn job(&self) -> &Webmention {
         &self.job
@@ -140,7 +137,6 @@ impl PostgresJobQueue<Webmention> {
     }
 }
 
-#[async_trait::async_trait]
 impl JobQueue<Webmention> for PostgresJobQueue<Webmention> {
     type Job = PostgresJob<Webmention>;
     type Error = sqlx::Error;
@@ -155,7 +151,7 @@ impl JobQueue<Webmention> for PostgresJobQueue<Webmention> {
             .await?
         {
             Some(job_row) => {
-                return Ok(Some(Self::Job {
+                Ok(Some(Self::Job {
                     id: job_row.id,
                     job: job_row.job,
                     txn: Some(txn),
@@ -174,11 +170,11 @@ impl JobQueue<Webmention> for PostgresJobQueue<Webmention> {
             .await
     }
 
-    async fn into_stream(self) -> Result<Pin<Box<dyn Stream<Item = Result<Self::Job, Self::Error>> + Send>>, Self::Error> {
+    async fn into_stream(self) -> Result<JobStream<Self::Job, Self::Error>, Self::Error> {
         let mut listener = PgListener::connect_with(&self.db).await?;
         listener.listen("incoming_webmention").await?;
 
-        let stream: Pin<Box<dyn Stream<Item = Result<Self::Job, Self::Error>> + Send>> = futures_util::stream::try_unfold((), {
+        let stream: JobStream<Self::Job, Self::Error> = futures_util::stream::try_unfold((), {
             let listener = std::sync::Arc::new(tokio::sync::Mutex::new(listener));
             move |_| {
                 let queue = self.clone();