about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--configuration.nix33
-rwxr-xr-xkittybox-rs/dev.sh1
-rw-r--r--kittybox-rs/src/lib.rs1
-rw-r--r--kittybox-rs/src/main.rs63
-rw-r--r--smoke-test.nix13
5 files changed, 76 insertions, 35 deletions
diff --git a/configuration.nix b/configuration.nix
index e86f4b7..bdd353a 100644
--- a/configuration.nix
+++ b/configuration.nix
@@ -76,6 +76,16 @@ in {
              credentials are stored per-site.
         '';
       };
+      jobQueueUri = mkOption {
+        type = types.nullOr types.str;
+        default = "postgres://localhost/kittybox?host=/run/postgresql";
+        description = lib.mdDoc ''
+          Set the job queue backend. Available options are:
+           - `postgres://` - PostgreSQL based job queue. It shares the schema
+             with the Kittybox PostgreSQL backend, so Kittybox can reuse the
+             same database for both.
+        '';
+      };
       microsubServer = mkOption {
         type = types.nullOr types.str;
         default = null;
@@ -86,20 +96,6 @@ in {
           https://aperture.p3k.io/ if you don't have one yet!
         '';
       };
-      webmentionEndpoint = mkOption {
-        type = types.nullOr types.str;
-        default = null;
-        example = "https://webmention.io/example.com/webmention";
-        description = ''
-          The URL of your webmention endpoint, which allows you to
-          receive notifications about your site's content being featured
-          or interacted with elsewhere on the IndieWeb.
-
-          By default Kittybox expects the Webmention endpoint to post
-          updates using an internal token. kittybox-webmention is an
-          endpoint capable of that.
-        '';
-      };
       internalTokenFile = mkOption {
         type = types.nullOr types.str;
         default = null;
@@ -110,14 +106,14 @@ in {
         type = types.str;
         default = "/var/lib/kittybox/cookie_secret_key";
         example = "/run/secrets/kittybox-cookie-secret";
-        description = "A secret file to encrypt cookies with the contents of. Should be at least 32 bytes in length. A random persistent file will be generated if this variable is left untouched.";
+        description = "A secret file to encrypt cookies with the contents of. Should be at least 32 bytes in length. A random persistent file will be generated if it doesn't exist or is invalid.";
       };
     };
   };
   config = lib.mkIf cfg.enable {
     assertions = [
       {
-        assertion = lib.strings.hasPrefix cfg.backendUri "postgres://" -> cfg.package.hasPostgres;
+        assertion = (lib.strings.hasPrefix cfg.backendUri "postgres://" || lib.strings.hasPrefix cfg.jobQueueUri "postgres://") -> cfg.package.hasPostgres;
         message = "To use the Postgres backend, Kittybox has to be compiled with Postgres support enabled.";
       }
     ];
@@ -143,7 +139,8 @@ in {
 
       restartTriggers = [
         cfg.package
-        cfg.backendUri cfg.blobstoreUri cfg.authstoreUri
+        cfg.backendUri cfg.blobstoreUri
+        cfg.authstoreUri cfg.jobQueueUri
         cfg.internalTokenFile
         cfg.bind cfg.port
         cfg.cookieSecretFile
@@ -151,10 +148,10 @@ in {
 
       environment = {
         MICROSUB_ENDPOINT = cfg.microsubServer;
-        WEBMENTION_ENDPOINT = cfg.webmentionEndpoint;
         BACKEND_URI = cfg.backendUri;
         BLOBSTORE_URI = cfg.blobstoreUri;
         AUTH_STORE_URI = cfg.authstoreUri;
+        JOB_QUEUE_URI = cfg.jobQueueUri;
         RUST_LOG = "${cfg.logLevel}";
         COOKIE_SECRET_FILE = "${cfg.cookieSecretFile}";
       };
diff --git a/kittybox-rs/dev.sh b/kittybox-rs/dev.sh
index 65d6143..52933ce 100755
--- a/kittybox-rs/dev.sh
+++ b/kittybox-rs/dev.sh
@@ -2,6 +2,7 @@
 export RUST_LOG="kittybox=debug,retainer::cache=warn,h2=info,rustls=info,tokio=info,tower_http::trace=debug,sqlx=trace"
 #export BACKEND_URI=file://./test-dir
 export BACKEND_URI="postgres://localhost?dbname=kittybox&host=/run/postgresql"
+export JOB_QUEUE_URI="postgres://localhost?dbname=kittybox&host=/run/postgresql"
 export BLOBSTORE_URI=file://./media-store
 export AUTH_STORE_URI=file://./auth-store
 export COOKIE_SECRET=1234567890abcdefghijklmnopqrstuvwxyz
diff --git a/kittybox-rs/src/lib.rs b/kittybox-rs/src/lib.rs
index 3d5638a..c1bd965 100644
--- a/kittybox-rs/src/lib.rs
+++ b/kittybox-rs/src/lib.rs
@@ -7,6 +7,7 @@ pub mod frontend;
 pub mod media;
 pub mod micropub;
 pub mod indieauth;
+pub mod webmentions;
 
 pub mod companion {
     use std::{collections::HashMap, sync::Arc};
diff --git a/kittybox-rs/src/main.rs b/kittybox-rs/src/main.rs
index 7c6ddb6..8c32556 100644
--- a/kittybox-rs/src/main.rs
+++ b/kittybox-rs/src/main.rs
@@ -21,8 +21,10 @@ async fn compose_kittybox_with_auth<A>(
     auth_backend: A,
     backend_uri: &str,
     blobstore_uri: &str,
-    jobset: &Arc<tokio::sync::Mutex<tokio::task::JoinSet<()>>>
-) -> axum::Router
+    job_queue_uri: &str,
+    jobset: &Arc<tokio::sync::Mutex<tokio::task::JoinSet<()>>>,
+    cancellation_token: &tokio_util::sync::CancellationToken
+) -> (axum::Router, kittybox::webmentions::SupervisedTask)
 where A: kittybox::indieauth::backend::AuthBackend
 {
     match backend_uri.split_once(':').unwrap().0 {
@@ -66,18 +68,29 @@ where A: kittybox::indieauth::backend::AuthBackend
                 database.clone(), http.clone(), Arc::clone(jobset)
             );
 
-            axum::Router::new()
+
+            let (webmention, task) = kittybox::webmentions::router(
+                kittybox::webmentions::queue::PostgresJobQueue::new(job_queue_uri).await.unwrap(),
+                database.clone(),
+                http.clone(),
+                cancellation_token.clone()
+            );
+
+            let router = axum::Router::new()
                 .route("/", homepage)
                 .fallback(fallback)
                 .route("/.kittybox/micropub", micropub)
                 .route("/.kittybox/onboarding", onboarding)
                 .nest("/.kittybox/media", init_media(auth_backend.clone(), blobstore_uri))
                 .merge(kittybox::indieauth::router(auth_backend.clone(), database.clone(), http.clone()))
+                .merge(webmention)
                 .route(
                     "/.kittybox/health",
                     axum::routing::get(health_check::<kittybox::database::FileStorage>)
                         .layer(axum::Extension(database))
-                )
+                );
+
+            (router, task)
         },
         "redis" => unimplemented!("Redis backend is not supported."),
         #[cfg(feature = "postgres")]
@@ -120,6 +133,13 @@ where A: kittybox::indieauth::backend::AuthBackend
                 database.clone(), http.clone(), Arc::clone(jobset)
             );
 
+            let (webmention, task) = kittybox::webmentions::router(
+                kittybox::webmentions::queue::PostgresJobQueue::new(job_queue_uri).await.unwrap(),
+                database.clone(),
+                http.clone(),
+                cancellation_token.clone()
+            );
+
             let router = axum::Router::new()
                 .route("/", homepage)
                 .fallback(fallback)
@@ -127,13 +147,14 @@ where A: kittybox::indieauth::backend::AuthBackend
                 .route("/.kittybox/onboarding", onboarding)
                 .nest("/.kittybox/media", init_media(auth_backend.clone(), blobstore_uri))
                 .merge(kittybox::indieauth::router(auth_backend.clone(), database.clone(), http.clone()))
+                .merge(webmention)
                 .route(
                     "/.kittybox/health",
                     axum::routing::get(health_check::<kittybox::database::PostgresStorage>)
                         .layer(axum::Extension(database))
                 );
 
-            router
+            (router, task)
         },
         other => unimplemented!("Unsupported backend: {other}")
     }
@@ -143,9 +164,10 @@ async fn compose_kittybox(
     backend_uri: &str,
     blobstore_uri: &str,
     authstore_uri: &str,
+    job_queue_uri: &str,
     jobset: &Arc<tokio::sync::Mutex<tokio::task::JoinSet<()>>>,
     cancellation_token: &tokio_util::sync::CancellationToken
-) -> axum::Router {
+) -> (axum::Router, kittybox::webmentions::SupervisedTask) {
     let http: reqwest::Client = {
         #[allow(unused_mut)]
         let mut builder = reqwest::Client::builder()
@@ -185,7 +207,7 @@ async fn compose_kittybox(
         builder.build().unwrap()
     };
 
-    let router = match authstore_uri.split_once(':').unwrap().0 {
+    let (router, task) = match authstore_uri.split_once(':').unwrap().0 {
         "file" => {
             let auth_backend = {
                 let folder = authstore_uri
@@ -194,12 +216,12 @@ async fn compose_kittybox(
                 kittybox::indieauth::backend::fs::FileBackend::new(folder)
             };
 
-            compose_kittybox_with_auth(http, auth_backend, backend_uri, blobstore_uri, jobset).await
+            compose_kittybox_with_auth(http, auth_backend, backend_uri, blobstore_uri, job_queue_uri, jobset, cancellation_token).await
         }
         other => unimplemented!("Unsupported backend: {other}")
     };
 
-    router
+    let router = router
         .route(
             "/.kittybox/static/:path",
             axum::routing::get(kittybox::frontend::statics)
@@ -207,7 +229,9 @@ async fn compose_kittybox(
         .route("/.kittybox/coffee", teapot_route())
         .nest("/.kittybox/micropub/client", kittybox::companion::router())
         .layer(tower_http::trace::TraceLayer::new_for_http())
-        .layer(tower_http::catch_panic::CatchPanicLayer::new())
+        .layer(tower_http::catch_panic::CatchPanicLayer::new());
+
+    (router, task)
 }
 
 fn teapot_route() -> axum::routing::MethodRouter {
@@ -265,16 +289,20 @@ async fn main() {
             std::process::exit(1);
         });
 
+    let job_queue_uri: String = env::var("JOB_QUEUE_URI")
+        .unwrap_or_else(|_| {
+            error!("JOB_QUEUE_URI is not set, can't find job queue");
             std::process::exit(1);
         });
 
     let cancellation_token = tokio_util::sync::CancellationToken::new();
     let jobset = Arc::new(tokio::sync::Mutex::new(tokio::task::JoinSet::new()));
 
-    let router = compose_kittybox(
+    let (router, webmentions_task) = compose_kittybox(
         backend_uri.as_str(),
         blobstore_uri.as_str(),
         authstore_uri.as_str(),
+        job_queue_uri.as_str(),
         &jobset,
         &cancellation_token
     ).await;
@@ -402,24 +430,25 @@ async fn main() {
             tracing::error!("Error in HTTP server: {}", e);
             tracing::error!("Shutting down because of error.");
             cancellation_token.cancel();
-            let _ = Box::pin(futures_util::future::join_all(
-                servers_futures.iter_mut().collect::<Vec<_>>()
-            )).await;
 
             1
         }
         _ = shutdown_signal => {
             tracing::info!("Shutdown requested by signal.");
             cancellation_token.cancel();
-            let _ = Box::pin(futures_util::future::join_all(
-                servers_futures.iter_mut().collect::<Vec<_>>()
-            )).await;
 
             0
         }
     };
 
     tracing::info!("Waiting for unfinished background tasks...");
+
+    let _ = tokio::join!(
+        webmentions_task,
+        Box::pin(futures_util::future::join_all(
+            servers_futures.iter_mut().collect::<Vec<_>>()
+        )),
+    );
     let mut jobset: tokio::task::JoinSet<()> = Arc::try_unwrap(jobset)
         .expect("Dangling jobset references present")
         .into_inner();
diff --git a/smoke-test.nix b/smoke-test.nix
index 139117f..b043a31 100644
--- a/smoke-test.nix
+++ b/smoke-test.nix
@@ -6,6 +6,19 @@ kittybox:
     kittybox = { config, pkgs, lib, ... }: {
       imports = [ kittybox.nixosModules.default ];
 
+      services.postgresql = {
+        enable = true;
+        ensureDatabases = ["kittybox"];
+        ensureUsers = [ {
+          name = "kittybox";
+          ensurePermissions = {
+            "DATABASE kittybox" = "ALL PRIVILEGES";
+          };
+        } ];
+      };
+
+      systemd.services.kittybox.wants = [ "postgresql.service" ];
+      systemd.services.kittybox.after = [ "postgresql.service" ];
       systemd.services.kittybox.wantedBy = lib.mkForce [];
 
       services.kittybox = {