about summary refs log tree commit diff
path: root/src/webmentions/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/webmentions/mod.rs')
-rw-r--r--src/webmentions/mod.rs130
1 files changed, 91 insertions, 39 deletions
diff --git a/src/webmentions/mod.rs b/src/webmentions/mod.rs
index 91b274b..57f9a57 100644
--- a/src/webmentions/mod.rs
+++ b/src/webmentions/mod.rs
@@ -1,9 +1,14 @@
-use axum::{extract::{FromRef, State}, response::{IntoResponse, Response}, routing::post, Form};
 use axum::http::StatusCode;
+use axum::{
+    extract::{FromRef, State},
+    response::{IntoResponse, Response},
+    routing::post,
+    Form,
+};
 use tracing::error;
 
-use crate::database::{Storage, StorageError};
 use self::queue::JobQueue;
+use crate::database::{Storage, StorageError};
 pub mod queue;
 
 #[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -24,40 +29,46 @@ async fn accept_webmention<Q: JobQueue<Webmention>>(
     Form(webmention): Form<Webmention>,
 ) -> Response {
     if let Err(err) = webmention.source.parse::<url::Url>() {
-        return (StatusCode::BAD_REQUEST, err.to_string()).into_response()
+        return (StatusCode::BAD_REQUEST, err.to_string()).into_response();
     }
     if let Err(err) = webmention.target.parse::<url::Url>() {
-        return (StatusCode::BAD_REQUEST, err.to_string()).into_response()
+        return (StatusCode::BAD_REQUEST, err.to_string()).into_response();
     }
 
     match queue.put(&webmention).await {
         Ok(_id) => StatusCode::ACCEPTED.into_response(),
-        Err(err) => (StatusCode::INTERNAL_SERVER_ERROR, [
-            ("Content-Type", "text/plain")
-        ], err.to_string()).into_response()
+        Err(err) => (
+            StatusCode::INTERNAL_SERVER_ERROR,
+            [("Content-Type", "text/plain")],
+            err.to_string(),
+        )
+            .into_response(),
     }
 }
 
-pub fn router<St: Clone + Send + Sync + 'static, Q: JobQueue<Webmention> + FromRef<St>>() -> axum::Router<St> {
-    axum::Router::new()
-        .route("/.kittybox/webmention", post(accept_webmention::<Q>))
+pub fn router<St: Clone + Send + Sync + 'static, Q: JobQueue<Webmention> + FromRef<St>>(
+) -> axum::Router<St> {
+    axum::Router::new().route("/.kittybox/webmention", post(accept_webmention::<Q>))
 }
 
 #[derive(thiserror::Error, Debug)]
 pub enum SupervisorError {
     #[error("the task was explicitly cancelled")]
-    Cancelled
+    Cancelled,
 }
 
-pub type SupervisedTask = tokio::task::JoinHandle<Result<std::convert::Infallible, SupervisorError>>;
+pub type SupervisedTask =
+    tokio::task::JoinHandle<Result<std::convert::Infallible, SupervisorError>>;
 
-pub fn supervisor<E, A, F>(mut f: F, cancellation_token: tokio_util::sync::CancellationToken) -> SupervisedTask
+pub fn supervisor<E, A, F>(
+    mut f: F,
+    cancellation_token: tokio_util::sync::CancellationToken,
+) -> SupervisedTask
 where
     E: std::error::Error + std::fmt::Debug + Send + 'static,
     A: std::future::Future<Output = Result<std::convert::Infallible, E>> + Send + 'static,
-    F: FnMut() -> A + Send + 'static
+    F: FnMut() -> A + Send + 'static,
 {
-
     let supervisor_future = async move {
         loop {
             // Don't spawn the task if we are already cancelled, but
@@ -65,7 +76,7 @@ where
             // crashed and we immediately received a cancellation
             // request after noticing the crashed task)
             if cancellation_token.is_cancelled() {
-                return Err(SupervisorError::Cancelled)
+                return Err(SupervisorError::Cancelled);
             }
             let task = tokio::task::spawn(f());
             tokio::select! {
@@ -87,7 +98,13 @@ where
     return tokio::task::spawn(supervisor_future);
     #[cfg(tokio_unstable)]
     return tokio::task::Builder::new()
-        .name(format!("supervisor for background task {}", std::any::type_name::<A>()).as_str())
+        .name(
+            format!(
+                "supervisor for background task {}",
+                std::any::type_name::<A>()
+            )
+            .as_str(),
+        )
         .spawn(supervisor_future)
         .unwrap();
 }
@@ -99,39 +116,55 @@ enum Error<Q: std::error::Error + std::fmt::Debug + Send + 'static> {
     #[error("queue error: {0}")]
     Queue(#[from] Q),
     #[error("storage error: {0}")]
-    Storage(StorageError)
+    Storage(StorageError),
 }
 
-async fn process_webmentions_from_queue<Q: JobQueue<Webmention>, S: Storage + 'static>(queue: Q, db: S, http: reqwest_middleware::ClientWithMiddleware) -> Result<std::convert::Infallible, Error<Q::Error>> {
-    use futures_util::StreamExt;
+async fn process_webmentions_from_queue<Q: JobQueue<Webmention>, S: Storage + 'static>(
+    queue: Q,
+    db: S,
+    http: reqwest_middleware::ClientWithMiddleware,
+) -> Result<std::convert::Infallible, Error<Q::Error>> {
     use self::queue::Job;
+    use futures_util::StreamExt;
 
     let mut stream = queue.into_stream().await?;
     while let Some(item) = stream.next().await.transpose()? {
         let job = item.job();
         let (source, target) = (
             job.source.parse::<url::Url>().unwrap(),
-            job.target.parse::<url::Url>().unwrap()
+            job.target.parse::<url::Url>().unwrap(),
         );
 
         let (code, text) = match http.get(source.clone()).send().await {
             Ok(response) => {
                 let code = response.status();
-                if ![StatusCode::OK, StatusCode::GONE].iter().any(|i| i == &code) {
-                    error!("error processing webmention: webpage fetch returned {}", code);
+                if ![StatusCode::OK, StatusCode::GONE]
+                    .iter()
+                    .any(|i| i == &code)
+                {
+                    error!(
+                        "error processing webmention: webpage fetch returned {}",
+                        code
+                    );
                     continue;
                 }
                 match response.text().await {
                     Ok(text) => (code, text),
                     Err(err) => {
-                        error!("error processing webmention: error fetching webpage text: {}", err);
-                        continue
+                        error!(
+                            "error processing webmention: error fetching webpage text: {}",
+                            err
+                        );
+                        continue;
                     }
                 }
             }
             Err(err) => {
-                error!("error processing webmention: error requesting webpage: {}", err);
-                continue
+                error!(
+                    "error processing webmention: error requesting webpage: {}",
+                    err
+                );
+                continue;
             }
         };
 
@@ -150,7 +183,10 @@ async fn process_webmentions_from_queue<Q: JobQueue<Webmention>, S: Storage + 's
                     continue;
                 }
                 Err(err) => {
-                    error!("error processing webmention: error checking webmention: {}", err);
+                    error!(
+                        "error processing webmention: error checking webmention: {}",
+                        err
+                    );
                     continue;
                 }
             };
@@ -158,31 +194,47 @@ async fn process_webmentions_from_queue<Q: JobQueue<Webmention>, S: Storage + 's
             {
                 mention["type"] = serde_json::json!(["h-cite"]);
 
-                if !mention["properties"].as_object().unwrap().contains_key("uid") {
-                    let url = mention["properties"]["url"][0].as_str().unwrap_or_else(|| target.as_str()).to_owned();
+                if !mention["properties"]
+                    .as_object()
+                    .unwrap()
+                    .contains_key("uid")
+                {
+                    let url = mention["properties"]["url"][0]
+                        .as_str()
+                        .unwrap_or_else(|| target.as_str())
+                        .to_owned();
                     let props = mention["properties"].as_object_mut().unwrap();
-                    props.insert("uid".to_owned(), serde_json::Value::Array(
-                        vec![serde_json::Value::String(url)])
+                    props.insert(
+                        "uid".to_owned(),
+                        serde_json::Value::Array(vec![serde_json::Value::String(url)]),
                     );
                 }
             }
 
-            db.add_or_update_webmention(target.as_str(), mention_type, mention).await.map_err(Error::<Q::Error>::Storage)?;
+            db.add_or_update_webmention(target.as_str(), mention_type, mention)
+                .await
+                .map_err(Error::<Q::Error>::Storage)?;
         }
     }
     unreachable!()
 }
 
-pub fn supervised_webmentions_task<St: Send + Sync + 'static, S: Storage + FromRef<St> + 'static, Q: JobQueue<Webmention> + FromRef<St> + 'static>(
+pub fn supervised_webmentions_task<
+    St: Send + Sync + 'static,
+    S: Storage + FromRef<St> + 'static,
+    Q: JobQueue<Webmention> + FromRef<St> + 'static,
+>(
     state: &St,
-    cancellation_token: tokio_util::sync::CancellationToken
+    cancellation_token: tokio_util::sync::CancellationToken,
 ) -> SupervisedTask
-where reqwest_middleware::ClientWithMiddleware: FromRef<St>
+where
+    reqwest_middleware::ClientWithMiddleware: FromRef<St>,
 {
     let queue = Q::from_ref(state);
     let storage = S::from_ref(state);
     let http = reqwest_middleware::ClientWithMiddleware::from_ref(state);
-    supervisor::<Error<Q::Error>, _, _>(move || process_webmentions_from_queue(
-        queue.clone(), storage.clone(), http.clone()
-    ), cancellation_token)
+    supervisor::<Error<Q::Error>, _, _>(
+        move || process_webmentions_from_queue(queue.clone(), storage.clone(), http.clone()),
+        cancellation_token,
+    )
 }