about summary refs log tree commit diff
path: root/kittybox-rs/src/main.rs
diff options
context:
space:
mode:
authorVika <vika@fireburn.ru>2023-07-22 12:32:36 +0300
committerVika <vika@fireburn.ru>2023-07-22 12:34:56 +0300
commitb9e66068484ea4111cd4adf63abc1afdac056b22 (patch)
treefac3a9311be580079260a690ec5f33a15b71833f /kittybox-rs/src/main.rs
parent3c8122e54cdba2c635e723eb33f1d690f05767a7 (diff)
Mount webmention handling routes and tasks
Diffstat (limited to 'kittybox-rs/src/main.rs')
-rw-r--r--kittybox-rs/src/main.rs63
1 files changed, 46 insertions, 17 deletions
diff --git a/kittybox-rs/src/main.rs b/kittybox-rs/src/main.rs
index 7c6ddb6..8c32556 100644
--- a/kittybox-rs/src/main.rs
+++ b/kittybox-rs/src/main.rs
@@ -21,8 +21,10 @@ async fn compose_kittybox_with_auth<A>(
     auth_backend: A,
     backend_uri: &str,
     blobstore_uri: &str,
-    jobset: &Arc<tokio::sync::Mutex<tokio::task::JoinSet<()>>>
-) -> axum::Router
+    job_queue_uri: &str,
+    jobset: &Arc<tokio::sync::Mutex<tokio::task::JoinSet<()>>>,
+    cancellation_token: &tokio_util::sync::CancellationToken
+) -> (axum::Router, kittybox::webmentions::SupervisedTask)
 where A: kittybox::indieauth::backend::AuthBackend
 {
     match backend_uri.split_once(':').unwrap().0 {
@@ -66,18 +68,29 @@ where A: kittybox::indieauth::backend::AuthBackend
                 database.clone(), http.clone(), Arc::clone(jobset)
             );
 
-            axum::Router::new()
+
+            let (webmention, task) = kittybox::webmentions::router(
+                kittybox::webmentions::queue::PostgresJobQueue::new(job_queue_uri).await.unwrap(),
+                database.clone(),
+                http.clone(),
+                cancellation_token.clone()
+            );
+
+            let router = axum::Router::new()
                 .route("/", homepage)
                 .fallback(fallback)
                 .route("/.kittybox/micropub", micropub)
                 .route("/.kittybox/onboarding", onboarding)
                 .nest("/.kittybox/media", init_media(auth_backend.clone(), blobstore_uri))
                 .merge(kittybox::indieauth::router(auth_backend.clone(), database.clone(), http.clone()))
+                .merge(webmention)
                 .route(
                     "/.kittybox/health",
                     axum::routing::get(health_check::<kittybox::database::FileStorage>)
                         .layer(axum::Extension(database))
-                )
+                );
+
+            (router, task)
         },
         "redis" => unimplemented!("Redis backend is not supported."),
         #[cfg(feature = "postgres")]
@@ -120,6 +133,13 @@ where A: kittybox::indieauth::backend::AuthBackend
                 database.clone(), http.clone(), Arc::clone(jobset)
             );
 
+            let (webmention, task) = kittybox::webmentions::router(
+                kittybox::webmentions::queue::PostgresJobQueue::new(job_queue_uri).await.unwrap(),
+                database.clone(),
+                http.clone(),
+                cancellation_token.clone()
+            );
+
             let router = axum::Router::new()
                 .route("/", homepage)
                 .fallback(fallback)
@@ -127,13 +147,14 @@ where A: kittybox::indieauth::backend::AuthBackend
                 .route("/.kittybox/onboarding", onboarding)
                 .nest("/.kittybox/media", init_media(auth_backend.clone(), blobstore_uri))
                 .merge(kittybox::indieauth::router(auth_backend.clone(), database.clone(), http.clone()))
+                .merge(webmention)
                 .route(
                     "/.kittybox/health",
                     axum::routing::get(health_check::<kittybox::database::PostgresStorage>)
                         .layer(axum::Extension(database))
                 );
 
-            router
+            (router, task)
         },
         other => unimplemented!("Unsupported backend: {other}")
     }
@@ -143,9 +164,10 @@ async fn compose_kittybox(
     backend_uri: &str,
     blobstore_uri: &str,
     authstore_uri: &str,
+    job_queue_uri: &str,
     jobset: &Arc<tokio::sync::Mutex<tokio::task::JoinSet<()>>>,
     cancellation_token: &tokio_util::sync::CancellationToken
-) -> axum::Router {
+) -> (axum::Router, kittybox::webmentions::SupervisedTask) {
     let http: reqwest::Client = {
         #[allow(unused_mut)]
         let mut builder = reqwest::Client::builder()
@@ -185,7 +207,7 @@ async fn compose_kittybox(
         builder.build().unwrap()
     };
 
-    let router = match authstore_uri.split_once(':').unwrap().0 {
+    let (router, task) = match authstore_uri.split_once(':').unwrap().0 {
         "file" => {
             let auth_backend = {
                 let folder = authstore_uri
@@ -194,12 +216,12 @@ async fn compose_kittybox(
                 kittybox::indieauth::backend::fs::FileBackend::new(folder)
             };
 
-            compose_kittybox_with_auth(http, auth_backend, backend_uri, blobstore_uri, jobset).await
+            compose_kittybox_with_auth(http, auth_backend, backend_uri, blobstore_uri, job_queue_uri, jobset, cancellation_token).await
         }
         other => unimplemented!("Unsupported backend: {other}")
     };
 
-    router
+    let router = router
         .route(
             "/.kittybox/static/:path",
             axum::routing::get(kittybox::frontend::statics)
@@ -207,7 +229,9 @@ async fn compose_kittybox(
         .route("/.kittybox/coffee", teapot_route())
         .nest("/.kittybox/micropub/client", kittybox::companion::router())
         .layer(tower_http::trace::TraceLayer::new_for_http())
-        .layer(tower_http::catch_panic::CatchPanicLayer::new())
+        .layer(tower_http::catch_panic::CatchPanicLayer::new());
+
+    (router, task)
 }
 
 fn teapot_route() -> axum::routing::MethodRouter {
@@ -265,16 +289,20 @@ async fn main() {
             std::process::exit(1);
         });
 
+    let job_queue_uri: String = env::var("JOB_QUEUE_URI")
+        .unwrap_or_else(|_| {
+            error!("JOB_QUEUE_URI is not set, can't find job queue");
             std::process::exit(1);
         });
 
     let cancellation_token = tokio_util::sync::CancellationToken::new();
     let jobset = Arc::new(tokio::sync::Mutex::new(tokio::task::JoinSet::new()));
 
-    let router = compose_kittybox(
+    let (router, webmentions_task) = compose_kittybox(
         backend_uri.as_str(),
         blobstore_uri.as_str(),
         authstore_uri.as_str(),
+        job_queue_uri.as_str(),
         &jobset,
         &cancellation_token
     ).await;
@@ -402,24 +430,25 @@ async fn main() {
             tracing::error!("Error in HTTP server: {}", e);
             tracing::error!("Shutting down because of error.");
             cancellation_token.cancel();
-            let _ = Box::pin(futures_util::future::join_all(
-                servers_futures.iter_mut().collect::<Vec<_>>()
-            )).await;
 
             1
         }
         _ = shutdown_signal => {
             tracing::info!("Shutdown requested by signal.");
             cancellation_token.cancel();
-            let _ = Box::pin(futures_util::future::join_all(
-                servers_futures.iter_mut().collect::<Vec<_>>()
-            )).await;
 
             0
         }
     };
 
     tracing::info!("Waiting for unfinished background tasks...");
+
+    let _ = tokio::join!(
+        webmentions_task,
+        Box::pin(futures_util::future::join_all(
+            servers_futures.iter_mut().collect::<Vec<_>>()
+        )),
+    );
     let mut jobset: tokio::task::JoinSet<()> = Arc::try_unwrap(jobset)
         .expect("Dangling jobset references present")
         .into_inner();