From a16272f3d0f32cb39b8da39021b45625f74ac140 Mon Sep 17 00:00:00 2001 From: Vika Date: Sat, 14 May 2022 20:05:50 +0300 Subject: feat: webmention sending and reply context enrichment These features share some code since they both require fetching reply contexts, so it makes sense to implement them together. TODO cover webmention sending with integration tests --- src/frontend/mod.rs | 2 +- src/micropub/mod.rs | 204 ++++++++++++++++++++++++++++++++++++++++------------ 2 files changed, 161 insertions(+), 45 deletions(-) diff --git a/src/frontend/mod.rs b/src/frontend/mod.rs index eec2f85..b87f9c6 100644 --- a/src/frontend/mod.rs +++ b/src/frontend/mod.rs @@ -286,7 +286,7 @@ pub fn homepage(db: D, endpoints: IndiewebEndpoints) -> impl Filter< }) } -pub fn onboarding( +pub fn onboarding( db: D, endpoints: IndiewebEndpoints, http: reqwest::Client diff --git a/src/micropub/mod.rs b/src/micropub/mod.rs index 7175e56..f426c77 100644 --- a/src/micropub/mod.rs +++ b/src/micropub/mod.rs @@ -1,7 +1,7 @@ use std::convert::Infallible; use std::fmt::Display; use either::Either; -use log::warn; +use log::{info, warn, error}; use warp::http::StatusCode; use warp::{Filter, Rejection, reject::InvalidQuery}; use serde_json::json; @@ -157,8 +157,38 @@ mod util { } } +#[derive(Debug)] +struct FetchedPostContext { + url: url::Url, + mf2: serde_json::Value, + webmention: Option +} + +fn populate_reply_context(mf2: &serde_json::Value, prop: &str, ctxs: &[FetchedPostContext]) -> Option { + if mf2["properties"][prop].is_array() { + Some(json!( + mf2["properties"][prop] + .as_array() + // Safe to unwrap because we checked its existence and type + // And it's not like we can make it disappear without unsafe code + .unwrap() + .iter() + // This seems to be O(n^2) and I don't like it. + // Nevertheless, I lack required knowledge to optimize it. Also, it works, so... + .map(|i| ctxs.iter() + .find(|ctx| Some(ctx.url.as_str()) == i.as_str()) + .and_then(|ctx| ctx.mf2["items"].get(0)) + .or(Some(i)) + .unwrap()) + .collect::>() + )) + } else { + None + } +} + // TODO actually save the post to the database and schedule post-processing -pub(crate) async fn _post( +pub(crate) async fn _post( user: crate::indieauth::User, uid: String, mf2: serde_json::Value, @@ -238,22 +268,20 @@ pub(crate) async fn _post( ); // TODO: Post-processing the post (aka second write pass) - // - [ ] Download rich reply contexts + // - [x] Download rich reply contexts // - [ ] Syndicate the post if requested, add links to the syndicated copies // - [ ] Send WebSub notifications to the hub (if we happen to have one) - // - [ ] Send webmentions - #[allow(unused_imports)] + // - [x] Send webmentions tokio::task::spawn(async move { - use hyper::{Uri, Response, Body, body::HttpBody}; - use bytes::{Buf, BufMut}; use futures_util::StreamExt; let uid: &str = mf2["properties"]["uid"][0].as_str().unwrap(); - let mut contextually_significant_posts: Vec = vec![]; - for prop in &["in-reply-to", "like-of", "repost-of", "bookmark-of"] { + let context_props = ["in-reply-to", "like-of", "repost-of", "bookmark-of"]; + let mut context_urls: Vec = vec![]; + for prop in &context_props { if let Some(array) = mf2["properties"][prop].as_array() { - contextually_significant_posts.extend( + context_urls.extend( array .iter() .filter_map(|v| v.as_str()) @@ -262,50 +290,62 @@ pub(crate) async fn _post( } } // TODO parse HTML in e-content and add links found here - contextually_significant_posts.sort_unstable_by_key(|u| u.to_string()); - contextually_significant_posts.dedup(); - - #[derive(Debug)] - #[allow(dead_code)] - struct FetchedPostContext { - url: url::Url, - // TODO see if we can use non-strings for non-UTF-8 HTML - html: String, - mf2: serde_json::Value, - webmention: Option - } + context_urls.sort_unstable_by_key(|u| u.to_string()); + context_urls.dedup(); - { - // TODO: Make a stream to fetch all these posts and convert them to MF2 - let post_contexts = { - let http = &http; - tokio_stream::iter(contextually_significant_posts - .into_iter() - ).then(move |url: url::Url| async move { - let html: String = todo!("Fetch the post using {:?}", http); - // We just need to get the following here: + // TODO: Make a stream to fetch all these posts and convert them to MF2 + let post_contexts = { + let http = &http; + tokio_stream::iter(context_urls.into_iter()) + .then(move |url: url::Url| http.get(url).send()) + .filter_map(|response| futures::future::ready(response.ok())) + .filter(|response| futures::future::ready(response.status() == 200)) + .filter_map(|response: reqwest::Response| async move { // 1. We need to preserve the URL // 2. We need to get the HTML for MF2 processing // 3. We need to get the webmention endpoint address // All of that can be done in one go. - - // XXX stub! - - dbg!(FetchedPostContext { - url, html, - mf2: serde_json::to_value(microformats::from_html(&html, url).unwrap()).unwrap(), - webmention: None - }) + let url = response.url().clone(); + // TODO parse link headers + let links = response + .headers() + .get_all(hyper::http::header::LINK) + .iter() + .cloned() + .collect::>(); + let html = response.text().await; + if html.is_err() { + return None; + } + let html = html.unwrap(); + let mf2 = microformats::from_html(&html, url.clone()).unwrap(); + // TODO use first Link: header if available + let webmention: Option = mf2.rels.by_rels().get("webmention") + .and_then(|i| i.first().cloned()); + + dbg!(Some(FetchedPostContext { + url, mf2: serde_json::to_value(mf2).unwrap(), webmention + })) }) - .collect::>() - .await - }; + .collect::>() + .await + }; - drop(post_contexts); + let mut update = json!({ "replace": {} }); + for prop in &context_props { + if let Some(json) = populate_reply_context(&mf2, prop, &post_contexts) { + update["replace"][prop] = json; + } } + if !update["replace"].as_object().unwrap().is_empty() { + if let Err(err) = db.update_post(uid, update).await { + error!("Failed to update post with rich reply contexts: {}", err); + } + } + // At this point we can start syndicating the post. // Currently we don't really support any syndication endpoints, but still! - if let Some(syndicate_to) = mf2["properties"]["mp-syndicate-to"].as_array() { + /*if let Some(syndicate_to) = mf2["properties"]["mp-syndicate-to"].as_array() { let http = &http; tokio_stream::iter(syndicate_to) .filter_map(|i| futures::future::ready(i.as_str())) @@ -319,6 +359,37 @@ pub(crate) async fn _post( } }) .await; + }*/ + + { + let http = &http; + tokio_stream::iter( + post_contexts.into_iter() + .filter(|ctx| ctx.webmention.is_some())) + .for_each_concurrent(2, |ctx| async move { + let mut map = std::collections::HashMap::new(); + map.insert("source", uid); + map.insert("target", ctx.url.as_str()); + + match http.post(ctx.webmention.unwrap().clone()) + .form(&map) + .send() + .await + { + Ok(res) => { + if !res.status().is_success() { + warn!( + "Failed to send a webmention for {}: got HTTP {}", + ctx.url, res.status() + ); + } else { + info!("Sent a webmention to {}, got HTTP {}", ctx.url, res.status()) + } + }, + Err(err) => warn!("Failed to send a webmention for {}: {}", ctx.url, err) + } + }) + .await; } }); @@ -717,6 +788,51 @@ mod tests { use warp::{Filter, Reply}; use serde_json::json; + use super::FetchedPostContext; + + #[test] + fn test_populate_reply_context() { + let already_expanded_reply_ctx = json!({ + "type": ["h-entry"], + "properties": { + "content": ["Hello world!"] + } + }); + let mf2 = json!({ + "type": ["h-entry"], + "properties": { + "like-of": [ + "https://fireburn.ru/posts/example", + already_expanded_reply_ctx, + "https://fireburn.ru/posts/non-existent" + ] + } + }); + let test_ctx = json!({ + "type": ["h-entry"], + "properties": { + "content": ["This is a post which was reacted to."] + } + }); + let reply_contexts = vec![ + FetchedPostContext { + url: "https://fireburn.ru/posts/example".parse().unwrap(), + mf2: json!({ + "items": [ + test_ctx + ] + }), + webmention: None + } + ]; + + let like_of = super::populate_reply_context(&mf2, "like-of", &reply_contexts).unwrap(); + + assert_eq!(like_of[0], test_ctx); + assert_eq!(like_of[1], already_expanded_reply_ctx); + assert_eq!(like_of[2], "https://fireburn.ru/posts/non-existent"); + } + #[tokio::test] async fn check_post_reject_scope() { let inject_db = { -- cgit 1.4.1