From d9e7fa9247925fc07f968efdcf831f3cb14539ef Mon Sep 17 00:00:00 2001 From: Vika Date: Sat, 1 Jul 2023 20:33:18 +0300 Subject: WIP: incoming webmention support --- kittybox-rs/src/webmentions/mod.rs | 75 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 kittybox-rs/src/webmentions/mod.rs diff --git a/kittybox-rs/src/webmentions/mod.rs b/kittybox-rs/src/webmentions/mod.rs new file mode 100644 index 0000000..1c4886e --- /dev/null +++ b/kittybox-rs/src/webmentions/mod.rs @@ -0,0 +1,75 @@ +use axum::{Form, response::{IntoResponse, Response}, Extension}; +use axum::http::StatusCode; + +use self::queue::JobQueue; +pub mod queue; + +#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[cfg_attr(feature = "sqlx", derive(sqlx::FromRow))] +pub struct Webmention { + source: String, + target: String, +} + +async fn accept_webmention>( + Form(webmention): Form, + Extension(queue): Extension +) -> Response { + if let Err(err) = webmention.source.parse::() { + return (StatusCode::BAD_REQUEST, err.to_string()).into_response() + } + if let Err(err) = webmention.target.parse::() { + return (StatusCode::BAD_REQUEST, err.to_string()).into_response() + } + + match queue.put(&webmention).await { + Ok(id) => (StatusCode::ACCEPTED, [ + ("Location", format!("/.kittybox/webmention/{id}")) + ]).into_response(), + Err(err) => (StatusCode::INTERNAL_SERVER_ERROR, [ + ("Content-Type", "text/plain") + ], err.to_string()).into_response() + } +} + +pub fn router>(queue: Q) -> axum::Router { + // Automatically spawn a background task to handle webmentions + tokio::task::spawn(supervised_webmentions_task(queue.clone())); + + axum::Router::new() + .route("/.kittybox/webmention", + axum::routing::post(accept_webmention::) + ) + .layer(Extension(queue)) +} + +async fn supervisor> + Send + 'static, F: FnMut() -> A>(mut f: F) -> std::convert::Infallible { + loop { + match tokio::task::spawn(f()).await { + Err(e) => tracing::error!("background task exited unexpectedly: {}", e), + Ok(Err(e)) => tracing::error!("background task returned error: {}", e), + Ok(Ok(_)) => unreachable!("task's Ok is Infallible") + } + } +} + +async fn process_webmentions_from_queue>(queue: Q) -> Result { + use futures_util::StreamExt; + use self::queue::Job; + + let mut stream = queue.into_stream().await?; + while let Some(item) = stream.next().await.transpose()? { + let job = item.job(); + let (source, target) = ( + job.source.parse::().unwrap(), + job.target.parse::().unwrap() + ); + + todo!() + } + unreachable!() +} + +async fn supervised_webmentions_task>(queue: Q) { + supervisor::(|| process_webmentions_from_queue(queue.clone())).await; +} -- cgit 1.4.1