diff options
Diffstat (limited to 'kittybox-rs/src/micropub/mod.rs')
-rw-r--r-- | kittybox-rs/src/micropub/mod.rs | 1040 |
1 files changed, 446 insertions, 594 deletions
diff --git a/kittybox-rs/src/micropub/mod.rs b/kittybox-rs/src/micropub/mod.rs index f426c77..d7be785 100644 --- a/kittybox-rs/src/micropub/mod.rs +++ b/kittybox-rs/src/micropub/mod.rs @@ -1,14 +1,15 @@ -use std::convert::Infallible; -use std::fmt::Display; -use either::Either; -use log::{info, warn, error}; -use warp::http::StatusCode; -use warp::{Filter, Rejection, reject::InvalidQuery}; -use serde_json::json; -use serde::{Serialize, Deserialize}; use crate::database::{MicropubChannel, Storage, StorageError}; use crate::indieauth::User; use crate::micropub::util::form_to_mf2_json; +use axum::TypedHeader; +use axum::extract::{BodyStream, Query}; +use axum::headers::ContentType; +use axum::response::{IntoResponse, Response}; +use axum::{http::StatusCode, Extension}; +use tracing::{error, info, warn, debug}; +use serde::{Deserialize, Serialize}; +use serde_json::json; +use std::fmt::Display; #[derive(Serialize, Deserialize, Debug, PartialEq)] #[serde(rename_all = "kebab-case")] @@ -16,13 +17,13 @@ enum QueryType { Source, Config, Channel, - SyndicateTo + SyndicateTo, } #[derive(Serialize, Deserialize, Debug)] -struct MicropubQuery { +pub struct MicropubQuery { q: QueryType, - url: Option<String> + url: Option<String>, } #[derive(Serialize, Deserialize, PartialEq, Debug)] @@ -35,13 +36,13 @@ enum ErrorType { InvalidScope, NotAuthorized, NotFound, - UnsupportedMediaType + UnsupportedMediaType, } #[derive(Serialize, Deserialize, Debug)] pub(crate) struct MicropubError { error: ErrorType, - error_description: String + error_description: String, } impl From<StorageError> for MicropubError { @@ -49,9 +50,9 @@ impl From<StorageError> for MicropubError { Self { error: match err.kind() { crate::database::ErrorKind::NotFound => ErrorType::NotFound, - _ => ErrorType::InternalServerError + _ => ErrorType::InternalServerError, }, - error_description: format!("Backend error: {}", err) + error_description: format!("Backend error: {}", err), } } } @@ -86,12 +87,21 @@ impl From<MicropubError> for StatusCode { } } +impl axum::response::IntoResponse for MicropubError { + fn into_response(self) -> axum::response::Response { + axum::response::IntoResponse::into_response(( + StatusCode::from(&self), + axum::response::Json(self) + )) + } +} + impl From<serde_json::Error> for MicropubError { fn from(err: serde_json::Error) -> Self { use ErrorType::*; Self { error: InvalidRequest, - error_description: err.to_string() + error_description: err.to_string(), } } } @@ -100,90 +110,184 @@ impl MicropubError { fn new(error: ErrorType, error_description: &str) -> Self { Self { error, - error_description: error_description.to_owned() + error_description: error_description.to_owned(), } } } -impl warp::reject::Reject for MicropubError {} +mod util; +pub(crate) use util::normalize_mf2; -mod post; -pub(crate) use post::normalize_mf2; +#[derive(Debug)] +struct FetchedPostContext { + url: url::Url, + mf2: serde_json::Value, + webmention: Option<url::Url>, +} -mod util { - use serde_json::json; +fn populate_reply_context( + mf2: &serde_json::Value, + prop: &str, + ctxs: &[FetchedPostContext], +) -> Option<serde_json::Value> { + mf2["properties"][prop].as_array().map(|array| { + json!(array + .iter() + // TODO: This seems to be O(n^2) and I don't like it. + // Switching `ctxs` to a hashmap might speed it up to O(n) + // The key would be the URL/UID + .map(|i| ctxs + .iter() + .find(|ctx| Some(ctx.url.as_str()) == i.as_str()) + .and_then(|ctx| ctx.mf2["items"].get(0)) + .or(Some(i)) + .unwrap()) + .collect::<Vec<&serde_json::Value>>()) + }) +} - pub(crate) fn form_to_mf2_json(form: Vec<(String, String)>) -> serde_json::Value { - let mut mf2 = json!({"type": [], "properties": {}}); - for (k, v) in form { - if k == "h" { - mf2["type"] - .as_array_mut() - .unwrap() - .push(json!("h-".to_string() + &v)); - } else if k != "access_token" { - let key = k.strip_suffix("[]").unwrap_or(&k); - match mf2["properties"][key].as_array_mut() { - Some(prop) => prop.push(json!(v)), - None => mf2["properties"][key] = json!([v]), - } - } - } - if mf2["type"].as_array().unwrap().is_empty() { - mf2["type"].as_array_mut().unwrap().push(json!("h-entry")); +#[tracing::instrument(skip(db))] +async fn background_processing<D: 'static + Storage>( + db: D, + mf2: serde_json::Value, + http: reqwest::Client, +) -> () { + // TODO: Post-processing the post (aka second write pass) + // - [x] Download rich reply contexts + // - [ ] Syndicate the post if requested, add links to the syndicated copies + // - [ ] Send WebSub notifications to the hub (if we happen to have one) + // - [x] Send webmentions + + use futures_util::StreamExt; + + let uid: &str = mf2["properties"]["uid"][0].as_str().unwrap(); + + let context_props = ["in-reply-to", "like-of", "repost-of", "bookmark-of"]; + let mut context_urls: Vec<url::Url> = vec![]; + for prop in &context_props { + if let Some(array) = mf2["properties"][prop].as_array() { + context_urls.extend( + array + .iter() + .filter_map(|v| v.as_str()) + .filter_map(|v| v.parse::<url::Url>().ok()), + ); } - mf2 } + // TODO parse HTML in e-content and add links found here + context_urls.sort_unstable_by_key(|u| u.to_string()); + context_urls.dedup(); + + // TODO: Make a stream to fetch all these posts and convert them to MF2 + let post_contexts = { + let http = &http; + tokio_stream::iter(context_urls.into_iter()) + .then(move |url: url::Url| http.get(url).send()) + .filter_map(|response| futures::future::ready(response.ok())) + .filter(|response| futures::future::ready(response.status() == 200)) + .filter_map(|response: reqwest::Response| async move { + // 1. We need to preserve the URL + // 2. We need to get the HTML for MF2 processing + // 3. We need to get the webmention endpoint address + // All of that can be done in one go. + let url = response.url().clone(); + // TODO parse link headers + let links = response + .headers() + .get_all(hyper::http::header::LINK) + .iter() + .cloned() + .collect::<Vec<hyper::http::HeaderValue>>(); + let html = response.text().await; + if html.is_err() { + return None; + } + let html = html.unwrap(); + let mf2 = microformats::from_html(&html, url.clone()).unwrap(); + // TODO use first Link: header if available + let webmention: Option<url::Url> = mf2 + .rels + .by_rels() + .get("webmention") + .and_then(|i| i.first().cloned()); + + dbg!(Some(FetchedPostContext { + url, + mf2: serde_json::to_value(mf2).unwrap(), + webmention + })) + }) + .collect::<Vec<FetchedPostContext>>() + .await + }; - #[cfg(test)] - mod tests { - use serde_json::json; - #[test] - fn test_form_to_mf2() { - assert_eq!( - super::form_to_mf2_json( - serde_urlencoded::from_str( - "h=entry&content=something%20interesting" - ).unwrap() - ), - json!({ - "type": ["h-entry"], - "properties": { - "content": ["something interesting"] - } - }) - ) + let mut update = json!({ "replace": {} }); + for prop in &context_props { + if let Some(json) = populate_reply_context(&mf2, prop, &post_contexts) { + update["replace"][prop] = json; + } + } + if !update["replace"].as_object().unwrap().is_empty() { + if let Err(err) = db.update_post(uid, update).await { + error!("Failed to update post with rich reply contexts: {}", err); } } -} -#[derive(Debug)] -struct FetchedPostContext { - url: url::Url, - mf2: serde_json::Value, - webmention: Option<url::Url> -} + // At this point we can start syndicating the post. + // Currently we don't really support any syndication endpoints, but still! + /*if let Some(syndicate_to) = mf2["properties"]["mp-syndicate-to"].as_array() { + let http = &http; + tokio_stream::iter(syndicate_to) + .filter_map(|i| futures::future::ready(i.as_str())) + .for_each_concurrent(3, |s: &str| async move { + #[allow(clippy::match_single_binding)] + match s { + _ => { + todo!("Syndicate to generic webmention-aware service {}", s); + } + // TODO special handling for non-webmention-aware services like the birdsite + } + }) + .await; + }*/ -fn populate_reply_context(mf2: &serde_json::Value, prop: &str, ctxs: &[FetchedPostContext]) -> Option<serde_json::Value> { - if mf2["properties"][prop].is_array() { - Some(json!( - mf2["properties"][prop] - .as_array() - // Safe to unwrap because we checked its existence and type - // And it's not like we can make it disappear without unsafe code - .unwrap() - .iter() - // This seems to be O(n^2) and I don't like it. - // Nevertheless, I lack required knowledge to optimize it. Also, it works, so... - .map(|i| ctxs.iter() - .find(|ctx| Some(ctx.url.as_str()) == i.as_str()) - .and_then(|ctx| ctx.mf2["items"].get(0)) - .or(Some(i)) - .unwrap()) - .collect::<Vec<&serde_json::Value>>() - )) - } else { - None + { + let http = &http; + tokio_stream::iter( + post_contexts + .into_iter() + .filter(|ctx| ctx.webmention.is_some()), + ) + .for_each_concurrent(2, |ctx| async move { + let mut map = std::collections::HashMap::new(); + map.insert("source", uid); + map.insert("target", ctx.url.as_str()); + + match http + .post(ctx.webmention.unwrap().clone()) + .form(&map) + .send() + .await + { + Ok(res) => { + if !res.status().is_success() { + warn!( + "Failed to send a webmention for {}: got HTTP {}", + ctx.url, + res.status() + ); + } else { + info!( + "Sent a webmention to {}, got HTTP {}", + ctx.url, + res.status() + ) + } + } + Err(err) => warn!("Failed to send a webmention for {}: {}", ctx.url, err), + } + }) + .await; } } @@ -193,8 +297,8 @@ pub(crate) async fn _post<D: 'static + Storage>( uid: String, mf2: serde_json::Value, db: D, - http: reqwest::Client -) -> Result<impl warp::Reply, MicropubError> { + http: reqwest::Client, +) -> Result<Response, MicropubError> { // Here, we have the following guarantees: // - The user is the same user for this host (guaranteed by ensure_same_user) // - The MF2-JSON document is normalized (guaranteed by normalize_mf2)\ @@ -205,24 +309,26 @@ pub(crate) async fn _post<D: 'static + Storage>( // - The MF2-JSON document's target channels are set // - The MF2-JSON document's author is set - // Security check! Do we have an oAuth2 scope to proceed? + // Security check! Do we have an OAuth2 scope to proceed? if !user.check_scope("create") { return Err(MicropubError { error: ErrorType::InvalidScope, - error_description: "Not enough privileges - try acquiring the \"create\" scope.".to_owned() + error_description: "Not enough privileges - try acquiring the \"create\" scope." + .to_owned(), }); } // Security check #2! Are we posting to our own website? - if !uid.starts_with(user.me.as_str()) || mf2["properties"]["channel"] - .as_array() - .unwrap_or(&vec![]) - .iter() - .any(|url| !url.as_str().unwrap().starts_with(user.me.as_str())) + if !uid.starts_with(user.me.as_str()) + || mf2["properties"]["channel"] + .as_array() + .unwrap_or(&vec![]) + .iter() + .any(|url| !url.as_str().unwrap().starts_with(user.me.as_str())) { return Err(MicropubError { error: ErrorType::Forbidden, - error_description: "You're posting to a website that's not yours.".to_owned() + error_description: "You're posting to a website that's not yours.".to_owned(), }); } @@ -230,7 +336,7 @@ pub(crate) async fn _post<D: 'static + Storage>( if db.post_exists(&uid).await? { return Err(MicropubError { error: ErrorType::AlreadyExists, - error_description: "UID clash was detected, operation aborted.".to_owned() + error_description: "UID clash was detected, operation aborted.".to_owned(), }); } @@ -244,172 +350,55 @@ pub(crate) async fn _post<D: 'static + Storage>( .map(|i| i.as_str().unwrap_or("")) .filter(|i| !i.is_empty()); - let default_channel = user.me.join(post::DEFAULT_CHANNEL_PATH).unwrap().to_string(); - let vcards_channel = user.me.join(post::CONTACTS_CHANNEL_PATH).unwrap().to_string(); - let food_channel = user.me.join(post::FOOD_CHANNEL_PATH).unwrap().to_string(); + let default_channel = user + .me + .join(util::DEFAULT_CHANNEL_PATH) + .unwrap() + .to_string(); + let vcards_channel = user + .me + .join(util::CONTACTS_CHANNEL_PATH) + .unwrap() + .to_string(); + let food_channel = user.me.join(util::FOOD_CHANNEL_PATH).unwrap().to_string(); let default_channels = vec![default_channel, vcards_channel, food_channel]; for chan in &mut channels { if db.post_exists(chan).await? { - db.update_post(chan, json!({"add": {"children": [uid]}})).await?; + db.update_post(chan, json!({"add": {"children": [uid]}})) + .await?; } else if default_channels.iter().any(|i| chan == i) { - post::create_feed(&db, &uid, chan, &user).await?; + util::create_feed(&db, &uid, chan, &user).await?; } else { warn!("Ignoring non-existent channel: {}", chan); } } - let reply = warp::reply::with_status( - warp::reply::with_header( - warp::reply::json(&json!({"location": &uid})), - "Location", &uid - ), - StatusCode::ACCEPTED - ); - - // TODO: Post-processing the post (aka second write pass) - // - [x] Download rich reply contexts - // - [ ] Syndicate the post if requested, add links to the syndicated copies - // - [ ] Send WebSub notifications to the hub (if we happen to have one) - // - [x] Send webmentions - tokio::task::spawn(async move { - use futures_util::StreamExt; - - let uid: &str = mf2["properties"]["uid"][0].as_str().unwrap(); - - let context_props = ["in-reply-to", "like-of", "repost-of", "bookmark-of"]; - let mut context_urls: Vec<url::Url> = vec![]; - for prop in &context_props { - if let Some(array) = mf2["properties"][prop].as_array() { - context_urls.extend( - array - .iter() - .filter_map(|v| v.as_str()) - .filter_map(|v| v.parse::<url::Url>().ok()), - ); - } - } - // TODO parse HTML in e-content and add links found here - context_urls.sort_unstable_by_key(|u| u.to_string()); - context_urls.dedup(); - - // TODO: Make a stream to fetch all these posts and convert them to MF2 - let post_contexts = { - let http = &http; - tokio_stream::iter(context_urls.into_iter()) - .then(move |url: url::Url| http.get(url).send()) - .filter_map(|response| futures::future::ready(response.ok())) - .filter(|response| futures::future::ready(response.status() == 200)) - .filter_map(|response: reqwest::Response| async move { - // 1. We need to preserve the URL - // 2. We need to get the HTML for MF2 processing - // 3. We need to get the webmention endpoint address - // All of that can be done in one go. - let url = response.url().clone(); - // TODO parse link headers - let links = response - .headers() - .get_all(hyper::http::header::LINK) - .iter() - .cloned() - .collect::<Vec<hyper::http::HeaderValue>>(); - let html = response.text().await; - if html.is_err() { - return None; - } - let html = html.unwrap(); - let mf2 = microformats::from_html(&html, url.clone()).unwrap(); - // TODO use first Link: header if available - let webmention: Option<url::Url> = mf2.rels.by_rels().get("webmention") - .and_then(|i| i.first().cloned()); - - dbg!(Some(FetchedPostContext { - url, mf2: serde_json::to_value(mf2).unwrap(), webmention - })) - }) - .collect::<Vec<FetchedPostContext>>() - .await - }; + let reply = IntoResponse::into_response(( + StatusCode::ACCEPTED, + [("Location", uid.as_str())], + () + )); - let mut update = json!({ "replace": {} }); - for prop in &context_props { - if let Some(json) = populate_reply_context(&mf2, prop, &post_contexts) { - update["replace"][prop] = json; - } - } - if !update["replace"].as_object().unwrap().is_empty() { - if let Err(err) = db.update_post(uid, update).await { - error!("Failed to update post with rich reply contexts: {}", err); - } - } + tokio::task::spawn(background_processing(db, mf2, http)); - // At this point we can start syndicating the post. - // Currently we don't really support any syndication endpoints, but still! - /*if let Some(syndicate_to) = mf2["properties"]["mp-syndicate-to"].as_array() { - let http = &http; - tokio_stream::iter(syndicate_to) - .filter_map(|i| futures::future::ready(i.as_str())) - .for_each_concurrent(3, |s: &str| async move { - #[allow(clippy::match_single_binding)] - match s { - _ => { - todo!("Syndicate to generic webmention-aware service {}", s); - } - // TODO special handling for non-webmention-aware services like the birdsite - } - }) - .await; - }*/ - - { - let http = &http; - tokio_stream::iter( - post_contexts.into_iter() - .filter(|ctx| ctx.webmention.is_some())) - .for_each_concurrent(2, |ctx| async move { - let mut map = std::collections::HashMap::new(); - map.insert("source", uid); - map.insert("target", ctx.url.as_str()); - - match http.post(ctx.webmention.unwrap().clone()) - .form(&map) - .send() - .await - { - Ok(res) => { - if !res.status().is_success() { - warn!( - "Failed to send a webmention for {}: got HTTP {}", - ctx.url, res.status() - ); - } else { - info!("Sent a webmention to {}, got HTTP {}", ctx.url, res.status()) - } - }, - Err(err) => warn!("Failed to send a webmention for {}: {}", ctx.url, err) - } - }) - .await; - } - }); - Ok(reply) } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Debug)] #[serde(rename_all = "snake_case")] enum ActionType { Delete, - Update + Update, } #[derive(Serialize, Deserialize)] struct MicropubFormAction { action: ActionType, - url: String + url: String, } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Debug)] struct MicropubAction { action: ActionType, url: String, @@ -418,7 +407,7 @@ struct MicropubAction { #[serde(skip_serializing_if = "Option::is_none")] add: Option<serde_json::Value>, #[serde(skip_serializing_if = "Option::is_none")] - delete: Option<serde_json::Value> + delete: Option<serde_json::Value>, } impl From<MicropubFormAction> for MicropubAction { @@ -426,31 +415,40 @@ impl From<MicropubFormAction> for MicropubAction { Self { action: a.action, url: a.url, - replace: None, add: None, delete: None + replace: None, + add: None, + delete: None, } } } -// TODO perform the requested actions synchronously +#[tracing::instrument(skip(db))] async fn post_action<D: Storage>( action: MicropubAction, db: D, - user: User -) -> Result<impl warp::Reply, MicropubError> { - + user: User, +) -> Result<(), MicropubError> { let uri = if let Ok(uri) = action.url.parse::<hyper::Uri>() { uri } else { return Err(MicropubError { error: ErrorType::InvalidRequest, - error_description: "Your URL doesn't parse properly.".to_owned() + error_description: "Your URL doesn't parse properly.".to_owned(), }); }; - if uri.authority().unwrap() != user.me.as_str().parse::<hyper::Uri>().unwrap().authority().unwrap() { + if uri.authority().unwrap() + != user + .me + .as_str() + .parse::<hyper::Uri>() + .unwrap() + .authority() + .unwrap() + { return Err(MicropubError { error: ErrorType::Forbidden, - error_description: "Don't tamper with others' posts!".to_owned() + error_description: "Don't tamper with others' posts!".to_owned(), }); } @@ -459,17 +457,17 @@ async fn post_action<D: Storage>( if !user.check_scope("delete") { return Err(MicropubError { error: ErrorType::InvalidScope, - error_description: "You need a \"delete\" scope for this.".to_owned() + error_description: "You need a \"delete\" scope for this.".to_owned(), }); } db.delete_post(&action.url).await? - }, + } ActionType::Update => { if !user.check_scope("update") { return Err(MicropubError { error: ErrorType::InvalidScope, - error_description: "You need an \"update\" scope for this.".to_owned() + error_description: "You need an \"update\" scope for this.".to_owned(), }); } @@ -477,146 +475,104 @@ async fn post_action<D: Storage>( &action.url, // Here, unwrapping is safe, because this value // was recently deserialized from JSON already. - serde_json::to_value(&action).unwrap() - ).await? - }, + serde_json::to_value(&action).unwrap(), + ) + .await? + } } - Ok(warp::reply::reply()) + Ok(()) } -async fn check_auth(host: warp::host::Authority, user: User) -> Result<User, warp::Rejection> { - let user_authority = warp::http::Uri::try_from(user.me.as_str()) - .unwrap() - .authority() - .unwrap() - .clone(); - // TODO compare with potential list of allowed websites - // to allow one user to edit several websites with one token - if host != user_authority { - Err(warp::reject::custom(MicropubError::new( - ErrorType::NotAuthorized, - "This user is not authorized to use Micropub on this website." - ))) - } else { - Ok(user) - } +enum PostBody { + Action(MicropubAction), + MF2(serde_json::Value) } -#[cfg(any(not(debug_assertions), test))] -fn ensure_same_user_as_host( - token_endpoint: String, - http: reqwest::Client -) -> impl Filter<Extract = (User,), Error = warp::Rejection> + Clone { - crate::util::require_host() - .and(crate::indieauth::require_token(token_endpoint, http)) - .and_then(check_auth) -} +#[tracing::instrument] +async fn dispatch_body(mut body: BodyStream, content_type: ContentType) -> Result<PostBody, MicropubError> { + let body: Vec<u8> = { + debug!("Buffering body..."); + use tokio_stream::StreamExt; + let mut buf = Vec::default(); -async fn dispatch_post_body( - mut body: impl bytes::Buf, - mimetype: http_types::Mime -) -> Result<Either<MicropubAction, serde_json::Value>, warp::Rejection> { - // Since hyper::common::buf::BufList doesn't implement Clone, we can't use Clone in here - // We have to copy the body. Ugh!!! - // so much for zero-copy buffers - let body = { - let mut _body: Vec<u8> = Vec::default(); - while body.has_remaining() { - _body.extend(body.chunk()); - body.advance(body.chunk().len()); + while let Some(chunk) = body.next().await { + buf.extend_from_slice(&chunk.unwrap()) } - _body + + buf }; - match mimetype.essence() { - "application/json" => { - if let Ok(body) = serde_json::from_slice::<MicropubAction>(&body) { - Ok(Either::Left(body)) - } else if let Ok(body) = serde_json::from_slice::<serde_json::Value>(&body) { - // quick sanity check - if !body.is_object() || !body["type"].is_array() { - return Err(MicropubError { - error: ErrorType::InvalidRequest, - error_description: "Invalid MF2-JSON detected: `.` should be an object, `.type` should be an array of MF2 types".to_owned() - }.into()) - } - Ok(Either::Right(body)) - } else { - Err(MicropubError { + + debug!("Content-Type: {:?}", content_type); + if content_type == ContentType::json() { + if let Ok(action) = serde_json::from_slice::<MicropubAction>(&body) { + Ok(PostBody::Action(action)) + } else if let Ok(body) = serde_json::from_slice::<serde_json::Value>(&body) { + // quick sanity check + if !body.is_object() || !body["type"].is_array() { + return Err(MicropubError { error: ErrorType::InvalidRequest, - error_description: "Invalid JSON object passed.".to_owned() - }.into()) + error_description: "Invalid MF2-JSON detected: `.` should be an object, `.type` should be an array of MF2 types".to_owned() + }); } + + Ok(PostBody::MF2(body)) + } else { + Err(MicropubError { + error: ErrorType::InvalidRequest, + error_description: "Invalid JSON object passed.".to_owned(), + }) + } + } else if content_type == ContentType::form_url_encoded() { + if let Ok(body) = serde_urlencoded::from_bytes::<MicropubFormAction>(&body) { + Ok(PostBody::Action(body.into())) + } else if let Ok(body) = serde_urlencoded::from_bytes::<Vec<(String, String)>>(&body) { + Ok(PostBody::MF2(form_to_mf2_json(body))) + } else { + Err(MicropubError { + error: ErrorType::InvalidRequest, + error_description: "Invalid form-encoded data. Try h=entry&content=Hello!" + .to_owned(), + }) + } + } else { + Err(MicropubError::new( + ErrorType::UnsupportedMediaType, + "This Content-Type is not recognized. Try application/json instead?" + )) + } +} + +#[tracing::instrument(skip(db, http))] +pub async fn post<D: Storage + 'static>( + Extension(db): Extension<D>, + Extension(http): Extension<reqwest::Client>, + user: User, + body: BodyStream, + TypedHeader(content_type): TypedHeader<ContentType> +) -> axum::response::Response { + match dispatch_body(body, content_type).await { + Ok(PostBody::Action(action)) => match post_action(action, db, user).await { + Ok(()) => Response::default(), + Err(err) => err.into_response() }, - "application/x-www-form-urlencoded" => { - if let Ok(body) = serde_urlencoded::from_bytes::<MicropubFormAction>(&body) { - Ok(Either::Left(body.into())) - } else if let Ok(body) = serde_urlencoded::from_bytes::<Vec<(String, String)>>(&body) { - Ok(Either::Right(form_to_mf2_json(body))) - } else { - Err(MicropubError { - error: ErrorType::InvalidRequest, - error_description: "Invalid form-encoded data. Try h=entry&content=Hello!".to_owned() - }.into()) + Ok(PostBody::MF2(mf2)) => { + let (uid, mf2) = normalize_mf2(mf2, &user); + match _post(user, uid, mf2, db, http).await { + Ok(response) => response, + Err(err) => err.into_response() } }, - other => Err(MicropubError { - error: ErrorType::UnsupportedMediaType, - error_description: format!("Unsupported media type: {}. Try application/json?", other) - }.into()) + Err(err) => err.into_response() } } -#[cfg_attr(all(debug_assertions, not(test)), allow(unused_variables))] -pub fn post<D: 'static + Storage>( - db: D, - token_endpoint: String, - http: reqwest::Client -) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone { - let inject_db = warp::any().map(move || db.clone()); - #[cfg(all(debug_assertions, not(test)))] - let ensure_same_user = warp::any().map(|| crate::indieauth::User::new( - "http://localhost:8080/", - "https://quill.p3k.io/", - "create update delete media" - )); - #[cfg(any(not(debug_assertions), test))] - let ensure_same_user = ensure_same_user_as_host(token_endpoint, http.clone()); - - warp::post() - .and(warp::body::content_length_limit(1024 * 512) - .and(warp::body::aggregate()) - .and(warp::header::<http_types::Mime>("Content-Type")) - .and_then(dispatch_post_body)) - .and(inject_db) - .and(warp::any().map(move || http.clone())) - .and(ensure_same_user) - .and_then(|body: Either<MicropubAction, serde_json::Value>, db: D, http: reqwest::Client, user: crate::indieauth::User| async move { - (match body { - Either::Left(action) => { - post_action(action, db, user).await.map(|p| Box::new(p) as Box<dyn warp::Reply>) - }, - Either::Right(post) => { - let (uid, mf2) = post::normalize_mf2(post, &user); - _post(user, uid, mf2, db, http).await.map(|p| Box::new(p) as Box<dyn warp::Reply>) - } - }).map_err(warp::reject::custom) - }) -} - -pub fn options() -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone { - warp::options() - // TODO make it reply with a basic description of Micropub spec - .map(|| warp::reply::json::<Option<()>>(&None)) - .with(warp::reply::with::header("Allow", "GET, POST")) -} - -async fn _query<D: Storage>( - db: D, - query: MicropubQuery, - user: crate::indieauth::User -) -> Box<dyn warp::Reply> { - let user_authority = warp::http::Uri::try_from(user.me.as_str()) +pub async fn query<D: Storage>( + Extension(db): Extension<D>, + Query(query): Query<MicropubQuery>, + user: User +) -> axum::response::Response { + let host = axum::http::Uri::try_from(user.me.as_str()) .unwrap() .authority() .unwrap() @@ -624,18 +580,15 @@ async fn _query<D: Storage>( match query.q { QueryType::Config => { - let channels: Vec<MicropubChannel> = match db.get_channels(user_authority.as_str()).await { + let channels: Vec<MicropubChannel> = match db.get_channels(host.as_str()).await { Ok(chans) => chans, - Err(err) => return Box::new(warp::reply::with_status( - warp::reply::json(&MicropubError::new( - ErrorType::InternalServerError, - &format!("Error fetching channels: {}", err) - )), - StatusCode::INTERNAL_SERVER_ERROR - )) + Err(err) => return MicropubError::new( + ErrorType::InternalServerError, + &format!("Error fetching channels: {}", err) + ).into_response(), }; - Box::new(warp::reply::json(json!({ + axum::response::Json(json!({ "q": [ QueryType::Source, QueryType::Config, @@ -643,149 +596,81 @@ async fn _query<D: Storage>( QueryType::SyndicateTo ], "channels": channels, - "_kittybox_authority": user_authority.as_str(), + "_kittybox_authority": host.as_str(), "syndicate-to": [] - }).as_object().unwrap())) + })).into_response() }, QueryType::Source => { match query.url { Some(url) => { - if warp::http::Uri::try_from(&url).unwrap().authority().unwrap() != &user_authority { - return Box::new(warp::reply::with_status( - warp::reply::json(&MicropubError::new( - ErrorType::NotAuthorized, - "You are requesting a post from a website that doesn't belong to you." - )), - StatusCode::UNAUTHORIZED - )) + if axum::http::Uri::try_from(&url).unwrap().authority().unwrap() != &host { + return MicropubError::new( + ErrorType::NotAuthorized, + "You are requesting a post from a website that doesn't belong to you." + ).into_response() } match db.get_post(&url).await { Ok(some) => match some { - Some(post) => Box::new(warp::reply::json(&post)), - None => Box::new(warp::reply::with_status( - warp::reply::json(&MicropubError::new( - ErrorType::NotFound, - "The specified MF2 object was not found in database." - )), - StatusCode::NOT_FOUND - )) + Some(post) => axum::response::Json(&post).into_response(), + None => MicropubError::new( + ErrorType::NotFound, + "The specified MF2 object was not found in database." + ).into_response() }, - Err(err) => { - Box::new(warp::reply::json(&MicropubError::new( - ErrorType::InternalServerError, - &format!("Backend error: {}", err) - ))) - } + Err(err) => MicropubError::new( + ErrorType::InternalServerError, + &format!("Backend error: {}", err) + ).into_response() } }, None => { // Here, one should probably attempt to query at least the main feed and collect posts // Using a pre-made query function can't be done because it does unneeded filtering // Don't implement for now, this is optional - Box::new(warp::reply::with_status( - warp::reply::json(&MicropubError::new( - ErrorType::InvalidRequest, - "Querying for post list is not implemented yet." - )), - StatusCode::BAD_REQUEST - )) + MicropubError::new( + ErrorType::InvalidRequest, + "Querying for post list is not implemented yet." + ).into_response() } } }, QueryType::Channel => { - let channels: Vec<MicropubChannel> = match db.get_channels(user_authority.as_str()).await { - Ok(chans) => chans, - Err(err) => return Box::new(warp::reply::with_status( - warp::reply::json(&MicropubError::new( - ErrorType::InternalServerError, - &format!("Error fetching channels: {}", err) - )), - StatusCode::INTERNAL_SERVER_ERROR - )) - }; - - Box::new(warp::reply::json(&json!({ "channels": channels }))) + match db.get_channels(host.as_str()).await { + Ok(chans) => axum::response::Json(json!({"channels": chans})).into_response(), + Err(err) => MicropubError::new( + ErrorType::InternalServerError, + &format!("Error fetching channels: {}", err) + ).into_response() + } }, QueryType::SyndicateTo => { - Box::new(warp::reply::json(&json!({ "syndicate-to": [] }))) + axum::response::Json(json!({ "syndicate-to": [] })).into_response() } } } -pub fn query<D: Storage>( - db: D, - token_endpoint: String, - http: reqwest::Client -) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone { - warp::get() - .map(move || db.clone()) - .and(warp::query::<MicropubQuery>()) - .and(crate::util::require_host() - .and(crate::indieauth::require_token(token_endpoint, http)) - .and_then(check_auth)) - .then(_query) - .recover(|e: warp::Rejection| async move { - if let Some(err) = e.find::<MicropubError>() { - Ok(warp::reply::json(err)) - } else { - Err(e) - } - }) -} - -pub async fn recover(err: Rejection) -> Result<impl warp::Reply, Infallible> { - if let Some(error) = err.find::<MicropubError>() { - return Ok(warp::reply::with_status(warp::reply::json(&error), error.into())) - } - let error = if err.find::<InvalidQuery>().is_some() { - MicropubError::new( - ErrorType::InvalidRequest, - "Invalid query parameters sent. Try ?q=config to see what you can do." - ) - } else { - log::error!("Unhandled rejection: {:?}", err); - MicropubError::new( - ErrorType::InternalServerError, - &format!("Unknown error: {:?}", err) - ) - }; - - Ok(warp::reply::with_status(warp::reply::json(&error), error.into())) -} - -pub fn micropub<D: 'static + Storage>( - db: D, - token_endpoint: String, - http: reqwest::Client -) -> impl Filter<Extract = (impl warp::Reply,), Error = Infallible> + Clone { - query(db.clone(), token_endpoint.clone(), http.clone()) - .or(post(db, token_endpoint, http)) - .or(options()) - .recover(recover) -} #[cfg(test)] #[allow(dead_code)] impl MicropubQuery { fn config() -> Self { Self { q: QueryType::Config, - url: None + url: None, } } fn source(url: &str) -> Self { Self { q: QueryType::Source, - url: Some(url.to_owned()) + url: Some(url.to_owned()), } } } #[cfg(test)] mod tests { - use hyper::body::HttpBody; use crate::{database::Storage, micropub::MicropubError}; - use warp::{Filter, Reply}; + use hyper::body::HttpBody; use serde_json::json; use super::FetchedPostContext; @@ -814,17 +699,11 @@ mod tests { "content": ["This is a post which was reacted to."] } }); - let reply_contexts = vec![ - FetchedPostContext { - url: "https://fireburn.ru/posts/example".parse().unwrap(), - mf2: json!({ - "items": [ - test_ctx - ] - }), - webmention: None - } - ]; + let reply_contexts = vec![FetchedPostContext { + url: "https://fireburn.ru/posts/example".parse().unwrap(), + mf2: json!({ "items": [test_ctx] }), + webmention: None, + }]; let like_of = super::populate_reply_context(&mf2, "like-of", &reply_contexts).unwrap(); @@ -834,84 +713,82 @@ mod tests { } #[tokio::test] - async fn check_post_reject_scope() { - let inject_db = { - let db = crate::database::MemoryStorage::new(); - - move || db.clone() - }; - let db = inject_db(); - - let res = warp::test::request() - .filter(&warp::any() - .map(inject_db) - .and_then(move |db| async move { - let post = json!({ - "type": ["h-entry"], - "properties": { - "content": ["Hello world!"] - } - }); - let user = crate::indieauth::User::new( - "https://localhost:8080/", - "https://kittybox.fireburn.ru/", - "profile" - ); - let (uid, mf2) = super::post::normalize_mf2(post, &user); + async fn test_post_reject_scope() { + let db = crate::database::MemoryStorage::new(); - super::_post( - user, uid, mf2, db, reqwest::Client::new() - ).await.map_err(warp::reject::custom) - }) - ) - .await - .map(|_| panic!("Tried to do something with a reply!")) - .unwrap_err(); + let post = json!({ + "type": ["h-entry"], + "properties": { + "content": ["Hello world!"] + } + }); + let user = crate::indieauth::User::new( + "https://localhost:8080/", + "https://kittybox.fireburn.ru/", + "profile" + ); + let (uid, mf2) = super::normalize_mf2(post, &user); + + let err = super::_post( + user, uid, mf2, db.clone(), reqwest::Client::new() + ).await.unwrap_err(); - if let Some(err) = res.find::<MicropubError>() { - assert_eq!(err.error, super::ErrorType::InvalidScope); - } else { - panic!("Did not return MicropubError"); - } + assert_eq!(err.error, super::ErrorType::InvalidScope); + + let hashmap = db.mapping.read().await; + assert!(hashmap.is_empty()); + } + + #[tokio::test] + async fn test_post_reject_different_user() { + let db = crate::database::MemoryStorage::new(); + + let post = json!({ + "type": ["h-entry"], + "properties": { + "content": ["Hello world!"], + "uid": ["https://fireburn.ru/posts/hello"], + "url": ["https://fireburn.ru/posts/hello"] + } + }); + let user = crate::indieauth::User::new( + "https://aaronparecki.com/", + "https://kittybox.fireburn.ru/", + "create update media" + ); + let (uid, mf2) = super::normalize_mf2(post, &user); + + let err = super::_post( + user, uid, mf2, db.clone(), reqwest::Client::new() + ).await.unwrap_err(); + assert_eq!(err.error, super::ErrorType::Forbidden); + let hashmap = db.mapping.read().await; assert!(hashmap.is_empty()); } + #[tokio::test] - async fn check_post_mf2() { - let inject_db = { - let db = crate::database::MemoryStorage::new(); + async fn test_post_mf2() { + let db = crate::database::MemoryStorage::new(); - move || db.clone() - }; - let db = inject_db(); - - let res = warp::test::request() - .filter(&warp::any() - .map(inject_db) - .and_then(move |db| async move { - let post = json!({ - "type": ["h-entry"], - "properties": { - "content": ["Hello world!"] - } - }); - let user = crate::indieauth::User::new( - "https://localhost:8080/", - "https://kittybox.fireburn.ru/", - "create" - ); - let (uid, mf2) = super::post::normalize_mf2(post, &user); + let post = json!({ + "type": ["h-entry"], + "properties": { + "content": ["Hello world!"] + } + }); + let user = crate::indieauth::User::new( + "https://localhost:8080/", + "https://kittybox.fireburn.ru/", + "create" + ); + let (uid, mf2) = super::normalize_mf2(post, &user); - super::_post( - user, uid, mf2, db, reqwest::Client::new() - ).await.map_err(warp::reject::custom) - }) - ) - .await - .unwrap() - .into_response(); + let res = super::_post( + user, uid, mf2, db.clone(), reqwest::Client::new() + ).await.unwrap(); assert!(res.headers().contains_key("Location")); let location = res.headers().get("Location").unwrap(); @@ -920,40 +797,16 @@ mod tests { } #[tokio::test] - async fn test_check_auth() { - let err = warp::test::request() - .filter(&warp::any() - .map(|| ( - warp::host::Authority::from_static("aaronparecki.com"), - crate::indieauth::User::new( - "https://fireburn.ru/", - "https://quill.p3k.io/", - "create update media" - ))) - .untuple_one() - .and_then(super::check_auth)) - .await - .unwrap_err(); - - let json: &MicropubError = err.find::<MicropubError>().unwrap(); - assert_eq!(json.error, super::ErrorType::NotAuthorized); - } - - #[tokio::test] async fn test_query_foreign_url() { - let mut res = warp::test::request() - .filter(&warp::any().then(|| super::_query( - crate::database::MemoryStorage::new(), - super::MicropubQuery::source("https://aaronparecki.com/feeds/main"), - crate::indieauth::User::new( - "https://fireburn.ru/", - "https://quill.p3k.io/", - "create update media" - ) - ))) - .await - .unwrap() - .into_response(); + let mut res = super::query( + axum::Extension(crate::database::MemoryStorage::new()), + axum::extract::Query(super::MicropubQuery::source("https://aaronparecki.com/feeds/main")), + crate::indieauth::User::new( + "https://fireburn.ru/", + "https://quill.p3k.io/", + "create update media" + ) + ).await; assert_eq!(res.status(), 401); let body = res.body_mut().data().await.unwrap().unwrap(); @@ -961,4 +814,3 @@ mod tests { assert_eq!(json.error, super::ErrorType::NotAuthorized); } } - |