From 94ebc5e653191fcaacfa91dddebf88dca7e7b7fe Mon Sep 17 00:00:00 2001 From: Vika Date: Mon, 17 Jul 2023 01:52:09 +0300 Subject: Put Micropub background processing tasks in a JoinSet MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This allows using tree-structured concurrency to keep background tasks in check and allow them to finish running before shutting down — a necessary prerequisite for shutdown-on-idle. (A background task may take a bit too long to complete, and we may need to wait for it.) --- kittybox-rs/src/frontend/onboarding.rs | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) (limited to 'kittybox-rs/src/frontend') diff --git a/kittybox-rs/src/frontend/onboarding.rs b/kittybox-rs/src/frontend/onboarding.rs index d5cde02..e44e866 100644 --- a/kittybox-rs/src/frontend/onboarding.rs +++ b/kittybox-rs/src/frontend/onboarding.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::database::{settings, Storage}; use axum::{ extract::{Extension, Host}, @@ -7,6 +9,7 @@ use axum::{ }; use kittybox_frontend_renderer::{ErrorPage, OnboardingPage, Template}; use serde::Deserialize; +use tokio::{task::JoinSet, sync::Mutex}; use tracing::{debug, error}; use super::FrontendError; @@ -51,6 +54,7 @@ async fn onboard( user_uid: url::Url, data: OnboardingData, http: reqwest::Client, + jobset: Arc>>, ) -> Result<(), FrontendError> { // Create a user to pass to the backend // At this point the site belongs to nobody, so it is safe to do @@ -115,7 +119,7 @@ async fn onboard( } let (uid, post) = crate::micropub::normalize_mf2(data.first_post, &user); tracing::debug!("Posting first post {}...", uid); - crate::micropub::_post(&user, uid, post, db, http) + crate::micropub::_post(&user, uid, post, db, http, jobset) .await .map_err(|e| FrontendError { msg: "Error while posting the first post".to_string(), @@ -130,6 +134,7 @@ pub async fn post( Extension(db): Extension, Host(host): Host, Extension(http): Extension, + Extension(jobset): Extension>>>, Json(data): Json, ) -> axum::response::Response { let user_uid = format!("https://{}/", host.as_str()); @@ -137,7 +142,7 @@ pub async fn post( if db.post_exists(&user_uid).await.unwrap() { IntoResponse::into_response((StatusCode::FOUND, [("Location", "/")])) } else { - match onboard(db, user_uid.parse().unwrap(), data, http).await { + match onboard(db, user_uid.parse().unwrap(), data, http, jobset).await { Ok(()) => IntoResponse::into_response((StatusCode::FOUND, [("Location", "/")])), Err(err) => { error!("Onboarding error: {}", err); @@ -163,9 +168,14 @@ pub async fn post( } } -pub fn router(database: S, http: reqwest::Client) -> axum::routing::MethodRouter { +pub fn router( + database: S, + http: reqwest::Client, + jobset: Arc>>, +) -> axum::routing::MethodRouter { axum::routing::get(get) .post(post::) .layer::<_, _, std::convert::Infallible>(axum::Extension(database)) - .layer(axum::Extension(http)) + .layer::<_, _, std::convert::Infallible>(axum::Extension(http)) + .layer(axum::Extension(jobset)) } -- cgit 1.4.1