about summary refs log tree commit diff
path: root/kittybox-rs/src/micropub/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'kittybox-rs/src/micropub/mod.rs')
-rw-r--r--kittybox-rs/src/micropub/mod.rs964
1 files changed, 964 insertions, 0 deletions
diff --git a/kittybox-rs/src/micropub/mod.rs b/kittybox-rs/src/micropub/mod.rs
new file mode 100644
index 0000000..f426c77
--- /dev/null
+++ b/kittybox-rs/src/micropub/mod.rs
@@ -0,0 +1,964 @@
+use std::convert::Infallible;
+use std::fmt::Display;
+use either::Either;
+use log::{info, warn, error};
+use warp::http::StatusCode;
+use warp::{Filter, Rejection, reject::InvalidQuery};
+use serde_json::json;
+use serde::{Serialize, Deserialize};
+use crate::database::{MicropubChannel, Storage, StorageError};
+use crate::indieauth::User;
+use crate::micropub::util::form_to_mf2_json;
+
+#[derive(Serialize, Deserialize, Debug, PartialEq)]
+#[serde(rename_all = "kebab-case")]
+enum QueryType {
+    Source,
+    Config,
+    Channel,
+    SyndicateTo
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+struct MicropubQuery {
+    q: QueryType,
+    url: Option<String>
+}
+
+#[derive(Serialize, Deserialize, PartialEq, Debug)]
+#[serde(rename_all = "snake_case")]
+enum ErrorType {
+    AlreadyExists,
+    Forbidden,
+    InternalServerError,
+    InvalidRequest,
+    InvalidScope,
+    NotAuthorized,
+    NotFound,
+    UnsupportedMediaType
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+pub(crate) struct MicropubError {
+    error: ErrorType,
+    error_description: String
+}
+
+impl From<StorageError> for MicropubError {
+    fn from(err: StorageError) -> Self {
+        Self {
+            error: match err.kind() {
+                crate::database::ErrorKind::NotFound => ErrorType::NotFound,
+                _ => ErrorType::InternalServerError
+            },
+            error_description: format!("Backend error: {}", err)
+        }
+    }
+}
+
+impl std::error::Error for MicropubError {}
+
+impl Display for MicropubError {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.write_str("Micropub error: ")?;
+        f.write_str(&self.error_description)
+    }
+}
+
+impl From<&MicropubError> for StatusCode {
+    fn from(err: &MicropubError) -> Self {
+        use ErrorType::*;
+        match err.error {
+            AlreadyExists => StatusCode::CONFLICT,
+            Forbidden => StatusCode::FORBIDDEN,
+            InternalServerError => StatusCode::INTERNAL_SERVER_ERROR,
+            InvalidRequest => StatusCode::BAD_REQUEST,
+            InvalidScope => StatusCode::UNAUTHORIZED,
+            NotAuthorized => StatusCode::UNAUTHORIZED,
+            NotFound => StatusCode::NOT_FOUND,
+            UnsupportedMediaType => StatusCode::UNSUPPORTED_MEDIA_TYPE,
+        }
+    }
+}
+impl From<MicropubError> for StatusCode {
+    fn from(err: MicropubError) -> Self {
+        (&err).into()
+    }
+}
+
+impl From<serde_json::Error> for MicropubError {
+    fn from(err: serde_json::Error) -> Self {
+        use ErrorType::*;
+        Self {
+            error: InvalidRequest,
+            error_description: err.to_string()
+        }
+    }
+}
+
+impl MicropubError {
+    fn new(error: ErrorType, error_description: &str) -> Self {
+        Self {
+            error,
+            error_description: error_description.to_owned()
+        }
+    }
+}
+
+impl warp::reject::Reject for MicropubError {}
+
+mod post;
+pub(crate) use post::normalize_mf2;
+
+mod util {
+    use serde_json::json;
+
+    pub(crate) fn form_to_mf2_json(form: Vec<(String, String)>) -> serde_json::Value {
+        let mut mf2 = json!({"type": [], "properties": {}});
+        for (k, v) in form {
+            if k == "h" {
+                mf2["type"]
+                    .as_array_mut()
+                    .unwrap()
+                    .push(json!("h-".to_string() + &v));
+            } else if k != "access_token" {
+                let key = k.strip_suffix("[]").unwrap_or(&k);
+                match mf2["properties"][key].as_array_mut() {
+                    Some(prop) => prop.push(json!(v)),
+                    None => mf2["properties"][key] = json!([v]),
+                }
+            }
+        }
+        if mf2["type"].as_array().unwrap().is_empty() {
+            mf2["type"].as_array_mut().unwrap().push(json!("h-entry"));
+        }
+        mf2
+    }
+
+    #[cfg(test)]
+    mod tests {
+        use serde_json::json;
+        #[test]
+        fn test_form_to_mf2() {
+            assert_eq!(
+                super::form_to_mf2_json(
+                    serde_urlencoded::from_str(
+                        "h=entry&content=something%20interesting"
+                    ).unwrap()
+                ),
+                json!({
+                    "type": ["h-entry"],
+                    "properties": {
+                        "content": ["something interesting"]
+                    }
+                })
+            )
+        }
+    }
+}
+
+#[derive(Debug)]
+struct FetchedPostContext {
+    url: url::Url,
+    mf2: serde_json::Value,
+    webmention: Option<url::Url>
+}
+
+fn populate_reply_context(mf2: &serde_json::Value, prop: &str, ctxs: &[FetchedPostContext]) -> Option<serde_json::Value> {
+    if mf2["properties"][prop].is_array() {
+        Some(json!(
+            mf2["properties"][prop]
+                .as_array()
+            // Safe to unwrap because we checked its existence and type
+            // And it's not like we can make it disappear without unsafe code
+                .unwrap()
+                .iter()
+            // This seems to be O(n^2) and I don't like it.
+            // Nevertheless, I lack required knowledge to optimize it. Also, it works, so...
+                .map(|i| ctxs.iter()
+                     .find(|ctx| Some(ctx.url.as_str()) == i.as_str())
+                     .and_then(|ctx| ctx.mf2["items"].get(0))
+                     .or(Some(i))
+                     .unwrap())
+                .collect::<Vec<&serde_json::Value>>()
+        ))
+    } else {
+        None
+    }
+}
+
+// TODO actually save the post to the database and schedule post-processing
+pub(crate) async fn _post<D: 'static + Storage>(
+    user: crate::indieauth::User,
+    uid: String,
+    mf2: serde_json::Value,
+    db: D,
+    http: reqwest::Client
+) -> Result<impl warp::Reply, MicropubError> {
+    // Here, we have the following guarantees:
+    // - The user is the same user for this host (guaranteed by ensure_same_user)
+    // - The MF2-JSON document is normalized (guaranteed by normalize_mf2)\
+    //   - The MF2-JSON document contains a UID
+    //   - The MF2-JSON document's URL list contains its UID
+    //   - The MF2-JSON document's "content" field contains an HTML blob, if present
+    //   - The MF2-JSON document's publishing datetime is present
+    //   - The MF2-JSON document's target channels are set
+    //   - The MF2-JSON document's author is set
+
+    // Security check! Do we have an oAuth2 scope to proceed?
+    if !user.check_scope("create") {
+        return Err(MicropubError {
+            error: ErrorType::InvalidScope,
+            error_description: "Not enough privileges - try acquiring the \"create\" scope.".to_owned()
+        });
+    }
+
+    // Security check #2! Are we posting to our own website?
+    if !uid.starts_with(user.me.as_str()) || mf2["properties"]["channel"]
+        .as_array()
+        .unwrap_or(&vec![])
+        .iter()
+        .any(|url| !url.as_str().unwrap().starts_with(user.me.as_str()))
+    {
+        return Err(MicropubError {
+            error: ErrorType::Forbidden,
+            error_description: "You're posting to a website that's not yours.".to_owned()
+        });
+    }
+
+    // Security check #3! Are we overwriting an existing document?
+    if db.post_exists(&uid).await? {
+        return Err(MicropubError {
+            error: ErrorType::AlreadyExists,
+            error_description: "UID clash was detected, operation aborted.".to_owned()
+        });
+    }
+
+    // Save the post
+    db.put_post(&mf2, user.me.as_str()).await?;
+
+    let mut channels = mf2["properties"]["channel"]
+        .as_array()
+        .unwrap()
+        .iter()
+        .map(|i| i.as_str().unwrap_or(""))
+        .filter(|i| !i.is_empty());
+
+    let default_channel = user.me.join(post::DEFAULT_CHANNEL_PATH).unwrap().to_string();
+    let vcards_channel = user.me.join(post::CONTACTS_CHANNEL_PATH).unwrap().to_string();
+    let food_channel = user.me.join(post::FOOD_CHANNEL_PATH).unwrap().to_string();
+    let default_channels = vec![default_channel, vcards_channel, food_channel];
+
+    for chan in &mut channels {
+        if db.post_exists(chan).await? {
+            db.update_post(chan, json!({"add": {"children": [uid]}})).await?;
+        } else if default_channels.iter().any(|i| chan == i) {
+            post::create_feed(&db, &uid, chan, &user).await?;
+        } else {
+            warn!("Ignoring non-existent channel: {}", chan);
+        }
+    }
+
+    let reply = warp::reply::with_status(
+        warp::reply::with_header(
+            warp::reply::json(&json!({"location": &uid})),
+            "Location", &uid
+        ),
+        StatusCode::ACCEPTED
+    );
+    
+    // TODO: Post-processing the post (aka second write pass)
+    // - [x] Download rich reply contexts
+    // - [ ] Syndicate the post if requested, add links to the syndicated copies
+    // - [ ] Send WebSub notifications to the hub (if we happen to have one)
+    // - [x] Send webmentions
+    tokio::task::spawn(async move {
+        use futures_util::StreamExt;
+
+        let uid: &str = mf2["properties"]["uid"][0].as_str().unwrap();
+
+        let context_props = ["in-reply-to", "like-of", "repost-of", "bookmark-of"];
+        let mut context_urls: Vec<url::Url> = vec![];
+        for prop in &context_props {
+            if let Some(array) = mf2["properties"][prop].as_array() {
+                context_urls.extend(
+                    array
+                        .iter()
+                        .filter_map(|v| v.as_str())
+                        .filter_map(|v| v.parse::<url::Url>().ok()),
+                );
+            }
+        }
+        // TODO parse HTML in e-content and add links found here
+        context_urls.sort_unstable_by_key(|u| u.to_string());
+        context_urls.dedup();
+
+        // TODO: Make a stream to fetch all these posts and convert them to MF2
+        let post_contexts = {
+            let http = &http;
+            tokio_stream::iter(context_urls.into_iter())
+                .then(move |url: url::Url| http.get(url).send())
+                .filter_map(|response| futures::future::ready(response.ok()))
+                .filter(|response| futures::future::ready(response.status() == 200))
+                .filter_map(|response: reqwest::Response| async move {
+                    // 1. We need to preserve the URL
+                    // 2. We need to get the HTML for MF2 processing
+                    // 3. We need to get the webmention endpoint address
+                    // All of that can be done in one go.
+                    let url = response.url().clone();
+                    // TODO parse link headers
+                    let links = response
+                        .headers()
+                        .get_all(hyper::http::header::LINK)
+                        .iter()
+                        .cloned()
+                        .collect::<Vec<hyper::http::HeaderValue>>();
+                    let html = response.text().await;
+                    if html.is_err() {
+                        return None;
+                    }
+                    let html = html.unwrap();
+                    let mf2 = microformats::from_html(&html, url.clone()).unwrap();
+                    // TODO use first Link: header if available
+                    let webmention: Option<url::Url> = mf2.rels.by_rels().get("webmention")
+                        .and_then(|i| i.first().cloned());
+
+                    dbg!(Some(FetchedPostContext {
+                        url, mf2: serde_json::to_value(mf2).unwrap(), webmention
+                    }))
+                })
+                .collect::<Vec<FetchedPostContext>>()
+                .await
+        };
+
+        let mut update = json!({ "replace": {} });
+        for prop in &context_props {
+            if let Some(json) = populate_reply_context(&mf2, prop, &post_contexts) {
+                update["replace"][prop] = json;
+            }
+        }
+        if !update["replace"].as_object().unwrap().is_empty() {
+            if let Err(err) = db.update_post(uid, update).await {
+                error!("Failed to update post with rich reply contexts: {}", err);
+            }
+        }
+
+        // At this point we can start syndicating the post.
+        // Currently we don't really support any syndication endpoints, but still!
+        /*if let Some(syndicate_to) = mf2["properties"]["mp-syndicate-to"].as_array() {
+            let http = &http;
+            tokio_stream::iter(syndicate_to)
+                .filter_map(|i| futures::future::ready(i.as_str()))
+                .for_each_concurrent(3, |s: &str| async move {
+                    #[allow(clippy::match_single_binding)]
+                    match s {
+                        _ => {
+                            todo!("Syndicate to generic webmention-aware service {}", s);
+                        }
+                        // TODO special handling for non-webmention-aware services like the birdsite
+                    }
+                })
+                .await;
+        }*/
+
+        {
+            let http = &http;
+            tokio_stream::iter(
+                post_contexts.into_iter()
+                    .filter(|ctx| ctx.webmention.is_some()))
+                .for_each_concurrent(2, |ctx| async move {
+                    let mut map = std::collections::HashMap::new();
+                    map.insert("source", uid);
+                    map.insert("target", ctx.url.as_str());
+
+                    match http.post(ctx.webmention.unwrap().clone())
+                        .form(&map)
+                        .send()
+                        .await
+                    {
+                        Ok(res) => {
+                            if !res.status().is_success() {
+                                warn!(
+                                    "Failed to send a webmention for {}: got HTTP {}",
+                                    ctx.url, res.status()
+                                );
+                            } else {
+                                info!("Sent a webmention to {}, got HTTP {}", ctx.url, res.status())
+                            }
+                        },
+                        Err(err) => warn!("Failed to send a webmention for {}: {}", ctx.url, err)
+                    }
+                })
+                .await;
+        }
+    });
+    
+    Ok(reply)
+}
+
+#[derive(Serialize, Deserialize)]
+#[serde(rename_all = "snake_case")]
+enum ActionType {
+    Delete,
+    Update
+}
+
+#[derive(Serialize, Deserialize)]
+struct MicropubFormAction {
+    action: ActionType,
+    url: String
+}
+
+#[derive(Serialize, Deserialize)]
+struct MicropubAction {
+    action: ActionType,
+    url: String,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    replace: Option<serde_json::Value>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    add: Option<serde_json::Value>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    delete: Option<serde_json::Value>
+}
+
+impl From<MicropubFormAction> for MicropubAction {
+    fn from(a: MicropubFormAction) -> Self {
+        Self {
+            action: a.action,
+            url: a.url,
+            replace: None, add: None, delete: None
+        }
+    }
+}
+
+// TODO perform the requested actions synchronously
+async fn post_action<D: Storage>(
+    action: MicropubAction,
+    db: D,
+    user: User    
+) -> Result<impl warp::Reply, MicropubError> {
+
+    let uri = if let Ok(uri) = action.url.parse::<hyper::Uri>() {
+        uri
+    } else {
+        return Err(MicropubError {
+            error: ErrorType::InvalidRequest,
+            error_description: "Your URL doesn't parse properly.".to_owned()
+        });
+    };
+
+    if uri.authority().unwrap() != user.me.as_str().parse::<hyper::Uri>().unwrap().authority().unwrap() {
+        return Err(MicropubError {
+            error: ErrorType::Forbidden,
+            error_description: "Don't tamper with others' posts!".to_owned()
+        });
+    }
+
+    match action.action {
+        ActionType::Delete => {
+            if !user.check_scope("delete") {
+                return Err(MicropubError {
+                    error: ErrorType::InvalidScope,
+                    error_description: "You need a \"delete\" scope for this.".to_owned()
+                });
+            }
+
+            db.delete_post(&action.url).await?
+        },
+        ActionType::Update => {
+            if !user.check_scope("update") {
+                return Err(MicropubError {
+                    error: ErrorType::InvalidScope,
+                    error_description: "You need an \"update\" scope for this.".to_owned()
+                });
+            }
+
+            db.update_post(
+                &action.url,
+                // Here, unwrapping is safe, because this value
+                // was recently deserialized from JSON already.
+                serde_json::to_value(&action).unwrap()
+            ).await?
+        },
+    }
+
+    Ok(warp::reply::reply())
+}
+
+async fn check_auth(host: warp::host::Authority, user: User) -> Result<User, warp::Rejection> {
+    let user_authority = warp::http::Uri::try_from(user.me.as_str())
+        .unwrap()
+        .authority()
+        .unwrap()
+        .clone();
+    // TODO compare with potential list of allowed websites
+    // to allow one user to edit several websites with one token
+    if host != user_authority {
+        Err(warp::reject::custom(MicropubError::new(
+            ErrorType::NotAuthorized,
+            "This user is not authorized to use Micropub on this website."
+        )))
+    } else {
+        Ok(user)
+    }
+}
+
+#[cfg(any(not(debug_assertions), test))]
+fn ensure_same_user_as_host(
+    token_endpoint: String,
+    http: reqwest::Client
+) -> impl Filter<Extract = (User,), Error = warp::Rejection> + Clone {
+    crate::util::require_host()
+        .and(crate::indieauth::require_token(token_endpoint, http))
+        .and_then(check_auth)
+}
+
+async fn dispatch_post_body(
+    mut body: impl bytes::Buf,
+    mimetype: http_types::Mime
+) -> Result<Either<MicropubAction, serde_json::Value>, warp::Rejection> {
+    // Since hyper::common::buf::BufList doesn't implement Clone, we can't use Clone in here
+    // We have to copy the body. Ugh!!!
+    // so much for zero-copy buffers
+    let body = {
+        let mut _body: Vec<u8> = Vec::default();
+        while body.has_remaining() {
+            _body.extend(body.chunk());
+            body.advance(body.chunk().len());
+        }
+        _body
+    };
+    match mimetype.essence() {
+        "application/json" => {
+            if let Ok(body) = serde_json::from_slice::<MicropubAction>(&body) {
+                Ok(Either::Left(body))
+            } else if let Ok(body) = serde_json::from_slice::<serde_json::Value>(&body) {
+                // quick sanity check
+                if !body.is_object() || !body["type"].is_array() {
+                    return Err(MicropubError {
+                        error: ErrorType::InvalidRequest,
+                        error_description: "Invalid MF2-JSON detected: `.` should be an object, `.type` should be an array of MF2 types".to_owned()
+                    }.into())
+                }
+                Ok(Either::Right(body))
+            } else {
+                Err(MicropubError {
+                    error: ErrorType::InvalidRequest,
+                    error_description: "Invalid JSON object passed.".to_owned()
+                }.into())
+            }
+        },
+        "application/x-www-form-urlencoded" => {
+            if let Ok(body) = serde_urlencoded::from_bytes::<MicropubFormAction>(&body) {
+                Ok(Either::Left(body.into()))
+            } else if let Ok(body) = serde_urlencoded::from_bytes::<Vec<(String, String)>>(&body) {
+                Ok(Either::Right(form_to_mf2_json(body)))
+            } else {
+                Err(MicropubError {
+                    error: ErrorType::InvalidRequest,
+                    error_description: "Invalid form-encoded data. Try h=entry&content=Hello!".to_owned()
+                }.into())
+            }
+        },
+        other => Err(MicropubError {
+            error: ErrorType::UnsupportedMediaType,
+            error_description: format!("Unsupported media type: {}. Try application/json?", other)
+        }.into())
+    }
+}
+
+#[cfg_attr(all(debug_assertions, not(test)), allow(unused_variables))]
+pub fn post<D: 'static + Storage>(
+    db: D,
+    token_endpoint: String,
+    http: reqwest::Client
+) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
+    let inject_db = warp::any().map(move || db.clone());
+    #[cfg(all(debug_assertions, not(test)))]
+    let ensure_same_user = warp::any().map(|| crate::indieauth::User::new(
+        "http://localhost:8080/",
+        "https://quill.p3k.io/",
+        "create update delete media"
+    ));
+    #[cfg(any(not(debug_assertions), test))]
+    let ensure_same_user = ensure_same_user_as_host(token_endpoint, http.clone());
+
+    warp::post()
+        .and(warp::body::content_length_limit(1024 * 512)
+             .and(warp::body::aggregate())
+             .and(warp::header::<http_types::Mime>("Content-Type"))
+             .and_then(dispatch_post_body))
+        .and(inject_db)
+        .and(warp::any().map(move || http.clone()))
+        .and(ensure_same_user)
+        .and_then(|body: Either<MicropubAction, serde_json::Value>, db: D, http: reqwest::Client, user: crate::indieauth::User| async move {
+            (match body {
+                Either::Left(action) => {
+                    post_action(action, db, user).await.map(|p| Box::new(p) as Box<dyn warp::Reply>)
+                },
+                Either::Right(post) => {
+                    let (uid, mf2) = post::normalize_mf2(post, &user);
+                    _post(user, uid, mf2, db, http).await.map(|p| Box::new(p) as Box<dyn warp::Reply>)
+                }
+            }).map_err(warp::reject::custom)
+        })
+}
+
+pub fn options() -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
+    warp::options()
+        // TODO make it reply with a basic description of Micropub spec
+        .map(|| warp::reply::json::<Option<()>>(&None))
+        .with(warp::reply::with::header("Allow", "GET, POST"))
+}
+
+async fn _query<D: Storage>(
+    db: D,
+    query: MicropubQuery,
+    user: crate::indieauth::User
+) -> Box<dyn warp::Reply> {
+    let user_authority = warp::http::Uri::try_from(user.me.as_str())
+        .unwrap()
+        .authority()
+        .unwrap()
+        .clone();
+
+    match query.q {
+        QueryType::Config => {
+            let channels: Vec<MicropubChannel> = match db.get_channels(user_authority.as_str()).await {
+                Ok(chans) => chans,
+                Err(err) => return Box::new(warp::reply::with_status(
+                    warp::reply::json(&MicropubError::new(
+                        ErrorType::InternalServerError,
+                        &format!("Error fetching channels: {}", err)
+                    )),
+                    StatusCode::INTERNAL_SERVER_ERROR
+                ))
+            };
+
+            Box::new(warp::reply::json(json!({
+                "q": [
+                    QueryType::Source,
+                    QueryType::Config,
+                    QueryType::Channel,
+                    QueryType::SyndicateTo
+                ],
+                "channels": channels,
+                "_kittybox_authority": user_authority.as_str(),
+                "syndicate-to": []
+            }).as_object().unwrap()))
+        },
+        QueryType::Source => {
+            match query.url {
+                Some(url) => {
+                    if warp::http::Uri::try_from(&url).unwrap().authority().unwrap() != &user_authority {
+                        return Box::new(warp::reply::with_status(
+                            warp::reply::json(&MicropubError::new(
+                                ErrorType::NotAuthorized,
+                                "You are requesting a post from a website that doesn't belong to you."
+                            )),
+                            StatusCode::UNAUTHORIZED
+                        ))
+                    }
+                    match db.get_post(&url).await {
+                        Ok(some) => match some {
+                            Some(post) => Box::new(warp::reply::json(&post)),
+                            None => Box::new(warp::reply::with_status(
+                                warp::reply::json(&MicropubError::new(
+                                    ErrorType::NotFound,
+                                    "The specified MF2 object was not found in database."
+                                )),
+                                StatusCode::NOT_FOUND
+                            ))
+                        },
+                        Err(err) => {
+                            Box::new(warp::reply::json(&MicropubError::new(
+                                ErrorType::InternalServerError,
+                                &format!("Backend error: {}", err)
+                            )))
+                        }
+                    }
+                },
+                None => {
+                    // Here, one should probably attempt to query at least the main feed and collect posts
+                    // Using a pre-made query function can't be done because it does unneeded filtering
+                    // Don't implement for now, this is optional
+                    Box::new(warp::reply::with_status(
+                        warp::reply::json(&MicropubError::new(
+                            ErrorType::InvalidRequest,
+                            "Querying for post list is not implemented yet."
+                        )),
+                        StatusCode::BAD_REQUEST
+                    ))
+                }
+            }
+        },
+        QueryType::Channel => {
+            let channels: Vec<MicropubChannel> = match db.get_channels(user_authority.as_str()).await {
+                Ok(chans) => chans,
+                Err(err) => return Box::new(warp::reply::with_status(
+                    warp::reply::json(&MicropubError::new(
+                        ErrorType::InternalServerError,
+                        &format!("Error fetching channels: {}", err)
+                    )),
+                    StatusCode::INTERNAL_SERVER_ERROR
+                ))
+            };
+
+            Box::new(warp::reply::json(&json!({ "channels": channels })))
+        },
+        QueryType::SyndicateTo => {
+            Box::new(warp::reply::json(&json!({ "syndicate-to": [] })))
+        }
+    }
+}
+
+pub fn query<D: Storage>(
+    db: D,
+    token_endpoint: String,
+    http: reqwest::Client
+) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
+    warp::get()
+        .map(move || db.clone())
+        .and(warp::query::<MicropubQuery>())
+        .and(crate::util::require_host()
+             .and(crate::indieauth::require_token(token_endpoint, http))
+             .and_then(check_auth))
+        .then(_query)
+        .recover(|e: warp::Rejection| async move {
+            if let Some(err) = e.find::<MicropubError>() {
+                Ok(warp::reply::json(err))
+            } else {
+                Err(e)
+            }
+        })
+}
+
+pub async fn recover(err: Rejection) -> Result<impl warp::Reply, Infallible> {
+    if let Some(error) = err.find::<MicropubError>() {
+        return Ok(warp::reply::with_status(warp::reply::json(&error), error.into()))
+    }
+    let error = if err.find::<InvalidQuery>().is_some() {
+        MicropubError::new(
+            ErrorType::InvalidRequest,
+            "Invalid query parameters sent. Try ?q=config to see what you can do."
+        )
+    } else {
+        log::error!("Unhandled rejection: {:?}", err);
+        MicropubError::new(
+            ErrorType::InternalServerError,
+            &format!("Unknown error: {:?}", err)
+        )
+    };
+
+    Ok(warp::reply::with_status(warp::reply::json(&error), error.into()))
+}
+
+pub fn micropub<D: 'static + Storage>(
+    db: D,
+    token_endpoint: String,
+    http: reqwest::Client
+) -> impl Filter<Extract = (impl warp::Reply,), Error = Infallible> + Clone {
+    query(db.clone(), token_endpoint.clone(), http.clone())
+        .or(post(db, token_endpoint, http))
+        .or(options())
+        .recover(recover)
+}
+#[cfg(test)]
+#[allow(dead_code)]
+impl MicropubQuery {
+    fn config() -> Self {
+        Self {
+            q: QueryType::Config,
+            url: None
+        }
+    }
+
+    fn source(url: &str) -> Self {
+        Self {
+            q: QueryType::Source,
+            url: Some(url.to_owned())
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use hyper::body::HttpBody;
+    use crate::{database::Storage, micropub::MicropubError};
+    use warp::{Filter, Reply};
+    use serde_json::json;
+
+    use super::FetchedPostContext;
+
+    #[test]
+    fn test_populate_reply_context() {
+        let already_expanded_reply_ctx = json!({
+            "type": ["h-entry"],
+            "properties": {
+                "content": ["Hello world!"]
+            }
+        });
+        let mf2 = json!({
+            "type": ["h-entry"],
+            "properties": {
+                "like-of": [
+                    "https://fireburn.ru/posts/example",
+                    already_expanded_reply_ctx,
+                    "https://fireburn.ru/posts/non-existent"
+                ]
+            }
+        });
+        let test_ctx = json!({
+            "type": ["h-entry"],
+            "properties": {
+                "content": ["This is a post which was reacted to."]
+            }
+        });
+        let reply_contexts = vec![
+            FetchedPostContext {
+                url: "https://fireburn.ru/posts/example".parse().unwrap(),
+                mf2: json!({
+                    "items": [
+                        test_ctx
+                    ]
+                }),
+                webmention: None
+            }
+        ];
+
+        let like_of = super::populate_reply_context(&mf2, "like-of", &reply_contexts).unwrap();
+
+        assert_eq!(like_of[0], test_ctx);
+        assert_eq!(like_of[1], already_expanded_reply_ctx);
+        assert_eq!(like_of[2], "https://fireburn.ru/posts/non-existent");
+    }
+
+    #[tokio::test]
+    async fn check_post_reject_scope() {
+        let inject_db = {
+            let db = crate::database::MemoryStorage::new();
+
+            move || db.clone()
+        };
+        let db = inject_db();
+
+        let res = warp::test::request()
+            .filter(&warp::any()
+                    .map(inject_db)
+                    .and_then(move |db| async move {
+                        let post = json!({
+                            "type": ["h-entry"],
+                            "properties": {
+                                "content": ["Hello world!"]
+                            }
+                        });
+                        let user = crate::indieauth::User::new(
+                            "https://localhost:8080/",
+                            "https://kittybox.fireburn.ru/",
+                            "profile"
+                        );
+                        let (uid, mf2) = super::post::normalize_mf2(post, &user);
+
+                        super::_post(
+                            user, uid, mf2, db, reqwest::Client::new()
+                        ).await.map_err(warp::reject::custom)
+                    })
+            )
+            .await
+            .map(|_| panic!("Tried to do something with a reply!"))
+            .unwrap_err();
+
+        if let Some(err) = res.find::<MicropubError>() {
+            assert_eq!(err.error, super::ErrorType::InvalidScope);
+        } else {
+            panic!("Did not return MicropubError");
+        }
+
+        let hashmap = db.mapping.read().await;
+        assert!(hashmap.is_empty());
+    }
+    
+    #[tokio::test]
+    async fn check_post_mf2() {
+        let inject_db = {
+            let db = crate::database::MemoryStorage::new();
+
+            move || db.clone()
+        };
+        let db = inject_db();
+        
+        let res = warp::test::request()
+            .filter(&warp::any()
+                    .map(inject_db)
+                    .and_then(move |db| async move {
+                        let post = json!({
+                            "type": ["h-entry"],
+                            "properties": {
+                                "content": ["Hello world!"]
+                            }
+                        });
+                        let user = crate::indieauth::User::new(
+                            "https://localhost:8080/",
+                            "https://kittybox.fireburn.ru/",
+                            "create"
+                        );
+                        let (uid, mf2) = super::post::normalize_mf2(post, &user);
+
+                        super::_post(
+                            user, uid, mf2, db, reqwest::Client::new()
+                        ).await.map_err(warp::reject::custom)
+                    })
+            )
+            .await
+            .unwrap()
+            .into_response();
+
+        assert!(res.headers().contains_key("Location"));
+        let location = res.headers().get("Location").unwrap();
+        assert!(db.post_exists(location.to_str().unwrap()).await.unwrap());
+        assert!(db.post_exists("https://localhost:8080/feeds/main").await.unwrap());
+    }
+
+    #[tokio::test]
+    async fn test_check_auth() {
+        let err = warp::test::request()
+            .filter(&warp::any()
+                    .map(|| (
+                        warp::host::Authority::from_static("aaronparecki.com"),
+                        crate::indieauth::User::new(
+                            "https://fireburn.ru/",
+                            "https://quill.p3k.io/",
+                            "create update media"
+                        )))
+                    .untuple_one()
+                    .and_then(super::check_auth))
+            .await
+            .unwrap_err();
+
+        let json: &MicropubError = err.find::<MicropubError>().unwrap();
+        assert_eq!(json.error, super::ErrorType::NotAuthorized);
+    }
+
+    #[tokio::test]
+    async fn test_query_foreign_url() {
+        let mut res = warp::test::request()
+            .filter(&warp::any().then(|| super::_query(
+                crate::database::MemoryStorage::new(),
+                super::MicropubQuery::source("https://aaronparecki.com/feeds/main"),
+                crate::indieauth::User::new(
+                    "https://fireburn.ru/",
+                    "https://quill.p3k.io/",
+                    "create update media"
+                )
+            )))
+            .await
+            .unwrap()
+            .into_response();
+
+        assert_eq!(res.status(), 401);
+        let body = res.body_mut().data().await.unwrap().unwrap();
+        let json: MicropubError = serde_json::from_slice(&body as &[u8]).unwrap();
+        assert_eq!(json.error, super::ErrorType::NotAuthorized);
+    }
+}
+