diff options
author | Vika <vika@fireburn.ru> | 2023-07-01 20:33:18 +0300 |
---|---|---|
committer | Vika <vika@fireburn.ru> | 2023-07-09 01:42:47 +0300 |
commit | d9e7fa9247925fc07f968efdcf831f3cb14539ef (patch) | |
tree | 1c784aa537e59dc53c79f22b73f5ab4736222bcd /kittybox-rs/src/webmentions | |
parent | a23a0438847a8ae8409edf6445ea4f43990e249c (diff) | |
download | kittybox-d9e7fa9247925fc07f968efdcf831f3cb14539ef.tar.zst |
WIP: incoming webmention support
Diffstat (limited to 'kittybox-rs/src/webmentions')
-rw-r--r-- | kittybox-rs/src/webmentions/mod.rs | 75 |
1 files changed, 75 insertions, 0 deletions
diff --git a/kittybox-rs/src/webmentions/mod.rs b/kittybox-rs/src/webmentions/mod.rs new file mode 100644 index 0000000..1c4886e --- /dev/null +++ b/kittybox-rs/src/webmentions/mod.rs @@ -0,0 +1,75 @@ +use axum::{Form, response::{IntoResponse, Response}, Extension}; +use axum::http::StatusCode; + +use self::queue::JobQueue; +pub mod queue; + +#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[cfg_attr(feature = "sqlx", derive(sqlx::FromRow))] +pub struct Webmention { + source: String, + target: String, +} + +async fn accept_webmention<Q: JobQueue<Webmention>>( + Form(webmention): Form<Webmention>, + Extension(queue): Extension<Q> +) -> Response { + if let Err(err) = webmention.source.parse::<url::Url>() { + return (StatusCode::BAD_REQUEST, err.to_string()).into_response() + } + if let Err(err) = webmention.target.parse::<url::Url>() { + return (StatusCode::BAD_REQUEST, err.to_string()).into_response() + } + + match queue.put(&webmention).await { + Ok(id) => (StatusCode::ACCEPTED, [ + ("Location", format!("/.kittybox/webmention/{id}")) + ]).into_response(), + Err(err) => (StatusCode::INTERNAL_SERVER_ERROR, [ + ("Content-Type", "text/plain") + ], err.to_string()).into_response() + } +} + +pub fn router<Q: JobQueue<Webmention>>(queue: Q) -> axum::Router { + // Automatically spawn a background task to handle webmentions + tokio::task::spawn(supervised_webmentions_task(queue.clone())); + + axum::Router::new() + .route("/.kittybox/webmention", + axum::routing::post(accept_webmention::<Q>) + ) + .layer(Extension(queue)) +} + +async fn supervisor<E: std::error::Error + std::fmt::Debug + Send + 'static, A: futures_util::Future<Output = Result<std::convert::Infallible, E>> + Send + 'static, F: FnMut() -> A>(mut f: F) -> std::convert::Infallible { + loop { + match tokio::task::spawn(f()).await { + Err(e) => tracing::error!("background task exited unexpectedly: {}", e), + Ok(Err(e)) => tracing::error!("background task returned error: {}", e), + Ok(Ok(_)) => unreachable!("task's Ok is Infallible") + } + } +} + +async fn process_webmentions_from_queue<Q: JobQueue<Webmention>>(queue: Q) -> Result<std::convert::Infallible, Q::Error> { + use futures_util::StreamExt; + use self::queue::Job; + + let mut stream = queue.into_stream().await?; + while let Some(item) = stream.next().await.transpose()? { + let job = item.job(); + let (source, target) = ( + job.source.parse::<url::Url>().unwrap(), + job.target.parse::<url::Url>().unwrap() + ); + + todo!() + } + unreachable!() +} + +async fn supervised_webmentions_task<Q: JobQueue<Webmention>>(queue: Q) { + supervisor::<Q::Error, _, _>(|| process_webmentions_from_queue(queue.clone())).await; +} |