diff options
Diffstat (limited to 'kittybox-rs/src/main.rs')
-rw-r--r-- | kittybox-rs/src/main.rs | 489 |
1 files changed, 0 insertions, 489 deletions
diff --git a/kittybox-rs/src/main.rs b/kittybox-rs/src/main.rs deleted file mode 100644 index 6389489..0000000 --- a/kittybox-rs/src/main.rs +++ /dev/null @@ -1,489 +0,0 @@ -use kittybox::database::FileStorage; -use std::{env, time::Duration, sync::Arc}; -use tracing::error; - -fn init_media<A: kittybox::indieauth::backend::AuthBackend>(auth_backend: A, blobstore_uri: &str) -> axum::Router { - match blobstore_uri.split_once(':').unwrap().0 { - "file" => { - let folder = std::path::PathBuf::from( - blobstore_uri.strip_prefix("file://").unwrap() - ); - let blobstore = kittybox::media::storage::file::FileStore::new(folder); - - kittybox::media::router::<_, _>(blobstore, auth_backend) - }, - other => unimplemented!("Unsupported backend: {other}") - } -} - -async fn compose_kittybox_with_auth<A>( - http: reqwest::Client, - auth_backend: A, - backend_uri: &str, - blobstore_uri: &str, - job_queue_uri: &str, - jobset: &Arc<tokio::sync::Mutex<tokio::task::JoinSet<()>>>, - cancellation_token: &tokio_util::sync::CancellationToken -) -> (axum::Router, kittybox::webmentions::SupervisedTask) -where A: kittybox::indieauth::backend::AuthBackend -{ - match backend_uri.split_once(':').unwrap().0 { - "file" => { - let database = { - let folder = backend_uri.strip_prefix("file://").unwrap(); - let path = std::path::PathBuf::from(folder); - - match kittybox::database::FileStorage::new(path).await { - Ok(db) => db, - Err(err) => { - error!("Error creating database: {:?}", err); - std::process::exit(1); - } - } - }; - - // Technically, if we don't construct the micropub router, - // we could use some wrapper that makes the database - // read-only. - // - // This would allow to exclude all code to write to the - // database and separate reader and writer processes of - // Kittybox to improve security. - let homepage: axum::routing::MethodRouter<_> = axum::routing::get( - kittybox::frontend::homepage::<FileStorage> - ) - .layer(axum::Extension(database.clone())); - let fallback = axum::routing::get( - kittybox::frontend::catchall::<FileStorage> - ) - .layer(axum::Extension(database.clone())); - - let micropub = kittybox::micropub::router( - database.clone(), - http.clone(), - auth_backend.clone(), - Arc::clone(jobset) - ); - let onboarding = kittybox::frontend::onboarding::router( - database.clone(), http.clone(), Arc::clone(jobset) - ); - - - let (webmention, task) = kittybox::webmentions::router( - kittybox::webmentions::queue::PostgresJobQueue::new(job_queue_uri).await.unwrap(), - database.clone(), - http.clone(), - cancellation_token.clone() - ); - - let router = axum::Router::new() - .route("/", homepage) - .fallback(fallback) - .route("/.kittybox/micropub", micropub) - .route("/.kittybox/onboarding", onboarding) - .nest("/.kittybox/media", init_media(auth_backend.clone(), blobstore_uri)) - .merge(kittybox::indieauth::router(auth_backend.clone(), database.clone(), http.clone())) - .merge(webmention) - .route( - "/.kittybox/health", - axum::routing::get(health_check::<kittybox::database::FileStorage>) - .layer(axum::Extension(database)) - ); - - (router, task) - }, - "redis" => unimplemented!("Redis backend is not supported."), - #[cfg(feature = "postgres")] - "postgres" => { - use kittybox::database::PostgresStorage; - - let database = { - match PostgresStorage::new(backend_uri).await { - Ok(db) => db, - Err(err) => { - error!("Error creating database: {:?}", err); - std::process::exit(1); - } - } - }; - - // Technically, if we don't construct the micropub router, - // we could use some wrapper that makes the database - // read-only. - // - // This would allow to exclude all code to write to the - // database and separate reader and writer processes of - // Kittybox to improve security. - let homepage: axum::routing::MethodRouter<_> = axum::routing::get( - kittybox::frontend::homepage::<PostgresStorage> - ) - .layer(axum::Extension(database.clone())); - let fallback = axum::routing::get( - kittybox::frontend::catchall::<PostgresStorage> - ) - .layer(axum::Extension(database.clone())); - - let micropub = kittybox::micropub::router( - database.clone(), - http.clone(), - auth_backend.clone(), - Arc::clone(jobset) - ); - let onboarding = kittybox::frontend::onboarding::router( - database.clone(), http.clone(), Arc::clone(jobset) - ); - - let (webmention, task) = kittybox::webmentions::router( - kittybox::webmentions::queue::PostgresJobQueue::new(job_queue_uri).await.unwrap(), - database.clone(), - http.clone(), - cancellation_token.clone() - ); - - let router = axum::Router::new() - .route("/", homepage) - .fallback(fallback) - .route("/.kittybox/micropub", micropub) - .route("/.kittybox/onboarding", onboarding) - .nest("/.kittybox/media", init_media(auth_backend.clone(), blobstore_uri)) - .merge(kittybox::indieauth::router(auth_backend.clone(), database.clone(), http.clone())) - .merge(webmention) - .route( - "/.kittybox/health", - axum::routing::get(health_check::<kittybox::database::PostgresStorage>) - .layer(axum::Extension(database)) - ); - - (router, task) - }, - other => unimplemented!("Unsupported backend: {other}") - } -} - -async fn compose_kittybox( - backend_uri: &str, - blobstore_uri: &str, - authstore_uri: &str, - job_queue_uri: &str, - jobset: &Arc<tokio::sync::Mutex<tokio::task::JoinSet<()>>>, - cancellation_token: &tokio_util::sync::CancellationToken -) -> (axum::Router, kittybox::webmentions::SupervisedTask) { - let http: reqwest::Client = { - #[allow(unused_mut)] - let mut builder = reqwest::Client::builder() - .user_agent(concat!( - env!("CARGO_PKG_NAME"), - "/", - env!("CARGO_PKG_VERSION") - )); - if let Ok(certs) = std::env::var("KITTYBOX_CUSTOM_PKI_ROOTS") { - // TODO: add a root certificate if there's an environment variable pointing at it - for path in certs.split(':') { - let metadata = match tokio::fs::metadata(path).await { - Ok(metadata) => metadata, - Err(err) if err.kind() == std::io::ErrorKind::NotFound => { - tracing::error!("TLS root certificate {} not found, skipping...", path); - continue; - } - Err(err) => panic!("Error loading TLS certificates: {}", err) - }; - if metadata.is_dir() { - let mut dir = tokio::fs::read_dir(path).await.unwrap(); - while let Ok(Some(file)) = dir.next_entry().await { - let pem = tokio::fs::read(file.path()).await.unwrap(); - builder = builder.add_root_certificate( - reqwest::Certificate::from_pem(&pem).unwrap() - ); - } - } else { - let pem = tokio::fs::read(path).await.unwrap(); - builder = builder.add_root_certificate( - reqwest::Certificate::from_pem(&pem).unwrap() - ); - } - } - } - - builder.build().unwrap() - }; - - let (router, task) = match authstore_uri.split_once(':').unwrap().0 { - "file" => { - let auth_backend = { - let folder = authstore_uri - .strip_prefix("file://") - .unwrap(); - kittybox::indieauth::backend::fs::FileBackend::new(folder) - }; - - compose_kittybox_with_auth(http, auth_backend, backend_uri, blobstore_uri, job_queue_uri, jobset, cancellation_token).await - } - other => unimplemented!("Unsupported backend: {other}") - }; - - let router = router - .route( - "/.kittybox/static/:path", - axum::routing::get(kittybox::frontend::statics) - ) - .route("/.kittybox/coffee", teapot_route()) - .nest("/.kittybox/micropub/client", kittybox::companion::router()) - .layer(tower_http::trace::TraceLayer::new_for_http()) - .layer(tower_http::catch_panic::CatchPanicLayer::new()); - - (router, task) -} - -fn teapot_route() -> axum::routing::MethodRouter { - axum::routing::get(|| async { - use axum::http::{header, StatusCode}; - (StatusCode::IM_A_TEAPOT, [(header::CONTENT_TYPE, "text/plain")], "Sorry, can't brew coffee yet!") - }) -} - -async fn health_check</*A, B, */D>( - //axum::Extension(auth): axum::Extension<A>, - //axum::Extension(blob): axum::Extension<B>, - axum::Extension(data): axum::Extension<D>, -) -> impl axum::response::IntoResponse -where - //A: kittybox::indieauth::backend::AuthBackend, - //B: kittybox::media::storage::MediaStore, - D: kittybox::database::Storage -{ - (axum::http::StatusCode::OK, std::borrow::Cow::Borrowed("OK")) -} - -#[tokio::main] -async fn main() { - use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Registry}; - - let tracing_registry = Registry::default() - .with(EnvFilter::from_default_env()) - .with( - #[cfg(debug_assertions)] - tracing_tree::HierarchicalLayer::new(2) - .with_bracketed_fields(true) - .with_indent_lines(true) - .with_verbose_exit(true), - #[cfg(not(debug_assertions))] - tracing_subscriber::fmt::layer().json() - .with_ansi(std::io::IsTerminal::is_terminal(&std::io::stdout().lock())) - ); - // In debug builds, also log to JSON, but to file. - #[cfg(debug_assertions)] - let tracing_registry = tracing_registry.with( - tracing_subscriber::fmt::layer() - .json() - .with_writer({ - let instant = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap(); - move || std::fs::OpenOptions::new() - .append(true) - .create(true) - .open( - format!( - "{}.log.json", - instant - .as_secs_f64() - .to_string() - .replace('.', "_") - ) - ).unwrap() - }) - ); - tracing_registry.init(); - - tracing::info!("Starting the kittybox server..."); - - let backend_uri: String = env::var("BACKEND_URI") - .unwrap_or_else(|_| { - error!("BACKEND_URI is not set, cannot find a database"); - std::process::exit(1); - }); - let blobstore_uri: String = env::var("BLOBSTORE_URI") - .unwrap_or_else(|_| { - error!("BLOBSTORE_URI is not set, can't find media store"); - std::process::exit(1); - }); - - let authstore_uri: String = env::var("AUTH_STORE_URI") - .unwrap_or_else(|_| { - error!("AUTH_STORE_URI is not set, can't find authentication store"); - std::process::exit(1); - }); - - let job_queue_uri: String = env::var("JOB_QUEUE_URI") - .unwrap_or_else(|_| { - error!("JOB_QUEUE_URI is not set, can't find job queue"); - std::process::exit(1); - }); - - let cancellation_token = tokio_util::sync::CancellationToken::new(); - let jobset = Arc::new(tokio::sync::Mutex::new(tokio::task::JoinSet::new())); - - let (router, webmentions_task) = compose_kittybox( - backend_uri.as_str(), - blobstore_uri.as_str(), - authstore_uri.as_str(), - job_queue_uri.as_str(), - &jobset, - &cancellation_token - ).await; - - let mut servers: Vec<hyper::server::Server<hyper::server::conn::AddrIncoming, _>> = vec![]; - - let build_hyper = |tcp: std::net::TcpListener| { - tracing::info!("Listening on {}", tcp.local_addr().unwrap()); - // Set the socket to non-blocking so tokio can poll it - // properly -- this is the async magic! - tcp.set_nonblocking(true).unwrap(); - - hyper::server::Server::from_tcp(tcp).unwrap() - // Otherwise Chrome keeps connections open for too long - .tcp_keepalive(Some(Duration::from_secs(30 * 60))) - .serve(router.clone().into_make_service()) - }; - - let mut listenfd = listenfd::ListenFd::from_env(); - for i in 0..(listenfd.len()) { - match listenfd.take_tcp_listener(i) { - Ok(Some(tcp)) => servers.push(build_hyper(tcp)), - Ok(None) => {}, - Err(err) => tracing::error!("Error binding to socket in fd {}: {}", i, err) - } - } - // TODO this requires the `hyperlocal` crate - //#[rustfmt::skip] - /*#[cfg(unix)] { - let build_hyper_unix = |unix: std::os::unix::net::UnixListener| { - { - use std::os::linux::net::SocketAddrExt; - - let local_addr = unix.local_addr().unwrap(); - if let Some(pathname) = local_addr.as_pathname() { - tracing::info!("Listening on unix:{}", pathname.display()); - } else if let Some(name) = { - #[cfg(linux)] - local_addr.as_abstract_name(); - #[cfg(not(linux))] - None::<&[u8]> - } { - tracing::info!("Listening on unix:@{}", String::from_utf8_lossy(name)); - } else { - tracing::info!("Listening on unnamed unix socket"); - } - } - unix.set_nonblocking(true).unwrap(); - - hyper::server::Server::builder(unix) - .serve(router.clone().into_make_service()) - }; - for i in 0..(listenfd.len()) { - match listenfd.take_unix_listener(i) { - Ok(Some(unix)) => servers.push(build_hyper_unix(unix)), - Ok(None) => {}, - Err(err) => tracing::error!("Error binding to socket in fd {}: {}", i, err) - } - } - }*/ - if servers.is_empty() { - servers.push(build_hyper({ - let listen_addr = env::var("SERVE_AT") - .ok() - .unwrap_or_else(|| "[::]:8080".to_string()) - .parse::<std::net::SocketAddr>() - .unwrap_or_else(|e| { - error!("Cannot parse SERVE_AT: {}", e); - std::process::exit(1); - }); - - std::net::TcpListener::bind(listen_addr).unwrap() - })) - } - // Drop the remaining copy of the router - // to get rid of an extra reference to `jobset` - drop(router); - // Polling streams mutates them - let mut servers_futures = Box::pin(servers.into_iter() - .map( - #[cfg(not(tokio_unstable))] |server| tokio::task::spawn( - server.with_graceful_shutdown(cancellation_token.clone().cancelled_owned()) - ), - #[cfg(tokio_unstable)] |server| { - tokio::task::Builder::new() - .name(format!("Kittybox HTTP acceptor: {}", server.local_addr()).as_str()) - .spawn( - server.with_graceful_shutdown( - cancellation_token.clone().cancelled_owned() - ) - ) - .unwrap() - } - ) - .collect::<futures_util::stream::FuturesUnordered<tokio::task::JoinHandle<Result<(), hyper::Error>>>>() - ); - - #[cfg(not(unix))] - let shutdown_signal = tokio::signal::ctrl_c(); - #[cfg(unix)] - let shutdown_signal = { - use tokio::signal::unix::{signal, SignalKind}; - - async move { - let mut interrupt = signal(SignalKind::interrupt()) - .expect("Failed to set up SIGINT handler"); - let mut terminate = signal(SignalKind::terminate()) - .expect("Failed to setup SIGTERM handler"); - - tokio::select! { - _ = terminate.recv() => {}, - _ = interrupt.recv() => {}, - } - } - }; - use futures_util::stream::StreamExt; - - let exitcode: i32 = tokio::select! { - // Poll the servers stream for errors. - // If any error out, shut down the entire operation - // - // We do this because there might not be a good way - // to recover from some errors without external help - Some(Err(e)) = servers_futures.next() => { - tracing::error!("Error in HTTP server: {}", e); - tracing::error!("Shutting down because of error."); - cancellation_token.cancel(); - - 1 - } - _ = cancellation_token.cancelled() => { - tracing::info!("Signal caught from watchdog."); - - 0 - } - _ = shutdown_signal => { - tracing::info!("Shutdown requested by signal."); - cancellation_token.cancel(); - - 0 - } - }; - - tracing::info!("Waiting for unfinished background tasks..."); - - let _ = tokio::join!( - webmentions_task, - Box::pin(futures_util::future::join_all( - servers_futures.iter_mut().collect::<Vec<_>>() - )), - ); - let mut jobset: tokio::task::JoinSet<()> = Arc::try_unwrap(jobset) - .expect("Dangling jobset references present") - .into_inner(); - while (jobset.join_next().await).is_some() {} - tracing::info!("Shutdown complete, exiting."); - std::process::exit(exitcode); - -} |