diff options
Diffstat (limited to 'kittybox-rs/src/webmentions/mod.rs')
-rw-r--r-- | kittybox-rs/src/webmentions/mod.rs | 85 |
1 files changed, 79 insertions, 6 deletions
diff --git a/kittybox-rs/src/webmentions/mod.rs b/kittybox-rs/src/webmentions/mod.rs index becddbc..a47fadb 100644 --- a/kittybox-rs/src/webmentions/mod.rs +++ b/kittybox-rs/src/webmentions/mod.rs @@ -1,6 +1,8 @@ use axum::{Form, response::{IntoResponse, Response}, Extension}; use axum::http::StatusCode; +use tracing::error; +use crate::database::{Storage, StorageError}; use self::queue::JobQueue; pub mod queue; @@ -38,9 +40,12 @@ async fn accept_webmention<Q: JobQueue<Webmention>>( } } -pub fn router<Q: JobQueue<Webmention>>(queue: Q, cancellation_token: tokio_util::sync::CancellationToken) -> (axum::Router, SupervisedTask) { +pub fn router<Q: JobQueue<Webmention>, S: Storage + 'static>( + queue: Q, db: S, http: reqwest::Client, + cancellation_token: tokio_util::sync::CancellationToken +) -> (axum::Router, SupervisedTask) { // Automatically spawn a background task to handle webmentions - let bgtask_handle = supervised_webmentions_task(queue.clone(), cancellation_token); + let bgtask_handle = supervised_webmentions_task(queue.clone(), db, http, cancellation_token); let router = axum::Router::new() .route("/.kittybox/webmention", @@ -100,7 +105,17 @@ where .unwrap(); } -async fn process_webmentions_from_queue<Q: JobQueue<Webmention>>(queue: Q) -> Result<std::convert::Infallible, Q::Error> { +mod check; + +#[derive(thiserror::Error, Debug)] +enum Error<Q: std::error::Error + std::fmt::Debug + Send + 'static> { + #[error("queue error: {0}")] + Queue(#[from] Q), + #[error("storage error: {0}")] + Storage(StorageError) +} + +async fn process_webmentions_from_queue<Q: JobQueue<Webmention>, S: Storage + 'static>(queue: Q, db: S, http: reqwest::Client) -> Result<std::convert::Infallible, Error<Q::Error>> { use futures_util::StreamExt; use self::queue::Job; @@ -112,11 +127,69 @@ async fn process_webmentions_from_queue<Q: JobQueue<Webmention>>(queue: Q) -> Re job.target.parse::<url::Url>().unwrap() ); - todo!() + let (code, text) = match http.get(source.clone()).send().await { + Ok(response) => { + let code = response.status(); + if ![StatusCode::OK, StatusCode::GONE].iter().any(|i| i == &code) { + error!("error processing webmention: webpage fetch returned {}", code); + continue; + } + match response.text().await { + Ok(text) => (code, text), + Err(err) => { + error!("error processing webmention: error fetching webpage text: {}", err); + continue + } + } + } + Err(err) => { + error!("error processing webmention: error requesting webpage: {}", err); + continue + } + }; + + if code == StatusCode::GONE { + todo!("removing webmentions is not implemented yet"); + // db.remove_webmention(target.as_str(), source.as_str()).await.map_err(Error::<Q::Error>::Storage)?; + } else { + // Verify webmention + let (mention_type, mut mention) = match tokio::task::block_in_place({ + || check::check_mention(text, &source, &target) + }) { + Ok(Some(mention_type)) => mention_type, + Ok(None) => { + error!("webmention {} -> {} invalid, rejecting", source, target); + item.done().await?; + continue; + } + Err(err) => { + error!("error processing webmention: error checking webmention: {}", err); + continue; + } + }; + + { + mention["type"] = serde_json::json!(["h-cite"]); + + if !mention["properties"].as_object().unwrap().contains_key("uid") { + let url = mention["properties"]["url"][0].as_str().unwrap_or_else(|| target.as_str()).to_owned(); + let props = mention["properties"].as_object_mut().unwrap(); + props.insert("uid".to_owned(), serde_json::Value::Array( + vec![serde_json::Value::String(url)]) + ); + } + } + + db.add_or_update_webmention(target.as_str(), mention_type, mention).await.map_err(Error::<Q::Error>::Storage)?; + } } unreachable!() } -fn supervised_webmentions_task<Q: JobQueue<Webmention>>(queue: Q, cancellation_token: tokio_util::sync::CancellationToken) -> SupervisedTask { - supervisor::<Q::Error, _, _>(move || process_webmentions_from_queue(queue.clone()), cancellation_token) +fn supervised_webmentions_task<Q: JobQueue<Webmention>, S: Storage + 'static>( + queue: Q, db: S, + http: reqwest::Client, + cancellation_token: tokio_util::sync::CancellationToken +) -> SupervisedTask { + supervisor::<Error<Q::Error>, _, _>(move || process_webmentions_from_queue(queue.clone(), db.clone(), http.clone()), cancellation_token) } |