about summary refs log tree commit diff
path: root/src/micropub
diff options
context:
space:
mode:
authorVika <vika@fireburn.ru>2021-05-06 17:13:03 +0300
committerVika <vika@fireburn.ru>2021-05-06 17:13:03 +0300
commitaecb72d104dae2bcdeafdb01f0e33ad92ecf27d4 (patch)
treebfa3178bd47ebbe701e99307ba3842f717bba719 /src/micropub
parentd56fcb55c033e47d5b4ccadef5032f6f0412b642 (diff)
Added post-processing of new posts
This launches a background task to handle:
 - Downloading reply contexts (requires an MF2 parser, doesn't work yet)
 - Syndicating the post (currently no syndication targets are defined)
 - Sending WebSub notifications if a hub is present (WIP)
 - Sending webmentions (ok this one is fully implemented... I hope!)

 This background task should not impact processing times and should
 never conflict with futher updates of a post, since the database is
 guaranteed to be fully atomic. Inside of the task, you can run long
 asynchronous fetching and stuff, just don't block the whole thread.
Diffstat (limited to 'src/micropub')
-rw-r--r--src/micropub/post.rs149
1 files changed, 143 insertions, 6 deletions
diff --git a/src/micropub/post.rs b/src/micropub/post.rs
index 37cbe26..39a06ce 100644
--- a/src/micropub/post.rs
+++ b/src/micropub/post.rs
@@ -1,6 +1,9 @@
 use core::iter::Iterator;
 use std::str::FromStr;
 use std::convert::TryInto;
+use log::error;
+use futures::stream;
+use futures::StreamExt;
 use chrono::prelude::*;
 use http_types::Mime;
 use tide::prelude::json;
@@ -198,12 +201,9 @@ async fn new_post<S: Storage>(req: Request<ApplicationState<S>>, body: serde_jso
         }
     }
     // END WRITE BOUNDARY
-    //drop(storage);
-    // TODO: Post-processing the post (aka second write pass)
-    // - [ ] Send webmentions
-    // - [ ] Download rich reply contexts
-    // - [ ] Send WebSub notifications to the hub (if we happen to have one)
-    // - [ ] Syndicate the post if requested, add links to the syndicated copies
+
+    // do background processing on the post
+    async_std::task::spawn(post_process_new_post(req, post));
 
     return Ok(Response::builder(202)
         .header("Location", &uid)
@@ -211,6 +211,143 @@ async fn new_post<S: Storage>(req: Request<ApplicationState<S>>, body: serde_jso
         .build());
 }
 
+async fn post_process_new_post<S: Storage>(req: Request<ApplicationState<S>>, post: serde_json::Value) {
+    // TODO: Post-processing the post (aka second write pass)
+    // - [-] Download rich reply contexts
+    // - [-] Syndicate the post if requested, add links to the syndicated copies
+    // - [ ] Send WebSub notifications to the hub (if we happen to have one)
+    // - [x] Send webmentions
+    let http = &req.state().http_client;
+    let uid = post["properties"]["uid"][0].as_str().unwrap().to_string();
+    // 1. Download rich reply contexts
+    //    This needs to be done first, because at this step we can also determine webmention endpoints
+    //    and save them for later use. Additionally, the richer our content is, the better.
+    //    This needs to be done asynchronously, so the posting experience for the author will be as fast
+    //    as possible without making them wait for potentially slow downstream websites to load
+    // 1.1. Collect the list of contextually-significant post to load context from.
+    //      This will include reply-tos, liked, reposted and bookmarked content
+    let mut contextually_significant_posts: Vec<surf::Url> = vec![];
+    for prop in &["in-reply-to", "like-of", "repost-of", "bookmark-of"] {
+        if let Some(array) = post["properties"][prop].as_array() {
+            contextually_significant_posts.extend(
+                array.iter()
+                    .filter_map(|v| v.as_str()
+                        .and_then(|v| surf::Url::parse(v).ok()
+                    )
+                )
+            );
+        }
+    }
+    // 1.2. Fetch the posts with their bodies and save them in a new Vec<(surf::Url, String)>
+    let posts_with_bodies: Vec<(surf::Url, String)> = stream::iter(contextually_significant_posts.into_iter())
+        .filter_map(|v: surf::Url| async move {
+            if let Ok(res) = http.get(&v).send().await {
+                if res.status() != 200 {
+                    return None
+                } else {
+                    return Some((v, res))
+                }
+            } else {
+                return None
+            }
+        })
+        .filter_map(|(v, mut res): (surf::Url, surf::Response)| async move {
+            if let Ok(body) = res.body_string().await {
+                return Some((v, body))
+            } else {
+                return None
+            }
+        })
+        .collect().await;
+    // 1.3. Parse the bodies and include them in relevant places on the MF2 struct
+    //      This requires an MF2 parser, and there are none for Rust at the moment.
+    //
+    // TODO: integrate https://gitlab.com/vikanezrimaya/mf2-parser when it's ready
+
+    // 2. Syndicate the post
+    let syndicated_copies: Vec<serde_json::Value>;
+    if let Some(syndication_targets) = post["properties"]["syndicate-to"].as_array() {
+        syndicated_copies = stream::iter(syndication_targets.into_iter()
+            .filter_map(|v| v.as_str())
+            .filter_map(|t| surf::Url::parse(t).ok())
+            .collect::<Vec<_>>().into_iter()
+            .map(|_t: surf::Url| async move {
+                // TODO: Define supported syndication methods
+                // and syndicate the endpoint there
+                // Possible ideas:
+                //  - indieweb.xyz (might need a lot of space for the buttons though, investigate proposing grouping syndication targets)
+                //  - news.indieweb.org (IndieNews - needs a category linking to #indienews)
+                //  - Twitter via brid.gy (do I really need Twitter syndication tho?)
+                if false {
+                    Some("")
+                } else {
+                    None
+                }
+            })
+        ).buffer_unordered(3).filter_map(|v| async move { v }).map(|v| serde_json::Value::String(v.to_string())).collect::<Vec<_>>().await;
+    } else {
+        syndicated_copies = vec![]
+    }
+    // Save the post a second time here after syndication
+    // We use update_post here to prevent race conditions since its required to be atomic
+    let mut update = json!({
+        "action": "update",
+        "url": &uid
+    });
+    if !syndicated_copies.is_empty() {
+        update["add"] = json!({});
+        update["add"]["syndication"] = serde_json::Value::Array(syndicated_copies);
+    }
+    if !posts_with_bodies.is_empty() {
+        error!("Replacing context links with parsed MF2-JSON data is not yet implemented (but it's ok! it'll just be less pretty)")
+        /* TODO: Replace context links with parsed MF2-JSON data * /
+        update["replace"] = {}
+        update["replace"]["like-of"] = []
+        update["replace"]["in-reply-to"] = []
+        update["replace"]["bookmark-of"] = []
+        update["replace"]["repost-of"] = []
+        // */
+    }
+    // We don't need the original copy of the post anymore... I hope!
+    // This will act as a safeguard so I can't read stale data by accident anymore...
+    drop(post);
+    if let Err(err) = req.state().storage.update_post(&uid, update).await {
+        error!("Encountered error while post-processing a post: {}", err)
+        // At this point, we can still continue, we just won't have rich data for the post
+        // I wonder why could it even happen except in case of a database disconnection?
+    }
+    // 3. Send WebSub notifications
+    // TODO
+
+    // 4. Send webmentions
+    //    We'll need the bodies here to get their endpoints
+    let source = &uid;
+    stream::iter(posts_with_bodies.into_iter())
+        .filter_map(|(url, body): (surf::Url, String)| async move {
+            let pattern = easy_scraper::Pattern::new(r#"<link href="{url}" rel="webmention">"#).expect("Pattern for webmentions couldn't be parsed");
+            let endpoint = &pattern.matches(&body)[0]["url"];
+            if let Ok(endpoint) = url.join(endpoint) { Some((url, endpoint)) } else { None }
+        })
+        .map(|(target, endpoint)| async move {
+            let response = http.post(&endpoint)
+                .content_type("application/x-www-form-urlencoded")
+                .body(
+                    serde_urlencoded::to_string(vec![("source", source), ("target", &target.to_string())])
+                        .expect("Couldn't construct webmention form")
+                ).send().await;
+            // TODO improve error handling
+            if let Ok(response) = response {
+                if response.status() == 200 || response.status() == 201 || response.status() == 202 {
+                    Ok(())
+                } else {
+                    Err(())
+                }
+            } else {
+                Err(())
+            }
+        }).buffer_unordered(3).collect::<Vec<_>>().await;
+}
+
 async fn process_json<S: Storage>(req: Request<ApplicationState<S>>, body: serde_json::Value) -> Result {
     let is_action = body["action"].is_string() && body["url"].is_string();
     if is_action {