about summary refs log tree commit diff
path: root/src/webmentions/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/webmentions/mod.rs')
-rw-r--r--src/webmentions/mod.rs195
1 files changed, 195 insertions, 0 deletions
diff --git a/src/webmentions/mod.rs b/src/webmentions/mod.rs
new file mode 100644
index 0000000..95ea870
--- /dev/null
+++ b/src/webmentions/mod.rs
@@ -0,0 +1,195 @@
+use axum::{Form, response::{IntoResponse, Response}, Extension};
+use axum::http::StatusCode;
+use tracing::error;
+
+use crate::database::{Storage, StorageError};
+use self::queue::JobQueue;
+pub mod queue;
+
+#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
+#[cfg_attr(feature = "sqlx", derive(sqlx::FromRow))]
+pub struct Webmention {
+    source: String,
+    target: String,
+}
+
+impl queue::JobItem for Webmention {}
+impl queue::PostgresJobItem for Webmention {
+    const DATABASE_NAME: &'static str = "kittybox_webmention.incoming_webmention_queue";
+    const NOTIFICATION_CHANNEL: &'static str = "incoming_webmention";
+}
+
+async fn accept_webmention<Q: JobQueue<Webmention>>(
+    Extension(queue): Extension<Q>,
+    Form(webmention): Form<Webmention>,
+) -> Response {
+    if let Err(err) = webmention.source.parse::<url::Url>() {
+        return (StatusCode::BAD_REQUEST, err.to_string()).into_response()
+    }
+    if let Err(err) = webmention.target.parse::<url::Url>() {
+        return (StatusCode::BAD_REQUEST, err.to_string()).into_response()
+    }
+
+    match queue.put(&webmention).await {
+        Ok(id) => (StatusCode::ACCEPTED, [
+            ("Location", format!("/.kittybox/webmention/{id}"))
+        ]).into_response(),
+        Err(err) => (StatusCode::INTERNAL_SERVER_ERROR, [
+            ("Content-Type", "text/plain")
+        ], err.to_string()).into_response()
+    }
+}
+
+pub fn router<Q: JobQueue<Webmention>, S: Storage + 'static>(
+    queue: Q, db: S, http: reqwest::Client,
+    cancellation_token: tokio_util::sync::CancellationToken
+) -> (axum::Router, SupervisedTask) {
+    // Automatically spawn a background task to handle webmentions
+    let bgtask_handle = supervised_webmentions_task(queue.clone(), db, http, cancellation_token);
+
+    let router = axum::Router::new()
+        .route("/.kittybox/webmention",
+            axum::routing::post(accept_webmention::<Q>)
+        )
+        .layer(Extension(queue));
+
+    (router, bgtask_handle)
+}
+
+#[derive(thiserror::Error, Debug)]
+pub enum SupervisorError {
+    #[error("the task was explicitly cancelled")]
+    Cancelled
+}
+
+pub type SupervisedTask = tokio::task::JoinHandle<Result<std::convert::Infallible, SupervisorError>>;
+
+pub fn supervisor<E, A, F>(mut f: F, cancellation_token: tokio_util::sync::CancellationToken) -> SupervisedTask
+where
+    E: std::error::Error + std::fmt::Debug + Send + 'static,
+    A: std::future::Future<Output = Result<std::convert::Infallible, E>> + Send + 'static,
+    F: FnMut() -> A + Send + 'static
+{
+
+    let supervisor_future = async move {
+        loop {
+            // Don't spawn the task if we are already cancelled, but
+            // have somehow missed it (probably because the task
+            // crashed and we immediately received a cancellation
+            // request after noticing the crashed task)
+            if cancellation_token.is_cancelled() {
+                return Err(SupervisorError::Cancelled)
+            }
+            let task = tokio::task::spawn(f());
+            tokio::select! {
+                _ = cancellation_token.cancelled() => {
+                    tracing::info!("Shutdown of background task {:?} requested.", std::any::type_name::<A>());
+                    return Err(SupervisorError::Cancelled)
+                }
+                task_result = task => match task_result {
+                    Err(e) => tracing::error!("background task {:?} exited unexpectedly: {}", std::any::type_name::<A>(), e),
+                    Ok(Err(e)) => tracing::error!("background task {:?} returned error: {}", std::any::type_name::<A>(), e),
+                    Ok(Ok(_)) => unreachable!("task's Ok is Infallible")
+                }
+            }
+            tracing::debug!("Sleeping for a little while to back-off...");
+            tokio::time::sleep(std::time::Duration::from_secs(5)).await;
+        }
+    };
+    #[cfg(not(tokio_unstable))]
+    return tokio::task::spawn(supervisor_future);
+    #[cfg(tokio_unstable)]
+    return tokio::task::Builder::new()
+        .name(format!("supervisor for background task {}", std::any::type_name::<A>()).as_str())
+        .spawn(supervisor_future)
+        .unwrap();
+}
+
+mod check;
+
+#[derive(thiserror::Error, Debug)]
+enum Error<Q: std::error::Error + std::fmt::Debug + Send + 'static> {
+    #[error("queue error: {0}")]
+    Queue(#[from] Q),
+    #[error("storage error: {0}")]
+    Storage(StorageError)
+}
+
+async fn process_webmentions_from_queue<Q: JobQueue<Webmention>, S: Storage + 'static>(queue: Q, db: S, http: reqwest::Client) -> Result<std::convert::Infallible, Error<Q::Error>> {
+    use futures_util::StreamExt;
+    use self::queue::Job;
+
+    let mut stream = queue.into_stream().await?;
+    while let Some(item) = stream.next().await.transpose()? {
+        let job = item.job();
+        let (source, target) = (
+            job.source.parse::<url::Url>().unwrap(),
+            job.target.parse::<url::Url>().unwrap()
+        );
+
+        let (code, text) = match http.get(source.clone()).send().await {
+            Ok(response) => {
+                let code = response.status();
+                if ![StatusCode::OK, StatusCode::GONE].iter().any(|i| i == &code) {
+                    error!("error processing webmention: webpage fetch returned {}", code);
+                    continue;
+                }
+                match response.text().await {
+                    Ok(text) => (code, text),
+                    Err(err) => {
+                        error!("error processing webmention: error fetching webpage text: {}", err);
+                        continue
+                    }
+                }
+            }
+            Err(err) => {
+                error!("error processing webmention: error requesting webpage: {}", err);
+                continue
+            }
+        };
+
+        if code == StatusCode::GONE {
+            todo!("removing webmentions is not implemented yet");
+            // db.remove_webmention(target.as_str(), source.as_str()).await.map_err(Error::<Q::Error>::Storage)?;
+        } else {
+            // Verify webmention
+            let (mention_type, mut mention) = match tokio::task::block_in_place({
+                || check::check_mention(text, &source, &target)
+            }) {
+                Ok(Some(mention_type)) => mention_type,
+                Ok(None) => {
+                    error!("webmention {} -> {} invalid, rejecting", source, target);
+                    item.done().await?;
+                    continue;
+                }
+                Err(err) => {
+                    error!("error processing webmention: error checking webmention: {}", err);
+                    continue;
+                }
+            };
+
+            {
+                mention["type"] = serde_json::json!(["h-cite"]);
+
+                if !mention["properties"].as_object().unwrap().contains_key("uid") {
+                    let url = mention["properties"]["url"][0].as_str().unwrap_or_else(|| target.as_str()).to_owned();
+                    let props = mention["properties"].as_object_mut().unwrap();
+                    props.insert("uid".to_owned(), serde_json::Value::Array(
+                        vec![serde_json::Value::String(url)])
+                    );
+                }
+            }
+
+            db.add_or_update_webmention(target.as_str(), mention_type, mention).await.map_err(Error::<Q::Error>::Storage)?;
+        }
+    }
+    unreachable!()
+}
+
+fn supervised_webmentions_task<Q: JobQueue<Webmention>, S: Storage + 'static>(
+    queue: Q, db: S,
+    http: reqwest::Client,
+    cancellation_token: tokio_util::sync::CancellationToken
+) -> SupervisedTask {
+    supervisor::<Error<Q::Error>, _, _>(move || process_webmentions_from_queue(queue.clone(), db.clone(), http.clone()), cancellation_token)
+}