diff options
author | Vika <vika@fireburn.ru> | 2025-04-09 23:31:02 +0300 |
---|---|---|
committer | Vika <vika@fireburn.ru> | 2025-04-09 23:31:57 +0300 |
commit | 8826d9446e6c492db2243b9921e59ce496027bef (patch) | |
tree | 63738aa9001cb73b11cb0e974e93129bcdf1adbb /src/webmentions/mod.rs | |
parent | 519cadfbb298f50cbf819dde757037ab56e2863e (diff) | |
download | kittybox-8826d9446e6c492db2243b9921e59ce496027bef.tar.zst |
cargo fmt
Change-Id: I80e81ebba3f0cdf8c094451c9fe3ee4126b8c888
Diffstat (limited to 'src/webmentions/mod.rs')
-rw-r--r-- | src/webmentions/mod.rs | 130 |
1 files changed, 91 insertions, 39 deletions
diff --git a/src/webmentions/mod.rs b/src/webmentions/mod.rs index 91b274b..57f9a57 100644 --- a/src/webmentions/mod.rs +++ b/src/webmentions/mod.rs @@ -1,9 +1,14 @@ -use axum::{extract::{FromRef, State}, response::{IntoResponse, Response}, routing::post, Form}; use axum::http::StatusCode; +use axum::{ + extract::{FromRef, State}, + response::{IntoResponse, Response}, + routing::post, + Form, +}; use tracing::error; -use crate::database::{Storage, StorageError}; use self::queue::JobQueue; +use crate::database::{Storage, StorageError}; pub mod queue; #[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -24,40 +29,46 @@ async fn accept_webmention<Q: JobQueue<Webmention>>( Form(webmention): Form<Webmention>, ) -> Response { if let Err(err) = webmention.source.parse::<url::Url>() { - return (StatusCode::BAD_REQUEST, err.to_string()).into_response() + return (StatusCode::BAD_REQUEST, err.to_string()).into_response(); } if let Err(err) = webmention.target.parse::<url::Url>() { - return (StatusCode::BAD_REQUEST, err.to_string()).into_response() + return (StatusCode::BAD_REQUEST, err.to_string()).into_response(); } match queue.put(&webmention).await { Ok(_id) => StatusCode::ACCEPTED.into_response(), - Err(err) => (StatusCode::INTERNAL_SERVER_ERROR, [ - ("Content-Type", "text/plain") - ], err.to_string()).into_response() + Err(err) => ( + StatusCode::INTERNAL_SERVER_ERROR, + [("Content-Type", "text/plain")], + err.to_string(), + ) + .into_response(), } } -pub fn router<St: Clone + Send + Sync + 'static, Q: JobQueue<Webmention> + FromRef<St>>() -> axum::Router<St> { - axum::Router::new() - .route("/.kittybox/webmention", post(accept_webmention::<Q>)) +pub fn router<St: Clone + Send + Sync + 'static, Q: JobQueue<Webmention> + FromRef<St>>( +) -> axum::Router<St> { + axum::Router::new().route("/.kittybox/webmention", post(accept_webmention::<Q>)) } #[derive(thiserror::Error, Debug)] pub enum SupervisorError { #[error("the task was explicitly cancelled")] - Cancelled + Cancelled, } -pub type SupervisedTask = tokio::task::JoinHandle<Result<std::convert::Infallible, SupervisorError>>; +pub type SupervisedTask = + tokio::task::JoinHandle<Result<std::convert::Infallible, SupervisorError>>; -pub fn supervisor<E, A, F>(mut f: F, cancellation_token: tokio_util::sync::CancellationToken) -> SupervisedTask +pub fn supervisor<E, A, F>( + mut f: F, + cancellation_token: tokio_util::sync::CancellationToken, +) -> SupervisedTask where E: std::error::Error + std::fmt::Debug + Send + 'static, A: std::future::Future<Output = Result<std::convert::Infallible, E>> + Send + 'static, - F: FnMut() -> A + Send + 'static + F: FnMut() -> A + Send + 'static, { - let supervisor_future = async move { loop { // Don't spawn the task if we are already cancelled, but @@ -65,7 +76,7 @@ where // crashed and we immediately received a cancellation // request after noticing the crashed task) if cancellation_token.is_cancelled() { - return Err(SupervisorError::Cancelled) + return Err(SupervisorError::Cancelled); } let task = tokio::task::spawn(f()); tokio::select! { @@ -87,7 +98,13 @@ where return tokio::task::spawn(supervisor_future); #[cfg(tokio_unstable)] return tokio::task::Builder::new() - .name(format!("supervisor for background task {}", std::any::type_name::<A>()).as_str()) + .name( + format!( + "supervisor for background task {}", + std::any::type_name::<A>() + ) + .as_str(), + ) .spawn(supervisor_future) .unwrap(); } @@ -99,39 +116,55 @@ enum Error<Q: std::error::Error + std::fmt::Debug + Send + 'static> { #[error("queue error: {0}")] Queue(#[from] Q), #[error("storage error: {0}")] - Storage(StorageError) + Storage(StorageError), } -async fn process_webmentions_from_queue<Q: JobQueue<Webmention>, S: Storage + 'static>(queue: Q, db: S, http: reqwest_middleware::ClientWithMiddleware) -> Result<std::convert::Infallible, Error<Q::Error>> { - use futures_util::StreamExt; +async fn process_webmentions_from_queue<Q: JobQueue<Webmention>, S: Storage + 'static>( + queue: Q, + db: S, + http: reqwest_middleware::ClientWithMiddleware, +) -> Result<std::convert::Infallible, Error<Q::Error>> { use self::queue::Job; + use futures_util::StreamExt; let mut stream = queue.into_stream().await?; while let Some(item) = stream.next().await.transpose()? { let job = item.job(); let (source, target) = ( job.source.parse::<url::Url>().unwrap(), - job.target.parse::<url::Url>().unwrap() + job.target.parse::<url::Url>().unwrap(), ); let (code, text) = match http.get(source.clone()).send().await { Ok(response) => { let code = response.status(); - if ![StatusCode::OK, StatusCode::GONE].iter().any(|i| i == &code) { - error!("error processing webmention: webpage fetch returned {}", code); + if ![StatusCode::OK, StatusCode::GONE] + .iter() + .any(|i| i == &code) + { + error!( + "error processing webmention: webpage fetch returned {}", + code + ); continue; } match response.text().await { Ok(text) => (code, text), Err(err) => { - error!("error processing webmention: error fetching webpage text: {}", err); - continue + error!( + "error processing webmention: error fetching webpage text: {}", + err + ); + continue; } } } Err(err) => { - error!("error processing webmention: error requesting webpage: {}", err); - continue + error!( + "error processing webmention: error requesting webpage: {}", + err + ); + continue; } }; @@ -150,7 +183,10 @@ async fn process_webmentions_from_queue<Q: JobQueue<Webmention>, S: Storage + 's continue; } Err(err) => { - error!("error processing webmention: error checking webmention: {}", err); + error!( + "error processing webmention: error checking webmention: {}", + err + ); continue; } }; @@ -158,31 +194,47 @@ async fn process_webmentions_from_queue<Q: JobQueue<Webmention>, S: Storage + 's { mention["type"] = serde_json::json!(["h-cite"]); - if !mention["properties"].as_object().unwrap().contains_key("uid") { - let url = mention["properties"]["url"][0].as_str().unwrap_or_else(|| target.as_str()).to_owned(); + if !mention["properties"] + .as_object() + .unwrap() + .contains_key("uid") + { + let url = mention["properties"]["url"][0] + .as_str() + .unwrap_or_else(|| target.as_str()) + .to_owned(); let props = mention["properties"].as_object_mut().unwrap(); - props.insert("uid".to_owned(), serde_json::Value::Array( - vec![serde_json::Value::String(url)]) + props.insert( + "uid".to_owned(), + serde_json::Value::Array(vec![serde_json::Value::String(url)]), ); } } - db.add_or_update_webmention(target.as_str(), mention_type, mention).await.map_err(Error::<Q::Error>::Storage)?; + db.add_or_update_webmention(target.as_str(), mention_type, mention) + .await + .map_err(Error::<Q::Error>::Storage)?; } } unreachable!() } -pub fn supervised_webmentions_task<St: Send + Sync + 'static, S: Storage + FromRef<St> + 'static, Q: JobQueue<Webmention> + FromRef<St> + 'static>( +pub fn supervised_webmentions_task< + St: Send + Sync + 'static, + S: Storage + FromRef<St> + 'static, + Q: JobQueue<Webmention> + FromRef<St> + 'static, +>( state: &St, - cancellation_token: tokio_util::sync::CancellationToken + cancellation_token: tokio_util::sync::CancellationToken, ) -> SupervisedTask -where reqwest_middleware::ClientWithMiddleware: FromRef<St> +where + reqwest_middleware::ClientWithMiddleware: FromRef<St>, { let queue = Q::from_ref(state); let storage = S::from_ref(state); let http = reqwest_middleware::ClientWithMiddleware::from_ref(state); - supervisor::<Error<Q::Error>, _, _>(move || process_webmentions_from_queue( - queue.clone(), storage.clone(), http.clone() - ), cancellation_token) + supervisor::<Error<Q::Error>, _, _>( + move || process_webmentions_from_queue(queue.clone(), storage.clone(), http.clone()), + cancellation_token, + ) } |