about summary refs log tree commit diff
path: root/src/micropub
diff options
context:
space:
mode:
Diffstat (limited to 'src/micropub')
-rw-r--r--src/micropub/mod.rs105
1 files changed, 82 insertions, 23 deletions
diff --git a/src/micropub/mod.rs b/src/micropub/mod.rs
index 7aad51a..7175e56 100644
--- a/src/micropub/mod.rs
+++ b/src/micropub/mod.rs
@@ -51,7 +51,7 @@ impl From<StorageError> for MicropubError {
                 crate::database::ErrorKind::NotFound => ErrorType::NotFound,
                 _ => ErrorType::InternalServerError
             },
-            error_description: format!("Backend error: {}", err.to_string())
+            error_description: format!("Backend error: {}", err)
         }
     }
 }
@@ -158,12 +158,12 @@ mod util {
 }
 
 // TODO actually save the post to the database and schedule post-processing
-pub(crate) async fn _post<D: Storage, T: hyper::client::connect::Connect + Clone + Send + Sync + 'static>(
+pub(crate) async fn _post<D: Storage>(
     user: crate::indieauth::User,
     uid: String,
     mf2: serde_json::Value,
     db: D,
-    http: hyper::Client<T, hyper::Body>
+    http: reqwest::Client
 ) -> Result<impl warp::Reply, MicropubError> {
     // Here, we have the following guarantees:
     // - The user is the same user for this host (guaranteed by ensure_same_user)
@@ -248,23 +248,78 @@ pub(crate) async fn _post<D: Storage, T: hyper::client::connect::Connect + Clone
         use bytes::{Buf, BufMut};
         use futures_util::StreamExt;
 
-        let mut contextually_significant_posts: Vec<hyper::Uri> = vec![];
+        let uid: &str = mf2["properties"]["uid"][0].as_str().unwrap();
+
+        let mut contextually_significant_posts: Vec<url::Url> = vec![];
         for prop in &["in-reply-to", "like-of", "repost-of", "bookmark-of"] {
             if let Some(array) = mf2["properties"][prop].as_array() {
                 contextually_significant_posts.extend(
                     array
                         .iter()
-                        .filter_map(|v| v.as_str().and_then(|v| v.parse::<hyper::Uri>().ok())),
+                        .filter_map(|v| v.as_str())
+                        .filter_map(|v| v.parse::<url::Url>().ok()),
                 );
             }
         }
+        // TODO parse HTML in e-content and add links found here
         contextually_significant_posts.sort_unstable_by_key(|u| u.to_string());
         contextually_significant_posts.dedup();
 
-        // TODO: Make a stream to fetch all these posts and convert them to MF2
-        drop(http);
+        #[derive(Debug)]
+        #[allow(dead_code)]
+        struct FetchedPostContext {
+            url: url::Url,
+            // TODO see if we can use non-strings for non-UTF-8 HTML
+            html: String,
+            mf2: serde_json::Value,
+            webmention: Option<hyper::Uri>
+        }
+
+        {
+            // TODO: Make a stream to fetch all these posts and convert them to MF2
+            let post_contexts = {
+                let http = &http;
+                tokio_stream::iter(contextually_significant_posts
+                                   .into_iter()
+                ).then(move |url: url::Url| async move {
+                    let html: String = todo!("Fetch the post using {:?}", http);
+                    // We just need to get the following here:
+                    // 1. We need to preserve the URL
+                    // 2. We need to get the HTML for MF2 processing
+                    // 3. We need to get the webmention endpoint address
+                    // All of that can be done in one go.
+
+                    // XXX stub!
+
+                    dbg!(FetchedPostContext {
+                        url, html,
+                        mf2: serde_json::to_value(microformats::from_html(&html, url).unwrap()).unwrap(),
+                        webmention: None
+                    })
+                })
+                    .collect::<Vec<FetchedPostContext>>()
+                    .await
+            };
 
-        todo!()
+            drop(post_contexts);
+        }
+        // At this point we can start syndicating the post.
+        // Currently we don't really support any syndication endpoints, but still!
+        if let Some(syndicate_to) = mf2["properties"]["mp-syndicate-to"].as_array() {
+            let http = &http;
+            tokio_stream::iter(syndicate_to)
+                .filter_map(|i| futures::future::ready(i.as_str()))
+                .for_each_concurrent(3, |s: &str| async move {
+                    #[allow(clippy::match_single_binding)]
+                    match s {
+                        _ => {
+                            todo!("Syndicate to generic webmention-aware service {}", s);
+                        }
+                        // TODO special handling for non-webmention-aware services like the birdsite
+                    }
+                })
+                .await;
+        }
     });
     
     Ok(reply)
@@ -378,11 +433,10 @@ async fn check_auth(host: warp::host::Authority, user: User) -> Result<User, war
 }
 
 #[cfg(any(not(debug_assertions), test))]
-fn ensure_same_user_as_host<T>(
+fn ensure_same_user_as_host(
     token_endpoint: String,
-    http: hyper::Client<T, hyper::Body>
-) -> impl Filter<Extract = (User,), Error = warp::Rejection> + Clone
-where T: hyper::client::connect::Connect + Clone + Send + Sync + 'static {
+    http: reqwest::Client
+) -> impl Filter<Extract = (User,), Error = warp::Rejection> + Clone {
     crate::util::require_host()
         .and(crate::indieauth::require_token(token_endpoint, http))
         .and_then(check_auth)
@@ -443,12 +497,11 @@ async fn dispatch_post_body(
 }
 
 #[cfg_attr(all(debug_assertions, not(test)), allow(unused_variables))]
-pub fn post<D: 'static + Storage, T>(
+pub fn post<D: 'static + Storage>(
     db: D,
     token_endpoint: String,
-    http: hyper::Client<T, hyper::Body>
-) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone
-where T: hyper::client::connect::Connect + Clone + Send + Sync + 'static {
+    http: reqwest::Client
+) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
     let inject_db = warp::any().map(move || db.clone());
     #[cfg(all(debug_assertions, not(test)))]
     let ensure_same_user = warp::any().map(|| crate::indieauth::User::new(
@@ -467,7 +520,7 @@ where T: hyper::client::connect::Connect + Clone + Send + Sync + 'static {
         .and(inject_db)
         .and(warp::any().map(move || http.clone()))
         .and(ensure_same_user)
-        .and_then(|body: Either<MicropubAction, serde_json::Value>, db: D, http: hyper::Client<T, hyper::Body>, user: crate::indieauth::User| async move {
+        .and_then(|body: Either<MicropubAction, serde_json::Value>, db: D, http: reqwest::Client, user: crate::indieauth::User| async move {
             (match body {
                 Either::Left(action) => {
                     post_action(action, db, user).await.map(|p| Box::new(p) as Box<dyn warp::Reply>)
@@ -588,8 +641,11 @@ async fn _query<D: Storage>(
     }
 }
 
-pub fn query<D: Storage, T>(db: D, token_endpoint: String, http: hyper::Client<T, hyper::Body>) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone
-where T: hyper::client::connect::Connect + Clone + Send + Sync + 'static {
+pub fn query<D: Storage>(
+    db: D,
+    token_endpoint: String,
+    http: reqwest::Client
+) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
     warp::get()
         .map(move || db.clone())
         .and(warp::query::<MicropubQuery>())
@@ -626,8 +682,11 @@ pub async fn recover(err: Rejection) -> Result<impl warp::Reply, Infallible> {
     Ok(warp::reply::with_status(warp::reply::json(&error), error.into()))
 }
 
-pub fn micropub<D: 'static + Storage, T>(db: D, token_endpoint: String, http: hyper::Client<T, hyper::Body>) -> impl Filter<Extract = (impl warp::Reply,), Error = Infallible> + Clone
-where T: hyper::client::connect::Connect + Clone + Send + Sync + 'static {
+pub fn micropub<D: 'static + Storage>(
+    db: D,
+    token_endpoint: String,
+    http: reqwest::Client
+) -> impl Filter<Extract = (impl warp::Reply,), Error = Infallible> + Clone {
     query(db.clone(), token_endpoint.clone(), http.clone())
         .or(post(db, token_endpoint, http))
         .or(options())
@@ -685,7 +744,7 @@ mod tests {
                         let (uid, mf2) = super::post::normalize_mf2(post, &user);
 
                         super::_post(
-                            user, uid, mf2, db, hyper::Client::new()
+                            user, uid, mf2, db, reqwest::Client::new()
                         ).await.map_err(warp::reject::custom)
                     })
             )
@@ -730,7 +789,7 @@ mod tests {
                         let (uid, mf2) = super::post::normalize_mf2(post, &user);
 
                         super::_post(
-                            user, uid, mf2, db, hyper::Client::new()
+                            user, uid, mf2, db, reqwest::Client::new()
                         ).await.map_err(warp::reject::custom)
                     })
             )