use kittybox::database::FileStorage; use std::{env, time::Duration}; use tracing::{debug, error, info}; fn init_media(auth_backend: A, blobstore_uri: &str) -> axum::Router { match blobstore_uri.split_once(':').unwrap().0 { "file" => { let folder = std::path::PathBuf::from( blobstore_uri.strip_prefix("file://").unwrap() ); let blobstore = kittybox::media::storage::file::FileStore::new(folder); kittybox::media::router::<_, _>(blobstore, auth_backend) }, other => unimplemented!("Unsupported backend: {other}") } } async fn compose_kittybox_with_auth( http: reqwest::Client, auth_backend: A, backend_uri: &str, blobstore_uri: &str ) -> axum::Router where A: kittybox::indieauth::backend::AuthBackend { match backend_uri.split_once(':').unwrap().0 { "file" => { let database = { let folder = backend_uri.strip_prefix("file://").unwrap(); let path = std::path::PathBuf::from(folder); match kittybox::database::FileStorage::new(path).await { Ok(db) => db, Err(err) => { error!("Error creating database: {:?}", err); std::process::exit(1); } } }; // Technically, if we don't construct the micropub router, // we could use some wrapper that makes the database // read-only. // // This would allow to exclude all code to write to the // database and separate reader and writer processes of // Kittybox to improve security. let homepage: axum::routing::MethodRouter<_> = axum::routing::get( kittybox::frontend::homepage:: ) .layer(axum::Extension(database.clone())); let fallback = axum::routing::get( kittybox::frontend::catchall:: ) .layer(axum::Extension(database.clone())); let micropub = kittybox::micropub::router( database.clone(), http.clone(), auth_backend.clone() ); let onboarding = kittybox::frontend::onboarding::router( database.clone(), http.clone() ); axum::Router::new() .route("/", homepage) .fallback(fallback) .route("/.kittybox/micropub", micropub) .route("/.kittybox/onboarding", onboarding) .nest("/.kittybox/media", init_media(auth_backend.clone(), blobstore_uri)) .merge(kittybox::indieauth::router(auth_backend.clone(), database.clone(), http.clone())) .route( "/.kittybox/health", axum::routing::get(health_check::) .layer(axum::Extension(database)) ) }, "redis" => unimplemented!("Redis backend is not supported."), #[cfg(feature = "postgres")] "postgres" => { use kittybox::database::PostgresStorage; let database = { match PostgresStorage::new(backend_uri).await { Ok(db) => db, Err(err) => { error!("Error creating database: {:?}", err); std::process::exit(1); } } }; // Technically, if we don't construct the micropub router, // we could use some wrapper that makes the database // read-only. // // This would allow to exclude all code to write to the // database and separate reader and writer processes of // Kittybox to improve security. let homepage: axum::routing::MethodRouter<_> = axum::routing::get( kittybox::frontend::homepage:: ) .layer(axum::Extension(database.clone())); let fallback = axum::routing::get( kittybox::frontend::catchall:: ) .layer(axum::Extension(database.clone())); let micropub = kittybox::micropub::router( database.clone(), http.clone(), auth_backend.clone() ); let onboarding = kittybox::frontend::onboarding::router( database.clone(), http.clone() ); axum::Router::new() .route("/", homepage) .fallback(fallback) .route("/.kittybox/micropub", micropub) .route("/.kittybox/onboarding", onboarding) .nest("/.kittybox/media", init_media(auth_backend.clone(), blobstore_uri)) .merge(kittybox::indieauth::router(auth_backend.clone(), database.clone(), http.clone())) .route( "/.kittybox/health", axum::routing::get(health_check::) .layer(axum::Extension(database)) ) }, other => unimplemented!("Unsupported backend: {other}") } } async fn compose_kittybox( backend_uri: &str, blobstore_uri: &str, authstore_uri: &str, cancellation_token: &tokio_util::sync::CancellationToken ) -> axum::Router { let http: reqwest::Client = { #[allow(unused_mut)] let mut builder = reqwest::Client::builder().user_agent(concat!( env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION") )); // TODO: add a root certificate if there's an environment variable pointing at it //builder = builder.add_root_certificate(reqwest::Certificate::from_pem(todo!())); builder.build().unwrap() }; let router = match authstore_uri.split_once(':').unwrap().0 { "file" => { let auth_backend = { let folder = authstore_uri .strip_prefix("file://") .unwrap(); kittybox::indieauth::backend::fs::FileBackend::new(folder) }; compose_kittybox_with_auth(http, auth_backend, backend_uri, blobstore_uri).await } other => unimplemented!("Unsupported backend: {other}") }; router .route( "/.kittybox/static/:path", axum::routing::get(kittybox::frontend::statics) ) .route("/.kittybox/coffee", teapot_route()) .nest("/.kittybox/micropub/client", kittybox::companion::router()) .layer(tower_http::trace::TraceLayer::new_for_http()) .layer(tower_http::catch_panic::CatchPanicLayer::new()) } fn teapot_route() -> axum::routing::MethodRouter { axum::routing::get(|| async { use axum::http::{header, StatusCode}; (StatusCode::IM_A_TEAPOT, [(header::CONTENT_TYPE, "text/plain")], "Sorry, can't brew coffee yet!") }) } async fn health_check( //axum::Extension(auth): axum::Extension, //axum::Extension(blob): axum::Extension, axum::Extension(data): axum::Extension, ) -> impl axum::response::IntoResponse where //A: kittybox::indieauth::backend::AuthBackend, //B: kittybox::media::storage::MediaStore, D: kittybox::database::Storage { (axum::http::StatusCode::OK, std::borrow::Cow::Borrowed("OK")) } #[tokio::main] async fn main() { use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Registry}; Registry::default() .with(EnvFilter::from_default_env()) //.with(tracing_subscriber::fmt::layer().json()) .with( #[cfg(debug_assertions)] tracing_tree::HierarchicalLayer::new(2), #[cfg(not(debug_assertions))] tracing_subscriber::fmt::layer().json() ) .init(); //let _ = tracing_log::LogTracer::init(); info!("Starting the kittybox server..."); let backend_uri: String = env::var("BACKEND_URI") .unwrap_or_else(|_| { error!("BACKEND_URI is not set, cannot find a database"); std::process::exit(1); }); let blobstore_uri: String = env::var("BLOBSTORE_URI") .unwrap_or_else(|_| { error!("BLOBSTORE_URI is not set, can't find media store"); std::process::exit(1); }); let authstore_uri: String = env::var("AUTH_STORE_URI") .unwrap_or_else(|_| { error!("AUTH_STORE_URI is not set, can't find authentication store"); std::process::exit(1); }); std::process::exit(1); }); let cancellation_token = tokio_util::sync::CancellationToken::new(); let router = compose_kittybox( backend_uri.as_str(), blobstore_uri.as_str(), authstore_uri.as_str(), &cancellation_token ).await; let mut servers: Vec> = vec![]; let build_hyper = |tcp: std::net::TcpListener| { tracing::info!("Listening on {}", tcp.local_addr().unwrap()); // Set the socket to non-blocking so tokio can poll it // properly -- this is the async magic! tcp.set_nonblocking(true).unwrap(); hyper::server::Server::from_tcp(tcp).unwrap() // Otherwise Chrome keeps connections open for too long .tcp_keepalive(Some(Duration::from_secs(30 * 60))) .serve(router.clone().into_make_service()) }; let mut listenfd = listenfd::ListenFd::from_env(); for i in 0..(listenfd.len()) { match listenfd.take_tcp_listener(i) { Ok(Some(tcp)) => servers.push(build_hyper(tcp)), Ok(None) => {}, Err(err) => { tracing::error!("Error binding to socket in fd {}: {}", i, err); } } } if servers.is_empty() { servers.push(build_hyper({ let listen_addr = env::var("SERVE_AT") .ok() .unwrap_or_else(|| "[::]:8080".to_string()) .parse::() .unwrap_or_else(|e| { error!("Cannot parse SERVE_AT: {}", e); std::process::exit(1); }); std::net::TcpListener::bind(listen_addr).unwrap() })) } // Polling streams mutates them let mut servers_futures = Box::pin(servers.into_iter() .map( #[cfg(not(tokio_unstable))] |server| tokio::task::spawn( server.with_graceful_shutdown(cancellation_token.clone().cancelled_owned()) ), #[cfg(tokio_unstable)] |server| { tokio::task::Builder::new() // We leak the String here. It is acceptable, as the string // is reasonably small and needs to live forever. .name({ let name = format!("Kittybox HTTP acceptor: {}", server.local_addr()); // Polyfill for unstablized [`String::leak`] // // SAFETY: the bytes come from a [`String`], which is valid UTF-8. unsafe { std::str::from_utf8_unchecked(name.into_bytes().leak()) } }) .spawn( server.with_graceful_shutdown( cancellation_token.clone().cancelled_owned() ) ) .unwrap() } ) .collect::>>>() ); #[cfg(not(unix))] let shutdown_signal = tokio::signal::ctrl_c(); #[cfg(unix)] let shutdown_signal = { use tokio::signal::unix::{signal, SignalKind}; async move { let mut interrupt = signal(SignalKind::interrupt()) .expect("Failed to set up SIGINT handler"); let mut terminate = signal(SignalKind::terminate()) .expect("Failed to setup SIGTERM handler"); tokio::select! { _ = terminate.recv() => {}, _ = interrupt.recv() => {}, } } }; use futures_util::stream::StreamExt; tokio::select! { // Poll the servers stream for errors. // If any error out, shut down the entire operation // // We do this because there might not be a good way // to recover from some errors without external help Some(Err(e)) = servers_futures.next() => { tracing::error!("Error in HTTP server: {}", e); tracing::error!("Shutting down because of error."); cancellation_token.cancel(); let _ = Box::pin(futures_util::future::join_all( servers_futures.iter_mut().collect::>() )).await; std::process::exit(1); } _ = shutdown_signal => { info!("Shutdown requested by signal."); cancellation_token.cancel(); let _ = Box::pin(futures_util::future::join_all( servers_futures.iter_mut().collect::>() )).await; info!("Shutdown complete, exiting."); std::process::exit(0); } } }