use axum::{Form, response::{IntoResponse, Response}, Extension}; use axum::http::StatusCode; use self::queue::JobQueue; pub mod queue; #[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] #[cfg_attr(feature = "sqlx", derive(sqlx::FromRow))] pub struct Webmention { source: String, target: String, } async fn accept_webmention>( Form(webmention): Form, Extension(queue): Extension ) -> Response { if let Err(err) = webmention.source.parse::() { return (StatusCode::BAD_REQUEST, err.to_string()).into_response() } if let Err(err) = webmention.target.parse::() { return (StatusCode::BAD_REQUEST, err.to_string()).into_response() } match queue.put(&webmention).await { Ok(id) => (StatusCode::ACCEPTED, [ ("Location", format!("/.kittybox/webmention/{id}")) ]).into_response(), Err(err) => (StatusCode::INTERNAL_SERVER_ERROR, [ ("Content-Type", "text/plain") ], err.to_string()).into_response() } } pub fn router>(queue: Q) -> axum::Router { // Automatically spawn a background task to handle webmentions tokio::task::spawn(supervised_webmentions_task(queue.clone())); axum::Router::new() .route("/.kittybox/webmention", axum::routing::post(accept_webmention::) ) .layer(Extension(queue)) } async fn supervisor> + Send + 'static, F: FnMut() -> A>(mut f: F) -> std::convert::Infallible { loop { match tokio::task::spawn(f()).await { Err(e) => tracing::error!("background task exited unexpectedly: {}", e), Ok(Err(e)) => tracing::error!("background task returned error: {}", e), Ok(Ok(_)) => unreachable!("task's Ok is Infallible") } } } async fn process_webmentions_from_queue>(queue: Q) -> Result { use futures_util::StreamExt; use self::queue::Job; let mut stream = queue.into_stream().await?; while let Some(item) = stream.next().await.transpose()? { let job = item.job(); let (source, target) = ( job.source.parse::().unwrap(), job.target.parse::().unwrap() ); todo!() } unreachable!() } async fn supervised_webmentions_task>(queue: Q) { supervisor::(|| process_webmentions_from_queue(queue.clone())).await; }