about summary refs log tree commit diff
diff options
context:
space:
mode:
authorVika <vika@fireburn.ru>2023-07-22 12:27:38 +0300
committerVika <vika@fireburn.ru>2023-07-22 12:27:38 +0300
commit769c036a58ec9f3ee47769372e2738b6089c9f1f (patch)
tree0bf578c4d400037c289e5e322e94b9f4f54785cb
parent39ddd3689aa4ef38580ea90087e1e204b55fcfc7 (diff)
downloadkittybox-769c036a58ec9f3ee47769372e2738b6089c9f1f.tar.zst
webmentions: check webmentions and save them to the database
-rw-r--r--kittybox-rs/src/webmentions/check.rs104
-rw-r--r--kittybox-rs/src/webmentions/mod.rs85
2 files changed, 183 insertions, 6 deletions
diff --git a/kittybox-rs/src/webmentions/check.rs b/kittybox-rs/src/webmentions/check.rs
new file mode 100644
index 0000000..eb4afcf
--- /dev/null
+++ b/kittybox-rs/src/webmentions/check.rs
@@ -0,0 +1,104 @@
+use std::{cell::RefCell, rc::Rc};
+use microformats::{types::PropertyValue, html5ever::{self, tendril::TendrilSink}};
+use kittybox_util::MentionType;
+
+#[derive(thiserror::Error, Debug)]
+pub enum Error {
+    #[error("microformats error: {0}")]
+    Microformats(#[from] microformats::Error),
+    // #[error("json error: {0}")]
+    // Json(#[from] serde_json::Error),
+    #[error("url parse error: {0}")]
+    UrlParse(#[from] url::ParseError),
+}
+
+pub fn check_mention(document: impl AsRef<str>, base_url: &url::Url, link: &url::Url) -> Result<Option<(MentionType, serde_json::Value)>, Error> {
+    // First, check the document for MF2 markup
+    let document = microformats::from_html(document.as_ref(), base_url.clone())?;
+
+    // Get an iterator of all items
+    let items_iter = document.items.iter()
+        .map(AsRef::as_ref)
+        .map(RefCell::borrow);
+
+    for item in items_iter {
+        let props = item.properties.borrow();
+        for (prop, interaction_type) in [
+            ("in-reply-to", MentionType::Reply), ("like-of", MentionType::Like),
+            ("bookmark-of", MentionType::Bookmark), ("repost-of", MentionType::Repost)
+        ] {
+            if let Some(propvals) = props.get(prop) {
+                for val in propvals {
+                    if let PropertyValue::Url(url) = val {
+                        if url == link {
+                            return Ok(Some((interaction_type, serde_json::to_value(&*item).unwrap())))
+                        }
+                    }
+                }
+            }
+        }
+        // Process `content`
+        if let Some(PropertyValue::Fragment(content)) = props.get("content")
+            .map(Vec::as_slice)
+            .unwrap_or_default()
+            .first()
+        {
+            let root = html5ever::parse_document(html5ever::rcdom::RcDom::default(), Default::default())
+                .from_utf8()
+                .one(content.html.to_owned().as_bytes())
+                .document;
+
+            // This is a trick to unwrap recursion into a loop
+            //
+            // A list of unprocessed node is made. Then, in each
+            // iteration, the list is "taken" and replaced with an
+            // empty list, which is populated with nodes for the next
+            // iteration of the loop.
+            //
+            // Empty list means all nodes were processed.
+            let mut unprocessed_nodes: Vec<Rc<html5ever::rcdom::Node>> = root.children.borrow().iter().cloned().collect();
+            while !unprocessed_nodes.is_empty() {
+                // "Take" the list out of its memory slot, replace it with an empty list
+                let nodes = std::mem::take(&mut unprocessed_nodes);
+                'nodes_loop: for node in nodes.into_iter() {
+                    // Add children nodes to the list for the next iteration
+                    unprocessed_nodes.extend(node.children.borrow().iter().cloned());
+
+                    if let html5ever::rcdom::NodeData::Element { ref name, ref attrs, .. } = node.data {
+                        // If it's not `<a>`, skip it
+                        if name.local != *"a" { continue; }
+                        let mut is_mention: bool = false;
+                        for attr in attrs.borrow().iter() {
+                            if attr.name.local == *"rel" {
+                                // Don't count `rel="nofollow"` links — a web crawler should ignore them
+                                // and so for purposes of driving visitors they are useless
+                                if attr.value
+                                    .as_ref()
+                                    .split([',', ' '])
+                                    .any(|v| v == "nofollow")
+                                {
+                                    // Skip the entire node.
+                                    continue 'nodes_loop;
+                                }
+                            }
+                            // if it's not `<a href="...">`, skip it 
+                            if attr.name.local != *"href" { continue; }
+                            // Be forgiving in parsing URLs, and resolve them against the base URL
+                            if let Ok(url) = base_url.join(attr.value.as_ref()) {
+                                if &url == link {
+                                    is_mention = true;
+                                }
+                            }
+                        }
+                        if is_mention {
+                            return Ok(Some((MentionType::Mention, serde_json::to_value(&*item).unwrap())));
+                        }
+                    }
+                }
+            }
+            
+        }
+    }
+
+    Ok(None)
+}
diff --git a/kittybox-rs/src/webmentions/mod.rs b/kittybox-rs/src/webmentions/mod.rs
index becddbc..a47fadb 100644
--- a/kittybox-rs/src/webmentions/mod.rs
+++ b/kittybox-rs/src/webmentions/mod.rs
@@ -1,6 +1,8 @@
 use axum::{Form, response::{IntoResponse, Response}, Extension};
 use axum::http::StatusCode;
+use tracing::error;
 
+use crate::database::{Storage, StorageError};
 use self::queue::JobQueue;
 pub mod queue;
 
@@ -38,9 +40,12 @@ async fn accept_webmention<Q: JobQueue<Webmention>>(
     }
 }
 
-pub fn router<Q: JobQueue<Webmention>>(queue: Q, cancellation_token: tokio_util::sync::CancellationToken) -> (axum::Router, SupervisedTask) {
+pub fn router<Q: JobQueue<Webmention>, S: Storage + 'static>(
+    queue: Q, db: S, http: reqwest::Client,
+    cancellation_token: tokio_util::sync::CancellationToken
+) -> (axum::Router, SupervisedTask) {
     // Automatically spawn a background task to handle webmentions
-    let bgtask_handle = supervised_webmentions_task(queue.clone(), cancellation_token);
+    let bgtask_handle = supervised_webmentions_task(queue.clone(), db, http, cancellation_token);
 
     let router = axum::Router::new()
         .route("/.kittybox/webmention",
@@ -100,7 +105,17 @@ where
         .unwrap();
 }
 
-async fn process_webmentions_from_queue<Q: JobQueue<Webmention>>(queue: Q) -> Result<std::convert::Infallible, Q::Error> {
+mod check;
+
+#[derive(thiserror::Error, Debug)]
+enum Error<Q: std::error::Error + std::fmt::Debug + Send + 'static> {
+    #[error("queue error: {0}")]
+    Queue(#[from] Q),
+    #[error("storage error: {0}")]
+    Storage(StorageError)
+}
+
+async fn process_webmentions_from_queue<Q: JobQueue<Webmention>, S: Storage + 'static>(queue: Q, db: S, http: reqwest::Client) -> Result<std::convert::Infallible, Error<Q::Error>> {
     use futures_util::StreamExt;
     use self::queue::Job;
 
@@ -112,11 +127,69 @@ async fn process_webmentions_from_queue<Q: JobQueue<Webmention>>(queue: Q) -> Re
             job.target.parse::<url::Url>().unwrap()
         );
 
-        todo!()
+        let (code, text) = match http.get(source.clone()).send().await {
+            Ok(response) => {
+                let code = response.status();
+                if ![StatusCode::OK, StatusCode::GONE].iter().any(|i| i == &code) {
+                    error!("error processing webmention: webpage fetch returned {}", code);
+                    continue;
+                }
+                match response.text().await {
+                    Ok(text) => (code, text),
+                    Err(err) => {
+                        error!("error processing webmention: error fetching webpage text: {}", err);
+                        continue
+                    }
+                }
+            }
+            Err(err) => {
+                error!("error processing webmention: error requesting webpage: {}", err);
+                continue
+            }
+        };
+
+        if code == StatusCode::GONE {
+            todo!("removing webmentions is not implemented yet");
+            // db.remove_webmention(target.as_str(), source.as_str()).await.map_err(Error::<Q::Error>::Storage)?;
+        } else {
+            // Verify webmention
+            let (mention_type, mut mention) = match tokio::task::block_in_place({
+                || check::check_mention(text, &source, &target)
+            }) {
+                Ok(Some(mention_type)) => mention_type,
+                Ok(None) => {
+                    error!("webmention {} -> {} invalid, rejecting", source, target);
+                    item.done().await?;
+                    continue;
+                }
+                Err(err) => {
+                    error!("error processing webmention: error checking webmention: {}", err);
+                    continue;
+                }
+            };
+
+            {
+                mention["type"] = serde_json::json!(["h-cite"]);
+
+                if !mention["properties"].as_object().unwrap().contains_key("uid") {
+                    let url = mention["properties"]["url"][0].as_str().unwrap_or_else(|| target.as_str()).to_owned();
+                    let props = mention["properties"].as_object_mut().unwrap();
+                    props.insert("uid".to_owned(), serde_json::Value::Array(
+                        vec![serde_json::Value::String(url)])
+                    );
+                }
+            }
+
+            db.add_or_update_webmention(target.as_str(), mention_type, mention).await.map_err(Error::<Q::Error>::Storage)?;
+        }
     }
     unreachable!()
 }
 
-fn supervised_webmentions_task<Q: JobQueue<Webmention>>(queue: Q, cancellation_token: tokio_util::sync::CancellationToken) -> SupervisedTask {
-    supervisor::<Q::Error, _, _>(move || process_webmentions_from_queue(queue.clone()), cancellation_token)
+fn supervised_webmentions_task<Q: JobQueue<Webmention>, S: Storage + 'static>(
+    queue: Q, db: S,
+    http: reqwest::Client,
+    cancellation_token: tokio_util::sync::CancellationToken
+) -> SupervisedTask {
+    supervisor::<Error<Q::Error>, _, _>(move || process_webmentions_from_queue(queue.clone(), db.clone(), http.clone()), cancellation_token)
 }