diff options
Diffstat (limited to 'src/micropub')
-rw-r--r-- | src/micropub/mod.rs | 204 |
1 files changed, 160 insertions, 44 deletions
diff --git a/src/micropub/mod.rs b/src/micropub/mod.rs index 7175e56..f426c77 100644 --- a/src/micropub/mod.rs +++ b/src/micropub/mod.rs @@ -1,7 +1,7 @@ use std::convert::Infallible; use std::fmt::Display; use either::Either; -use log::warn; +use log::{info, warn, error}; use warp::http::StatusCode; use warp::{Filter, Rejection, reject::InvalidQuery}; use serde_json::json; @@ -157,8 +157,38 @@ mod util { } } +#[derive(Debug)] +struct FetchedPostContext { + url: url::Url, + mf2: serde_json::Value, + webmention: Option<url::Url> +} + +fn populate_reply_context(mf2: &serde_json::Value, prop: &str, ctxs: &[FetchedPostContext]) -> Option<serde_json::Value> { + if mf2["properties"][prop].is_array() { + Some(json!( + mf2["properties"][prop] + .as_array() + // Safe to unwrap because we checked its existence and type + // And it's not like we can make it disappear without unsafe code + .unwrap() + .iter() + // This seems to be O(n^2) and I don't like it. + // Nevertheless, I lack required knowledge to optimize it. Also, it works, so... + .map(|i| ctxs.iter() + .find(|ctx| Some(ctx.url.as_str()) == i.as_str()) + .and_then(|ctx| ctx.mf2["items"].get(0)) + .or(Some(i)) + .unwrap()) + .collect::<Vec<&serde_json::Value>>() + )) + } else { + None + } +} + // TODO actually save the post to the database and schedule post-processing -pub(crate) async fn _post<D: Storage>( +pub(crate) async fn _post<D: 'static + Storage>( user: crate::indieauth::User, uid: String, mf2: serde_json::Value, @@ -238,22 +268,20 @@ pub(crate) async fn _post<D: Storage>( ); // TODO: Post-processing the post (aka second write pass) - // - [ ] Download rich reply contexts + // - [x] Download rich reply contexts // - [ ] Syndicate the post if requested, add links to the syndicated copies // - [ ] Send WebSub notifications to the hub (if we happen to have one) - // - [ ] Send webmentions - #[allow(unused_imports)] + // - [x] Send webmentions tokio::task::spawn(async move { - use hyper::{Uri, Response, Body, body::HttpBody}; - use bytes::{Buf, BufMut}; use futures_util::StreamExt; let uid: &str = mf2["properties"]["uid"][0].as_str().unwrap(); - let mut contextually_significant_posts: Vec<url::Url> = vec![]; - for prop in &["in-reply-to", "like-of", "repost-of", "bookmark-of"] { + let context_props = ["in-reply-to", "like-of", "repost-of", "bookmark-of"]; + let mut context_urls: Vec<url::Url> = vec![]; + for prop in &context_props { if let Some(array) = mf2["properties"][prop].as_array() { - contextually_significant_posts.extend( + context_urls.extend( array .iter() .filter_map(|v| v.as_str()) @@ -262,50 +290,62 @@ pub(crate) async fn _post<D: Storage>( } } // TODO parse HTML in e-content and add links found here - contextually_significant_posts.sort_unstable_by_key(|u| u.to_string()); - contextually_significant_posts.dedup(); - - #[derive(Debug)] - #[allow(dead_code)] - struct FetchedPostContext { - url: url::Url, - // TODO see if we can use non-strings for non-UTF-8 HTML - html: String, - mf2: serde_json::Value, - webmention: Option<hyper::Uri> - } + context_urls.sort_unstable_by_key(|u| u.to_string()); + context_urls.dedup(); - { - // TODO: Make a stream to fetch all these posts and convert them to MF2 - let post_contexts = { - let http = &http; - tokio_stream::iter(contextually_significant_posts - .into_iter() - ).then(move |url: url::Url| async move { - let html: String = todo!("Fetch the post using {:?}", http); - // We just need to get the following here: + // TODO: Make a stream to fetch all these posts and convert them to MF2 + let post_contexts = { + let http = &http; + tokio_stream::iter(context_urls.into_iter()) + .then(move |url: url::Url| http.get(url).send()) + .filter_map(|response| futures::future::ready(response.ok())) + .filter(|response| futures::future::ready(response.status() == 200)) + .filter_map(|response: reqwest::Response| async move { // 1. We need to preserve the URL // 2. We need to get the HTML for MF2 processing // 3. We need to get the webmention endpoint address // All of that can be done in one go. - - // XXX stub! - - dbg!(FetchedPostContext { - url, html, - mf2: serde_json::to_value(microformats::from_html(&html, url).unwrap()).unwrap(), - webmention: None - }) + let url = response.url().clone(); + // TODO parse link headers + let links = response + .headers() + .get_all(hyper::http::header::LINK) + .iter() + .cloned() + .collect::<Vec<hyper::http::HeaderValue>>(); + let html = response.text().await; + if html.is_err() { + return None; + } + let html = html.unwrap(); + let mf2 = microformats::from_html(&html, url.clone()).unwrap(); + // TODO use first Link: header if available + let webmention: Option<url::Url> = mf2.rels.by_rels().get("webmention") + .and_then(|i| i.first().cloned()); + + dbg!(Some(FetchedPostContext { + url, mf2: serde_json::to_value(mf2).unwrap(), webmention + })) }) - .collect::<Vec<FetchedPostContext>>() - .await - }; + .collect::<Vec<FetchedPostContext>>() + .await + }; - drop(post_contexts); + let mut update = json!({ "replace": {} }); + for prop in &context_props { + if let Some(json) = populate_reply_context(&mf2, prop, &post_contexts) { + update["replace"][prop] = json; + } } + if !update["replace"].as_object().unwrap().is_empty() { + if let Err(err) = db.update_post(uid, update).await { + error!("Failed to update post with rich reply contexts: {}", err); + } + } + // At this point we can start syndicating the post. // Currently we don't really support any syndication endpoints, but still! - if let Some(syndicate_to) = mf2["properties"]["mp-syndicate-to"].as_array() { + /*if let Some(syndicate_to) = mf2["properties"]["mp-syndicate-to"].as_array() { let http = &http; tokio_stream::iter(syndicate_to) .filter_map(|i| futures::future::ready(i.as_str())) @@ -319,6 +359,37 @@ pub(crate) async fn _post<D: Storage>( } }) .await; + }*/ + + { + let http = &http; + tokio_stream::iter( + post_contexts.into_iter() + .filter(|ctx| ctx.webmention.is_some())) + .for_each_concurrent(2, |ctx| async move { + let mut map = std::collections::HashMap::new(); + map.insert("source", uid); + map.insert("target", ctx.url.as_str()); + + match http.post(ctx.webmention.unwrap().clone()) + .form(&map) + .send() + .await + { + Ok(res) => { + if !res.status().is_success() { + warn!( + "Failed to send a webmention for {}: got HTTP {}", + ctx.url, res.status() + ); + } else { + info!("Sent a webmention to {}, got HTTP {}", ctx.url, res.status()) + } + }, + Err(err) => warn!("Failed to send a webmention for {}: {}", ctx.url, err) + } + }) + .await; } }); @@ -717,6 +788,51 @@ mod tests { use warp::{Filter, Reply}; use serde_json::json; + use super::FetchedPostContext; + + #[test] + fn test_populate_reply_context() { + let already_expanded_reply_ctx = json!({ + "type": ["h-entry"], + "properties": { + "content": ["Hello world!"] + } + }); + let mf2 = json!({ + "type": ["h-entry"], + "properties": { + "like-of": [ + "https://fireburn.ru/posts/example", + already_expanded_reply_ctx, + "https://fireburn.ru/posts/non-existent" + ] + } + }); + let test_ctx = json!({ + "type": ["h-entry"], + "properties": { + "content": ["This is a post which was reacted to."] + } + }); + let reply_contexts = vec![ + FetchedPostContext { + url: "https://fireburn.ru/posts/example".parse().unwrap(), + mf2: json!({ + "items": [ + test_ctx + ] + }), + webmention: None + } + ]; + + let like_of = super::populate_reply_context(&mf2, "like-of", &reply_contexts).unwrap(); + + assert_eq!(like_of[0], test_ctx); + assert_eq!(like_of[1], already_expanded_reply_ctx); + assert_eq!(like_of[2], "https://fireburn.ru/posts/non-existent"); + } + #[tokio::test] async fn check_post_reject_scope() { let inject_db = { |