about summary refs log tree commit diff
path: root/src
diff options
context:
space:
mode:
authorVika <vika@fireburn.ru>2022-05-12 01:37:59 +0300
committerVika <vika@fireburn.ru>2022-05-12 01:37:59 +0300
commit47285d5af803ed26d52b7f248a8f2de556e9057f (patch)
treef385c797742a4e98d1c78ba803396bd6ffe44b35 /src
parent34f1c6229bac92212c97cbacc77801d4a2921e4a (diff)
downloadkittybox-47285d5af803ed26d52b7f248a8f2de556e9057f.tar.zst
treewide: prepare for mf2 parsing and cleanup unused code
Diffstat (limited to 'src')
-rw-r--r--src/frontend/mod.rs8
-rw-r--r--src/indieauth.rs75
-rw-r--r--src/main.rs28
-rw-r--r--src/micropub/mod.rs105
4 files changed, 122 insertions, 94 deletions
diff --git a/src/frontend/mod.rs b/src/frontend/mod.rs
index 3c3072d..eec2f85 100644
--- a/src/frontend/mod.rs
+++ b/src/frontend/mod.rs
@@ -286,8 +286,10 @@ pub fn homepage<D: Storage>(db: D, endpoints: IndiewebEndpoints) -> impl Filter<
         })
 }
 
-pub fn onboarding<D: Storage, T: hyper::client::connect::Connect + Clone + Send + Sync + 'static>(
-    db: D, endpoints: IndiewebEndpoints, http: hyper::Client<T, hyper::Body>
+pub fn onboarding<D: Storage>(
+    db: D,
+    endpoints: IndiewebEndpoints,
+    http: reqwest::Client
 ) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
     let inject_db = move || db.clone();
     warp::get()
@@ -304,7 +306,7 @@ pub fn onboarding<D: Storage, T: hyper::client::connect::Connect + Clone + Send
             .and(warp::any().map(inject_db))
             .and(warp::body::json::<OnboardingData>())
             .and(warp::any().map(move || http.clone()))
-            .and_then(|host: warp::host::Authority, db: D, body: OnboardingData, http: _| async move {
+            .and_then(|host: warp::host::Authority, db: D, body: OnboardingData, http: reqwest::Client| async move {
                 let user_uid = format!("https://{}/", host.as_str());
                 if db.post_exists(&user_uid).await.map_err(FrontendError::from)? {
                     
diff --git a/src/indieauth.rs b/src/indieauth.rs
index 4dfe11d..57c0301 100644
--- a/src/indieauth.rs
+++ b/src/indieauth.rs
@@ -62,8 +62,8 @@ impl From<serde_json::Error> for IndieAuthError {
     }
 }
 
-impl From<hyper::Error> for IndieAuthError {
-    fn from(err: hyper::Error) -> Self {
+impl From<reqwest::Error> for IndieAuthError {
+    fn from(err: reqwest::Error) -> Self {
         Self {
             msg: format!("{}", err),
             source: Some(Box::new(err)),
@@ -90,11 +90,10 @@ impl User {
     }
 }
 
-pub fn require_token<T>(token_endpoint: String, http: hyper::Client<T, hyper::Body>) -> impl Filter<Extract = (User,), Error = Rejection> + Clone
-where T: hyper::client::connect::Connect + Clone + Send + Sync + 'static {
+pub fn require_token(token_endpoint: String, http: reqwest::Client) -> impl Filter<Extract = (User,), Error = Rejection> + Clone {
     // It might be OK to panic here, because we're still inside the initialisation sequence for now.
     // Proper error handling on the top of this should be used though.
-    let token_endpoint_uri = hyper::Uri::try_from(&token_endpoint)
+    let token_endpoint_uri = url::Url::parse(&token_endpoint)
         .expect("Couldn't parse the token endpoint URI!");
     warp::any()
         .map(move || token_endpoint_uri.clone())
@@ -110,60 +109,36 @@ where T: hyper::client::connect::Connect + Clone + Send + Sync + 'static {
                 Err(err)
             }
         }).unify())
-        .and_then(|token_endpoint, http: hyper::Client<T, hyper::Body>, token| async move {
-            let request = hyper::Request::builder()
-                .method(hyper::Method::GET)
-                .uri(token_endpoint)
-                .header("Authorization", token)
-                .header("Accept", "application/json")
-                .body(hyper::Body::from(""))
-                // TODO is it acceptable to panic here?
-                .unwrap();
-
+        .and_then(|token_endpoint, http: reqwest::Client, token| async move {
             use hyper::StatusCode;
 
-            match http.request(request).await {
-                Ok(mut res) => match res.status() {
-                    StatusCode::OK => {
-                        use hyper::body::HttpBody;
-                        use bytes::BufMut;
-                        let mut buf: Vec<u8> = Vec::default();
-                        while let Some(chunk) = res.body_mut().data().await {
-                            if let Err(err) = chunk {
-                                return Err(IndieAuthError::from(err).into());
-                            }
-                            buf.put(chunk.unwrap());
-                        }
-                        match serde_json::from_slice(&buf) {
+            match http
+                .get(token_endpoint)
+                .header("Authorization", token)
+                .header("Accept", "application/json")
+                .send()
+                .await
+            {
+                Ok(res) => match res.status() {
+                    StatusCode::OK => match res.json::<serde_json::Value>().await {
+                        Ok(json) => match serde_json::from_value::<User>(json.clone()) {
                             Ok(user) => Ok(user),
                             Err(err) => {
-                                if let Ok(json) = serde_json::from_slice::<serde_json::Value>(&buf) {
-                                    if Some(false) == json["active"].as_bool() {
-                                        Err(IndieAuthError {
-                                            source: None,
-                                            kind: ErrorKind::NotAuthorized,
-                                            msg: "The token endpoint deemed the token as not \"active\".".to_string()
-                                        }.into())
-                                    } else {
-                                        Err(IndieAuthError::from(err).into())
-                                    }
+                                if let Some(false) = json["active"].as_bool() {
+                                    Err(IndieAuthError {
+                                        source: None,
+                                        kind: ErrorKind::NotAuthorized,
+                                        msg: "The token is not active for this user.".to_owned()
+                                    }.into())
                                 } else {
                                     Err(IndieAuthError::from(err).into())
                                 }
                             }
                         }
+                        Err(err) => Err(IndieAuthError::from(err).into())
                     },
                     StatusCode::BAD_REQUEST => {
-                        use hyper::body::HttpBody;
-                        use bytes::BufMut;
-                        let mut buf: Vec<u8> = Vec::default();
-                        while let Some(chunk) = res.body_mut().data().await {
-                            if let Err(err) = chunk {
-                                return Err(IndieAuthError::from(err).into());
-                            }
-                            buf.put(chunk.unwrap());
-                        }
-                        match serde_json::from_slice::<TokenEndpointError>(&buf) {
+                        match res.json::<TokenEndpointError>().await {
                             Ok(err) => {
                                 if err.error == "unauthorized" {
                                     Err(IndieAuthError {
@@ -211,8 +186,8 @@ mod tests {
     }
 
     #[inline]
-    fn get_http_client() -> hyper::Client<impl hyper::client::connect::Connect + Clone + Send + Sync + 'static, hyper::Body> {
-        hyper::Client::new()
+    fn get_http_client() -> reqwest::Client {
+        reqwest::Client::new()
     }
     
     #[tokio::test]
diff --git a/src/main.rs b/src/main.rs
index 0234abc..ed8c8e7 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,15 +1,15 @@
 use log::{debug, error, info};
 use std::{convert::Infallible, env, time::Duration};
 use url::Url;
-use hyper::client::{HttpConnector,connect::dns::GaiResolver};
-use hyper_rustls::HttpsConnector;
 use warp::{Filter, host::Authority};
 
 #[tokio::main]
 async fn main() {
+    // TODO turn into a feature so I can enable and disable it
     #[cfg(debug_assertions)]
     console_subscriber::init();
-    // TODO json logging in the future?
+
+    // TODO use tracing instead of log
     let logger_env = env_logger::Env::new().filter_or("RUST_LOG", "info");
     env_logger::init_from_env(logger_env);
 
@@ -97,18 +97,6 @@ async fn main() {
             }
         };
 
-    // TODO remove this and see what screams to replace it with reqwest
-    let http_client: hyper::Client<HttpsConnector<HttpConnector<GaiResolver>>, hyper::Body> = {
-        let builder = hyper::Client::builder();
-        let https = hyper_rustls::HttpsConnectorBuilder::new()
-            .with_webpki_roots()
-            .https_only()
-            .enable_http1()
-            .enable_http2()
-            .build();
-        builder.build(https)
-    };
-
     // This thing handles redirects automatically but is type-incompatible with hyper::Client
     // Bonus: less generics to be aware of, this thing hides its complexity
     let http: reqwest::Client = {
@@ -119,7 +107,7 @@ async fn main() {
                 "/",
                 env!("CARGO_PKG_VERSION")
             ));
-        // TODO add a root certificate if there's an environment variable pointing at it
+        // TODO: add a root certificate if there's an environment variable pointing at it
         //builder = builder.add_root_certificate(reqwest::Certificate::from_pem(todo!()));
 
         builder.build().unwrap()
@@ -155,14 +143,18 @@ async fn main() {
 
         let onboarding = warp::path("onboarding")
             .and(warp::path::end())
-            .and(kittybox::frontend::onboarding(database.clone(), endpoints.clone(), http_client.clone()));
+            .and(kittybox::frontend::onboarding(
+                database.clone(),
+                endpoints.clone(),
+                http.clone()
+            ));
         
         let micropub = warp::path("micropub")
             .and(warp::path::end()
                  .and(kittybox::micropub::micropub(
                      database.clone(),
                      token_endpoint.to_string(),
-                     http_client.clone()
+                     http.clone()
                  ))
                  .or(warp::get()
                      .and(warp::path("client"))
diff --git a/src/micropub/mod.rs b/src/micropub/mod.rs
index 7aad51a..7175e56 100644
--- a/src/micropub/mod.rs
+++ b/src/micropub/mod.rs
@@ -51,7 +51,7 @@ impl From<StorageError> for MicropubError {
                 crate::database::ErrorKind::NotFound => ErrorType::NotFound,
                 _ => ErrorType::InternalServerError
             },
-            error_description: format!("Backend error: {}", err.to_string())
+            error_description: format!("Backend error: {}", err)
         }
     }
 }
@@ -158,12 +158,12 @@ mod util {
 }
 
 // TODO actually save the post to the database and schedule post-processing
-pub(crate) async fn _post<D: Storage, T: hyper::client::connect::Connect + Clone + Send + Sync + 'static>(
+pub(crate) async fn _post<D: Storage>(
     user: crate::indieauth::User,
     uid: String,
     mf2: serde_json::Value,
     db: D,
-    http: hyper::Client<T, hyper::Body>
+    http: reqwest::Client
 ) -> Result<impl warp::Reply, MicropubError> {
     // Here, we have the following guarantees:
     // - The user is the same user for this host (guaranteed by ensure_same_user)
@@ -248,23 +248,78 @@ pub(crate) async fn _post<D: Storage, T: hyper::client::connect::Connect + Clone
         use bytes::{Buf, BufMut};
         use futures_util::StreamExt;
 
-        let mut contextually_significant_posts: Vec<hyper::Uri> = vec![];
+        let uid: &str = mf2["properties"]["uid"][0].as_str().unwrap();
+
+        let mut contextually_significant_posts: Vec<url::Url> = vec![];
         for prop in &["in-reply-to", "like-of", "repost-of", "bookmark-of"] {
             if let Some(array) = mf2["properties"][prop].as_array() {
                 contextually_significant_posts.extend(
                     array
                         .iter()
-                        .filter_map(|v| v.as_str().and_then(|v| v.parse::<hyper::Uri>().ok())),
+                        .filter_map(|v| v.as_str())
+                        .filter_map(|v| v.parse::<url::Url>().ok()),
                 );
             }
         }
+        // TODO parse HTML in e-content and add links found here
         contextually_significant_posts.sort_unstable_by_key(|u| u.to_string());
         contextually_significant_posts.dedup();
 
-        // TODO: Make a stream to fetch all these posts and convert them to MF2
-        drop(http);
+        #[derive(Debug)]
+        #[allow(dead_code)]
+        struct FetchedPostContext {
+            url: url::Url,
+            // TODO see if we can use non-strings for non-UTF-8 HTML
+            html: String,
+            mf2: serde_json::Value,
+            webmention: Option<hyper::Uri>
+        }
+
+        {
+            // TODO: Make a stream to fetch all these posts and convert them to MF2
+            let post_contexts = {
+                let http = &http;
+                tokio_stream::iter(contextually_significant_posts
+                                   .into_iter()
+                ).then(move |url: url::Url| async move {
+                    let html: String = todo!("Fetch the post using {:?}", http);
+                    // We just need to get the following here:
+                    // 1. We need to preserve the URL
+                    // 2. We need to get the HTML for MF2 processing
+                    // 3. We need to get the webmention endpoint address
+                    // All of that can be done in one go.
+
+                    // XXX stub!
+
+                    dbg!(FetchedPostContext {
+                        url, html,
+                        mf2: serde_json::to_value(microformats::from_html(&html, url).unwrap()).unwrap(),
+                        webmention: None
+                    })
+                })
+                    .collect::<Vec<FetchedPostContext>>()
+                    .await
+            };
 
-        todo!()
+            drop(post_contexts);
+        }
+        // At this point we can start syndicating the post.
+        // Currently we don't really support any syndication endpoints, but still!
+        if let Some(syndicate_to) = mf2["properties"]["mp-syndicate-to"].as_array() {
+            let http = &http;
+            tokio_stream::iter(syndicate_to)
+                .filter_map(|i| futures::future::ready(i.as_str()))
+                .for_each_concurrent(3, |s: &str| async move {
+                    #[allow(clippy::match_single_binding)]
+                    match s {
+                        _ => {
+                            todo!("Syndicate to generic webmention-aware service {}", s);
+                        }
+                        // TODO special handling for non-webmention-aware services like the birdsite
+                    }
+                })
+                .await;
+        }
     });
     
     Ok(reply)
@@ -378,11 +433,10 @@ async fn check_auth(host: warp::host::Authority, user: User) -> Result<User, war
 }
 
 #[cfg(any(not(debug_assertions), test))]
-fn ensure_same_user_as_host<T>(
+fn ensure_same_user_as_host(
     token_endpoint: String,
-    http: hyper::Client<T, hyper::Body>
-) -> impl Filter<Extract = (User,), Error = warp::Rejection> + Clone
-where T: hyper::client::connect::Connect + Clone + Send + Sync + 'static {
+    http: reqwest::Client
+) -> impl Filter<Extract = (User,), Error = warp::Rejection> + Clone {
     crate::util::require_host()
         .and(crate::indieauth::require_token(token_endpoint, http))
         .and_then(check_auth)
@@ -443,12 +497,11 @@ async fn dispatch_post_body(
 }
 
 #[cfg_attr(all(debug_assertions, not(test)), allow(unused_variables))]
-pub fn post<D: 'static + Storage, T>(
+pub fn post<D: 'static + Storage>(
     db: D,
     token_endpoint: String,
-    http: hyper::Client<T, hyper::Body>
-) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone
-where T: hyper::client::connect::Connect + Clone + Send + Sync + 'static {
+    http: reqwest::Client
+) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
     let inject_db = warp::any().map(move || db.clone());
     #[cfg(all(debug_assertions, not(test)))]
     let ensure_same_user = warp::any().map(|| crate::indieauth::User::new(
@@ -467,7 +520,7 @@ where T: hyper::client::connect::Connect + Clone + Send + Sync + 'static {
         .and(inject_db)
         .and(warp::any().map(move || http.clone()))
         .and(ensure_same_user)
-        .and_then(|body: Either<MicropubAction, serde_json::Value>, db: D, http: hyper::Client<T, hyper::Body>, user: crate::indieauth::User| async move {
+        .and_then(|body: Either<MicropubAction, serde_json::Value>, db: D, http: reqwest::Client, user: crate::indieauth::User| async move {
             (match body {
                 Either::Left(action) => {
                     post_action(action, db, user).await.map(|p| Box::new(p) as Box<dyn warp::Reply>)
@@ -588,8 +641,11 @@ async fn _query<D: Storage>(
     }
 }
 
-pub fn query<D: Storage, T>(db: D, token_endpoint: String, http: hyper::Client<T, hyper::Body>) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone
-where T: hyper::client::connect::Connect + Clone + Send + Sync + 'static {
+pub fn query<D: Storage>(
+    db: D,
+    token_endpoint: String,
+    http: reqwest::Client
+) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
     warp::get()
         .map(move || db.clone())
         .and(warp::query::<MicropubQuery>())
@@ -626,8 +682,11 @@ pub async fn recover(err: Rejection) -> Result<impl warp::Reply, Infallible> {
     Ok(warp::reply::with_status(warp::reply::json(&error), error.into()))
 }
 
-pub fn micropub<D: 'static + Storage, T>(db: D, token_endpoint: String, http: hyper::Client<T, hyper::Body>) -> impl Filter<Extract = (impl warp::Reply,), Error = Infallible> + Clone
-where T: hyper::client::connect::Connect + Clone + Send + Sync + 'static {
+pub fn micropub<D: 'static + Storage>(
+    db: D,
+    token_endpoint: String,
+    http: reqwest::Client
+) -> impl Filter<Extract = (impl warp::Reply,), Error = Infallible> + Clone {
     query(db.clone(), token_endpoint.clone(), http.clone())
         .or(post(db, token_endpoint, http))
         .or(options())
@@ -685,7 +744,7 @@ mod tests {
                         let (uid, mf2) = super::post::normalize_mf2(post, &user);
 
                         super::_post(
-                            user, uid, mf2, db, hyper::Client::new()
+                            user, uid, mf2, db, reqwest::Client::new()
                         ).await.map_err(warp::reject::custom)
                     })
             )
@@ -730,7 +789,7 @@ mod tests {
                         let (uid, mf2) = super::post::normalize_mf2(post, &user);
 
                         super::_post(
-                            user, uid, mf2, db, hyper::Client::new()
+                            user, uid, mf2, db, reqwest::Client::new()
                         ).await.map_err(warp::reject::custom)
                     })
             )