use axum::{Form, response::{IntoResponse, Response}, Extension}; use axum::http::StatusCode; use self::queue::JobQueue; pub mod queue; #[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] #[cfg_attr(feature = "sqlx", derive(sqlx::FromRow))] pub struct Webmention { source: String, target: String, } impl queue::JobItem for Webmention {} impl queue::PostgresJobItem for Webmention { const DATABASE_NAME: &'static str = "kittybox.incoming_webmention_queue"; const NOTIFICATION_CHANNEL: &'static str = "incoming_webmention"; } async fn accept_webmention>( Extension(queue): Extension, Form(webmention): Form, ) -> Response { if let Err(err) = webmention.source.parse::() { return (StatusCode::BAD_REQUEST, err.to_string()).into_response() } if let Err(err) = webmention.target.parse::() { return (StatusCode::BAD_REQUEST, err.to_string()).into_response() } match queue.put(&webmention).await { Ok(id) => (StatusCode::ACCEPTED, [ ("Location", format!("/.kittybox/webmention/{id}")) ]).into_response(), Err(err) => (StatusCode::INTERNAL_SERVER_ERROR, [ ("Content-Type", "text/plain") ], err.to_string()).into_response() } } pub fn router>(queue: Q, cancellation_token: tokio_util::sync::CancellationToken) -> (axum::Router, SupervisedTask) { // Automatically spawn a background task to handle webmentions let bgtask_handle = supervised_webmentions_task(queue.clone(), cancellation_token); let router = axum::Router::new() .route("/.kittybox/webmention", axum::routing::post(accept_webmention::) ) .layer(Extension(queue)); (router, bgtask_handle) } #[derive(thiserror::Error, Debug)] pub enum SupervisorError { #[error("the task was explicitly cancelled")] Cancelled } pub type SupervisedTask = tokio::task::JoinHandle>; pub fn supervisor(mut f: F, cancellation_token: tokio_util::sync::CancellationToken) -> SupervisedTask where E: std::error::Error + std::fmt::Debug + Send + 'static, A: std::future::Future> + Send + 'static, F: FnMut() -> A + Send + 'static { let supervisor_future = async move { loop { // Don't spawn the task if we are already cancelled, but // have somehow missed it (probably because the task // crashed and we immediately received a cancellation // request after noticing the crashed task) if cancellation_token.is_cancelled() { return Err(SupervisorError::Cancelled) } let task = tokio::task::spawn(f()); tokio::select! { _ = cancellation_token.cancelled() => { tracing::info!("Shutdown of background task {:?} requested.", std::any::type_name::()); return Err(SupervisorError::Cancelled) } task_result = task => match task_result { Err(e) => tracing::error!("background task {:?} exited unexpectedly: {}", std::any::type_name::(), e), Ok(Err(e)) => tracing::error!("background task {:?} returned error: {}", std::any::type_name::(), e), Ok(Ok(_)) => unreachable!("task's Ok is Infallible") } } tracing::debug!("Sleeping for a little while to back-off..."); tokio::time::sleep(std::time::Duration::from_secs(5)).await; } }; #[cfg(not(tokio_unstable))] return tokio::task::spawn(supervisor_future); #[cfg(tokio_unstable)] return tokio::task::Builder::new() .name(format!("supervisor for background task {}", std::any::type_name::()).as_str()) .spawn(supervisor_future) .unwrap(); } async fn process_webmentions_from_queue>(queue: Q) -> Result { use futures_util::StreamExt; use self::queue::Job; let mut stream = queue.into_stream().await?; while let Some(item) = stream.next().await.transpose()? { let job = item.job(); let (source, target) = ( job.source.parse::().unwrap(), job.target.parse::().unwrap() ); todo!() } unreachable!() } fn supervised_webmentions_task>(queue: Q, cancellation_token: tokio_util::sync::CancellationToken) -> SupervisedTask { supervisor::(move || process_webmentions_from_queue(queue.clone()), cancellation_token) }