From b9e66068484ea4111cd4adf63abc1afdac056b22 Mon Sep 17 00:00:00 2001 From: Vika Date: Sat, 22 Jul 2023 12:32:36 +0300 Subject: Mount webmention handling routes and tasks --- kittybox-rs/dev.sh | 1 + kittybox-rs/src/lib.rs | 1 + kittybox-rs/src/main.rs | 63 ++++++++++++++++++++++++++++++++++++------------- 3 files changed, 48 insertions(+), 17 deletions(-) (limited to 'kittybox-rs') diff --git a/kittybox-rs/dev.sh b/kittybox-rs/dev.sh index 65d6143..52933ce 100755 --- a/kittybox-rs/dev.sh +++ b/kittybox-rs/dev.sh @@ -2,6 +2,7 @@ export RUST_LOG="kittybox=debug,retainer::cache=warn,h2=info,rustls=info,tokio=info,tower_http::trace=debug,sqlx=trace" #export BACKEND_URI=file://./test-dir export BACKEND_URI="postgres://localhost?dbname=kittybox&host=/run/postgresql" +export JOB_QUEUE_URI="postgres://localhost?dbname=kittybox&host=/run/postgresql" export BLOBSTORE_URI=file://./media-store export AUTH_STORE_URI=file://./auth-store export COOKIE_SECRET=1234567890abcdefghijklmnopqrstuvwxyz diff --git a/kittybox-rs/src/lib.rs b/kittybox-rs/src/lib.rs index 3d5638a..c1bd965 100644 --- a/kittybox-rs/src/lib.rs +++ b/kittybox-rs/src/lib.rs @@ -7,6 +7,7 @@ pub mod frontend; pub mod media; pub mod micropub; pub mod indieauth; +pub mod webmentions; pub mod companion { use std::{collections::HashMap, sync::Arc}; diff --git a/kittybox-rs/src/main.rs b/kittybox-rs/src/main.rs index 7c6ddb6..8c32556 100644 --- a/kittybox-rs/src/main.rs +++ b/kittybox-rs/src/main.rs @@ -21,8 +21,10 @@ async fn compose_kittybox_with_auth( auth_backend: A, backend_uri: &str, blobstore_uri: &str, - jobset: &Arc>> -) -> axum::Router + job_queue_uri: &str, + jobset: &Arc>>, + cancellation_token: &tokio_util::sync::CancellationToken +) -> (axum::Router, kittybox::webmentions::SupervisedTask) where A: kittybox::indieauth::backend::AuthBackend { match backend_uri.split_once(':').unwrap().0 { @@ -66,18 +68,29 @@ where A: kittybox::indieauth::backend::AuthBackend database.clone(), http.clone(), Arc::clone(jobset) ); - axum::Router::new() + + let (webmention, task) = kittybox::webmentions::router( + kittybox::webmentions::queue::PostgresJobQueue::new(job_queue_uri).await.unwrap(), + database.clone(), + http.clone(), + cancellation_token.clone() + ); + + let router = axum::Router::new() .route("/", homepage) .fallback(fallback) .route("/.kittybox/micropub", micropub) .route("/.kittybox/onboarding", onboarding) .nest("/.kittybox/media", init_media(auth_backend.clone(), blobstore_uri)) .merge(kittybox::indieauth::router(auth_backend.clone(), database.clone(), http.clone())) + .merge(webmention) .route( "/.kittybox/health", axum::routing::get(health_check::) .layer(axum::Extension(database)) - ) + ); + + (router, task) }, "redis" => unimplemented!("Redis backend is not supported."), #[cfg(feature = "postgres")] @@ -120,6 +133,13 @@ where A: kittybox::indieauth::backend::AuthBackend database.clone(), http.clone(), Arc::clone(jobset) ); + let (webmention, task) = kittybox::webmentions::router( + kittybox::webmentions::queue::PostgresJobQueue::new(job_queue_uri).await.unwrap(), + database.clone(), + http.clone(), + cancellation_token.clone() + ); + let router = axum::Router::new() .route("/", homepage) .fallback(fallback) @@ -127,13 +147,14 @@ where A: kittybox::indieauth::backend::AuthBackend .route("/.kittybox/onboarding", onboarding) .nest("/.kittybox/media", init_media(auth_backend.clone(), blobstore_uri)) .merge(kittybox::indieauth::router(auth_backend.clone(), database.clone(), http.clone())) + .merge(webmention) .route( "/.kittybox/health", axum::routing::get(health_check::) .layer(axum::Extension(database)) ); - router + (router, task) }, other => unimplemented!("Unsupported backend: {other}") } @@ -143,9 +164,10 @@ async fn compose_kittybox( backend_uri: &str, blobstore_uri: &str, authstore_uri: &str, + job_queue_uri: &str, jobset: &Arc>>, cancellation_token: &tokio_util::sync::CancellationToken -) -> axum::Router { +) -> (axum::Router, kittybox::webmentions::SupervisedTask) { let http: reqwest::Client = { #[allow(unused_mut)] let mut builder = reqwest::Client::builder() @@ -185,7 +207,7 @@ async fn compose_kittybox( builder.build().unwrap() }; - let router = match authstore_uri.split_once(':').unwrap().0 { + let (router, task) = match authstore_uri.split_once(':').unwrap().0 { "file" => { let auth_backend = { let folder = authstore_uri @@ -194,12 +216,12 @@ async fn compose_kittybox( kittybox::indieauth::backend::fs::FileBackend::new(folder) }; - compose_kittybox_with_auth(http, auth_backend, backend_uri, blobstore_uri, jobset).await + compose_kittybox_with_auth(http, auth_backend, backend_uri, blobstore_uri, job_queue_uri, jobset, cancellation_token).await } other => unimplemented!("Unsupported backend: {other}") }; - router + let router = router .route( "/.kittybox/static/:path", axum::routing::get(kittybox::frontend::statics) @@ -207,7 +229,9 @@ async fn compose_kittybox( .route("/.kittybox/coffee", teapot_route()) .nest("/.kittybox/micropub/client", kittybox::companion::router()) .layer(tower_http::trace::TraceLayer::new_for_http()) - .layer(tower_http::catch_panic::CatchPanicLayer::new()) + .layer(tower_http::catch_panic::CatchPanicLayer::new()); + + (router, task) } fn teapot_route() -> axum::routing::MethodRouter { @@ -265,16 +289,20 @@ async fn main() { std::process::exit(1); }); + let job_queue_uri: String = env::var("JOB_QUEUE_URI") + .unwrap_or_else(|_| { + error!("JOB_QUEUE_URI is not set, can't find job queue"); std::process::exit(1); }); let cancellation_token = tokio_util::sync::CancellationToken::new(); let jobset = Arc::new(tokio::sync::Mutex::new(tokio::task::JoinSet::new())); - let router = compose_kittybox( + let (router, webmentions_task) = compose_kittybox( backend_uri.as_str(), blobstore_uri.as_str(), authstore_uri.as_str(), + job_queue_uri.as_str(), &jobset, &cancellation_token ).await; @@ -402,24 +430,25 @@ async fn main() { tracing::error!("Error in HTTP server: {}", e); tracing::error!("Shutting down because of error."); cancellation_token.cancel(); - let _ = Box::pin(futures_util::future::join_all( - servers_futures.iter_mut().collect::>() - )).await; 1 } _ = shutdown_signal => { tracing::info!("Shutdown requested by signal."); cancellation_token.cancel(); - let _ = Box::pin(futures_util::future::join_all( - servers_futures.iter_mut().collect::>() - )).await; 0 } }; tracing::info!("Waiting for unfinished background tasks..."); + + let _ = tokio::join!( + webmentions_task, + Box::pin(futures_util::future::join_all( + servers_futures.iter_mut().collect::>() + )), + ); let mut jobset: tokio::task::JoinSet<()> = Arc::try_unwrap(jobset) .expect("Dangling jobset references present") .into_inner(); -- cgit 1.4.1