From 47285d5af803ed26d52b7f248a8f2de556e9057f Mon Sep 17 00:00:00 2001 From: Vika Date: Thu, 12 May 2022 01:37:59 +0300 Subject: treewide: prepare for mf2 parsing and cleanup unused code --- src/frontend/mod.rs | 8 ++-- src/indieauth.rs | 75 +++++++++++++------------------------ src/main.rs | 28 +++++--------- src/micropub/mod.rs | 105 ++++++++++++++++++++++++++++++++++++++++------------ 4 files changed, 122 insertions(+), 94 deletions(-) (limited to 'src') diff --git a/src/frontend/mod.rs b/src/frontend/mod.rs index 3c3072d..eec2f85 100644 --- a/src/frontend/mod.rs +++ b/src/frontend/mod.rs @@ -286,8 +286,10 @@ pub fn homepage(db: D, endpoints: IndiewebEndpoints) -> impl Filter< }) } -pub fn onboarding( - db: D, endpoints: IndiewebEndpoints, http: hyper::Client +pub fn onboarding( + db: D, + endpoints: IndiewebEndpoints, + http: reqwest::Client ) -> impl Filter + Clone { let inject_db = move || db.clone(); warp::get() @@ -304,7 +306,7 @@ pub fn onboarding()) .and(warp::any().map(move || http.clone())) - .and_then(|host: warp::host::Authority, db: D, body: OnboardingData, http: _| async move { + .and_then(|host: warp::host::Authority, db: D, body: OnboardingData, http: reqwest::Client| async move { let user_uid = format!("https://{}/", host.as_str()); if db.post_exists(&user_uid).await.map_err(FrontendError::from)? { diff --git a/src/indieauth.rs b/src/indieauth.rs index 4dfe11d..57c0301 100644 --- a/src/indieauth.rs +++ b/src/indieauth.rs @@ -62,8 +62,8 @@ impl From for IndieAuthError { } } -impl From for IndieAuthError { - fn from(err: hyper::Error) -> Self { +impl From for IndieAuthError { + fn from(err: reqwest::Error) -> Self { Self { msg: format!("{}", err), source: Some(Box::new(err)), @@ -90,11 +90,10 @@ impl User { } } -pub fn require_token(token_endpoint: String, http: hyper::Client) -> impl Filter + Clone -where T: hyper::client::connect::Connect + Clone + Send + Sync + 'static { +pub fn require_token(token_endpoint: String, http: reqwest::Client) -> impl Filter + Clone { // It might be OK to panic here, because we're still inside the initialisation sequence for now. // Proper error handling on the top of this should be used though. - let token_endpoint_uri = hyper::Uri::try_from(&token_endpoint) + let token_endpoint_uri = url::Url::parse(&token_endpoint) .expect("Couldn't parse the token endpoint URI!"); warp::any() .map(move || token_endpoint_uri.clone()) @@ -110,60 +109,36 @@ where T: hyper::client::connect::Connect + Clone + Send + Sync + 'static { Err(err) } }).unify()) - .and_then(|token_endpoint, http: hyper::Client, token| async move { - let request = hyper::Request::builder() - .method(hyper::Method::GET) - .uri(token_endpoint) - .header("Authorization", token) - .header("Accept", "application/json") - .body(hyper::Body::from("")) - // TODO is it acceptable to panic here? - .unwrap(); - + .and_then(|token_endpoint, http: reqwest::Client, token| async move { use hyper::StatusCode; - match http.request(request).await { - Ok(mut res) => match res.status() { - StatusCode::OK => { - use hyper::body::HttpBody; - use bytes::BufMut; - let mut buf: Vec = Vec::default(); - while let Some(chunk) = res.body_mut().data().await { - if let Err(err) = chunk { - return Err(IndieAuthError::from(err).into()); - } - buf.put(chunk.unwrap()); - } - match serde_json::from_slice(&buf) { + match http + .get(token_endpoint) + .header("Authorization", token) + .header("Accept", "application/json") + .send() + .await + { + Ok(res) => match res.status() { + StatusCode::OK => match res.json::().await { + Ok(json) => match serde_json::from_value::(json.clone()) { Ok(user) => Ok(user), Err(err) => { - if let Ok(json) = serde_json::from_slice::(&buf) { - if Some(false) == json["active"].as_bool() { - Err(IndieAuthError { - source: None, - kind: ErrorKind::NotAuthorized, - msg: "The token endpoint deemed the token as not \"active\".".to_string() - }.into()) - } else { - Err(IndieAuthError::from(err).into()) - } + if let Some(false) = json["active"].as_bool() { + Err(IndieAuthError { + source: None, + kind: ErrorKind::NotAuthorized, + msg: "The token is not active for this user.".to_owned() + }.into()) } else { Err(IndieAuthError::from(err).into()) } } } + Err(err) => Err(IndieAuthError::from(err).into()) }, StatusCode::BAD_REQUEST => { - use hyper::body::HttpBody; - use bytes::BufMut; - let mut buf: Vec = Vec::default(); - while let Some(chunk) = res.body_mut().data().await { - if let Err(err) = chunk { - return Err(IndieAuthError::from(err).into()); - } - buf.put(chunk.unwrap()); - } - match serde_json::from_slice::(&buf) { + match res.json::().await { Ok(err) => { if err.error == "unauthorized" { Err(IndieAuthError { @@ -211,8 +186,8 @@ mod tests { } #[inline] - fn get_http_client() -> hyper::Client { - hyper::Client::new() + fn get_http_client() -> reqwest::Client { + reqwest::Client::new() } #[tokio::test] diff --git a/src/main.rs b/src/main.rs index 0234abc..ed8c8e7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,15 +1,15 @@ use log::{debug, error, info}; use std::{convert::Infallible, env, time::Duration}; use url::Url; -use hyper::client::{HttpConnector,connect::dns::GaiResolver}; -use hyper_rustls::HttpsConnector; use warp::{Filter, host::Authority}; #[tokio::main] async fn main() { + // TODO turn into a feature so I can enable and disable it #[cfg(debug_assertions)] console_subscriber::init(); - // TODO json logging in the future? + + // TODO use tracing instead of log let logger_env = env_logger::Env::new().filter_or("RUST_LOG", "info"); env_logger::init_from_env(logger_env); @@ -97,18 +97,6 @@ async fn main() { } }; - // TODO remove this and see what screams to replace it with reqwest - let http_client: hyper::Client>, hyper::Body> = { - let builder = hyper::Client::builder(); - let https = hyper_rustls::HttpsConnectorBuilder::new() - .with_webpki_roots() - .https_only() - .enable_http1() - .enable_http2() - .build(); - builder.build(https) - }; - // This thing handles redirects automatically but is type-incompatible with hyper::Client // Bonus: less generics to be aware of, this thing hides its complexity let http: reqwest::Client = { @@ -119,7 +107,7 @@ async fn main() { "/", env!("CARGO_PKG_VERSION") )); - // TODO add a root certificate if there's an environment variable pointing at it + // TODO: add a root certificate if there's an environment variable pointing at it //builder = builder.add_root_certificate(reqwest::Certificate::from_pem(todo!())); builder.build().unwrap() @@ -155,14 +143,18 @@ async fn main() { let onboarding = warp::path("onboarding") .and(warp::path::end()) - .and(kittybox::frontend::onboarding(database.clone(), endpoints.clone(), http_client.clone())); + .and(kittybox::frontend::onboarding( + database.clone(), + endpoints.clone(), + http.clone() + )); let micropub = warp::path("micropub") .and(warp::path::end() .and(kittybox::micropub::micropub( database.clone(), token_endpoint.to_string(), - http_client.clone() + http.clone() )) .or(warp::get() .and(warp::path("client")) diff --git a/src/micropub/mod.rs b/src/micropub/mod.rs index 7aad51a..7175e56 100644 --- a/src/micropub/mod.rs +++ b/src/micropub/mod.rs @@ -51,7 +51,7 @@ impl From for MicropubError { crate::database::ErrorKind::NotFound => ErrorType::NotFound, _ => ErrorType::InternalServerError }, - error_description: format!("Backend error: {}", err.to_string()) + error_description: format!("Backend error: {}", err) } } } @@ -158,12 +158,12 @@ mod util { } // TODO actually save the post to the database and schedule post-processing -pub(crate) async fn _post( +pub(crate) async fn _post( user: crate::indieauth::User, uid: String, mf2: serde_json::Value, db: D, - http: hyper::Client + http: reqwest::Client ) -> Result { // Here, we have the following guarantees: // - The user is the same user for this host (guaranteed by ensure_same_user) @@ -248,23 +248,78 @@ pub(crate) async fn _post = vec![]; + let uid: &str = mf2["properties"]["uid"][0].as_str().unwrap(); + + let mut contextually_significant_posts: Vec = vec![]; for prop in &["in-reply-to", "like-of", "repost-of", "bookmark-of"] { if let Some(array) = mf2["properties"][prop].as_array() { contextually_significant_posts.extend( array .iter() - .filter_map(|v| v.as_str().and_then(|v| v.parse::().ok())), + .filter_map(|v| v.as_str()) + .filter_map(|v| v.parse::().ok()), ); } } + // TODO parse HTML in e-content and add links found here contextually_significant_posts.sort_unstable_by_key(|u| u.to_string()); contextually_significant_posts.dedup(); - // TODO: Make a stream to fetch all these posts and convert them to MF2 - drop(http); + #[derive(Debug)] + #[allow(dead_code)] + struct FetchedPostContext { + url: url::Url, + // TODO see if we can use non-strings for non-UTF-8 HTML + html: String, + mf2: serde_json::Value, + webmention: Option + } + + { + // TODO: Make a stream to fetch all these posts and convert them to MF2 + let post_contexts = { + let http = &http; + tokio_stream::iter(contextually_significant_posts + .into_iter() + ).then(move |url: url::Url| async move { + let html: String = todo!("Fetch the post using {:?}", http); + // We just need to get the following here: + // 1. We need to preserve the URL + // 2. We need to get the HTML for MF2 processing + // 3. We need to get the webmention endpoint address + // All of that can be done in one go. + + // XXX stub! + + dbg!(FetchedPostContext { + url, html, + mf2: serde_json::to_value(microformats::from_html(&html, url).unwrap()).unwrap(), + webmention: None + }) + }) + .collect::>() + .await + }; - todo!() + drop(post_contexts); + } + // At this point we can start syndicating the post. + // Currently we don't really support any syndication endpoints, but still! + if let Some(syndicate_to) = mf2["properties"]["mp-syndicate-to"].as_array() { + let http = &http; + tokio_stream::iter(syndicate_to) + .filter_map(|i| futures::future::ready(i.as_str())) + .for_each_concurrent(3, |s: &str| async move { + #[allow(clippy::match_single_binding)] + match s { + _ => { + todo!("Syndicate to generic webmention-aware service {}", s); + } + // TODO special handling for non-webmention-aware services like the birdsite + } + }) + .await; + } }); Ok(reply) @@ -378,11 +433,10 @@ async fn check_auth(host: warp::host::Authority, user: User) -> Result( +fn ensure_same_user_as_host( token_endpoint: String, - http: hyper::Client -) -> impl Filter + Clone -where T: hyper::client::connect::Connect + Clone + Send + Sync + 'static { + http: reqwest::Client +) -> impl Filter + Clone { crate::util::require_host() .and(crate::indieauth::require_token(token_endpoint, http)) .and_then(check_auth) @@ -443,12 +497,11 @@ async fn dispatch_post_body( } #[cfg_attr(all(debug_assertions, not(test)), allow(unused_variables))] -pub fn post( +pub fn post( db: D, token_endpoint: String, - http: hyper::Client -) -> impl Filter + Clone -where T: hyper::client::connect::Connect + Clone + Send + Sync + 'static { + http: reqwest::Client +) -> impl Filter + Clone { let inject_db = warp::any().map(move || db.clone()); #[cfg(all(debug_assertions, not(test)))] let ensure_same_user = warp::any().map(|| crate::indieauth::User::new( @@ -467,7 +520,7 @@ where T: hyper::client::connect::Connect + Clone + Send + Sync + 'static { .and(inject_db) .and(warp::any().map(move || http.clone())) .and(ensure_same_user) - .and_then(|body: Either, db: D, http: hyper::Client, user: crate::indieauth::User| async move { + .and_then(|body: Either, db: D, http: reqwest::Client, user: crate::indieauth::User| async move { (match body { Either::Left(action) => { post_action(action, db, user).await.map(|p| Box::new(p) as Box) @@ -588,8 +641,11 @@ async fn _query( } } -pub fn query(db: D, token_endpoint: String, http: hyper::Client) -> impl Filter + Clone -where T: hyper::client::connect::Connect + Clone + Send + Sync + 'static { +pub fn query( + db: D, + token_endpoint: String, + http: reqwest::Client +) -> impl Filter + Clone { warp::get() .map(move || db.clone()) .and(warp::query::()) @@ -626,8 +682,11 @@ pub async fn recover(err: Rejection) -> Result { Ok(warp::reply::with_status(warp::reply::json(&error), error.into())) } -pub fn micropub(db: D, token_endpoint: String, http: hyper::Client) -> impl Filter + Clone -where T: hyper::client::connect::Connect + Clone + Send + Sync + 'static { +pub fn micropub( + db: D, + token_endpoint: String, + http: reqwest::Client +) -> impl Filter + Clone { query(db.clone(), token_endpoint.clone(), http.clone()) .or(post(db, token_endpoint, http)) .or(options()) @@ -685,7 +744,7 @@ mod tests { let (uid, mf2) = super::post::normalize_mf2(post, &user); super::_post( - user, uid, mf2, db, hyper::Client::new() + user, uid, mf2, db, reqwest::Client::new() ).await.map_err(warp::reject::custom) }) ) @@ -730,7 +789,7 @@ mod tests { let (uid, mf2) = super::post::normalize_mf2(post, &user); super::_post( - user, uid, mf2, db, hyper::Client::new() + user, uid, mf2, db, reqwest::Client::new() ).await.map_err(warp::reject::custom) }) ) -- cgit 1.4.1