From aecb72d104dae2bcdeafdb01f0e33ad92ecf27d4 Mon Sep 17 00:00:00 2001 From: Vika Date: Thu, 6 May 2021 17:13:03 +0300 Subject: Added post-processing of new posts This launches a background task to handle: - Downloading reply contexts (requires an MF2 parser, doesn't work yet) - Syndicating the post (currently no syndication targets are defined) - Sending WebSub notifications if a hub is present (WIP) - Sending webmentions (ok this one is fully implemented... I hope!) This background task should not impact processing times and should never conflict with futher updates of a post, since the database is guaranteed to be fully atomic. Inside of the task, you can run long asynchronous fetching and stuff, just don't block the whole thread. --- src/micropub/post.rs | 149 ++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 143 insertions(+), 6 deletions(-) (limited to 'src') diff --git a/src/micropub/post.rs b/src/micropub/post.rs index 37cbe26..39a06ce 100644 --- a/src/micropub/post.rs +++ b/src/micropub/post.rs @@ -1,6 +1,9 @@ use core::iter::Iterator; use std::str::FromStr; use std::convert::TryInto; +use log::error; +use futures::stream; +use futures::StreamExt; use chrono::prelude::*; use http_types::Mime; use tide::prelude::json; @@ -198,12 +201,9 @@ async fn new_post(req: Request>, body: serde_jso } } // END WRITE BOUNDARY - //drop(storage); - // TODO: Post-processing the post (aka second write pass) - // - [ ] Send webmentions - // - [ ] Download rich reply contexts - // - [ ] Send WebSub notifications to the hub (if we happen to have one) - // - [ ] Syndicate the post if requested, add links to the syndicated copies + + // do background processing on the post + async_std::task::spawn(post_process_new_post(req, post)); return Ok(Response::builder(202) .header("Location", &uid) @@ -211,6 +211,143 @@ async fn new_post(req: Request>, body: serde_jso .build()); } +async fn post_process_new_post(req: Request>, post: serde_json::Value) { + // TODO: Post-processing the post (aka second write pass) + // - [-] Download rich reply contexts + // - [-] Syndicate the post if requested, add links to the syndicated copies + // - [ ] Send WebSub notifications to the hub (if we happen to have one) + // - [x] Send webmentions + let http = &req.state().http_client; + let uid = post["properties"]["uid"][0].as_str().unwrap().to_string(); + // 1. Download rich reply contexts + // This needs to be done first, because at this step we can also determine webmention endpoints + // and save them for later use. Additionally, the richer our content is, the better. + // This needs to be done asynchronously, so the posting experience for the author will be as fast + // as possible without making them wait for potentially slow downstream websites to load + // 1.1. Collect the list of contextually-significant post to load context from. + // This will include reply-tos, liked, reposted and bookmarked content + let mut contextually_significant_posts: Vec = vec![]; + for prop in &["in-reply-to", "like-of", "repost-of", "bookmark-of"] { + if let Some(array) = post["properties"][prop].as_array() { + contextually_significant_posts.extend( + array.iter() + .filter_map(|v| v.as_str() + .and_then(|v| surf::Url::parse(v).ok() + ) + ) + ); + } + } + // 1.2. Fetch the posts with their bodies and save them in a new Vec<(surf::Url, String)> + let posts_with_bodies: Vec<(surf::Url, String)> = stream::iter(contextually_significant_posts.into_iter()) + .filter_map(|v: surf::Url| async move { + if let Ok(res) = http.get(&v).send().await { + if res.status() != 200 { + return None + } else { + return Some((v, res)) + } + } else { + return None + } + }) + .filter_map(|(v, mut res): (surf::Url, surf::Response)| async move { + if let Ok(body) = res.body_string().await { + return Some((v, body)) + } else { + return None + } + }) + .collect().await; + // 1.3. Parse the bodies and include them in relevant places on the MF2 struct + // This requires an MF2 parser, and there are none for Rust at the moment. + // + // TODO: integrate https://gitlab.com/vikanezrimaya/mf2-parser when it's ready + + // 2. Syndicate the post + let syndicated_copies: Vec; + if let Some(syndication_targets) = post["properties"]["syndicate-to"].as_array() { + syndicated_copies = stream::iter(syndication_targets.into_iter() + .filter_map(|v| v.as_str()) + .filter_map(|t| surf::Url::parse(t).ok()) + .collect::>().into_iter() + .map(|_t: surf::Url| async move { + // TODO: Define supported syndication methods + // and syndicate the endpoint there + // Possible ideas: + // - indieweb.xyz (might need a lot of space for the buttons though, investigate proposing grouping syndication targets) + // - news.indieweb.org (IndieNews - needs a category linking to #indienews) + // - Twitter via brid.gy (do I really need Twitter syndication tho?) + if false { + Some("") + } else { + None + } + }) + ).buffer_unordered(3).filter_map(|v| async move { v }).map(|v| serde_json::Value::String(v.to_string())).collect::>().await; + } else { + syndicated_copies = vec![] + } + // Save the post a second time here after syndication + // We use update_post here to prevent race conditions since its required to be atomic + let mut update = json!({ + "action": "update", + "url": &uid + }); + if !syndicated_copies.is_empty() { + update["add"] = json!({}); + update["add"]["syndication"] = serde_json::Value::Array(syndicated_copies); + } + if !posts_with_bodies.is_empty() { + error!("Replacing context links with parsed MF2-JSON data is not yet implemented (but it's ok! it'll just be less pretty)") + /* TODO: Replace context links with parsed MF2-JSON data * / + update["replace"] = {} + update["replace"]["like-of"] = [] + update["replace"]["in-reply-to"] = [] + update["replace"]["bookmark-of"] = [] + update["replace"]["repost-of"] = [] + // */ + } + // We don't need the original copy of the post anymore... I hope! + // This will act as a safeguard so I can't read stale data by accident anymore... + drop(post); + if let Err(err) = req.state().storage.update_post(&uid, update).await { + error!("Encountered error while post-processing a post: {}", err) + // At this point, we can still continue, we just won't have rich data for the post + // I wonder why could it even happen except in case of a database disconnection? + } + // 3. Send WebSub notifications + // TODO + + // 4. Send webmentions + // We'll need the bodies here to get their endpoints + let source = &uid; + stream::iter(posts_with_bodies.into_iter()) + .filter_map(|(url, body): (surf::Url, String)| async move { + let pattern = easy_scraper::Pattern::new(r#""#).expect("Pattern for webmentions couldn't be parsed"); + let endpoint = &pattern.matches(&body)[0]["url"]; + if let Ok(endpoint) = url.join(endpoint) { Some((url, endpoint)) } else { None } + }) + .map(|(target, endpoint)| async move { + let response = http.post(&endpoint) + .content_type("application/x-www-form-urlencoded") + .body( + serde_urlencoded::to_string(vec![("source", source), ("target", &target.to_string())]) + .expect("Couldn't construct webmention form") + ).send().await; + // TODO improve error handling + if let Ok(response) = response { + if response.status() == 200 || response.status() == 201 || response.status() == 202 { + Ok(()) + } else { + Err(()) + } + } else { + Err(()) + } + }).buffer_unordered(3).collect::>().await; +} + async fn process_json(req: Request>, body: serde_json::Value) -> Result { let is_action = body["action"].is_string() && body["url"].is_string(); if is_action { -- cgit 1.4.1