diff options
author | Vika <vika@fireburn.ru> | 2023-07-22 12:27:38 +0300 |
---|---|---|
committer | Vika <vika@fireburn.ru> | 2023-07-22 12:27:38 +0300 |
commit | 769c036a58ec9f3ee47769372e2738b6089c9f1f (patch) | |
tree | 0bf578c4d400037c289e5e322e94b9f4f54785cb | |
parent | 39ddd3689aa4ef38580ea90087e1e204b55fcfc7 (diff) | |
download | kittybox-769c036a58ec9f3ee47769372e2738b6089c9f1f.tar.zst |
webmentions: check webmentions and save them to the database
-rw-r--r-- | kittybox-rs/src/webmentions/check.rs | 104 | ||||
-rw-r--r-- | kittybox-rs/src/webmentions/mod.rs | 85 |
2 files changed, 183 insertions, 6 deletions
diff --git a/kittybox-rs/src/webmentions/check.rs b/kittybox-rs/src/webmentions/check.rs new file mode 100644 index 0000000..eb4afcf --- /dev/null +++ b/kittybox-rs/src/webmentions/check.rs @@ -0,0 +1,104 @@ +use std::{cell::RefCell, rc::Rc}; +use microformats::{types::PropertyValue, html5ever::{self, tendril::TendrilSink}}; +use kittybox_util::MentionType; + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("microformats error: {0}")] + Microformats(#[from] microformats::Error), + // #[error("json error: {0}")] + // Json(#[from] serde_json::Error), + #[error("url parse error: {0}")] + UrlParse(#[from] url::ParseError), +} + +pub fn check_mention(document: impl AsRef<str>, base_url: &url::Url, link: &url::Url) -> Result<Option<(MentionType, serde_json::Value)>, Error> { + // First, check the document for MF2 markup + let document = microformats::from_html(document.as_ref(), base_url.clone())?; + + // Get an iterator of all items + let items_iter = document.items.iter() + .map(AsRef::as_ref) + .map(RefCell::borrow); + + for item in items_iter { + let props = item.properties.borrow(); + for (prop, interaction_type) in [ + ("in-reply-to", MentionType::Reply), ("like-of", MentionType::Like), + ("bookmark-of", MentionType::Bookmark), ("repost-of", MentionType::Repost) + ] { + if let Some(propvals) = props.get(prop) { + for val in propvals { + if let PropertyValue::Url(url) = val { + if url == link { + return Ok(Some((interaction_type, serde_json::to_value(&*item).unwrap()))) + } + } + } + } + } + // Process `content` + if let Some(PropertyValue::Fragment(content)) = props.get("content") + .map(Vec::as_slice) + .unwrap_or_default() + .first() + { + let root = html5ever::parse_document(html5ever::rcdom::RcDom::default(), Default::default()) + .from_utf8() + .one(content.html.to_owned().as_bytes()) + .document; + + // This is a trick to unwrap recursion into a loop + // + // A list of unprocessed node is made. Then, in each + // iteration, the list is "taken" and replaced with an + // empty list, which is populated with nodes for the next + // iteration of the loop. + // + // Empty list means all nodes were processed. + let mut unprocessed_nodes: Vec<Rc<html5ever::rcdom::Node>> = root.children.borrow().iter().cloned().collect(); + while !unprocessed_nodes.is_empty() { + // "Take" the list out of its memory slot, replace it with an empty list + let nodes = std::mem::take(&mut unprocessed_nodes); + 'nodes_loop: for node in nodes.into_iter() { + // Add children nodes to the list for the next iteration + unprocessed_nodes.extend(node.children.borrow().iter().cloned()); + + if let html5ever::rcdom::NodeData::Element { ref name, ref attrs, .. } = node.data { + // If it's not `<a>`, skip it + if name.local != *"a" { continue; } + let mut is_mention: bool = false; + for attr in attrs.borrow().iter() { + if attr.name.local == *"rel" { + // Don't count `rel="nofollow"` links — a web crawler should ignore them + // and so for purposes of driving visitors they are useless + if attr.value + .as_ref() + .split([',', ' ']) + .any(|v| v == "nofollow") + { + // Skip the entire node. + continue 'nodes_loop; + } + } + // if it's not `<a href="...">`, skip it + if attr.name.local != *"href" { continue; } + // Be forgiving in parsing URLs, and resolve them against the base URL + if let Ok(url) = base_url.join(attr.value.as_ref()) { + if &url == link { + is_mention = true; + } + } + } + if is_mention { + return Ok(Some((MentionType::Mention, serde_json::to_value(&*item).unwrap()))); + } + } + } + } + + } + } + + Ok(None) +} diff --git a/kittybox-rs/src/webmentions/mod.rs b/kittybox-rs/src/webmentions/mod.rs index becddbc..a47fadb 100644 --- a/kittybox-rs/src/webmentions/mod.rs +++ b/kittybox-rs/src/webmentions/mod.rs @@ -1,6 +1,8 @@ use axum::{Form, response::{IntoResponse, Response}, Extension}; use axum::http::StatusCode; +use tracing::error; +use crate::database::{Storage, StorageError}; use self::queue::JobQueue; pub mod queue; @@ -38,9 +40,12 @@ async fn accept_webmention<Q: JobQueue<Webmention>>( } } -pub fn router<Q: JobQueue<Webmention>>(queue: Q, cancellation_token: tokio_util::sync::CancellationToken) -> (axum::Router, SupervisedTask) { +pub fn router<Q: JobQueue<Webmention>, S: Storage + 'static>( + queue: Q, db: S, http: reqwest::Client, + cancellation_token: tokio_util::sync::CancellationToken +) -> (axum::Router, SupervisedTask) { // Automatically spawn a background task to handle webmentions - let bgtask_handle = supervised_webmentions_task(queue.clone(), cancellation_token); + let bgtask_handle = supervised_webmentions_task(queue.clone(), db, http, cancellation_token); let router = axum::Router::new() .route("/.kittybox/webmention", @@ -100,7 +105,17 @@ where .unwrap(); } -async fn process_webmentions_from_queue<Q: JobQueue<Webmention>>(queue: Q) -> Result<std::convert::Infallible, Q::Error> { +mod check; + +#[derive(thiserror::Error, Debug)] +enum Error<Q: std::error::Error + std::fmt::Debug + Send + 'static> { + #[error("queue error: {0}")] + Queue(#[from] Q), + #[error("storage error: {0}")] + Storage(StorageError) +} + +async fn process_webmentions_from_queue<Q: JobQueue<Webmention>, S: Storage + 'static>(queue: Q, db: S, http: reqwest::Client) -> Result<std::convert::Infallible, Error<Q::Error>> { use futures_util::StreamExt; use self::queue::Job; @@ -112,11 +127,69 @@ async fn process_webmentions_from_queue<Q: JobQueue<Webmention>>(queue: Q) -> Re job.target.parse::<url::Url>().unwrap() ); - todo!() + let (code, text) = match http.get(source.clone()).send().await { + Ok(response) => { + let code = response.status(); + if ![StatusCode::OK, StatusCode::GONE].iter().any(|i| i == &code) { + error!("error processing webmention: webpage fetch returned {}", code); + continue; + } + match response.text().await { + Ok(text) => (code, text), + Err(err) => { + error!("error processing webmention: error fetching webpage text: {}", err); + continue + } + } + } + Err(err) => { + error!("error processing webmention: error requesting webpage: {}", err); + continue + } + }; + + if code == StatusCode::GONE { + todo!("removing webmentions is not implemented yet"); + // db.remove_webmention(target.as_str(), source.as_str()).await.map_err(Error::<Q::Error>::Storage)?; + } else { + // Verify webmention + let (mention_type, mut mention) = match tokio::task::block_in_place({ + || check::check_mention(text, &source, &target) + }) { + Ok(Some(mention_type)) => mention_type, + Ok(None) => { + error!("webmention {} -> {} invalid, rejecting", source, target); + item.done().await?; + continue; + } + Err(err) => { + error!("error processing webmention: error checking webmention: {}", err); + continue; + } + }; + + { + mention["type"] = serde_json::json!(["h-cite"]); + + if !mention["properties"].as_object().unwrap().contains_key("uid") { + let url = mention["properties"]["url"][0].as_str().unwrap_or_else(|| target.as_str()).to_owned(); + let props = mention["properties"].as_object_mut().unwrap(); + props.insert("uid".to_owned(), serde_json::Value::Array( + vec![serde_json::Value::String(url)]) + ); + } + } + + db.add_or_update_webmention(target.as_str(), mention_type, mention).await.map_err(Error::<Q::Error>::Storage)?; + } } unreachable!() } -fn supervised_webmentions_task<Q: JobQueue<Webmention>>(queue: Q, cancellation_token: tokio_util::sync::CancellationToken) -> SupervisedTask { - supervisor::<Q::Error, _, _>(move || process_webmentions_from_queue(queue.clone()), cancellation_token) +fn supervised_webmentions_task<Q: JobQueue<Webmention>, S: Storage + 'static>( + queue: Q, db: S, + http: reqwest::Client, + cancellation_token: tokio_util::sync::CancellationToken +) -> SupervisedTask { + supervisor::<Error<Q::Error>, _, _>(move || process_webmentions_from_queue(queue.clone(), db.clone(), http.clone()), cancellation_token) } |