about summary refs log tree commit diff
path: root/src/micropub/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/micropub/mod.rs')
-rw-r--r--src/micropub/mod.rs204
1 files changed, 160 insertions, 44 deletions
diff --git a/src/micropub/mod.rs b/src/micropub/mod.rs
index 7175e56..f426c77 100644
--- a/src/micropub/mod.rs
+++ b/src/micropub/mod.rs
@@ -1,7 +1,7 @@
 use std::convert::Infallible;
 use std::fmt::Display;
 use either::Either;
-use log::warn;
+use log::{info, warn, error};
 use warp::http::StatusCode;
 use warp::{Filter, Rejection, reject::InvalidQuery};
 use serde_json::json;
@@ -157,8 +157,38 @@ mod util {
     }
 }
 
+#[derive(Debug)]
+struct FetchedPostContext {
+    url: url::Url,
+    mf2: serde_json::Value,
+    webmention: Option<url::Url>
+}
+
+fn populate_reply_context(mf2: &serde_json::Value, prop: &str, ctxs: &[FetchedPostContext]) -> Option<serde_json::Value> {
+    if mf2["properties"][prop].is_array() {
+        Some(json!(
+            mf2["properties"][prop]
+                .as_array()
+            // Safe to unwrap because we checked its existence and type
+            // And it's not like we can make it disappear without unsafe code
+                .unwrap()
+                .iter()
+            // This seems to be O(n^2) and I don't like it.
+            // Nevertheless, I lack required knowledge to optimize it. Also, it works, so...
+                .map(|i| ctxs.iter()
+                     .find(|ctx| Some(ctx.url.as_str()) == i.as_str())
+                     .and_then(|ctx| ctx.mf2["items"].get(0))
+                     .or(Some(i))
+                     .unwrap())
+                .collect::<Vec<&serde_json::Value>>()
+        ))
+    } else {
+        None
+    }
+}
+
 // TODO actually save the post to the database and schedule post-processing
-pub(crate) async fn _post<D: Storage>(
+pub(crate) async fn _post<D: 'static + Storage>(
     user: crate::indieauth::User,
     uid: String,
     mf2: serde_json::Value,
@@ -238,22 +268,20 @@ pub(crate) async fn _post<D: Storage>(
     );
     
     // TODO: Post-processing the post (aka second write pass)
-    // - [ ] Download rich reply contexts
+    // - [x] Download rich reply contexts
     // - [ ] Syndicate the post if requested, add links to the syndicated copies
     // - [ ] Send WebSub notifications to the hub (if we happen to have one)
-    // - [ ] Send webmentions
-    #[allow(unused_imports)]
+    // - [x] Send webmentions
     tokio::task::spawn(async move {
-        use hyper::{Uri, Response, Body, body::HttpBody};
-        use bytes::{Buf, BufMut};
         use futures_util::StreamExt;
 
         let uid: &str = mf2["properties"]["uid"][0].as_str().unwrap();
 
-        let mut contextually_significant_posts: Vec<url::Url> = vec![];
-        for prop in &["in-reply-to", "like-of", "repost-of", "bookmark-of"] {
+        let context_props = ["in-reply-to", "like-of", "repost-of", "bookmark-of"];
+        let mut context_urls: Vec<url::Url> = vec![];
+        for prop in &context_props {
             if let Some(array) = mf2["properties"][prop].as_array() {
-                contextually_significant_posts.extend(
+                context_urls.extend(
                     array
                         .iter()
                         .filter_map(|v| v.as_str())
@@ -262,50 +290,62 @@ pub(crate) async fn _post<D: Storage>(
             }
         }
         // TODO parse HTML in e-content and add links found here
-        contextually_significant_posts.sort_unstable_by_key(|u| u.to_string());
-        contextually_significant_posts.dedup();
-
-        #[derive(Debug)]
-        #[allow(dead_code)]
-        struct FetchedPostContext {
-            url: url::Url,
-            // TODO see if we can use non-strings for non-UTF-8 HTML
-            html: String,
-            mf2: serde_json::Value,
-            webmention: Option<hyper::Uri>
-        }
+        context_urls.sort_unstable_by_key(|u| u.to_string());
+        context_urls.dedup();
 
-        {
-            // TODO: Make a stream to fetch all these posts and convert them to MF2
-            let post_contexts = {
-                let http = &http;
-                tokio_stream::iter(contextually_significant_posts
-                                   .into_iter()
-                ).then(move |url: url::Url| async move {
-                    let html: String = todo!("Fetch the post using {:?}", http);
-                    // We just need to get the following here:
+        // TODO: Make a stream to fetch all these posts and convert them to MF2
+        let post_contexts = {
+            let http = &http;
+            tokio_stream::iter(context_urls.into_iter())
+                .then(move |url: url::Url| http.get(url).send())
+                .filter_map(|response| futures::future::ready(response.ok()))
+                .filter(|response| futures::future::ready(response.status() == 200))
+                .filter_map(|response: reqwest::Response| async move {
                     // 1. We need to preserve the URL
                     // 2. We need to get the HTML for MF2 processing
                     // 3. We need to get the webmention endpoint address
                     // All of that can be done in one go.
-
-                    // XXX stub!
-
-                    dbg!(FetchedPostContext {
-                        url, html,
-                        mf2: serde_json::to_value(microformats::from_html(&html, url).unwrap()).unwrap(),
-                        webmention: None
-                    })
+                    let url = response.url().clone();
+                    // TODO parse link headers
+                    let links = response
+                        .headers()
+                        .get_all(hyper::http::header::LINK)
+                        .iter()
+                        .cloned()
+                        .collect::<Vec<hyper::http::HeaderValue>>();
+                    let html = response.text().await;
+                    if html.is_err() {
+                        return None;
+                    }
+                    let html = html.unwrap();
+                    let mf2 = microformats::from_html(&html, url.clone()).unwrap();
+                    // TODO use first Link: header if available
+                    let webmention: Option<url::Url> = mf2.rels.by_rels().get("webmention")
+                        .and_then(|i| i.first().cloned());
+
+                    dbg!(Some(FetchedPostContext {
+                        url, mf2: serde_json::to_value(mf2).unwrap(), webmention
+                    }))
                 })
-                    .collect::<Vec<FetchedPostContext>>()
-                    .await
-            };
+                .collect::<Vec<FetchedPostContext>>()
+                .await
+        };
 
-            drop(post_contexts);
+        let mut update = json!({ "replace": {} });
+        for prop in &context_props {
+            if let Some(json) = populate_reply_context(&mf2, prop, &post_contexts) {
+                update["replace"][prop] = json;
+            }
         }
+        if !update["replace"].as_object().unwrap().is_empty() {
+            if let Err(err) = db.update_post(uid, update).await {
+                error!("Failed to update post with rich reply contexts: {}", err);
+            }
+        }
+
         // At this point we can start syndicating the post.
         // Currently we don't really support any syndication endpoints, but still!
-        if let Some(syndicate_to) = mf2["properties"]["mp-syndicate-to"].as_array() {
+        /*if let Some(syndicate_to) = mf2["properties"]["mp-syndicate-to"].as_array() {
             let http = &http;
             tokio_stream::iter(syndicate_to)
                 .filter_map(|i| futures::future::ready(i.as_str()))
@@ -319,6 +359,37 @@ pub(crate) async fn _post<D: Storage>(
                     }
                 })
                 .await;
+        }*/
+
+        {
+            let http = &http;
+            tokio_stream::iter(
+                post_contexts.into_iter()
+                    .filter(|ctx| ctx.webmention.is_some()))
+                .for_each_concurrent(2, |ctx| async move {
+                    let mut map = std::collections::HashMap::new();
+                    map.insert("source", uid);
+                    map.insert("target", ctx.url.as_str());
+
+                    match http.post(ctx.webmention.unwrap().clone())
+                        .form(&map)
+                        .send()
+                        .await
+                    {
+                        Ok(res) => {
+                            if !res.status().is_success() {
+                                warn!(
+                                    "Failed to send a webmention for {}: got HTTP {}",
+                                    ctx.url, res.status()
+                                );
+                            } else {
+                                info!("Sent a webmention to {}, got HTTP {}", ctx.url, res.status())
+                            }
+                        },
+                        Err(err) => warn!("Failed to send a webmention for {}: {}", ctx.url, err)
+                    }
+                })
+                .await;
         }
     });
     
@@ -717,6 +788,51 @@ mod tests {
     use warp::{Filter, Reply};
     use serde_json::json;
 
+    use super::FetchedPostContext;
+
+    #[test]
+    fn test_populate_reply_context() {
+        let already_expanded_reply_ctx = json!({
+            "type": ["h-entry"],
+            "properties": {
+                "content": ["Hello world!"]
+            }
+        });
+        let mf2 = json!({
+            "type": ["h-entry"],
+            "properties": {
+                "like-of": [
+                    "https://fireburn.ru/posts/example",
+                    already_expanded_reply_ctx,
+                    "https://fireburn.ru/posts/non-existent"
+                ]
+            }
+        });
+        let test_ctx = json!({
+            "type": ["h-entry"],
+            "properties": {
+                "content": ["This is a post which was reacted to."]
+            }
+        });
+        let reply_contexts = vec![
+            FetchedPostContext {
+                url: "https://fireburn.ru/posts/example".parse().unwrap(),
+                mf2: json!({
+                    "items": [
+                        test_ctx
+                    ]
+                }),
+                webmention: None
+            }
+        ];
+
+        let like_of = super::populate_reply_context(&mf2, "like-of", &reply_contexts).unwrap();
+
+        assert_eq!(like_of[0], test_ctx);
+        assert_eq!(like_of[1], already_expanded_reply_ctx);
+        assert_eq!(like_of[2], "https://fireburn.ru/posts/non-existent");
+    }
+
     #[tokio::test]
     async fn check_post_reject_scope() {
         let inject_db = {