diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/database/file/mod.rs | 6 | ||||
-rw-r--r-- | src/database/memory.rs | 6 | ||||
-rw-r--r-- | src/database/mod.rs | 51 | ||||
-rw-r--r-- | src/database/postgres/mod.rs | 8 | ||||
-rw-r--r-- | src/database/redis/mod.rs | 2 | ||||
-rw-r--r-- | src/indieauth/backend.rs | 50 | ||||
-rw-r--r-- | src/indieauth/backend/fs.rs | 2 | ||||
-rw-r--r-- | src/media/storage/file.rs | 2 | ||||
-rw-r--r-- | src/media/storage/mod.rs | 28 | ||||
-rw-r--r-- | src/webmentions/queue.rs | 14 |
10 files changed, 78 insertions, 91 deletions
diff --git a/src/database/file/mod.rs b/src/database/file/mod.rs index 117ba17..10d6079 100644 --- a/src/database/file/mod.rs +++ b/src/database/file/mod.rs @@ -1,7 +1,6 @@ //#![warn(clippy::unwrap_used)] use crate::database::{ErrorKind, Result, settings, Storage, StorageError}; use crate::micropub::{MicropubUpdate, MicropubPropertyDeletion}; -use async_trait::async_trait; use futures::{stream, StreamExt, TryStreamExt}; use kittybox_util::MentionType; use serde_json::json; @@ -245,7 +244,6 @@ async fn hydrate_author<S: Storage>( } } -#[async_trait] impl Storage for FileStorage { async fn new(url: &'_ url::Url) -> Result<Self> { // TODO: sanity check @@ -630,7 +628,7 @@ impl Storage for FileStorage { } #[tracing::instrument(skip(self))] - async fn get_setting<S: settings::Setting<'a>, 'a>(&self, user: &url::Url) -> Result<S> { + async fn get_setting<S: settings::Setting>(&self, user: &url::Url) -> Result<S> { debug!("User for getting settings: {}", user); let mut path = relative_path::RelativePathBuf::new(); path.push(user.authority()); @@ -651,7 +649,7 @@ impl Storage for FileStorage { } #[tracing::instrument(skip(self))] - async fn set_setting<S: settings::Setting<'a> + 'a, 'a>(&self, user: &'a url::Url, value: S::Data) -> Result<()> { + async fn set_setting<S: settings::Setting>(&self, user: &url::Url, value: S::Data) -> Result<()> { let mut path = relative_path::RelativePathBuf::new(); path.push(user.authority()); path.push("settings"); diff --git a/src/database/memory.rs b/src/database/memory.rs index be37fed..a4ffc7b 100644 --- a/src/database/memory.rs +++ b/src/database/memory.rs @@ -1,5 +1,4 @@ #![allow(clippy::todo)] -use async_trait::async_trait; use futures_util::FutureExt; use serde_json::json; use std::collections::HashMap; @@ -14,7 +13,6 @@ pub struct MemoryStorage { pub channels: Arc<RwLock<HashMap<url::Url, Vec<String>>>>, } -#[async_trait] impl Storage for MemoryStorage { async fn new(_url: &url::Url) -> Result<Self> { Ok(Self::default()) @@ -220,12 +218,12 @@ impl Storage for MemoryStorage { } #[allow(unused_variables)] - async fn get_setting<S: settings::Setting<'a>, 'a>(&'_ self, user: &url::Url) -> Result<S> { + async fn get_setting<S: settings::Setting>(&'_ self, user: &url::Url) -> Result<S> { todo!() } #[allow(unused_variables)] - async fn set_setting<S: settings::Setting<'a> + 'a, 'a>(&self, user: &'a url::Url, value: S::Data) -> Result<()> { + async fn set_setting<S: settings::Setting>(&self, user: &url::Url, value: S::Data) -> Result<()> { todo!() } diff --git a/src/database/mod.rs b/src/database/mod.rs index 3b13cb3..058fc0c 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -1,7 +1,6 @@ #![warn(missing_docs)] -use std::borrow::Cow; +use std::{borrow::Cow, future::Future}; -use async_trait::async_trait; use kittybox_util::MentionType; mod file; @@ -54,7 +53,7 @@ pub mod settings { /// **Note**: this trait is sealed to prevent external /// implementations, as it wouldn't make sense to add new settings /// that aren't used by Kittybox itself. - pub trait Setting<'de>: private::Sealed + std::fmt::Debug + Default + Clone + serde::Serialize + serde::de::DeserializeOwned + /*From<Settings> +*/ Send + Sync { + pub trait Setting: private::Sealed + std::fmt::Debug + Default + Clone + serde::Serialize + serde::de::DeserializeOwned + /*From<Settings> +*/ Send + Sync + 'static { /// The data that the setting carries. type Data: std::fmt::Debug + Send + Sync; /// The string ID for the setting, usable as an identifier in the database. @@ -80,7 +79,7 @@ pub mod settings { } } impl private::Sealed for SiteName {} - impl Setting<'_> for SiteName { + impl Setting for SiteName { type Data = String; const ID: &'static str = "site_name"; @@ -96,7 +95,7 @@ pub mod settings { #[derive(Debug, Default, serde::Deserialize, serde::Serialize, Clone, Copy, PartialEq, Eq)] pub struct Webring(bool); impl private::Sealed for Webring {} - impl Setting<'_> for Webring { + impl Setting for Webring { type Data = bool; const ID: &'static str = "webring"; @@ -210,39 +209,38 @@ pub type Result<T> = std::result::Result<T, StorageError>; /// /// Implementations should note that all methods listed on this trait MUST be fully atomic /// or lock the database so that write conflicts or reading half-written data should not occur. -#[async_trait] pub trait Storage: std::fmt::Debug + Clone + Send + Sync { /// Initialize Self from a URL, possibly performing initialization. - async fn new(url: &'_ url::Url) -> Result<Self>; + fn new(url: &url::Url) -> impl Future<Output = Result<Self>> + Send; /// Return the list of categories used in blog posts of a specified blog. - async fn categories(&self, url: &str) -> Result<Vec<String>>; + fn categories(&self, url: &str) -> impl Future<Output = Result<Vec<String>>> + Send; /// Check if a post exists in the database. - async fn post_exists(&self, url: &str) -> Result<bool>; + fn post_exists(&self, url: &str) -> impl Future<Output = Result<bool>> + Send; /// Load a post from the database in MF2-JSON format, deserialized from JSON. - async fn get_post(&self, url: &str) -> Result<Option<serde_json::Value>>; + fn get_post(&self, url: &str) -> impl Future<Output = Result<Option<serde_json::Value>>> + Send; /// Save a post to the database as an MF2-JSON structure. /// /// Note that the `post` object MUST have `post["properties"]["uid"][0]` defined. - async fn put_post(&self, post: &'_ serde_json::Value, user: &url::Url) -> Result<()>; + fn put_post(&self, post: &serde_json::Value, user: &url::Url) -> impl Future<Output = Result<()>> + Send; /// Add post to feed. Some database implementations might have optimized ways to do this. #[tracing::instrument(skip(self))] - async fn add_to_feed(&self, feed: &'_ str, post: &'_ str) -> Result<()> { + fn add_to_feed(&self, feed: &str, post: &str) -> impl Future<Output = Result<()>> + Send { tracing::debug!("Inserting {} into {} using `update_post`", post, feed); self.update_post(feed, serde_json::from_value( serde_json::json!({"add": {"children": [post]}})).unwrap() - ).await + ) } /// Remove post from feed. Some database implementations might have optimized ways to do this. #[tracing::instrument(skip(self))] - async fn remove_from_feed(&self, feed: &str, post: &str) -> Result<()> { + fn remove_from_feed(&self, feed: &str, post: &str) -> impl Future<Output = Result<()>> + Send { tracing::debug!("Removing {} into {} using `update_post`", post, feed); self.update_post(feed, serde_json::from_value( serde_json::json!({"delete": {"children": [post]}})).unwrap() - ).await + ) } /// Modify a post using an update object as defined in the @@ -253,11 +251,11 @@ pub trait Storage: std::fmt::Debug + Clone + Send + Sync { /// each other's changes or simply corrupting something. Rejecting /// is allowed in case of concurrent updates if waiting for a lock /// cannot be done. - async fn update_post(&self, url: &str, update: MicropubUpdate) -> Result<()>; + fn update_post(&self, url: &str, update: MicropubUpdate) -> impl Future<Output = Result<()>> + Send; /// Get a list of channels available for the user represented by /// the `user` domain to write to. - async fn get_channels(&self, user: &url::Url) -> Result<Vec<MicropubChannel>>; + fn get_channels(&self, user: &url::Url) -> impl Future<Output = Result<Vec<MicropubChannel>>> + Send; /// Fetch a feed at `url` and return an h-feed object containing /// `limit` posts after a post by url `after`, filtering the content @@ -275,13 +273,14 @@ pub trait Storage: std::fmt::Debug + Clone + Send + Sync { /// parallel from the database, preferably make this method use a /// connection pool to reduce overhead of creating a database /// connection per post for parallel fetching. - async fn read_feed_with_limit( + #[deprecated] + fn read_feed_with_limit( &self, - url: &'_ str, + url: &str, after: Option<&str>, limit: usize, user: Option<&url::Url>, - ) -> Result<Option<serde_json::Value>>; + ) -> impl Future<Output = Result<Option<serde_json::Value>>> + Send; /// Fetch a feed at `url` and return an h-feed object containing /// `limit` posts after a `cursor` (filtering the content in @@ -301,22 +300,22 @@ pub trait Storage: std::fmt::Debug + Clone + Send + Sync { /// parallel from the database, preferably make this method use a /// connection pool to reduce overhead of creating a database /// connection per post for parallel fetching. - async fn read_feed_with_cursor( + fn read_feed_with_cursor( &self, url: &'_ str, cursor: Option<&'_ str>, limit: usize, user: Option<&url::Url> - ) -> Result<Option<(serde_json::Value, Option<String>)>>; + ) -> impl Future<Output = Result<Option<(serde_json::Value, Option<String>)>>> + Send; /// Deletes a post from the database irreversibly. Must be idempotent. - async fn delete_post(&self, url: &'_ str) -> Result<()>; + fn delete_post(&self, url: &'_ str) -> impl Future<Output = Result<()>> + Send; /// Gets a setting from the setting store and passes the result. - async fn get_setting<S: Setting<'a>, 'a>(&'_ self, user: &url::Url) -> Result<S>; + fn get_setting<S: Setting>(&self, user: &url::Url) -> impl Future<Output = Result<S>> + Send; /// Commits a setting to the setting store. - async fn set_setting<S: Setting<'a> + 'a, 'a>(&self, user: &'a url::Url, value: S::Data) -> Result<()>; + fn set_setting<S: Setting>(&self, user: &url::Url, value: S::Data) -> impl Future<Output = Result<()>> + Send; /// Add (or update) a webmention on a certian post. /// @@ -332,7 +331,7 @@ pub trait Storage: std::fmt::Debug + Clone + Send + Sync { /// /// Besides, it may even allow for nice tricks like storing the /// webmentions separately and rehydrating them on feed reads. - async fn add_or_update_webmention(&self, target: &str, mention_type: MentionType, mention: serde_json::Value) -> Result<()>; + fn add_or_update_webmention(&self, target: &str, mention_type: MentionType, mention: serde_json::Value) -> impl Future<Output = Result<()>> + Send; } #[cfg(test)] diff --git a/src/database/postgres/mod.rs b/src/database/postgres/mod.rs index b2d4339..83e04c7 100644 --- a/src/database/postgres/mod.rs +++ b/src/database/postgres/mod.rs @@ -45,7 +45,6 @@ impl PostgresStorage { } } -#[async_trait::async_trait] impl Storage for PostgresStorage { /// Construct a new [`PostgresStorage`] from an URI string and run /// migrations on the database. @@ -189,8 +188,9 @@ WHERE txn.commit().await.map_err(Into::into) } + #[tracing::instrument(skip(self))] - async fn update_post(&self, url: &'_ str, update: MicropubUpdate) -> Result<()> { + async fn update_post(&self, url: &str, update: MicropubUpdate) -> Result<()> { tracing::debug!("Updating post {}", url); let mut txn = self.db.begin().await?; let (uid, mut post) = sqlx::query_as::<_, (String, serde_json::Value)>("SELECT uid, mf2 FROM kittybox.mf2_json WHERE uid = $1 OR mf2['properties']['url'] ? $1 FOR UPDATE") @@ -348,7 +348,7 @@ LIMIT $2" } #[tracing::instrument(skip(self))] - async fn get_setting<S: Setting<'a>, 'a>(&'_ self, user: &url::Url) -> Result<S> { + async fn get_setting<S: Setting>(&'_ self, user: &url::Url) -> Result<S> { match sqlx::query_as::<_, (serde_json::Value,)>("SELECT kittybox.get_setting($1, $2)") .bind(user.authority()) .bind(S::ID) @@ -361,7 +361,7 @@ LIMIT $2" } #[tracing::instrument(skip(self))] - async fn set_setting<S: Setting<'a> + 'a, 'a>(&self, user: &'a url::Url, value: S::Data) -> Result<()> { + async fn set_setting<S: Setting>(&self, user: &url::Url, value: S::Data) -> Result<()> { sqlx::query("SELECT kittybox.set_setting($1, $2, $3)") .bind(user.authority()) .bind(S::ID) diff --git a/src/database/redis/mod.rs b/src/database/redis/mod.rs index 39ee852..e92a773 100644 --- a/src/database/redis/mod.rs +++ b/src/database/redis/mod.rs @@ -1,4 +1,3 @@ -use async_trait::async_trait; use futures::stream; use futures_util::FutureExt; use futures_util::StreamExt; @@ -63,7 +62,6 @@ pub struct RedisStorage { redis: mobc::Pool<RedisConnectionManager>, } -#[async_trait] impl Storage for RedisStorage { async fn get_setting<'a>(&self, setting: &'a str, user: &'a str) -> Result<String> { let mut conn = self.redis.get().await.map_err(|e| StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e)))?; diff --git a/src/indieauth/backend.rs b/src/indieauth/backend.rs index 5814dc2..b913256 100644 --- a/src/indieauth/backend.rs +++ b/src/indieauth/backend.rs @@ -1,3 +1,4 @@ +use std::future::Future; use std::collections::HashMap; use kittybox_indieauth::{ AuthorizationRequest, TokenData @@ -9,10 +10,9 @@ type Result<T> = std::io::Result<T>; pub mod fs; pub use fs::FileBackend; -#[async_trait::async_trait] pub trait AuthBackend: Clone + Send + Sync + 'static { /// Initialize self from URL, possibly performing initialization. - async fn new(url: &'_ url::Url) -> Result<Self>; + fn new(url: &'_ url::Url) -> impl Future<Output = Result<Self>> + Send; // Authorization code management. /// Create a one-time OAuth2 authorization code for the passed /// authorization request, and save it for later retrieval. @@ -20,33 +20,33 @@ pub trait AuthBackend: Clone + Send + Sync + 'static { /// Note for implementors: the [`AuthorizationRequest::me`] value /// is guaranteed to be [`Some(url::Url)`][Option::Some] and can /// be trusted to be correct and non-malicious. - async fn create_code(&self, data: AuthorizationRequest) -> Result<String>; + fn create_code(&self, data: AuthorizationRequest) -> impl Future<Output = Result<String>> + Send; /// Retreive an authorization request using the one-time /// code. Implementations must sanitize the `code` field to /// prevent exploits, and must check if the code should still be /// valid at this point in time (validity interval is left up to /// the implementation, but is recommended to be no more than 10 /// minutes). - async fn get_code(&self, code: &str) -> Result<Option<AuthorizationRequest>>; + fn get_code(&self, code: &str) -> impl Future<Output = Result<Option<AuthorizationRequest>>> + Send; // Token management. - async fn create_token(&self, data: TokenData) -> Result<String>; - async fn get_token(&self, website: &url::Url, token: &str) -> Result<Option<TokenData>>; - async fn list_tokens(&self, website: &url::Url) -> Result<HashMap<String, TokenData>>; - async fn revoke_token(&self, website: &url::Url, token: &str) -> Result<()>; + fn create_token(&self, data: TokenData) -> impl Future<Output = Result<String>> + Send; + fn get_token(&self, website: &url::Url, token: &str) -> impl Future<Output = Result<Option<TokenData>>> + Send; + fn list_tokens(&self, website: &url::Url) -> impl Future<Output = Result<HashMap<String, TokenData>>> + Send; + fn revoke_token(&self, website: &url::Url, token: &str) -> impl Future<Output = Result<()>> + Send; // Refresh token management. - async fn create_refresh_token(&self, data: TokenData) -> Result<String>; - async fn get_refresh_token(&self, website: &url::Url, token: &str) -> Result<Option<TokenData>>; - async fn list_refresh_tokens(&self, website: &url::Url) -> Result<HashMap<String, TokenData>>; - async fn revoke_refresh_token(&self, website: &url::Url, token: &str) -> Result<()>; + fn create_refresh_token(&self, data: TokenData) -> impl Future<Output = Result<String>> + Send; + fn get_refresh_token(&self, website: &url::Url, token: &str) -> impl Future<Output = Result<Option<TokenData>>> + Send; + fn list_refresh_tokens(&self, website: &url::Url) -> impl Future<Output = Result<HashMap<String, TokenData>>> + Send; + fn revoke_refresh_token(&self, website: &url::Url, token: &str) -> impl Future<Output = Result<()>> + Send; // Password management. /// Verify a password. #[must_use] - async fn verify_password(&self, website: &url::Url, password: String) -> Result<bool>; + fn verify_password(&self, website: &url::Url, password: String) -> impl Future<Output = Result<bool>> + Send; /// Enroll a password credential for a user. Only one password /// credential must exist for a given user. - async fn enroll_password(&self, website: &url::Url, password: String) -> Result<()>; + fn enroll_password(&self, website: &url::Url, password: String) -> impl Future<Output = Result<()>> + Send; /// List currently enrolled credential types for a given user. - async fn list_user_credential_types(&self, website: &url::Url) -> Result<Vec<EnrolledCredential>>; + fn list_user_credential_types(&self, website: &url::Url) -> impl Future<Output = Result<Vec<EnrolledCredential>>> + Send; // WebAuthn credential management. #[cfg(feature = "webauthn")] /// Enroll a WebAuthn authenticator public key for this user. @@ -56,30 +56,30 @@ pub trait AuthBackend: Clone + Send + Sync + 'static { /// This function can also be used to overwrite a passkey with an /// updated version after using /// [webauthn::prelude::Passkey::update_credential()]. - async fn enroll_webauthn(&self, website: &url::Url, credential: webauthn::prelude::Passkey) -> Result<()>; + fn enroll_webauthn(&self, website: &url::Url, credential: webauthn::prelude::Passkey) -> impl Future<Output = Result<()>> + Send; #[cfg(feature = "webauthn")] /// List currently enrolled WebAuthn authenticators for a given user. - async fn list_webauthn_pubkeys(&self, website: &url::Url) -> Result<Vec<webauthn::prelude::Passkey>>; + fn list_webauthn_pubkeys(&self, website: &url::Url) -> impl Future<Output = Result<Vec<webauthn::prelude::Passkey>>> + Send; #[cfg(feature = "webauthn")] /// Persist registration challenge state for a little while so it /// can be used later. /// /// Challenges saved in this manner MUST expire after a little /// while. 10 minutes is recommended. - async fn persist_registration_challenge( + fn persist_registration_challenge( &self, website: &url::Url, state: webauthn::prelude::PasskeyRegistration - ) -> Result<String>; + ) -> impl Future<Output = Result<String>> + Send; #[cfg(feature = "webauthn")] /// Retrieve a persisted registration challenge. /// /// The challenge should be deleted after retrieval. - async fn retrieve_registration_challenge( + fn retrieve_registration_challenge( &self, website: &url::Url, challenge_id: &str - ) -> Result<webauthn::prelude::PasskeyRegistration>; + ) -> impl Future<Output = Result<webauthn::prelude::PasskeyRegistration>> + Send; #[cfg(feature = "webauthn")] /// Persist authentication challenge state for a little while so /// it can be used later. @@ -89,19 +89,19 @@ pub trait AuthBackend: Clone + Send + Sync + 'static { /// /// To support multiple authentication options, this can return an /// opaque token that should be set as a cookie. - async fn persist_authentication_challenge( + fn persist_authentication_challenge( &self, website: &url::Url, state: webauthn::prelude::PasskeyAuthentication - ) -> Result<String>; + ) -> impl Future<Output = Result<String>> + Send; #[cfg(feature = "webauthn")] /// Retrieve a persisted authentication challenge. /// /// The challenge should be deleted after retrieval. - async fn retrieve_authentication_challenge( + fn retrieve_authentication_challenge( &self, website: &url::Url, challenge_id: &str - ) -> Result<webauthn::prelude::PasskeyAuthentication>; + ) -> impl Future<Output = Result<webauthn::prelude::PasskeyAuthentication>> + Send; } diff --git a/src/indieauth/backend/fs.rs b/src/indieauth/backend/fs.rs index ce519da..f74fbbc 100644 --- a/src/indieauth/backend/fs.rs +++ b/src/indieauth/backend/fs.rs @@ -1,7 +1,6 @@ use std::{path::PathBuf, collections::HashMap, borrow::Cow, time::{SystemTime, Duration}}; use super::{AuthBackend, Result, EnrolledCredential}; -use async_trait::async_trait; use kittybox_indieauth::{ AuthorizationRequest, TokenData }; @@ -186,7 +185,6 @@ impl FileBackend { } } -#[async_trait] impl AuthBackend for FileBackend { async fn new(path: &'_ url::Url) -> Result<Self> { let orig_path = path; diff --git a/src/media/storage/file.rs b/src/media/storage/file.rs index b9ab541..711b298 100644 --- a/src/media/storage/file.rs +++ b/src/media/storage/file.rs @@ -1,5 +1,4 @@ use super::{Metadata, ErrorKind, MediaStore, MediaStoreError, Result}; -use async_trait::async_trait; use std::{path::PathBuf, fmt::Debug}; use tokio::fs::OpenOptions; use tokio::io::{BufReader, BufWriter, AsyncWriteExt, AsyncSeekExt}; @@ -39,7 +38,6 @@ impl FileStore { } } -#[async_trait] impl MediaStore for FileStore { async fn new(url: &'_ url::Url) -> Result<Self> { Ok(Self { base: url.path().into() }) diff --git a/src/media/storage/mod.rs b/src/media/storage/mod.rs index 38410e6..c2a66ec 100644 --- a/src/media/storage/mod.rs +++ b/src/media/storage/mod.rs @@ -1,8 +1,8 @@ -use async_trait::async_trait; use axum::extract::multipart::Field; use tokio_stream::Stream; use bytes::Bytes; use serde::{Deserialize, Serialize}; +use std::future::Future; use std::ops::Bound; use std::pin::Pin; use std::fmt::Debug; @@ -84,31 +84,33 @@ impl std::fmt::Display for MediaStoreError { pub type Result<T> = std::result::Result<T, MediaStoreError>; -#[async_trait] pub trait MediaStore: 'static + Send + Sync + Clone { // Initialize self from a URL, possibly performing asynchronous initialization. - async fn new(url: &'_ url::Url) -> Result<Self>; - async fn write_streaming<T>( + fn new(url: &'_ url::Url) -> impl Future<Output = Result<Self>> + Send; + + fn write_streaming<T>( &self, domain: &str, metadata: Metadata, content: T, - ) -> Result<String> + ) -> impl Future<Output = Result<String>> + Send where T: tokio_stream::Stream<Item = std::result::Result<bytes::Bytes, axum::extract::multipart::MultipartError>> + Unpin + Send + Debug; - async fn read_streaming( + fn read_streaming( &self, domain: &str, filename: &str, - ) -> Result<(Metadata, Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>)>; + ) -> impl Future<Output = Result< + (Metadata, Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>) + >> + Send; - async fn stream_range( + fn stream_range( &self, domain: &str, filename: &str, range: (Bound<u64>, Bound<u64>) - ) -> Result<Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>> { + ) -> impl Future<Output = Result<Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>>> + Send { async move { use futures::stream::TryStreamExt; use tracing::debug; let (metadata, mut stream) = self.read_streaming(domain, filename).await?; @@ -163,17 +165,17 @@ pub trait MediaStore: 'static + Send + Sync + Clone { ); return Ok(stream); - } + } } /// Read metadata for a file. /// /// The default implementation uses the `read_streaming` method /// and drops the stream containing file content. - async fn metadata(&self, domain: &str, filename: &str) -> Result<Metadata> { + fn metadata(&self, domain: &str, filename: &str) -> impl Future<Output = Result<Metadata>> + Send { async move { self.read_streaming(domain, filename) .await .map(|(meta, stream)| meta) - } + } } - async fn delete(&self, domain: &str, filename: &str) -> Result<()>; + fn delete(&self, domain: &str, filename: &str) -> impl Future<Output = Result<()>> + Send; } diff --git a/src/webmentions/queue.rs b/src/webmentions/queue.rs index af1387f..dfa2a48 100644 --- a/src/webmentions/queue.rs +++ b/src/webmentions/queue.rs @@ -1,6 +1,4 @@ -use std::pin::Pin; - -use futures_util::{Stream, StreamExt}; +use futures_util::StreamExt; use sqlx::{postgres::PgListener, ConnectOptions, Executor}; use uuid::Uuid; @@ -8,7 +6,7 @@ use super::Webmention; static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!("./migrations/webmention"); -pub use kittybox_util::queue::{JobQueue, JobItem, Job}; +pub use kittybox_util::queue::{JobQueue, JobItem, Job, JobStream}; pub trait PostgresJobItem: JobItem + sqlx::FromRow<'static, sqlx::postgres::PgRow> { const DATABASE_NAME: &'static str; @@ -85,7 +83,6 @@ impl<T: PostgresJobItem> PostgresJob<T> { } } -#[async_trait::async_trait] impl Job<Webmention, PostgresJobQueue<Webmention>> for PostgresJob<Webmention> { fn job(&self) -> &Webmention { &self.job @@ -140,7 +137,6 @@ impl PostgresJobQueue<Webmention> { } } -#[async_trait::async_trait] impl JobQueue<Webmention> for PostgresJobQueue<Webmention> { type Job = PostgresJob<Webmention>; type Error = sqlx::Error; @@ -155,7 +151,7 @@ impl JobQueue<Webmention> for PostgresJobQueue<Webmention> { .await? { Some(job_row) => { - return Ok(Some(Self::Job { + Ok(Some(Self::Job { id: job_row.id, job: job_row.job, txn: Some(txn), @@ -174,11 +170,11 @@ impl JobQueue<Webmention> for PostgresJobQueue<Webmention> { .await } - async fn into_stream(self) -> Result<Pin<Box<dyn Stream<Item = Result<Self::Job, Self::Error>> + Send>>, Self::Error> { + async fn into_stream(self) -> Result<JobStream<Self::Job, Self::Error>, Self::Error> { let mut listener = PgListener::connect_with(&self.db).await?; listener.listen("incoming_webmention").await?; - let stream: Pin<Box<dyn Stream<Item = Result<Self::Job, Self::Error>> + Send>> = futures_util::stream::try_unfold((), { + let stream: JobStream<Self::Job, Self::Error> = futures_util::stream::try_unfold((), { let listener = std::sync::Arc::new(tokio::sync::Mutex::new(listener)); move |_| { let queue = self.clone(); |