diff options
Diffstat (limited to 'src/micropub/mod.rs')
-rw-r--r-- | src/micropub/mod.rs | 617 |
1 files changed, 550 insertions, 67 deletions
diff --git a/src/micropub/mod.rs b/src/micropub/mod.rs index b81ac17..f3152d7 100644 --- a/src/micropub/mod.rs +++ b/src/micropub/mod.rs @@ -1,10 +1,13 @@ use std::convert::Infallible; - +use either::Either; +use log::warn; use warp::http::StatusCode; use warp::{Filter, Rejection, reject::InvalidQuery}; -use serde_json::{json, Value}; +use serde_json::json; use serde::{Serialize, Deserialize}; -use crate::database::{MicropubChannel, Storage}; +use crate::database::{MicropubChannel, Storage, StorageError}; +use crate::indieauth::User; +use crate::micropub::util::form_to_mf2_json; #[derive(Serialize, Deserialize, Debug, PartialEq)] #[serde(rename_all = "kebab-case")] @@ -24,26 +27,61 @@ struct MicropubQuery { #[derive(Serialize, Deserialize, PartialEq, Debug)] #[serde(rename_all = "snake_case")] enum ErrorType { - InvalidRequest, + AlreadyExists, + Forbidden, InternalServerError, - NotFound, + InvalidRequest, + InvalidScope, NotAuthorized, + NotFound, + UnsupportedMediaType } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Debug)] struct MicropubError { error: ErrorType, error_description: String } -impl From<MicropubError> for StatusCode { - fn from(err: MicropubError) -> Self { +impl From<StorageError> for MicropubError { + fn from(err: StorageError) -> Self { + Self { + error: match err.kind() { + crate::database::ErrorKind::NotFound => ErrorType::NotFound, + _ => ErrorType::InternalServerError + }, + error_description: format!("Backend error: {}", err.to_string()) + } + } +} + +impl From<&MicropubError> for StatusCode { + fn from(err: &MicropubError) -> Self { use ErrorType::*; match err.error { - InvalidRequest => StatusCode::BAD_REQUEST, + AlreadyExists => StatusCode::CONFLICT, + Forbidden => StatusCode::FORBIDDEN, InternalServerError => StatusCode::INTERNAL_SERVER_ERROR, + InvalidRequest => StatusCode::BAD_REQUEST, + InvalidScope => StatusCode::UNAUTHORIZED, + NotAuthorized => StatusCode::UNAUTHORIZED, NotFound => StatusCode::NOT_FOUND, - NotAuthorized => StatusCode::UNAUTHORIZED + UnsupportedMediaType => StatusCode::UNSUPPORTED_MEDIA_TYPE, + } + } +} +impl From<MicropubError> for StatusCode { + fn from(err: MicropubError) -> Self { + (&err).into() + } +} + +impl From<serde_json::Error> for MicropubError { + fn from(err: serde_json::Error) -> Self { + use ErrorType::*; + Self { + error: InvalidRequest, + error_description: err.to_string() } } } @@ -57,8 +95,13 @@ impl MicropubError { } } +impl warp::reject::Reject for MicropubError {} + +mod post; + +#[allow(unused_variables)] pub mod media { - use futures_util::{Stream, StreamExt}; + use futures_util::StreamExt; use bytes::buf::Buf; use warp::{Filter, Rejection, Reply, multipart::{FormData, Part}}; @@ -71,9 +114,7 @@ pub mod media { pub fn options() -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone { warp::options() .map(|| warp::reply::json::<Option<()>>(&None)) - // TODO: why doesn't this work? - // .map(warp::reply::with::header("Allow", "GET, POST")) - .map(|reply| warp::reply::with_header(reply, "Allow", "GET, POST")) + .with(warp::reply::with::header("Allow", "GET, POST")) } pub fn upload() -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone { @@ -108,47 +149,396 @@ pub mod media { } } -async fn _post<D: Storage>(db: D, host: warp::host::Authority, user: crate::indieauth::User) -> impl warp::Reply { - todo!("post to database {:?} for host {:?} using user {:?}", db, host, user); - #[allow(unreachable_code)] - "" +mod util { + use serde_json::json; + + pub(crate) fn form_to_mf2_json(form: Vec<(String, String)>) -> serde_json::Value { + let mut mf2 = json!({"type": [], "properties": {}}); + for (k, v) in form { + if k == "h" { + mf2["type"] + .as_array_mut() + .unwrap() + .push(json!("h-".to_string() + &v)); + } else if k != "access_token" { + let key = k.strip_suffix("[]").unwrap_or(&k); + match mf2["properties"][key].as_array_mut() { + Some(prop) => prop.push(json!(v)), + None => mf2["properties"][key] = json!([v]), + } + } + } + if mf2["type"].as_array().unwrap().is_empty() { + mf2["type"].as_array_mut().unwrap().push(json!("h-entry")); + } + mf2 + } + + #[cfg(test)] + mod tests { + use serde_json::json; + #[test] + fn test_form_to_mf2() { + assert_eq!( + super::form_to_mf2_json( + serde_urlencoded::from_str( + "h=entry&content=something%20interesting" + ).unwrap() + ), + json!({ + "type": ["h-entry"], + "properties": { + "content": ["something interesting"] + } + }) + ) + } + } +} + +// TODO actually save the post to the database and schedule post-processing +async fn _post<D: Storage, T: hyper::client::connect::Connect + Clone + Send + Sync + 'static>( + user: crate::indieauth::User, + uid: String, + mf2: serde_json::Value, + db: D, + http: hyper::Client<T, hyper::Body> +) -> Result<impl warp::Reply, MicropubError> { + // Here, we have the following guarantees: + // - The user is the same user for this host (guaranteed by ensure_same_user) + // - The MF2-JSON document is normalized (guaranteed by normalize_mf2)\ + // - The MF2-JSON document contains a UID + // - The MF2-JSON document's URL list contains its UID + // - The MF2-JSON document's "content" field contains an HTML blob, if present + // - The MF2-JSON document's publishing datetime is present + // - The MF2-JSON document's target channels are set + // - The MF2-JSON document's author is set + + // Security check! Do we have an oAuth2 scope to proceed? + if !user.check_scope("create") { + return Err(MicropubError { + error: ErrorType::InvalidScope, + error_description: "Not enough privileges - try acquiring the \"create\" scope.".to_owned() + }); + } + + // Security check #2! Are we posting to our own website? + if !uid.starts_with(user.me.as_str()) || mf2["properties"]["channel"] + .as_array() + .unwrap_or(&vec![]) + .iter() + .any(|url| !url.as_str().unwrap().starts_with(user.me.as_str())) + { + return Err(MicropubError { + error: ErrorType::Forbidden, + error_description: "You're posting to a website that's not yours.".to_owned() + }); + } + + // Security check #3! Are we overwriting an existing document? + if db.post_exists(&uid).await? { + return Err(MicropubError { + error: ErrorType::AlreadyExists, + error_description: "UID clash was detected, operation aborted.".to_owned() + }); + } + + // Save the post + db.put_post(&mf2, user.me.as_str()).await?; + + let mut channels = mf2["properties"]["channel"] + .as_array() + .unwrap() + .iter() + .map(|i| i.as_str().unwrap_or("")) + .filter(|i| !i.is_empty()); + + let default_channel = user.me.join(post::DEFAULT_CHANNEL_PATH).unwrap().to_string(); + let vcards_channel = user.me.join(post::CONTACTS_CHANNEL_PATH).unwrap().to_string(); + let food_channel = user.me.join(post::FOOD_CHANNEL_PATH).unwrap().to_string(); + let default_channels = vec![default_channel, vcards_channel, food_channel]; + + for chan in &mut channels { + if db.post_exists(chan).await? { + db.update_post(chan, json!({"add": {"children": [uid]}})).await?; + } else if default_channels.iter().any(|i| chan == i) { + post::create_feed(&db, &uid, chan, &user).await?; + } else { + warn!("Ignoring non-existent channel: {}", chan); + } + } + + let reply = warp::reply::with_status( + warp::reply::with_header( + warp::reply::json(&json!({"location": &uid})), + "Location", &uid + ), + StatusCode::ACCEPTED + ); + + // TODO: Post-processing the post (aka second write pass) + // - [-] Download rich reply contexts + // - [-] Syndicate the post if requested, add links to the syndicated copies + // - [ ] Send WebSub notifications to the hub (if we happen to have one) + // - [x] Send webmentions + #[allow(unused_imports)] + tokio::task::spawn(async move { + use hyper::{Uri, Response, Body, body::HttpBody}; + use bytes::{Buf, BufMut}; + use futures_util::StreamExt; + + let mut contextually_significant_posts: Vec<hyper::Uri> = vec![]; + for prop in &["in-reply-to", "like-of", "repost-of", "bookmark-of"] { + if let Some(array) = mf2["properties"][prop].as_array() { + contextually_significant_posts.extend( + array + .iter() + .filter_map(|v| v.as_str().and_then(|v| v.parse::<hyper::Uri>().ok())), + ); + } + } + contextually_significant_posts.sort_unstable_by_key(|u| u.to_string()); + contextually_significant_posts.dedup(); + + // TODO: Make a stream to fetch all these posts and convert them to MF2 + + todo!() + }); + + Ok(reply) +} + +#[derive(Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +enum ActionType { + Delete, + Update +} + +#[derive(Serialize, Deserialize)] +struct MicropubFormAction { + action: ActionType, + url: String +} + +#[derive(Serialize, Deserialize)] +struct MicropubAction { + action: ActionType, + url: String, + #[serde(skip_serializing_if = "Option::is_none")] + replace: Option<serde_json::Value>, + #[serde(skip_serializing_if = "Option::is_none")] + add: Option<serde_json::Value>, + #[serde(skip_serializing_if = "Option::is_none")] + delete: Option<serde_json::Value> } -pub fn post<D: Storage, T>(db: D, token_endpoint: String, http: hyper::Client<T, hyper::Body>) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone +impl From<MicropubFormAction> for MicropubAction { + fn from(a: MicropubFormAction) -> Self { + Self { + action: a.action, + url: a.url, + replace: None, add: None, delete: None + } + } +} + +// TODO perform the requested actions synchronously +async fn post_action<D: Storage>( + action: MicropubAction, + db: D, + user: User +) -> Result<impl warp::Reply, MicropubError> { + + let uri = if let Ok(uri) = action.url.parse::<hyper::Uri>() { + uri + } else { + return Err(MicropubError { + error: ErrorType::InvalidRequest, + error_description: "Your URL doesn't parse properly.".to_owned() + }); + }; + + if uri.authority().unwrap() != user.me.as_str().parse::<hyper::Uri>().unwrap().authority().unwrap() { + return Err(MicropubError { + error: ErrorType::Forbidden, + error_description: "Don't tamper with others' posts!".to_owned() + }); + } + + match action.action { + ActionType::Delete => { + if !user.check_scope("delete") { + return Err(MicropubError { + error: ErrorType::InvalidScope, + error_description: "You need a \"delete\" scope for this.".to_owned() + }); + } + + db.delete_post(&action.url).await? + }, + ActionType::Update => { + if !user.check_scope("update") { + return Err(MicropubError { + error: ErrorType::InvalidScope, + error_description: "You need an \"update\" scope for this.".to_owned() + }); + } + + db.update_post( + &action.url, + // Here, unwrapping is safe, because this value + // was recently deserialized from JSON already. + serde_json::to_value(&action).unwrap() + ).await? + }, + } + + Ok(warp::reply::reply()) +} + +async fn check_auth(host: warp::host::Authority, user: User) -> Result<User, warp::Rejection> { + let user_authority = warp::http::Uri::try_from(user.me.as_str()) + .unwrap() + .authority() + .unwrap() + .clone(); + // TODO compare with potential list of allowed websites + // to allow one user to edit several websites with one token + if host != user_authority { + Err(warp::reject::custom(MicropubError::new( + ErrorType::NotAuthorized, + "This user is not authorized to use Micropub on this website." + ))) + } else { + Ok(user) + } +} + +#[cfg(any(not(debug_assertions), test))] +fn ensure_same_user_as_host<T>( + token_endpoint: String, + http: hyper::Client<T, hyper::Body> +) -> impl Filter<Extract = (User,), Error = warp::Rejection> + Clone where T: hyper::client::connect::Connect + Clone + Send + Sync + 'static { - warp::post() - .map(move || db.clone()) - .and(crate::util::require_host()) + crate::util::require_host() .and(crate::indieauth::require_token(token_endpoint, http)) - // TODO get body and process it - .then(_post) + .and_then(check_auth) +} + +async fn dispatch_post_body( + mut body: impl bytes::Buf, + mimetype: http_types::Mime +) -> Result<Either<MicropubAction, serde_json::Value>, warp::Rejection> { + // Since hyper::common::buf::BufList doesn't implement Clone, we can't use Clone in here + // We have to copy the body. Ugh!!! + // so much for zero-copy buffers + let body = { + let mut _body: Vec<u8> = Vec::default(); + while body.has_remaining() { + _body.extend(body.chunk()); + body.advance(body.chunk().len()); + } + _body + }; + match mimetype.essence() { + "application/json" => { + if let Ok(body) = serde_json::from_slice::<MicropubAction>(&body) { + Ok(Either::Left(body)) + } else if let Ok(body) = serde_json::from_slice::<serde_json::Value>(&body) { + // quick sanity check + if !body.is_object() || !body["type"].is_array() { + return Err(MicropubError { + error: ErrorType::InvalidRequest, + error_description: "Invalid MF2-JSON detected: `.` should be an object, `.type` should be an array of MF2 types".to_owned() + }.into()) + } + Ok(Either::Right(body)) + } else { + Err(MicropubError { + error: ErrorType::InvalidRequest, + error_description: "Invalid JSON object passed.".to_owned() + }.into()) + } + }, + "application/x-www-form-urlencoded" => { + if let Ok(body) = serde_urlencoded::from_bytes::<MicropubFormAction>(&body) { + Ok(Either::Left(body.into())) + } else if let Ok(body) = serde_urlencoded::from_bytes::<Vec<(String, String)>>(&body) { + Ok(Either::Right(form_to_mf2_json(body))) + } else { + Err(MicropubError { + error: ErrorType::InvalidRequest, + error_description: "Invalid form-encoded data. Try h=entry&content=Hello!".to_owned() + }.into()) + } + }, + other => Err(MicropubError { + error: ErrorType::UnsupportedMediaType, + error_description: format!("Unsupported media type: {}. Try application/json?", other) + }.into()) + } +} + +#[cfg_attr(all(debug_assertions, not(test)), allow(unused_variables))] +pub fn post<D: 'static + Storage, T>( + db: D, + token_endpoint: String, + http: hyper::Client<T, hyper::Body> +) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone +where T: hyper::client::connect::Connect + Clone + Send + Sync + 'static { + let inject_db = warp::any().map(move || db.clone()); + #[cfg(all(debug_assertions, not(test)))] + let ensure_same_user = warp::any().map(|| crate::indieauth::User::new( + "http://localhost:8080/", + "https://quill.p3k.io/", + "create update delete media" + )); + #[cfg(any(not(debug_assertions), test))] + let ensure_same_user = ensure_same_user_as_host(token_endpoint, http.clone()); + + warp::post() + .and(warp::body::content_length_limit(1024 * 512) + .and(warp::body::aggregate()) + .and(warp::header::<http_types::Mime>("Content-Type")) + .and_then(dispatch_post_body)) + .and(inject_db) + .and(warp::any().map(move || http.clone())) + .and(ensure_same_user) + .and_then(|body: Either<MicropubAction, serde_json::Value>, db: D, http: hyper::Client<T, hyper::Body>, user: crate::indieauth::User| async move { + (match body { + Either::Left(action) => { + post_action(action, db, user).await.map(|p| Box::new(p) as Box<dyn warp::Reply>) + }, + Either::Right(post) => { + let (uid, mf2) = post::normalize_mf2(post, &user); + _post(user, uid, mf2, db, http).await.map(|p| Box::new(p) as Box<dyn warp::Reply>) + } + }).map_err(warp::reject::custom) + }) } -pub fn options() -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Copy { +pub fn options() -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone { warp::options() // TODO make it reply with a basic description of Micropub spec .map(|| warp::reply::json::<Option<()>>(&None)) - // TODO: why doesn't this work? - // .map(warp::reply::with::header("Allow", "GET, POST")) - .map(|reply| warp::reply::with_header(reply, "Allow", "GET, POST")) + .with(warp::reply::with::header("Allow", "GET, POST")) } -async fn _query<D: Storage>(db: D, host: warp::host::Authority, query: MicropubQuery, user: crate::indieauth::User) -> impl warp::Reply { - let user_authority = warp::http::Uri::try_from(user.me.as_str()).unwrap().authority().unwrap().clone(); - // TODO compare with potential list of allowed websites - // to allow one user to edit several websites with one token - if host != user_authority { - return Box::new(warp::reply::with_status( - warp::reply::json(&MicropubError::new( - ErrorType::NotAuthorized, - "This user is not authorized to use Micropub on this website." - )), - StatusCode::UNAUTHORIZED - )) as Box<dyn warp::Reply> - } +async fn _query<D: Storage>( + db: D, + query: MicropubQuery, + user: crate::indieauth::User +) -> Box<dyn warp::Reply> { + let user_authority = warp::http::Uri::try_from(user.me.as_str()) + .unwrap() + .authority() + .unwrap() + .clone(); + match query.q { QueryType::Config => { - let channels: Vec<MicropubChannel> = match db.get_channels(host.as_str()).await { + let channels: Vec<MicropubChannel> = match db.get_channels(user_authority.as_str()).await { Ok(chans) => chans, Err(err) => return Box::new(warp::reply::with_status( warp::reply::json(&MicropubError::new( @@ -167,7 +557,7 @@ async fn _query<D: Storage>(db: D, host: warp::host::Authority, query: MicropubQ QueryType::SyndicateTo ], "channels": channels, - "_kittybox_authority": host.as_str() + "_kittybox_authority": user_authority.as_str() }).as_object().unwrap())) }, QueryType::Source => { @@ -204,9 +594,7 @@ async fn _query<D: Storage>(db: D, host: warp::host::Authority, query: MicropubQ None => todo!() } }, - _ => { - todo!() - } + _ => todo!() } } @@ -214,13 +602,24 @@ pub fn query<D: Storage, T>(db: D, token_endpoint: String, http: hyper::Client<T where T: hyper::client::connect::Connect + Clone + Send + Sync + 'static { warp::get() .map(move || db.clone()) - .and(crate::util::require_host()) .and(warp::query::<MicropubQuery>()) - .and(crate::indieauth::require_token(token_endpoint, http)) + .and(crate::util::require_host() + .and(crate::indieauth::require_token(token_endpoint, http)) + .and_then(check_auth)) .then(_query) + .recover(|e: warp::Rejection| async move { + if let Some(err) = e.find::<MicropubError>() { + Ok(warp::reply::json(err)) + } else { + Err(e) + } + }) } pub async fn recover(err: Rejection) -> Result<impl warp::Reply, Infallible> { + if let Some(error) = err.find::<MicropubError>() { + return Ok(warp::reply::with_status(warp::reply::json(&error), error.into())) + } let error = if err.find::<InvalidQuery>().is_some() { MicropubError::new( ErrorType::InvalidRequest, @@ -237,14 +636,15 @@ pub async fn recover(err: Rejection) -> Result<impl warp::Reply, Infallible> { Ok(warp::reply::with_status(warp::reply::json(&error), error.into())) } -pub fn micropub<D: Storage, T>(db: D, token_endpoint: String, http: hyper::Client<T, hyper::Body>) -> impl Filter<Extract = (impl warp::Reply,), Error = Infallible> + Clone +pub fn micropub<D: 'static + Storage, T>(db: D, token_endpoint: String, http: hyper::Client<T, hyper::Body>) -> impl Filter<Extract = (impl warp::Reply,), Error = Infallible> + Clone where T: hyper::client::connect::Connect + Clone + Send + Sync + 'static { query(db.clone(), token_endpoint.clone(), http.clone()) - .or(post(db.clone(), token_endpoint.clone(), http.clone())) + .or(post(db, token_endpoint, http)) .or(options()) .recover(recover) } #[cfg(test)] +#[allow(dead_code)] impl MicropubQuery { fn config() -> Self { Self { @@ -264,29 +664,113 @@ impl MicropubQuery { #[cfg(test)] mod tests { use hyper::body::HttpBody; - use crate::micropub::MicropubError; + use crate::{database::Storage, micropub::MicropubError}; use warp::{Filter, Reply}; + use serde_json::json; #[tokio::test] - async fn test_query_wrong_auth() { - let mut res = warp::test::request() - .filter(&warp::any().then(|| super::_query( - crate::database::MemoryStorage::new(), - warp::host::Authority::from_static("aaronparecki.com"), - super::MicropubQuery::config(), - crate::indieauth::User::new( - "https://fireburn.ru/", - "https://quill.p3k.io/", - "create update media" - ) - ))) + async fn check_post_reject_scope() { + let inject_db = { + let db = crate::database::MemoryStorage::new(); + + move || db.clone() + }; + let db = inject_db(); + + let res = warp::test::request() + .filter(&warp::any() + .map(inject_db) + .and_then(move |db| async move { + let post = json!({ + "type": ["h-entry"], + "properties": { + "content": ["Hello world!"] + } + }); + let user = crate::indieauth::User::new( + "https://localhost:8080/", + "https://kittybox.fireburn.ru/", + "profile" + ); + let (uid, mf2) = super::post::normalize_mf2(post, &user); + + super::_post( + user, uid, mf2, db, hyper::Client::new() + ).await.map_err(warp::reject::custom) + }) + ) + .await + .map(|_| panic!("Tried to do something with a reply!")) + .unwrap_err(); + + if let Some(err) = res.find::<MicropubError>() { + assert_eq!(err.error, super::ErrorType::InvalidScope); + } else { + panic!("Did not return MicropubError"); + } + + let hashmap = db.mapping.read().await; + assert!(hashmap.is_empty()); + } + + #[tokio::test] + async fn check_post_mf2() { + let inject_db = { + let db = crate::database::MemoryStorage::new(); + + move || db.clone() + }; + let db = inject_db(); + + let res = warp::test::request() + .filter(&warp::any() + .map(inject_db) + .and_then(move |db| async move { + let post = json!({ + "type": ["h-entry"], + "properties": { + "content": ["Hello world!"] + } + }); + let user = crate::indieauth::User::new( + "https://localhost:8080/", + "https://kittybox.fireburn.ru/", + "create" + ); + let (uid, mf2) = super::post::normalize_mf2(post, &user); + + super::_post( + user, uid, mf2, db, hyper::Client::new() + ).await.map_err(warp::reject::custom) + }) + ) .await .unwrap() .into_response(); - assert_eq!(res.status(), 401); - let body = res.body_mut().data().await.unwrap().unwrap(); - let json: MicropubError = serde_json::from_slice(&body as &[u8]).unwrap(); + assert!(res.headers().contains_key("Location")); + let location = res.headers().get("Location").unwrap(); + assert!(db.post_exists(location.to_str().unwrap()).await.unwrap()); + assert!(db.post_exists("https://localhost:8080/feeds/main").await.unwrap()); + } + + #[tokio::test] + async fn test_check_auth() { + let err = warp::test::request() + .filter(&warp::any() + .map(|| ( + warp::host::Authority::from_static("aaronparecki.com"), + crate::indieauth::User::new( + "https://fireburn.ru/", + "https://quill.p3k.io/", + "create update media" + ))) + .untuple_one() + .and_then(super::check_auth)) + .await + .unwrap_err(); + + let json: &MicropubError = err.find::<MicropubError>().unwrap(); assert_eq!(json.error, super::ErrorType::NotAuthorized); } @@ -295,7 +779,6 @@ mod tests { let mut res = warp::test::request() .filter(&warp::any().then(|| super::_query( crate::database::MemoryStorage::new(), - warp::host::Authority::from_static("aaronparecki.com"), super::MicropubQuery::source("https://aaronparecki.com/feeds/main"), crate::indieauth::User::new( "https://fireburn.ru/", |