about summary refs log tree commit diff
path: root/kittybox-rs
diff options
context:
space:
mode:
authorVika <vika@fireburn.ru>2023-07-01 20:33:18 +0300
committerVika <vika@fireburn.ru>2023-07-09 01:42:47 +0300
commitd9e7fa9247925fc07f968efdcf831f3cb14539ef (patch)
tree1c784aa537e59dc53c79f22b73f5ab4736222bcd /kittybox-rs
parenta23a0438847a8ae8409edf6445ea4f43990e249c (diff)
downloadkittybox-d9e7fa9247925fc07f968efdcf831f3cb14539ef.tar.zst
WIP: incoming webmention support
Diffstat (limited to 'kittybox-rs')
-rw-r--r--kittybox-rs/src/webmentions/mod.rs75
1 files changed, 75 insertions, 0 deletions
diff --git a/kittybox-rs/src/webmentions/mod.rs b/kittybox-rs/src/webmentions/mod.rs
new file mode 100644
index 0000000..1c4886e
--- /dev/null
+++ b/kittybox-rs/src/webmentions/mod.rs
@@ -0,0 +1,75 @@
+use axum::{Form, response::{IntoResponse, Response}, Extension};
+use axum::http::StatusCode;
+
+use self::queue::JobQueue;
+pub mod queue;
+
+#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
+#[cfg_attr(feature = "sqlx", derive(sqlx::FromRow))]
+pub struct Webmention {
+    source: String,
+    target: String,
+}
+
+async fn accept_webmention<Q: JobQueue<Webmention>>(
+    Form(webmention): Form<Webmention>,
+    Extension(queue): Extension<Q>
+) -> Response {
+    if let Err(err) = webmention.source.parse::<url::Url>() {
+        return (StatusCode::BAD_REQUEST, err.to_string()).into_response()
+    }
+    if let Err(err) = webmention.target.parse::<url::Url>() {
+        return (StatusCode::BAD_REQUEST, err.to_string()).into_response()
+    }
+
+    match queue.put(&webmention).await {
+        Ok(id) => (StatusCode::ACCEPTED, [
+            ("Location", format!("/.kittybox/webmention/{id}"))
+        ]).into_response(),
+        Err(err) => (StatusCode::INTERNAL_SERVER_ERROR, [
+            ("Content-Type", "text/plain")
+        ], err.to_string()).into_response()
+    }
+}
+
+pub fn router<Q: JobQueue<Webmention>>(queue: Q) -> axum::Router {
+    // Automatically spawn a background task to handle webmentions
+    tokio::task::spawn(supervised_webmentions_task(queue.clone()));
+
+    axum::Router::new()
+        .route("/.kittybox/webmention",
+            axum::routing::post(accept_webmention::<Q>)
+        )
+        .layer(Extension(queue))
+}
+
+async fn supervisor<E: std::error::Error + std::fmt::Debug + Send + 'static, A: futures_util::Future<Output = Result<std::convert::Infallible, E>> + Send + 'static, F: FnMut() -> A>(mut f: F) -> std::convert::Infallible {
+    loop {
+        match tokio::task::spawn(f()).await {
+            Err(e) => tracing::error!("background task exited unexpectedly: {}", e),
+            Ok(Err(e)) => tracing::error!("background task returned error: {}", e),
+            Ok(Ok(_)) => unreachable!("task's Ok is Infallible")
+        }
+    }
+}
+
+async fn process_webmentions_from_queue<Q: JobQueue<Webmention>>(queue: Q) -> Result<std::convert::Infallible, Q::Error> {
+    use futures_util::StreamExt;
+    use self::queue::Job;
+
+    let mut stream = queue.into_stream().await?;
+    while let Some(item) = stream.next().await.transpose()? {
+        let job = item.job();
+        let (source, target) = (
+            job.source.parse::<url::Url>().unwrap(),
+            job.target.parse::<url::Url>().unwrap()
+        );
+
+        todo!()
+    }
+    unreachable!()
+}
+
+async fn supervised_webmentions_task<Q: JobQueue<Webmention>>(queue: Q) {
+    supervisor::<Q::Error, _, _>(|| process_webmentions_from_queue(queue.clone())).await;
+}