diff options
author | Vika <vika@fireburn.ru> | 2023-07-29 21:59:56 +0300 |
---|---|---|
committer | Vika <vika@fireburn.ru> | 2023-07-29 21:59:56 +0300 |
commit | 0617663b249f9ca488e5de652108b17d67fbaf45 (patch) | |
tree | 11564b6c8fa37bf9203a0a4cc1c4e9cc088cb1a5 /kittybox-rs/src/webmentions/mod.rs | |
parent | 26c2b79f6a6380ae3224e9309b9f3352f5717bd7 (diff) | |
download | kittybox-0617663b249f9ca488e5de652108b17d67fbaf45.tar.zst |
Moved the entire Kittybox tree into the root
Diffstat (limited to 'kittybox-rs/src/webmentions/mod.rs')
-rw-r--r-- | kittybox-rs/src/webmentions/mod.rs | 195 |
1 files changed, 0 insertions, 195 deletions
diff --git a/kittybox-rs/src/webmentions/mod.rs b/kittybox-rs/src/webmentions/mod.rs deleted file mode 100644 index 95ea870..0000000 --- a/kittybox-rs/src/webmentions/mod.rs +++ /dev/null @@ -1,195 +0,0 @@ -use axum::{Form, response::{IntoResponse, Response}, Extension}; -use axum::http::StatusCode; -use tracing::error; - -use crate::database::{Storage, StorageError}; -use self::queue::JobQueue; -pub mod queue; - -#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] -#[cfg_attr(feature = "sqlx", derive(sqlx::FromRow))] -pub struct Webmention { - source: String, - target: String, -} - -impl queue::JobItem for Webmention {} -impl queue::PostgresJobItem for Webmention { - const DATABASE_NAME: &'static str = "kittybox_webmention.incoming_webmention_queue"; - const NOTIFICATION_CHANNEL: &'static str = "incoming_webmention"; -} - -async fn accept_webmention<Q: JobQueue<Webmention>>( - Extension(queue): Extension<Q>, - Form(webmention): Form<Webmention>, -) -> Response { - if let Err(err) = webmention.source.parse::<url::Url>() { - return (StatusCode::BAD_REQUEST, err.to_string()).into_response() - } - if let Err(err) = webmention.target.parse::<url::Url>() { - return (StatusCode::BAD_REQUEST, err.to_string()).into_response() - } - - match queue.put(&webmention).await { - Ok(id) => (StatusCode::ACCEPTED, [ - ("Location", format!("/.kittybox/webmention/{id}")) - ]).into_response(), - Err(err) => (StatusCode::INTERNAL_SERVER_ERROR, [ - ("Content-Type", "text/plain") - ], err.to_string()).into_response() - } -} - -pub fn router<Q: JobQueue<Webmention>, S: Storage + 'static>( - queue: Q, db: S, http: reqwest::Client, - cancellation_token: tokio_util::sync::CancellationToken -) -> (axum::Router, SupervisedTask) { - // Automatically spawn a background task to handle webmentions - let bgtask_handle = supervised_webmentions_task(queue.clone(), db, http, cancellation_token); - - let router = axum::Router::new() - .route("/.kittybox/webmention", - axum::routing::post(accept_webmention::<Q>) - ) - .layer(Extension(queue)); - - (router, bgtask_handle) -} - -#[derive(thiserror::Error, Debug)] -pub enum SupervisorError { - #[error("the task was explicitly cancelled")] - Cancelled -} - -pub type SupervisedTask = tokio::task::JoinHandle<Result<std::convert::Infallible, SupervisorError>>; - -pub fn supervisor<E, A, F>(mut f: F, cancellation_token: tokio_util::sync::CancellationToken) -> SupervisedTask -where - E: std::error::Error + std::fmt::Debug + Send + 'static, - A: std::future::Future<Output = Result<std::convert::Infallible, E>> + Send + 'static, - F: FnMut() -> A + Send + 'static -{ - - let supervisor_future = async move { - loop { - // Don't spawn the task if we are already cancelled, but - // have somehow missed it (probably because the task - // crashed and we immediately received a cancellation - // request after noticing the crashed task) - if cancellation_token.is_cancelled() { - return Err(SupervisorError::Cancelled) - } - let task = tokio::task::spawn(f()); - tokio::select! { - _ = cancellation_token.cancelled() => { - tracing::info!("Shutdown of background task {:?} requested.", std::any::type_name::<A>()); - return Err(SupervisorError::Cancelled) - } - task_result = task => match task_result { - Err(e) => tracing::error!("background task {:?} exited unexpectedly: {}", std::any::type_name::<A>(), e), - Ok(Err(e)) => tracing::error!("background task {:?} returned error: {}", std::any::type_name::<A>(), e), - Ok(Ok(_)) => unreachable!("task's Ok is Infallible") - } - } - tracing::debug!("Sleeping for a little while to back-off..."); - tokio::time::sleep(std::time::Duration::from_secs(5)).await; - } - }; - #[cfg(not(tokio_unstable))] - return tokio::task::spawn(supervisor_future); - #[cfg(tokio_unstable)] - return tokio::task::Builder::new() - .name(format!("supervisor for background task {}", std::any::type_name::<A>()).as_str()) - .spawn(supervisor_future) - .unwrap(); -} - -mod check; - -#[derive(thiserror::Error, Debug)] -enum Error<Q: std::error::Error + std::fmt::Debug + Send + 'static> { - #[error("queue error: {0}")] - Queue(#[from] Q), - #[error("storage error: {0}")] - Storage(StorageError) -} - -async fn process_webmentions_from_queue<Q: JobQueue<Webmention>, S: Storage + 'static>(queue: Q, db: S, http: reqwest::Client) -> Result<std::convert::Infallible, Error<Q::Error>> { - use futures_util::StreamExt; - use self::queue::Job; - - let mut stream = queue.into_stream().await?; - while let Some(item) = stream.next().await.transpose()? { - let job = item.job(); - let (source, target) = ( - job.source.parse::<url::Url>().unwrap(), - job.target.parse::<url::Url>().unwrap() - ); - - let (code, text) = match http.get(source.clone()).send().await { - Ok(response) => { - let code = response.status(); - if ![StatusCode::OK, StatusCode::GONE].iter().any(|i| i == &code) { - error!("error processing webmention: webpage fetch returned {}", code); - continue; - } - match response.text().await { - Ok(text) => (code, text), - Err(err) => { - error!("error processing webmention: error fetching webpage text: {}", err); - continue - } - } - } - Err(err) => { - error!("error processing webmention: error requesting webpage: {}", err); - continue - } - }; - - if code == StatusCode::GONE { - todo!("removing webmentions is not implemented yet"); - // db.remove_webmention(target.as_str(), source.as_str()).await.map_err(Error::<Q::Error>::Storage)?; - } else { - // Verify webmention - let (mention_type, mut mention) = match tokio::task::block_in_place({ - || check::check_mention(text, &source, &target) - }) { - Ok(Some(mention_type)) => mention_type, - Ok(None) => { - error!("webmention {} -> {} invalid, rejecting", source, target); - item.done().await?; - continue; - } - Err(err) => { - error!("error processing webmention: error checking webmention: {}", err); - continue; - } - }; - - { - mention["type"] = serde_json::json!(["h-cite"]); - - if !mention["properties"].as_object().unwrap().contains_key("uid") { - let url = mention["properties"]["url"][0].as_str().unwrap_or_else(|| target.as_str()).to_owned(); - let props = mention["properties"].as_object_mut().unwrap(); - props.insert("uid".to_owned(), serde_json::Value::Array( - vec![serde_json::Value::String(url)]) - ); - } - } - - db.add_or_update_webmention(target.as_str(), mention_type, mention).await.map_err(Error::<Q::Error>::Storage)?; - } - } - unreachable!() -} - -fn supervised_webmentions_task<Q: JobQueue<Webmention>, S: Storage + 'static>( - queue: Q, db: S, - http: reqwest::Client, - cancellation_token: tokio_util::sync::CancellationToken -) -> SupervisedTask { - supervisor::<Error<Q::Error>, _, _>(move || process_webmentions_from_queue(queue.clone(), db.clone(), http.clone()), cancellation_token) -} |