about summary refs log tree commit diff
diff options
context:
space:
mode:
authorVika <vika@fireburn.ru>2021-05-06 17:13:03 +0300
committerVika <vika@fireburn.ru>2021-05-06 17:13:03 +0300
commitaecb72d104dae2bcdeafdb01f0e33ad92ecf27d4 (patch)
treebfa3178bd47ebbe701e99307ba3842f717bba719
parentd56fcb55c033e47d5b4ccadef5032f6f0412b642 (diff)
downloadkittybox-aecb72d104dae2bcdeafdb01f0e33ad92ecf27d4.tar.zst
Added post-processing of new posts
This launches a background task to handle:
 - Downloading reply contexts (requires an MF2 parser, doesn't work yet)
 - Syndicating the post (currently no syndication targets are defined)
 - Sending WebSub notifications if a hub is present (WIP)
 - Sending webmentions (ok this one is fully implemented... I hope!)

 This background task should not impact processing times and should
 never conflict with futher updates of a post, since the database is
 guaranteed to be fully atomic. Inside of the task, you can run long
 asynchronous fetching and stuff, just don't block the whole thread.
-rw-r--r--src/micropub/post.rs149
1 files changed, 143 insertions, 6 deletions
diff --git a/src/micropub/post.rs b/src/micropub/post.rs
index 37cbe26..39a06ce 100644
--- a/src/micropub/post.rs
+++ b/src/micropub/post.rs
@@ -1,6 +1,9 @@
 use core::iter::Iterator;
 use std::str::FromStr;
 use std::convert::TryInto;
+use log::error;
+use futures::stream;
+use futures::StreamExt;
 use chrono::prelude::*;
 use http_types::Mime;
 use tide::prelude::json;
@@ -198,12 +201,9 @@ async fn new_post<S: Storage>(req: Request<ApplicationState<S>>, body: serde_jso
         }
     }
     // END WRITE BOUNDARY
-    //drop(storage);
-    // TODO: Post-processing the post (aka second write pass)
-    // - [ ] Send webmentions
-    // - [ ] Download rich reply contexts
-    // - [ ] Send WebSub notifications to the hub (if we happen to have one)
-    // - [ ] Syndicate the post if requested, add links to the syndicated copies
+
+    // do background processing on the post
+    async_std::task::spawn(post_process_new_post(req, post));
 
     return Ok(Response::builder(202)
         .header("Location", &uid)
@@ -211,6 +211,143 @@ async fn new_post<S: Storage>(req: Request<ApplicationState<S>>, body: serde_jso
         .build());
 }
 
+async fn post_process_new_post<S: Storage>(req: Request<ApplicationState<S>>, post: serde_json::Value) {
+    // TODO: Post-processing the post (aka second write pass)
+    // - [-] Download rich reply contexts
+    // - [-] Syndicate the post if requested, add links to the syndicated copies
+    // - [ ] Send WebSub notifications to the hub (if we happen to have one)
+    // - [x] Send webmentions
+    let http = &req.state().http_client;
+    let uid = post["properties"]["uid"][0].as_str().unwrap().to_string();
+    // 1. Download rich reply contexts
+    //    This needs to be done first, because at this step we can also determine webmention endpoints
+    //    and save them for later use. Additionally, the richer our content is, the better.
+    //    This needs to be done asynchronously, so the posting experience for the author will be as fast
+    //    as possible without making them wait for potentially slow downstream websites to load
+    // 1.1. Collect the list of contextually-significant post to load context from.
+    //      This will include reply-tos, liked, reposted and bookmarked content
+    let mut contextually_significant_posts: Vec<surf::Url> = vec![];
+    for prop in &["in-reply-to", "like-of", "repost-of", "bookmark-of"] {
+        if let Some(array) = post["properties"][prop].as_array() {
+            contextually_significant_posts.extend(
+                array.iter()
+                    .filter_map(|v| v.as_str()
+                        .and_then(|v| surf::Url::parse(v).ok()
+                    )
+                )
+            );
+        }
+    }
+    // 1.2. Fetch the posts with their bodies and save them in a new Vec<(surf::Url, String)>
+    let posts_with_bodies: Vec<(surf::Url, String)> = stream::iter(contextually_significant_posts.into_iter())
+        .filter_map(|v: surf::Url| async move {
+            if let Ok(res) = http.get(&v).send().await {
+                if res.status() != 200 {
+                    return None
+                } else {
+                    return Some((v, res))
+                }
+            } else {
+                return None
+            }
+        })
+        .filter_map(|(v, mut res): (surf::Url, surf::Response)| async move {
+            if let Ok(body) = res.body_string().await {
+                return Some((v, body))
+            } else {
+                return None
+            }
+        })
+        .collect().await;
+    // 1.3. Parse the bodies and include them in relevant places on the MF2 struct
+    //      This requires an MF2 parser, and there are none for Rust at the moment.
+    //
+    // TODO: integrate https://gitlab.com/vikanezrimaya/mf2-parser when it's ready
+
+    // 2. Syndicate the post
+    let syndicated_copies: Vec<serde_json::Value>;
+    if let Some(syndication_targets) = post["properties"]["syndicate-to"].as_array() {
+        syndicated_copies = stream::iter(syndication_targets.into_iter()
+            .filter_map(|v| v.as_str())
+            .filter_map(|t| surf::Url::parse(t).ok())
+            .collect::<Vec<_>>().into_iter()
+            .map(|_t: surf::Url| async move {
+                // TODO: Define supported syndication methods
+                // and syndicate the endpoint there
+                // Possible ideas:
+                //  - indieweb.xyz (might need a lot of space for the buttons though, investigate proposing grouping syndication targets)
+                //  - news.indieweb.org (IndieNews - needs a category linking to #indienews)
+                //  - Twitter via brid.gy (do I really need Twitter syndication tho?)
+                if false {
+                    Some("")
+                } else {
+                    None
+                }
+            })
+        ).buffer_unordered(3).filter_map(|v| async move { v }).map(|v| serde_json::Value::String(v.to_string())).collect::<Vec<_>>().await;
+    } else {
+        syndicated_copies = vec![]
+    }
+    // Save the post a second time here after syndication
+    // We use update_post here to prevent race conditions since its required to be atomic
+    let mut update = json!({
+        "action": "update",
+        "url": &uid
+    });
+    if !syndicated_copies.is_empty() {
+        update["add"] = json!({});
+        update["add"]["syndication"] = serde_json::Value::Array(syndicated_copies);
+    }
+    if !posts_with_bodies.is_empty() {
+        error!("Replacing context links with parsed MF2-JSON data is not yet implemented (but it's ok! it'll just be less pretty)")
+        /* TODO: Replace context links with parsed MF2-JSON data * /
+        update["replace"] = {}
+        update["replace"]["like-of"] = []
+        update["replace"]["in-reply-to"] = []
+        update["replace"]["bookmark-of"] = []
+        update["replace"]["repost-of"] = []
+        // */
+    }
+    // We don't need the original copy of the post anymore... I hope!
+    // This will act as a safeguard so I can't read stale data by accident anymore...
+    drop(post);
+    if let Err(err) = req.state().storage.update_post(&uid, update).await {
+        error!("Encountered error while post-processing a post: {}", err)
+        // At this point, we can still continue, we just won't have rich data for the post
+        // I wonder why could it even happen except in case of a database disconnection?
+    }
+    // 3. Send WebSub notifications
+    // TODO
+
+    // 4. Send webmentions
+    //    We'll need the bodies here to get their endpoints
+    let source = &uid;
+    stream::iter(posts_with_bodies.into_iter())
+        .filter_map(|(url, body): (surf::Url, String)| async move {
+            let pattern = easy_scraper::Pattern::new(r#"<link href="{url}" rel="webmention">"#).expect("Pattern for webmentions couldn't be parsed");
+            let endpoint = &pattern.matches(&body)[0]["url"];
+            if let Ok(endpoint) = url.join(endpoint) { Some((url, endpoint)) } else { None }
+        })
+        .map(|(target, endpoint)| async move {
+            let response = http.post(&endpoint)
+                .content_type("application/x-www-form-urlencoded")
+                .body(
+                    serde_urlencoded::to_string(vec![("source", source), ("target", &target.to_string())])
+                        .expect("Couldn't construct webmention form")
+                ).send().await;
+            // TODO improve error handling
+            if let Ok(response) = response {
+                if response.status() == 200 || response.status() == 201 || response.status() == 202 {
+                    Ok(())
+                } else {
+                    Err(())
+                }
+            } else {
+                Err(())
+            }
+        }).buffer_unordered(3).collect::<Vec<_>>().await;
+}
+
 async fn process_json<S: Storage>(req: Request<ApplicationState<S>>, body: serde_json::Value) -> Result {
     let is_action = body["action"].is_string() && body["url"].is_string();
     if is_action {