diff options
author | Vika <vika@fireburn.ru> | 2023-07-22 12:32:36 +0300 |
---|---|---|
committer | Vika <vika@fireburn.ru> | 2023-07-22 12:34:56 +0300 |
commit | b9e66068484ea4111cd4adf63abc1afdac056b22 (patch) | |
tree | fac3a9311be580079260a690ec5f33a15b71833f /kittybox-rs/src/main.rs | |
parent | 3c8122e54cdba2c635e723eb33f1d690f05767a7 (diff) | |
download | kittybox-b9e66068484ea4111cd4adf63abc1afdac056b22.tar.zst |
Mount webmention handling routes and tasks
Diffstat (limited to 'kittybox-rs/src/main.rs')
-rw-r--r-- | kittybox-rs/src/main.rs | 63 |
1 files changed, 46 insertions, 17 deletions
diff --git a/kittybox-rs/src/main.rs b/kittybox-rs/src/main.rs index 7c6ddb6..8c32556 100644 --- a/kittybox-rs/src/main.rs +++ b/kittybox-rs/src/main.rs @@ -21,8 +21,10 @@ async fn compose_kittybox_with_auth<A>( auth_backend: A, backend_uri: &str, blobstore_uri: &str, - jobset: &Arc<tokio::sync::Mutex<tokio::task::JoinSet<()>>> -) -> axum::Router + job_queue_uri: &str, + jobset: &Arc<tokio::sync::Mutex<tokio::task::JoinSet<()>>>, + cancellation_token: &tokio_util::sync::CancellationToken +) -> (axum::Router, kittybox::webmentions::SupervisedTask) where A: kittybox::indieauth::backend::AuthBackend { match backend_uri.split_once(':').unwrap().0 { @@ -66,18 +68,29 @@ where A: kittybox::indieauth::backend::AuthBackend database.clone(), http.clone(), Arc::clone(jobset) ); - axum::Router::new() + + let (webmention, task) = kittybox::webmentions::router( + kittybox::webmentions::queue::PostgresJobQueue::new(job_queue_uri).await.unwrap(), + database.clone(), + http.clone(), + cancellation_token.clone() + ); + + let router = axum::Router::new() .route("/", homepage) .fallback(fallback) .route("/.kittybox/micropub", micropub) .route("/.kittybox/onboarding", onboarding) .nest("/.kittybox/media", init_media(auth_backend.clone(), blobstore_uri)) .merge(kittybox::indieauth::router(auth_backend.clone(), database.clone(), http.clone())) + .merge(webmention) .route( "/.kittybox/health", axum::routing::get(health_check::<kittybox::database::FileStorage>) .layer(axum::Extension(database)) - ) + ); + + (router, task) }, "redis" => unimplemented!("Redis backend is not supported."), #[cfg(feature = "postgres")] @@ -120,6 +133,13 @@ where A: kittybox::indieauth::backend::AuthBackend database.clone(), http.clone(), Arc::clone(jobset) ); + let (webmention, task) = kittybox::webmentions::router( + kittybox::webmentions::queue::PostgresJobQueue::new(job_queue_uri).await.unwrap(), + database.clone(), + http.clone(), + cancellation_token.clone() + ); + let router = axum::Router::new() .route("/", homepage) .fallback(fallback) @@ -127,13 +147,14 @@ where A: kittybox::indieauth::backend::AuthBackend .route("/.kittybox/onboarding", onboarding) .nest("/.kittybox/media", init_media(auth_backend.clone(), blobstore_uri)) .merge(kittybox::indieauth::router(auth_backend.clone(), database.clone(), http.clone())) + .merge(webmention) .route( "/.kittybox/health", axum::routing::get(health_check::<kittybox::database::PostgresStorage>) .layer(axum::Extension(database)) ); - router + (router, task) }, other => unimplemented!("Unsupported backend: {other}") } @@ -143,9 +164,10 @@ async fn compose_kittybox( backend_uri: &str, blobstore_uri: &str, authstore_uri: &str, + job_queue_uri: &str, jobset: &Arc<tokio::sync::Mutex<tokio::task::JoinSet<()>>>, cancellation_token: &tokio_util::sync::CancellationToken -) -> axum::Router { +) -> (axum::Router, kittybox::webmentions::SupervisedTask) { let http: reqwest::Client = { #[allow(unused_mut)] let mut builder = reqwest::Client::builder() @@ -185,7 +207,7 @@ async fn compose_kittybox( builder.build().unwrap() }; - let router = match authstore_uri.split_once(':').unwrap().0 { + let (router, task) = match authstore_uri.split_once(':').unwrap().0 { "file" => { let auth_backend = { let folder = authstore_uri @@ -194,12 +216,12 @@ async fn compose_kittybox( kittybox::indieauth::backend::fs::FileBackend::new(folder) }; - compose_kittybox_with_auth(http, auth_backend, backend_uri, blobstore_uri, jobset).await + compose_kittybox_with_auth(http, auth_backend, backend_uri, blobstore_uri, job_queue_uri, jobset, cancellation_token).await } other => unimplemented!("Unsupported backend: {other}") }; - router + let router = router .route( "/.kittybox/static/:path", axum::routing::get(kittybox::frontend::statics) @@ -207,7 +229,9 @@ async fn compose_kittybox( .route("/.kittybox/coffee", teapot_route()) .nest("/.kittybox/micropub/client", kittybox::companion::router()) .layer(tower_http::trace::TraceLayer::new_for_http()) - .layer(tower_http::catch_panic::CatchPanicLayer::new()) + .layer(tower_http::catch_panic::CatchPanicLayer::new()); + + (router, task) } fn teapot_route() -> axum::routing::MethodRouter { @@ -265,16 +289,20 @@ async fn main() { std::process::exit(1); }); + let job_queue_uri: String = env::var("JOB_QUEUE_URI") + .unwrap_or_else(|_| { + error!("JOB_QUEUE_URI is not set, can't find job queue"); std::process::exit(1); }); let cancellation_token = tokio_util::sync::CancellationToken::new(); let jobset = Arc::new(tokio::sync::Mutex::new(tokio::task::JoinSet::new())); - let router = compose_kittybox( + let (router, webmentions_task) = compose_kittybox( backend_uri.as_str(), blobstore_uri.as_str(), authstore_uri.as_str(), + job_queue_uri.as_str(), &jobset, &cancellation_token ).await; @@ -402,24 +430,25 @@ async fn main() { tracing::error!("Error in HTTP server: {}", e); tracing::error!("Shutting down because of error."); cancellation_token.cancel(); - let _ = Box::pin(futures_util::future::join_all( - servers_futures.iter_mut().collect::<Vec<_>>() - )).await; 1 } _ = shutdown_signal => { tracing::info!("Shutdown requested by signal."); cancellation_token.cancel(); - let _ = Box::pin(futures_util::future::join_all( - servers_futures.iter_mut().collect::<Vec<_>>() - )).await; 0 } }; tracing::info!("Waiting for unfinished background tasks..."); + + let _ = tokio::join!( + webmentions_task, + Box::pin(futures_util::future::join_all( + servers_futures.iter_mut().collect::<Vec<_>>() + )), + ); let mut jobset: tokio::task::JoinSet<()> = Arc::try_unwrap(jobset) .expect("Dangling jobset references present") .into_inner(); |