use std::convert::Infallible; use std::fmt::Display; use either::Either; use log::{info, warn, error}; use warp::http::StatusCode; use warp::{Filter, Rejection, reject::InvalidQuery}; use serde_json::json; use serde::{Serialize, Deserialize}; use crate::database::{MicropubChannel, Storage, StorageError}; use crate::indieauth::User; use crate::micropub::util::form_to_mf2_json; #[derive(Serialize, Deserialize, Debug, PartialEq)] #[serde(rename_all = "kebab-case")] enum QueryType { Source, Config, Channel, SyndicateTo } #[derive(Serialize, Deserialize, Debug)] struct MicropubQuery { q: QueryType, url: Option } #[derive(Serialize, Deserialize, PartialEq, Debug)] #[serde(rename_all = "snake_case")] enum ErrorType { AlreadyExists, Forbidden, InternalServerError, InvalidRequest, InvalidScope, NotAuthorized, NotFound, UnsupportedMediaType } #[derive(Serialize, Deserialize, Debug)] pub(crate) struct MicropubError { error: ErrorType, error_description: String } impl From for MicropubError { fn from(err: StorageError) -> Self { Self { error: match err.kind() { crate::database::ErrorKind::NotFound => ErrorType::NotFound, _ => ErrorType::InternalServerError }, error_description: format!("Backend error: {}", err) } } } impl std::error::Error for MicropubError {} impl Display for MicropubError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.write_str("Micropub error: ")?; f.write_str(&self.error_description) } } impl From<&MicropubError> for StatusCode { fn from(err: &MicropubError) -> Self { use ErrorType::*; match err.error { AlreadyExists => StatusCode::CONFLICT, Forbidden => StatusCode::FORBIDDEN, InternalServerError => StatusCode::INTERNAL_SERVER_ERROR, InvalidRequest => StatusCode::BAD_REQUEST, InvalidScope => StatusCode::UNAUTHORIZED, NotAuthorized => StatusCode::UNAUTHORIZED, NotFound => StatusCode::NOT_FOUND, UnsupportedMediaType => StatusCode::UNSUPPORTED_MEDIA_TYPE, } } } impl From for StatusCode { fn from(err: MicropubError) -> Self { (&err).into() } } impl From for MicropubError { fn from(err: serde_json::Error) -> Self { use ErrorType::*; Self { error: InvalidRequest, error_description: err.to_string() } } } impl MicropubError { fn new(error: ErrorType, error_description: &str) -> Self { Self { error, error_description: error_description.to_owned() } } } impl warp::reject::Reject for MicropubError {} mod post; pub(crate) use post::normalize_mf2; mod util { use serde_json::json; pub(crate) fn form_to_mf2_json(form: Vec<(String, String)>) -> serde_json::Value { let mut mf2 = json!({"type": [], "properties": {}}); for (k, v) in form { if k == "h" { mf2["type"] .as_array_mut() .unwrap() .push(json!("h-".to_string() + &v)); } else if k != "access_token" { let key = k.strip_suffix("[]").unwrap_or(&k); match mf2["properties"][key].as_array_mut() { Some(prop) => prop.push(json!(v)), None => mf2["properties"][key] = json!([v]), } } } if mf2["type"].as_array().unwrap().is_empty() { mf2["type"].as_array_mut().unwrap().push(json!("h-entry")); } mf2 } #[cfg(test)] mod tests { use serde_json::json; #[test] fn test_form_to_mf2() { assert_eq!( super::form_to_mf2_json( serde_urlencoded::from_str( "h=entry&content=something%20interesting" ).unwrap() ), json!({ "type": ["h-entry"], "properties": { "content": ["something interesting"] } }) ) } } } #[derive(Debug)] struct FetchedPostContext { url: url::Url, mf2: serde_json::Value, webmention: Option } fn populate_reply_context(mf2: &serde_json::Value, prop: &str, ctxs: &[FetchedPostContext]) -> Option { if mf2["properties"][prop].is_array() { Some(json!( mf2["properties"][prop] .as_array() // Safe to unwrap because we checked its existence and type // And it's not like we can make it disappear without unsafe code .unwrap() .iter() // This seems to be O(n^2) and I don't like it. // Nevertheless, I lack required knowledge to optimize it. Also, it works, so... .map(|i| ctxs.iter() .find(|ctx| Some(ctx.url.as_str()) == i.as_str()) .and_then(|ctx| ctx.mf2["items"].get(0)) .or(Some(i)) .unwrap()) .collect::>() )) } else { None } } // TODO actually save the post to the database and schedule post-processing pub(crate) async fn _post( user: crate::indieauth::User, uid: String, mf2: serde_json::Value, db: D, http: reqwest::Client ) -> Result { // Here, we have the following guarantees: // - The user is the same user for this host (guaranteed by ensure_same_user) // - The MF2-JSON document is normalized (guaranteed by normalize_mf2)\ // - The MF2-JSON document contains a UID // - The MF2-JSON document's URL list contains its UID // - The MF2-JSON document's "content" field contains an HTML blob, if present // - The MF2-JSON document's publishing datetime is present // - The MF2-JSON document's target channels are set // - The MF2-JSON document's author is set // Security check! Do we have an oAuth2 scope to proceed? if !user.check_scope("create") { return Err(MicropubError { error: ErrorType::InvalidScope, error_description: "Not enough privileges - try acquiring the \"create\" scope.".to_owned() }); } // Security check #2! Are we posting to our own website? if !uid.starts_with(user.me.as_str()) || mf2["properties"]["channel"] .as_array() .unwrap_or(&vec![]) .iter() .any(|url| !url.as_str().unwrap().starts_with(user.me.as_str())) { return Err(MicropubError { error: ErrorType::Forbidden, error_description: "You're posting to a website that's not yours.".to_owned() }); } // Security check #3! Are we overwriting an existing document? if db.post_exists(&uid).await? { return Err(MicropubError { error: ErrorType::AlreadyExists, error_description: "UID clash was detected, operation aborted.".to_owned() }); } // Save the post db.put_post(&mf2, user.me.as_str()).await?; let mut channels = mf2["properties"]["channel"] .as_array() .unwrap() .iter() .map(|i| i.as_str().unwrap_or("")) .filter(|i| !i.is_empty()); let default_channel = user.me.join(post::DEFAULT_CHANNEL_PATH).unwrap().to_string(); let vcards_channel = user.me.join(post::CONTACTS_CHANNEL_PATH).unwrap().to_string(); let food_channel = user.me.join(post::FOOD_CHANNEL_PATH).unwrap().to_string(); let default_channels = vec![default_channel, vcards_channel, food_channel]; for chan in &mut channels { if db.post_exists(chan).await? { db.update_post(chan, json!({"add": {"children": [uid]}})).await?; } else if default_channels.iter().any(|i| chan == i) { post::create_feed(&db, &uid, chan, &user).await?; } else { warn!("Ignoring non-existent channel: {}", chan); } } let reply = warp::reply::with_status( warp::reply::with_header( warp::reply::json(&json!({"location": &uid})), "Location", &uid ), StatusCode::ACCEPTED ); // TODO: Post-processing the post (aka second write pass) // - [x] Download rich reply contexts // - [ ] Syndicate the post if requested, add links to the syndicated copies // - [ ] Send WebSub notifications to the hub (if we happen to have one) // - [x] Send webmentions tokio::task::spawn(async move { use futures_util::StreamExt; let uid: &str = mf2["properties"]["uid"][0].as_str().unwrap(); let context_props = ["in-reply-to", "like-of", "repost-of", "bookmark-of"]; let mut context_urls: Vec = vec![]; for prop in &context_props { if let Some(array) = mf2["properties"][prop].as_array() { context_urls.extend( array .iter() .filter_map(|v| v.as_str()) .filter_map(|v| v.parse::().ok()), ); } } // TODO parse HTML in e-content and add links found here context_urls.sort_unstable_by_key(|u| u.to_string()); context_urls.dedup(); // TODO: Make a stream to fetch all these posts and convert them to MF2 let post_contexts = { let http = &http; tokio_stream::iter(context_urls.into_iter()) .then(move |url: url::Url| http.get(url).send()) .filter_map(|response| futures::future::ready(response.ok())) .filter(|response| futures::future::ready(response.status() == 200)) .filter_map(|response: reqwest::Response| async move { // 1. We need to preserve the URL // 2. We need to get the HTML for MF2 processing // 3. We need to get the webmention endpoint address // All of that can be done in one go. let url = response.url().clone(); // TODO parse link headers let links = response .headers() .get_all(hyper::http::header::LINK) .iter() .cloned() .collect::>(); let html = response.text().await; if html.is_err() { return None; } let html = html.unwrap(); let mf2 = microformats::from_html(&html, url.clone()).unwrap(); // TODO use first Link: header if available let webmention: Option = mf2.rels.by_rels().get("webmention") .and_then(|i| i.first().cloned()); dbg!(Some(FetchedPostContext { url, mf2: serde_json::to_value(mf2).unwrap(), webmention })) }) .collect::>() .await }; let mut update = json!({ "replace": {} }); for prop in &context_props { if let Some(json) = populate_reply_context(&mf2, prop, &post_contexts) { update["replace"][prop] = json; } } if !update["replace"].as_object().unwrap().is_empty() { if let Err(err) = db.update_post(uid, update).await { error!("Failed to update post with rich reply contexts: {}", err); } } // At this point we can start syndicating the post. // Currently we don't really support any syndication endpoints, but still! /*if let Some(syndicate_to) = mf2["properties"]["mp-syndicate-to"].as_array() { let http = &http; tokio_stream::iter(syndicate_to) .filter_map(|i| futures::future::ready(i.as_str())) .for_each_concurrent(3, |s: &str| async move { #[allow(clippy::match_single_binding)] match s { _ => { todo!("Syndicate to generic webmention-aware service {}", s); } // TODO special handling for non-webmention-aware services like the birdsite } }) .await; }*/ { let http = &http; tokio_stream::iter( post_contexts.into_iter() .filter(|ctx| ctx.webmention.is_some())) .for_each_concurrent(2, |ctx| async move { let mut map = std::collections::HashMap::new(); map.insert("source", uid); map.insert("target", ctx.url.as_str()); match http.post(ctx.webmention.unwrap().clone()) .form(&map) .send() .await { Ok(res) => { if !res.status().is_success() { warn!( "Failed to send a webmention for {}: got HTTP {}", ctx.url, res.status() ); } else { info!("Sent a webmention to {}, got HTTP {}", ctx.url, res.status()) } }, Err(err) => warn!("Failed to send a webmention for {}: {}", ctx.url, err) } }) .await; } }); Ok(reply) } #[derive(Serialize, Deserialize)] #[serde(rename_all = "snake_case")] enum ActionType { Delete, Update } #[derive(Serialize, Deserialize)] struct MicropubFormAction { action: ActionType, url: String } #[derive(Serialize, Deserialize)] struct MicropubAction { action: ActionType, url: String, #[serde(skip_serializing_if = "Option::is_none")] replace: Option, #[serde(skip_serializing_if = "Option::is_none")] add: Option, #[serde(skip_serializing_if = "Option::is_none")] delete: Option } impl From for MicropubAction { fn from(a: MicropubFormAction) -> Self { Self { action: a.action, url: a.url, replace: None, add: None, delete: None } } } // TODO perform the requested actions synchronously async fn post_action( action: MicropubAction, db: D, user: User ) -> Result { let uri = if let Ok(uri) = action.url.parse::() { uri } else { return Err(MicropubError { error: ErrorType::InvalidRequest, error_description: "Your URL doesn't parse properly.".to_owned() }); }; if uri.authority().unwrap() != user.me.as_str().parse::().unwrap().authority().unwrap() { return Err(MicropubError { error: ErrorType::Forbidden, error_description: "Don't tamper with others' posts!".to_owned() }); } match action.action { ActionType::Delete => { if !user.check_scope("delete") { return Err(MicropubError { error: ErrorType::InvalidScope, error_description: "You need a \"delete\" scope for this.".to_owned() }); } db.delete_post(&action.url).await? }, ActionType::Update => { if !user.check_scope("update") { return Err(MicropubError { error: ErrorType::InvalidScope, error_description: "You need an \"update\" scope for this.".to_owned() }); } db.update_post( &action.url, // Here, unwrapping is safe, because this value // was recently deserialized from JSON already. serde_json::to_value(&action).unwrap() ).await? }, } Ok(warp::reply::reply()) } async fn check_auth(host: warp::host::Authority, user: User) -> Result { let user_authority = warp::http::Uri::try_from(user.me.as_str()) .unwrap() .authority() .unwrap() .clone(); // TODO compare with potential list of allowed websites // to allow one user to edit several websites with one token if host != user_authority { Err(warp::reject::custom(MicropubError::new( ErrorType::NotAuthorized, "This user is not authorized to use Micropub on this website." ))) } else { Ok(user) } } #[cfg(any(not(debug_assertions), test))] fn ensure_same_user_as_host( token_endpoint: String, http: reqwest::Client ) -> impl Filter + Clone { crate::util::require_host() .and(crate::indieauth::require_token(token_endpoint, http)) .and_then(check_auth) } async fn dispatch_post_body( mut body: impl bytes::Buf, mimetype: http_types::Mime ) -> Result, warp::Rejection> { // Since hyper::common::buf::BufList doesn't implement Clone, we can't use Clone in here // We have to copy the body. Ugh!!! // so much for zero-copy buffers let body = { let mut _body: Vec = Vec::default(); while body.has_remaining() { _body.extend(body.chunk()); body.advance(body.chunk().len()); } _body }; match mimetype.essence() { "application/json" => { if let Ok(body) = serde_json::from_slice::(&body) { Ok(Either::Left(body)) } else if let Ok(body) = serde_json::from_slice::(&body) { // quick sanity check if !body.is_object() || !body["type"].is_array() { return Err(MicropubError { error: ErrorType::InvalidRequest, error_description: "Invalid MF2-JSON detected: `.` should be an object, `.type` should be an array of MF2 types".to_owned() }.into()) } Ok(Either::Right(body)) } else { Err(MicropubError { error: ErrorType::InvalidRequest, error_description: "Invalid JSON object passed.".to_owned() }.into()) } }, "application/x-www-form-urlencoded" => { if let Ok(body) = serde_urlencoded::from_bytes::(&body) { Ok(Either::Left(body.into())) } else if let Ok(body) = serde_urlencoded::from_bytes::>(&body) { Ok(Either::Right(form_to_mf2_json(body))) } else { Err(MicropubError { error: ErrorType::InvalidRequest, error_description: "Invalid form-encoded data. Try h=entry&content=Hello!".to_owned() }.into()) } }, other => Err(MicropubError { error: ErrorType::UnsupportedMediaType, error_description: format!("Unsupported media type: {}. Try application/json?", other) }.into()) } } #[cfg_attr(all(debug_assertions, not(test)), allow(unused_variables))] pub fn post( db: D, token_endpoint: String, http: reqwest::Client ) -> impl Filter + Clone { let inject_db = warp::any().map(move || db.clone()); #[cfg(all(debug_assertions, not(test)))] let ensure_same_user = warp::any().map(|| crate::indieauth::User::new( "http://localhost:8080/", "https://quill.p3k.io/", "create update delete media" )); #[cfg(any(not(debug_assertions), test))] let ensure_same_user = ensure_same_user_as_host(token_endpoint, http.clone()); warp::post() .and(warp::body::content_length_limit(1024 * 512) .and(warp::body::aggregate()) .and(warp::header::("Content-Type")) .and_then(dispatch_post_body)) .and(inject_db) .and(warp::any().map(move || http.clone())) .and(ensure_same_user) .and_then(|body: Either, db: D, http: reqwest::Client, user: crate::indieauth::User| async move { (match body { Either::Left(action) => { post_action(action, db, user).await.map(|p| Box::new(p) as Box) }, Either::Right(post) => { let (uid, mf2) = post::normalize_mf2(post, &user); _post(user, uid, mf2, db, http).await.map(|p| Box::new(p) as Box) } }).map_err(warp::reject::custom) }) } pub fn options() -> impl Filter + Clone { warp::options() // TODO make it reply with a basic description of Micropub spec .map(|| warp::reply::json::>(&None)) .with(warp::reply::with::header("Allow", "GET, POST")) } async fn _query( db: D, query: MicropubQuery, user: crate::indieauth::User ) -> Box { let user_authority = warp::http::Uri::try_from(user.me.as_str()) .unwrap() .authority() .unwrap() .clone(); match query.q { QueryType::Config => { let channels: Vec = match db.get_channels(user_authority.as_str()).await { Ok(chans) => chans, Err(err) => return Box::new(warp::reply::with_status( warp::reply::json(&MicropubError::new( ErrorType::InternalServerError, &format!("Error fetching channels: {}", err) )), StatusCode::INTERNAL_SERVER_ERROR )) }; Box::new(warp::reply::json(json!({ "q": [ QueryType::Source, QueryType::Config, QueryType::Channel, QueryType::SyndicateTo ], "channels": channels, "_kittybox_authority": user_authority.as_str(), "syndicate-to": [] }).as_object().unwrap())) }, QueryType::Source => { match query.url { Some(url) => { if warp::http::Uri::try_from(&url).unwrap().authority().unwrap() != &user_authority { return Box::new(warp::reply::with_status( warp::reply::json(&MicropubError::new( ErrorType::NotAuthorized, "You are requesting a post from a website that doesn't belong to you." )), StatusCode::UNAUTHORIZED )) } match db.get_post(&url).await { Ok(some) => match some { Some(post) => Box::new(warp::reply::json(&post)), None => Box::new(warp::reply::with_status( warp::reply::json(&MicropubError::new( ErrorType::NotFound, "The specified MF2 object was not found in database." )), StatusCode::NOT_FOUND )) }, Err(err) => { Box::new(warp::reply::json(&MicropubError::new( ErrorType::InternalServerError, &format!("Backend error: {}", err) ))) } } }, None => { // Here, one should probably attempt to query at least the main feed and collect posts // Using a pre-made query function can't be done because it does unneeded filtering // Don't implement for now, this is optional Box::new(warp::reply::with_status( warp::reply::json(&MicropubError::new( ErrorType::InvalidRequest, "Querying for post list is not implemented yet." )), StatusCode::BAD_REQUEST )) } } }, QueryType::Channel => { let channels: Vec = match db.get_channels(user_authority.as_str()).await { Ok(chans) => chans, Err(err) => return Box::new(warp::reply::with_status( warp::reply::json(&MicropubError::new( ErrorType::InternalServerError, &format!("Error fetching channels: {}", err) )), StatusCode::INTERNAL_SERVER_ERROR )) }; Box::new(warp::reply::json(&json!({ "channels": channels }))) }, QueryType::SyndicateTo => { Box::new(warp::reply::json(&json!({ "syndicate-to": [] }))) } } } pub fn query( db: D, token_endpoint: String, http: reqwest::Client ) -> impl Filter + Clone { warp::get() .map(move || db.clone()) .and(warp::query::()) .and(crate::util::require_host() .and(crate::indieauth::require_token(token_endpoint, http)) .and_then(check_auth)) .then(_query) .recover(|e: warp::Rejection| async move { if let Some(err) = e.find::() { Ok(warp::reply::json(err)) } else { Err(e) } }) } pub async fn recover(err: Rejection) -> Result { if let Some(error) = err.find::() { return Ok(warp::reply::with_status(warp::reply::json(&error), error.into())) } let error = if err.find::().is_some() { MicropubError::new( ErrorType::InvalidRequest, "Invalid query parameters sent. Try ?q=config to see what you can do." ) } else { log::error!("Unhandled rejection: {:?}", err); MicropubError::new( ErrorType::InternalServerError, &format!("Unknown error: {:?}", err) ) }; Ok(warp::reply::with_status(warp::reply::json(&error), error.into())) } pub fn micropub( db: D, token_endpoint: String, http: reqwest::Client ) -> impl Filter + Clone { query(db.clone(), token_endpoint.clone(), http.clone()) .or(post(db, token_endpoint, http)) .or(options()) .recover(recover) } #[cfg(test)] #[allow(dead_code)] impl MicropubQuery { fn config() -> Self { Self { q: QueryType::Config, url: None } } fn source(url: &str) -> Self { Self { q: QueryType::Source, url: Some(url.to_owned()) } } } #[cfg(test)] mod tests { use hyper::body::HttpBody; use crate::{database::Storage, micropub::MicropubError}; use warp::{Filter, Reply}; use serde_json::json; use super::FetchedPostContext; #[test] fn test_populate_reply_context() { let already_expanded_reply_ctx = json!({ "type": ["h-entry"], "properties": { "content": ["Hello world!"] } }); let mf2 = json!({ "type": ["h-entry"], "properties": { "like-of": [ "https://fireburn.ru/posts/example", already_expanded_reply_ctx, "https://fireburn.ru/posts/non-existent" ] } }); let test_ctx = json!({ "type": ["h-entry"], "properties": { "content": ["This is a post which was reacted to."] } }); let reply_contexts = vec![ FetchedPostContext { url: "https://fireburn.ru/posts/example".parse().unwrap(), mf2: json!({ "items": [ test_ctx ] }), webmention: None } ]; let like_of = super::populate_reply_context(&mf2, "like-of", &reply_contexts).unwrap(); assert_eq!(like_of[0], test_ctx); assert_eq!(like_of[1], already_expanded_reply_ctx); assert_eq!(like_of[2], "https://fireburn.ru/posts/non-existent"); } #[tokio::test] async fn check_post_reject_scope() { let inject_db = { let db = crate::database::MemoryStorage::new(); move || db.clone() }; let db = inject_db(); let res = warp::test::request() .filter(&warp::any() .map(inject_db) .and_then(move |db| async move { let post = json!({ "type": ["h-entry"], "properties": { "content": ["Hello world!"] } }); let user = crate::indieauth::User::new( "https://localhost:8080/", "https://kittybox.fireburn.ru/", "profile" ); let (uid, mf2) = super::post::normalize_mf2(post, &user); super::_post( user, uid, mf2, db, reqwest::Client::new() ).await.map_err(warp::reject::custom) }) ) .await .map(|_| panic!("Tried to do something with a reply!")) .unwrap_err(); if let Some(err) = res.find::() { assert_eq!(err.error, super::ErrorType::InvalidScope); } else { panic!("Did not return MicropubError"); } let hashmap = db.mapping.read().await; assert!(hashmap.is_empty()); } #[tokio::test] async fn check_post_mf2() { let inject_db = { let db = crate::database::MemoryStorage::new(); move || db.clone() }; let db = inject_db(); let res = warp::test::request() .filter(&warp::any() .map(inject_db) .and_then(move |db| async move { let post = json!({ "type": ["h-entry"], "properties": { "content": ["Hello world!"] } }); let user = crate::indieauth::User::new( "https://localhost:8080/", "https://kittybox.fireburn.ru/", "create" ); let (uid, mf2) = super::post::normalize_mf2(post, &user); super::_post( user, uid, mf2, db, reqwest::Client::new() ).await.map_err(warp::reject::custom) }) ) .await .unwrap() .into_response(); assert!(res.headers().contains_key("Location")); let location = res.headers().get("Location").unwrap(); assert!(db.post_exists(location.to_str().unwrap()).await.unwrap()); assert!(db.post_exists("https://localhost:8080/feeds/main").await.unwrap()); } #[tokio::test] async fn test_check_auth() { let err = warp::test::request() .filter(&warp::any() .map(|| ( warp::host::Authority::from_static("aaronparecki.com"), crate::indieauth::User::new( "https://fireburn.ru/", "https://quill.p3k.io/", "create update media" ))) .untuple_one() .and_then(super::check_auth)) .await .unwrap_err(); let json: &MicropubError = err.find::().unwrap(); assert_eq!(json.error, super::ErrorType::NotAuthorized); } #[tokio::test] async fn test_query_foreign_url() { let mut res = warp::test::request() .filter(&warp::any().then(|| super::_query( crate::database::MemoryStorage::new(), super::MicropubQuery::source("https://aaronparecki.com/feeds/main"), crate::indieauth::User::new( "https://fireburn.ru/", "https://quill.p3k.io/", "create update media" ) ))) .await .unwrap() .into_response(); assert_eq!(res.status(), 401); let body = res.body_mut().data().await.unwrap().unwrap(); let json: MicropubError = serde_json::from_slice(&body as &[u8]).unwrap(); assert_eq!(json.error, super::ErrorType::NotAuthorized); } }