diff options
-rw-r--r-- | configuration.nix | 33 | ||||
-rwxr-xr-x | kittybox-rs/dev.sh | 1 | ||||
-rw-r--r-- | kittybox-rs/src/lib.rs | 1 | ||||
-rw-r--r-- | kittybox-rs/src/main.rs | 63 | ||||
-rw-r--r-- | smoke-test.nix | 13 |
5 files changed, 76 insertions, 35 deletions
diff --git a/configuration.nix b/configuration.nix index e86f4b7..bdd353a 100644 --- a/configuration.nix +++ b/configuration.nix @@ -76,6 +76,16 @@ in { credentials are stored per-site. ''; }; + jobQueueUri = mkOption { + type = types.nullOr types.str; + default = "postgres://localhost/kittybox?host=/run/postgresql"; + description = lib.mdDoc '' + Set the job queue backend. Available options are: + - `postgres://` - PostgreSQL based job queue. It shares the schema + with the Kittybox PostgreSQL backend, so Kittybox can reuse the + same database for both. + ''; + }; microsubServer = mkOption { type = types.nullOr types.str; default = null; @@ -86,20 +96,6 @@ in { https://aperture.p3k.io/ if you don't have one yet! ''; }; - webmentionEndpoint = mkOption { - type = types.nullOr types.str; - default = null; - example = "https://webmention.io/example.com/webmention"; - description = '' - The URL of your webmention endpoint, which allows you to - receive notifications about your site's content being featured - or interacted with elsewhere on the IndieWeb. - - By default Kittybox expects the Webmention endpoint to post - updates using an internal token. kittybox-webmention is an - endpoint capable of that. - ''; - }; internalTokenFile = mkOption { type = types.nullOr types.str; default = null; @@ -110,14 +106,14 @@ in { type = types.str; default = "/var/lib/kittybox/cookie_secret_key"; example = "/run/secrets/kittybox-cookie-secret"; - description = "A secret file to encrypt cookies with the contents of. Should be at least 32 bytes in length. A random persistent file will be generated if this variable is left untouched."; + description = "A secret file to encrypt cookies with the contents of. Should be at least 32 bytes in length. A random persistent file will be generated if it doesn't exist or is invalid."; }; }; }; config = lib.mkIf cfg.enable { assertions = [ { - assertion = lib.strings.hasPrefix cfg.backendUri "postgres://" -> cfg.package.hasPostgres; + assertion = (lib.strings.hasPrefix cfg.backendUri "postgres://" || lib.strings.hasPrefix cfg.jobQueueUri "postgres://") -> cfg.package.hasPostgres; message = "To use the Postgres backend, Kittybox has to be compiled with Postgres support enabled."; } ]; @@ -143,7 +139,8 @@ in { restartTriggers = [ cfg.package - cfg.backendUri cfg.blobstoreUri cfg.authstoreUri + cfg.backendUri cfg.blobstoreUri + cfg.authstoreUri cfg.jobQueueUri cfg.internalTokenFile cfg.bind cfg.port cfg.cookieSecretFile @@ -151,10 +148,10 @@ in { environment = { MICROSUB_ENDPOINT = cfg.microsubServer; - WEBMENTION_ENDPOINT = cfg.webmentionEndpoint; BACKEND_URI = cfg.backendUri; BLOBSTORE_URI = cfg.blobstoreUri; AUTH_STORE_URI = cfg.authstoreUri; + JOB_QUEUE_URI = cfg.jobQueueUri; RUST_LOG = "${cfg.logLevel}"; COOKIE_SECRET_FILE = "${cfg.cookieSecretFile}"; }; diff --git a/kittybox-rs/dev.sh b/kittybox-rs/dev.sh index 65d6143..52933ce 100755 --- a/kittybox-rs/dev.sh +++ b/kittybox-rs/dev.sh @@ -2,6 +2,7 @@ export RUST_LOG="kittybox=debug,retainer::cache=warn,h2=info,rustls=info,tokio=info,tower_http::trace=debug,sqlx=trace" #export BACKEND_URI=file://./test-dir export BACKEND_URI="postgres://localhost?dbname=kittybox&host=/run/postgresql" +export JOB_QUEUE_URI="postgres://localhost?dbname=kittybox&host=/run/postgresql" export BLOBSTORE_URI=file://./media-store export AUTH_STORE_URI=file://./auth-store export COOKIE_SECRET=1234567890abcdefghijklmnopqrstuvwxyz diff --git a/kittybox-rs/src/lib.rs b/kittybox-rs/src/lib.rs index 3d5638a..c1bd965 100644 --- a/kittybox-rs/src/lib.rs +++ b/kittybox-rs/src/lib.rs @@ -7,6 +7,7 @@ pub mod frontend; pub mod media; pub mod micropub; pub mod indieauth; +pub mod webmentions; pub mod companion { use std::{collections::HashMap, sync::Arc}; diff --git a/kittybox-rs/src/main.rs b/kittybox-rs/src/main.rs index 7c6ddb6..8c32556 100644 --- a/kittybox-rs/src/main.rs +++ b/kittybox-rs/src/main.rs @@ -21,8 +21,10 @@ async fn compose_kittybox_with_auth<A>( auth_backend: A, backend_uri: &str, blobstore_uri: &str, - jobset: &Arc<tokio::sync::Mutex<tokio::task::JoinSet<()>>> -) -> axum::Router + job_queue_uri: &str, + jobset: &Arc<tokio::sync::Mutex<tokio::task::JoinSet<()>>>, + cancellation_token: &tokio_util::sync::CancellationToken +) -> (axum::Router, kittybox::webmentions::SupervisedTask) where A: kittybox::indieauth::backend::AuthBackend { match backend_uri.split_once(':').unwrap().0 { @@ -66,18 +68,29 @@ where A: kittybox::indieauth::backend::AuthBackend database.clone(), http.clone(), Arc::clone(jobset) ); - axum::Router::new() + + let (webmention, task) = kittybox::webmentions::router( + kittybox::webmentions::queue::PostgresJobQueue::new(job_queue_uri).await.unwrap(), + database.clone(), + http.clone(), + cancellation_token.clone() + ); + + let router = axum::Router::new() .route("/", homepage) .fallback(fallback) .route("/.kittybox/micropub", micropub) .route("/.kittybox/onboarding", onboarding) .nest("/.kittybox/media", init_media(auth_backend.clone(), blobstore_uri)) .merge(kittybox::indieauth::router(auth_backend.clone(), database.clone(), http.clone())) + .merge(webmention) .route( "/.kittybox/health", axum::routing::get(health_check::<kittybox::database::FileStorage>) .layer(axum::Extension(database)) - ) + ); + + (router, task) }, "redis" => unimplemented!("Redis backend is not supported."), #[cfg(feature = "postgres")] @@ -120,6 +133,13 @@ where A: kittybox::indieauth::backend::AuthBackend database.clone(), http.clone(), Arc::clone(jobset) ); + let (webmention, task) = kittybox::webmentions::router( + kittybox::webmentions::queue::PostgresJobQueue::new(job_queue_uri).await.unwrap(), + database.clone(), + http.clone(), + cancellation_token.clone() + ); + let router = axum::Router::new() .route("/", homepage) .fallback(fallback) @@ -127,13 +147,14 @@ where A: kittybox::indieauth::backend::AuthBackend .route("/.kittybox/onboarding", onboarding) .nest("/.kittybox/media", init_media(auth_backend.clone(), blobstore_uri)) .merge(kittybox::indieauth::router(auth_backend.clone(), database.clone(), http.clone())) + .merge(webmention) .route( "/.kittybox/health", axum::routing::get(health_check::<kittybox::database::PostgresStorage>) .layer(axum::Extension(database)) ); - router + (router, task) }, other => unimplemented!("Unsupported backend: {other}") } @@ -143,9 +164,10 @@ async fn compose_kittybox( backend_uri: &str, blobstore_uri: &str, authstore_uri: &str, + job_queue_uri: &str, jobset: &Arc<tokio::sync::Mutex<tokio::task::JoinSet<()>>>, cancellation_token: &tokio_util::sync::CancellationToken -) -> axum::Router { +) -> (axum::Router, kittybox::webmentions::SupervisedTask) { let http: reqwest::Client = { #[allow(unused_mut)] let mut builder = reqwest::Client::builder() @@ -185,7 +207,7 @@ async fn compose_kittybox( builder.build().unwrap() }; - let router = match authstore_uri.split_once(':').unwrap().0 { + let (router, task) = match authstore_uri.split_once(':').unwrap().0 { "file" => { let auth_backend = { let folder = authstore_uri @@ -194,12 +216,12 @@ async fn compose_kittybox( kittybox::indieauth::backend::fs::FileBackend::new(folder) }; - compose_kittybox_with_auth(http, auth_backend, backend_uri, blobstore_uri, jobset).await + compose_kittybox_with_auth(http, auth_backend, backend_uri, blobstore_uri, job_queue_uri, jobset, cancellation_token).await } other => unimplemented!("Unsupported backend: {other}") }; - router + let router = router .route( "/.kittybox/static/:path", axum::routing::get(kittybox::frontend::statics) @@ -207,7 +229,9 @@ async fn compose_kittybox( .route("/.kittybox/coffee", teapot_route()) .nest("/.kittybox/micropub/client", kittybox::companion::router()) .layer(tower_http::trace::TraceLayer::new_for_http()) - .layer(tower_http::catch_panic::CatchPanicLayer::new()) + .layer(tower_http::catch_panic::CatchPanicLayer::new()); + + (router, task) } fn teapot_route() -> axum::routing::MethodRouter { @@ -265,16 +289,20 @@ async fn main() { std::process::exit(1); }); + let job_queue_uri: String = env::var("JOB_QUEUE_URI") + .unwrap_or_else(|_| { + error!("JOB_QUEUE_URI is not set, can't find job queue"); std::process::exit(1); }); let cancellation_token = tokio_util::sync::CancellationToken::new(); let jobset = Arc::new(tokio::sync::Mutex::new(tokio::task::JoinSet::new())); - let router = compose_kittybox( + let (router, webmentions_task) = compose_kittybox( backend_uri.as_str(), blobstore_uri.as_str(), authstore_uri.as_str(), + job_queue_uri.as_str(), &jobset, &cancellation_token ).await; @@ -402,24 +430,25 @@ async fn main() { tracing::error!("Error in HTTP server: {}", e); tracing::error!("Shutting down because of error."); cancellation_token.cancel(); - let _ = Box::pin(futures_util::future::join_all( - servers_futures.iter_mut().collect::<Vec<_>>() - )).await; 1 } _ = shutdown_signal => { tracing::info!("Shutdown requested by signal."); cancellation_token.cancel(); - let _ = Box::pin(futures_util::future::join_all( - servers_futures.iter_mut().collect::<Vec<_>>() - )).await; 0 } }; tracing::info!("Waiting for unfinished background tasks..."); + + let _ = tokio::join!( + webmentions_task, + Box::pin(futures_util::future::join_all( + servers_futures.iter_mut().collect::<Vec<_>>() + )), + ); let mut jobset: tokio::task::JoinSet<()> = Arc::try_unwrap(jobset) .expect("Dangling jobset references present") .into_inner(); diff --git a/smoke-test.nix b/smoke-test.nix index 139117f..b043a31 100644 --- a/smoke-test.nix +++ b/smoke-test.nix @@ -6,6 +6,19 @@ kittybox: kittybox = { config, pkgs, lib, ... }: { imports = [ kittybox.nixosModules.default ]; + services.postgresql = { + enable = true; + ensureDatabases = ["kittybox"]; + ensureUsers = [ { + name = "kittybox"; + ensurePermissions = { + "DATABASE kittybox" = "ALL PRIVILEGES"; + }; + } ]; + }; + + systemd.services.kittybox.wants = [ "postgresql.service" ]; + systemd.services.kittybox.after = [ "postgresql.service" ]; systemd.services.kittybox.wantedBy = lib.mkForce []; services.kittybox = { |