about summary refs log tree commit diff
path: root/src/main.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/main.rs')
-rw-r--r--src/main.rs202
1 files changed, 109 insertions, 93 deletions
diff --git a/src/main.rs b/src/main.rs
index bd3684e..984745a 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,9 +1,11 @@
-use kittybox::{database::Storage, indieauth::backend::AuthBackend, media::storage::MediaStore, webmentions::Webmention, compose_kittybox};
-use tokio::{sync::Mutex, task::JoinSet};
+use kittybox::{
+    compose_kittybox, database::Storage, indieauth::backend::AuthBackend,
+    media::storage::MediaStore, webmentions::Webmention,
+};
 use std::{env, future::IntoFuture, sync::Arc};
+use tokio::{sync::Mutex, task::JoinSet};
 use tracing::error;
 
-
 #[tokio::main]
 async fn main() {
     use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Registry};
@@ -17,32 +19,28 @@ async fn main() {
                 .with_indent_lines(true)
                 .with_verbose_exit(true),
             #[cfg(not(debug_assertions))]
-            tracing_subscriber::fmt::layer().json()
-                .with_ansi(std::io::IsTerminal::is_terminal(&std::io::stdout().lock()))
+            tracing_subscriber::fmt::layer()
+                .json()
+                .with_ansi(std::io::IsTerminal::is_terminal(&std::io::stdout().lock())),
         );
     // In debug builds, also log to JSON, but to file.
     #[cfg(debug_assertions)]
-    let tracing_registry = tracing_registry.with(
-        tracing_subscriber::fmt::layer()
-            .json()
-            .with_writer({
-                let instant = std::time::SystemTime::now()
-                        .duration_since(std::time::UNIX_EPOCH)
-                        .unwrap();
-                move || std::fs::OpenOptions::new()
+    let tracing_registry =
+        tracing_registry.with(tracing_subscriber::fmt::layer().json().with_writer({
+            let instant = std::time::SystemTime::now()
+                .duration_since(std::time::UNIX_EPOCH)
+                .unwrap();
+            move || {
+                std::fs::OpenOptions::new()
                     .append(true)
                     .create(true)
-                    .open(
-                        format!(
-                            "{}.log.json",
-                            instant
-                            .as_secs_f64()
-                            .to_string()
-                            .replace('.', "_")
-                        )
-                    ).unwrap()
-            })
-    );
+                    .open(format!(
+                        "{}.log.json",
+                        instant.as_secs_f64().to_string().replace('.', "_")
+                    ))
+                    .unwrap()
+            }
+        }));
     tracing_registry.init();
 
     tracing::info!("Starting the kittybox server...");
@@ -79,12 +77,15 @@ async fn main() {
         });
 
     // TODO: load from environment
-    let cookie_key = axum_extra::extract::cookie::Key::from(&env::var("COOKIE_KEY")
-        .as_deref()
-        .map(|s| data_encoding::BASE64_MIME_PERMISSIVE.decode(s.as_bytes())
-            .expect("Invalid cookie key: must be base64 encoded")
-        )
-        .unwrap()
+    let cookie_key = axum_extra::extract::cookie::Key::from(
+        &env::var("COOKIE_KEY")
+            .as_deref()
+            .map(|s| {
+                data_encoding::BASE64_MIME_PERMISSIVE
+                    .decode(s.as_bytes())
+                    .expect("Invalid cookie key: must be base64 encoded")
+            })
+            .unwrap(),
     );
 
     let cancellation_token = tokio_util::sync::CancellationToken::new();
@@ -93,12 +94,11 @@ async fn main() {
 
     let http: reqwest_middleware::ClientWithMiddleware = {
         #[allow(unused_mut)]
-        let mut builder = reqwest::Client::builder()
-            .user_agent(concat!(
-                env!("CARGO_PKG_NAME"),
-                "/",
-                env!("CARGO_PKG_VERSION")
-            ));
+        let mut builder = reqwest::Client::builder().user_agent(concat!(
+            env!("CARGO_PKG_NAME"),
+            "/",
+            env!("CARGO_PKG_VERSION")
+        ));
         if let Ok(certs) = std::env::var("KITTYBOX_CUSTOM_PKI_ROOTS") {
             // TODO: add a root certificate if there's an environment variable pointing at it
             for path in certs.split(':') {
@@ -108,21 +108,19 @@ async fn main() {
                         tracing::error!("TLS root certificate {} not found, skipping...", path);
                         continue;
                     }
-                    Err(err) => panic!("Error loading TLS certificates: {}", err)
+                    Err(err) => panic!("Error loading TLS certificates: {}", err),
                 };
                 if metadata.is_dir() {
                     let mut dir = tokio::fs::read_dir(path).await.unwrap();
                     while let Ok(Some(file)) = dir.next_entry().await {
                         let pem = tokio::fs::read(file.path()).await.unwrap();
-                        builder = builder.add_root_certificate(
-                            reqwest::Certificate::from_pem(&pem).unwrap()
-                        );
+                        builder = builder
+                            .add_root_certificate(reqwest::Certificate::from_pem(&pem).unwrap());
                     }
                 } else {
                     let pem = tokio::fs::read(path).await.unwrap();
-                    builder = builder.add_root_certificate(
-                        reqwest::Certificate::from_pem(&pem).unwrap()
-                    );
+                    builder =
+                        builder.add_root_certificate(reqwest::Certificate::from_pem(&pem).unwrap());
                 }
             }
         }
@@ -151,7 +149,7 @@ async fn main() {
     let job_queue_type = job_queue_uri.scheme();
 
     macro_rules! compose_kittybox {
-        ($auth:ty, $store:ty, $media:ty, $queue:ty) => { {
+        ($auth:ty, $store:ty, $media:ty, $queue:ty) => {{
             type AuthBackend = $auth;
             type Storage = $store;
             type MediaStore = $media;
@@ -193,36 +191,43 @@ async fn main() {
             };
 
             type St = kittybox::AppState<AuthBackend, Storage, MediaStore, JobQueue>;
-            let stateful_router = compose_kittybox::<St, AuthBackend, Storage, MediaStore, JobQueue>().await;
-            let task = kittybox::webmentions::supervised_webmentions_task::<St, Storage, JobQueue>(&state, cancellation_token.clone());
+            let stateful_router =
+                compose_kittybox::<St, AuthBackend, Storage, MediaStore, JobQueue>().await;
+            let task = kittybox::webmentions::supervised_webmentions_task::<St, Storage, JobQueue>(
+                &state,
+                cancellation_token.clone(),
+            );
             let router = stateful_router.with_state(state);
 
             (router, task)
-        } }
+        }};
     }
 
-    let (router, webmentions_task): (axum::Router<()>, kittybox::webmentions::SupervisedTask) = match (authstore_type, backend_type, blobstore_type, job_queue_type) {
-        ("file", "file", "file", "postgres") => {
-            compose_kittybox!(
-                kittybox::indieauth::backend::fs::FileBackend,
-                kittybox::database::FileStorage,
-                kittybox::media::storage::file::FileStore,
-                kittybox::webmentions::queue::PostgresJobQueue<Webmention>
-            )
-        },
-        ("file", "postgres", "file", "postgres") => {
-            compose_kittybox!(
-                kittybox::indieauth::backend::fs::FileBackend,
-                kittybox::database::PostgresStorage,
-                kittybox::media::storage::file::FileStore,
-                kittybox::webmentions::queue::PostgresJobQueue<Webmention>
-            )
-        },
-        (_, _, _, _) => {
-            // TODO: refine this error.
-            panic!("Invalid type for AUTH_STORE_URI, BACKEND_URI, BLOBSTORE_URI or JOB_QUEUE_URI");
-        }
-    };
+    let (router, webmentions_task): (axum::Router<()>, kittybox::webmentions::SupervisedTask) =
+        match (authstore_type, backend_type, blobstore_type, job_queue_type) {
+            ("file", "file", "file", "postgres") => {
+                compose_kittybox!(
+                    kittybox::indieauth::backend::fs::FileBackend,
+                    kittybox::database::FileStorage,
+                    kittybox::media::storage::file::FileStore,
+                    kittybox::webmentions::queue::PostgresJobQueue<Webmention>
+                )
+            }
+            ("file", "postgres", "file", "postgres") => {
+                compose_kittybox!(
+                    kittybox::indieauth::backend::fs::FileBackend,
+                    kittybox::database::PostgresStorage,
+                    kittybox::media::storage::file::FileStore,
+                    kittybox::webmentions::queue::PostgresJobQueue<Webmention>
+                )
+            }
+            (_, _, _, _) => {
+                // TODO: refine this error.
+                panic!(
+                    "Invalid type for AUTH_STORE_URI, BACKEND_URI, BLOBSTORE_URI or JOB_QUEUE_URI"
+                );
+            }
+        };
 
     let mut servers: Vec<axum::serve::Serve<_, _, _>> = vec![];
 
@@ -238,7 +243,7 @@ async fn main() {
         //    .serve(router.clone().into_make_service())
         axum::serve(
             tokio::net::TcpListener::from_std(tcp).unwrap(),
-            router.clone()
+            router.clone(),
         )
     };
 
@@ -246,8 +251,8 @@ async fn main() {
     for i in 0..(listenfd.len()) {
         match listenfd.take_tcp_listener(i) {
             Ok(Some(tcp)) => servers.push(build_hyper(tcp)),
-            Ok(None) => {},
-            Err(err) => tracing::error!("Error binding to socket in fd {}: {}", i, err)
+            Ok(None) => {}
+            Err(err) => tracing::error!("Error binding to socket in fd {}: {}", i, err),
         }
     }
     // TODO this requires the `hyperlocal` crate
@@ -302,24 +307,35 @@ async fn main() {
     // to get rid of an extra reference to `jobset`
     drop(router);
     // Polling streams mutates them
-    let mut servers_futures = Box::pin(servers.into_iter()
-        .map(
-            #[cfg(not(tokio_unstable))] |server| tokio::task::spawn(
-                server.with_graceful_shutdown(cancellation_token.clone().cancelled_owned())
-                    .into_future()
-            ),
-            #[cfg(tokio_unstable)] |server| {
-                tokio::task::Builder::new()
-                    .name(format!("Kittybox HTTP acceptor: {:?}", server).as_str())
-                    .spawn(
-                        server.with_graceful_shutdown(
-                            cancellation_token.clone().cancelled_owned()
-                        ).into_future()
+    let mut servers_futures = Box::pin(
+        servers
+            .into_iter()
+            .map(
+                #[cfg(not(tokio_unstable))]
+                |server| {
+                    tokio::task::spawn(
+                        server
+                            .with_graceful_shutdown(cancellation_token.clone().cancelled_owned())
+                            .into_future(),
                     )
-                    .unwrap()
-            }
-        )
-        .collect::<futures_util::stream::FuturesUnordered<tokio::task::JoinHandle<Result<(), std::io::Error>>>>()
+                },
+                #[cfg(tokio_unstable)]
+                |server| {
+                    tokio::task::Builder::new()
+                        .name(format!("Kittybox HTTP acceptor: {:?}", server).as_str())
+                        .spawn(
+                            server
+                                .with_graceful_shutdown(
+                                    cancellation_token.clone().cancelled_owned(),
+                                )
+                                .into_future(),
+                        )
+                        .unwrap()
+                },
+            )
+            .collect::<futures_util::stream::FuturesUnordered<
+                tokio::task::JoinHandle<Result<(), std::io::Error>>,
+            >>(),
     );
 
     #[cfg(not(unix))]
@@ -329,10 +345,10 @@ async fn main() {
         use tokio::signal::unix::{signal, SignalKind};
 
         async move {
-            let mut interrupt = signal(SignalKind::interrupt())
-                .expect("Failed to set up SIGINT handler");
-            let mut terminate = signal(SignalKind::terminate())
-                .expect("Failed to setup SIGTERM handler");
+            let mut interrupt =
+                signal(SignalKind::interrupt()).expect("Failed to set up SIGINT handler");
+            let mut terminate =
+                signal(SignalKind::terminate()).expect("Failed to setup SIGTERM handler");
 
             tokio::select! {
                 _ = terminate.recv() => {},