From 0617663b249f9ca488e5de652108b17d67fbaf45 Mon Sep 17 00:00:00 2001 From: Vika Date: Sat, 29 Jul 2023 21:59:56 +0300 Subject: Moved the entire Kittybox tree into the root --- src/micropub/mod.rs | 846 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 846 insertions(+) create mode 100644 src/micropub/mod.rs (limited to 'src/micropub/mod.rs') diff --git a/src/micropub/mod.rs b/src/micropub/mod.rs new file mode 100644 index 0000000..02eee6e --- /dev/null +++ b/src/micropub/mod.rs @@ -0,0 +1,846 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use crate::database::{MicropubChannel, Storage, StorageError}; +use crate::indieauth::backend::AuthBackend; +use crate::indieauth::User; +use crate::micropub::util::form_to_mf2_json; +use axum::extract::{BodyStream, Query, Host}; +use axum::headers::ContentType; +use axum::response::{IntoResponse, Response}; +use axum::TypedHeader; +use axum::{http::StatusCode, Extension}; +use serde::{Deserialize, Serialize}; +use serde_json::json; +use tokio::sync::Mutex; +use tokio::task::JoinSet; +use tracing::{debug, error, info, warn}; +use kittybox_indieauth::{Scope, TokenData}; +use kittybox_util::{MicropubError, ErrorType}; + +#[derive(Serialize, Deserialize, Debug, PartialEq)] +#[serde(rename_all = "kebab-case")] +enum QueryType { + Source, + Config, + Channel, + SyndicateTo, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct MicropubQuery { + q: QueryType, + url: Option, +} + +impl From for MicropubError { + fn from(err: StorageError) -> Self { + Self { + error: match err.kind() { + crate::database::ErrorKind::NotFound => ErrorType::NotFound, + _ => ErrorType::InternalServerError, + }, + error_description: format!("Backend error: {}", err), + } + } +} + +mod util; +pub(crate) use util::normalize_mf2; + +#[derive(Debug)] +struct FetchedPostContext { + url: url::Url, + mf2: serde_json::Value, + webmention: Option, +} + +fn populate_reply_context( + mf2: &serde_json::Value, + prop: &str, + ctxs: &[FetchedPostContext], +) -> Option> { + mf2["properties"][prop].as_array().map(|array| { + array + .iter() + // TODO: This seems to be O(n^2) and I don't like it. + // Switching `ctxs` to a hashmap might speed it up to O(n) + // The key would be the URL/UID + .map(|i| ctxs + .iter() + .find(|ctx| Some(ctx.url.as_str()) == i.as_str()) + .and_then(|ctx| ctx.mf2["items"].get(0)) + .unwrap_or(i)) + .cloned() + .collect::>() + }) +} + +#[tracing::instrument(skip(db))] +async fn background_processing( + db: D, + mf2: serde_json::Value, + http: reqwest::Client, +) -> () { + // TODO: Post-processing the post (aka second write pass) + // - [x] Download rich reply contexts + // - [ ] Syndicate the post if requested, add links to the syndicated copies + // - [ ] Send WebSub notifications to the hub (if we happen to have one) + // - [x] Send webmentions + + use futures_util::StreamExt; + + let uid: &str = mf2["properties"]["uid"][0].as_str().unwrap(); + + let context_props = ["in-reply-to", "like-of", "repost-of", "bookmark-of"]; + let mut context_urls: Vec = vec![]; + for prop in &context_props { + if let Some(array) = mf2["properties"][prop].as_array() { + context_urls.extend( + array + .iter() + .filter_map(|v| v.as_str()) + .filter_map(|v| v.parse::().ok()), + ); + } + } + // TODO parse HTML in e-content and add links found here + context_urls.sort_unstable_by_key(|u| u.to_string()); + context_urls.dedup(); + + // TODO: Make a stream to fetch all these posts and convert them to MF2 + let post_contexts = { + let http = &http; + tokio_stream::iter(context_urls.into_iter()) + .then(move |url: url::Url| http.get(url).send()) + .filter_map(|response| futures::future::ready(response.ok())) + .filter(|response| futures::future::ready(response.status() == 200)) + .filter_map(|response: reqwest::Response| async move { + // 1. We need to preserve the URL + // 2. We need to get the HTML for MF2 processing + // 3. We need to get the webmention endpoint address + // All of that can be done in one go. + let url = response.url().clone(); + // TODO parse link headers + let links = response + .headers() + .get_all(hyper::http::header::LINK) + .iter() + .cloned() + .collect::>(); + let html = response.text().await; + if html.is_err() { + return None; + } + let html = html.unwrap(); + let mf2 = microformats::from_html(&html, url.clone()).unwrap(); + // TODO use first Link: header if available + let webmention: Option = mf2 + .rels + .by_rels() + .get("webmention") + .and_then(|i| i.first().cloned()); + + dbg!(Some(FetchedPostContext { + url, + mf2: serde_json::to_value(mf2).unwrap(), + webmention + })) + }) + .collect::>() + .await + }; + + let mut update = MicropubUpdate { + replace: Some(Default::default()), + ..Default::default() + }; + for prop in context_props { + if let Some(json) = populate_reply_context(&mf2, prop, &post_contexts) { + update.replace.as_mut().unwrap().insert(prop.to_owned(), json); + } + } + if !update.replace.as_ref().unwrap().is_empty() { + if let Err(err) = db.update_post(uid, update).await { + error!("Failed to update post with rich reply contexts: {}", err); + } + } + + // At this point we can start syndicating the post. + // Currently we don't really support any syndication endpoints, but still! + /*if let Some(syndicate_to) = mf2["properties"]["mp-syndicate-to"].as_array() { + let http = &http; + tokio_stream::iter(syndicate_to) + .filter_map(|i| futures::future::ready(i.as_str())) + .for_each_concurrent(3, |s: &str| async move { + #[allow(clippy::match_single_binding)] + match s { + _ => { + todo!("Syndicate to generic webmention-aware service {}", s); + } + // TODO special handling for non-webmention-aware services like the birdsite + } + }) + .await; + }*/ + + { + let http = &http; + tokio_stream::iter( + post_contexts + .into_iter() + .filter(|ctx| ctx.webmention.is_some()), + ) + .for_each_concurrent(2, |ctx| async move { + let mut map = std::collections::HashMap::new(); + map.insert("source", uid); + map.insert("target", ctx.url.as_str()); + + match http + .post(ctx.webmention.unwrap().clone()) + .form(&map) + .send() + .await + { + Ok(res) => { + if !res.status().is_success() { + warn!( + "Failed to send a webmention for {}: got HTTP {}", + ctx.url, + res.status() + ); + } else { + info!( + "Sent a webmention to {}, got HTTP {}", + ctx.url, + res.status() + ) + } + } + Err(err) => warn!("Failed to send a webmention for {}: {}", ctx.url, err), + } + }) + .await; + } +} + +// TODO actually save the post to the database and schedule post-processing +pub(crate) async fn _post( + user: &TokenData, + uid: String, + mf2: serde_json::Value, + db: D, + http: reqwest::Client, + jobset: Arc>>, +) -> Result { + // Here, we have the following guarantees: + // - The MF2-JSON document is normalized (guaranteed by normalize_mf2) + // - The MF2-JSON document contains a UID + // - The MF2-JSON document's URL list contains its UID + // - The MF2-JSON document's "content" field contains an HTML blob, if present + // - The MF2-JSON document's publishing datetime is present + // - The MF2-JSON document's target channels are set + // - The MF2-JSON document's author is set + + // Security check! Do we have an OAuth2 scope to proceed? + if !user.check_scope(&Scope::Create) { + return Err(MicropubError { + error: ErrorType::InvalidScope, + error_description: "Not enough privileges - try acquiring the \"create\" scope." + .to_owned(), + }); + } + + // Security check #2! Are we posting to our own website? + if !uid.starts_with(user.me.as_str()) + || mf2["properties"]["channel"] + .as_array() + .unwrap_or(&vec![]) + .iter() + .any(|url| !url.as_str().unwrap().starts_with(user.me.as_str())) + { + return Err(MicropubError { + error: ErrorType::Forbidden, + error_description: "You're posting to a website that's not yours.".to_owned(), + }); + } + + // Security check #3! Are we overwriting an existing document? + if db.post_exists(&uid).await? { + return Err(MicropubError { + error: ErrorType::AlreadyExists, + error_description: "UID clash was detected, operation aborted.".to_owned(), + }); + } + let user_domain = format!( + "{}{}", + user.me.host_str().unwrap(), + user.me.port() + .map(|port| format!(":{}", port)) + .unwrap_or_default() + ); + // Save the post + tracing::debug!("Saving post to database..."); + db.put_post(&mf2, &user_domain).await?; + + let mut channels = mf2["properties"]["channel"] + .as_array() + .unwrap() + .iter() + .map(|i| i.as_str().unwrap_or("")) + .filter(|i| !i.is_empty()); + + let default_channel = user + .me + .join(util::DEFAULT_CHANNEL_PATH) + .unwrap() + .to_string(); + let vcards_channel = user + .me + .join(util::CONTACTS_CHANNEL_PATH) + .unwrap() + .to_string(); + let food_channel = user.me.join(util::FOOD_CHANNEL_PATH).unwrap().to_string(); + let default_channels = vec![default_channel, vcards_channel, food_channel]; + + for chan in &mut channels { + debug!("Adding post {} to channel {}", uid, chan); + if db.post_exists(chan).await? { + db.add_to_feed(chan, &uid).await?; + } else if default_channels.iter().any(|i| chan == i) { + util::create_feed(&db, &uid, chan, user).await?; + } else { + warn!("Ignoring non-existent channel: {}", chan); + } + } + + let reply = + IntoResponse::into_response((StatusCode::ACCEPTED, [("Location", uid.as_str())])); + + #[cfg(not(tokio_unstable))] + jobset.lock().await.spawn(background_processing(db, mf2, http)); + #[cfg(tokio_unstable)] + jobset.lock().await.build_task() + .name(format!("Kittybox background processing for post {}", uid.as_str()).as_str()) + .spawn(background_processing(db, mf2, http)); + + Ok(reply) +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "snake_case")] +enum ActionType { + Delete, + Update, +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(untagged)] +pub enum MicropubPropertyDeletion { + Properties(Vec), + Values(HashMap>) +} +#[derive(Serialize, Deserialize)] +struct MicropubFormAction { + action: ActionType, + url: String, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct MicropubAction { + action: ActionType, + url: String, + #[serde(flatten)] + #[serde(skip_serializing_if = "Option::is_none")] + update: Option +} + +#[derive(Serialize, Deserialize, Debug, Default)] +pub struct MicropubUpdate { + #[serde(skip_serializing_if = "Option::is_none")] + pub replace: Option>>, + #[serde(skip_serializing_if = "Option::is_none")] + pub add: Option>>, + #[serde(skip_serializing_if = "Option::is_none")] + pub delete: Option, + +} + +impl From for MicropubAction { + fn from(a: MicropubFormAction) -> Self { + debug_assert!(matches!(a.action, ActionType::Delete)); + Self { + action: a.action, + url: a.url, + update: None + } + } +} + +#[tracing::instrument(skip(db))] +async fn post_action( + action: MicropubAction, + db: D, + user: User, +) -> Result<(), MicropubError> { + let uri = if let Ok(uri) = action.url.parse::() { + uri + } else { + return Err(MicropubError { + error: ErrorType::InvalidRequest, + error_description: "Your URL doesn't parse properly.".to_owned(), + }); + }; + + if uri.authority().unwrap() + != user + .me + .as_str() + .parse::() + .unwrap() + .authority() + .unwrap() + { + return Err(MicropubError { + error: ErrorType::Forbidden, + error_description: "Don't tamper with others' posts!".to_owned(), + }); + } + + match action.action { + ActionType::Delete => { + if !user.check_scope(&Scope::Delete) { + return Err(MicropubError { + error: ErrorType::InvalidScope, + error_description: "You need a \"delete\" scope for this.".to_owned(), + }); + } + + db.delete_post(&action.url).await? + } + ActionType::Update => { + if !user.check_scope(&Scope::Update) { + return Err(MicropubError { + error: ErrorType::InvalidScope, + error_description: "You need an \"update\" scope for this.".to_owned(), + }); + } + + db.update_post( + &action.url, + action.update.ok_or(MicropubError { + error: ErrorType::InvalidRequest, + error_description: "Update request is not set.".to_owned(), + })? + ) + .await? + } + } + + Ok(()) +} + +enum PostBody { + Action(MicropubAction), + MF2(serde_json::Value), +} + +#[tracing::instrument] +async fn dispatch_body( + mut body: BodyStream, + content_type: ContentType, +) -> Result { + let body: Vec = { + debug!("Buffering body..."); + use tokio_stream::StreamExt; + let mut buf = Vec::default(); + + while let Some(chunk) = body.next().await { + buf.extend_from_slice(&chunk.unwrap()) + } + + buf + }; + + debug!("Content-Type: {:?}", content_type); + if content_type == ContentType::json() { + if let Ok(action) = serde_json::from_slice::(&body) { + Ok(PostBody::Action(action)) + } else if let Ok(body) = serde_json::from_slice::(&body) { + // quick sanity check + if !body.is_object() || !body["type"].is_array() { + return Err(MicropubError { + error: ErrorType::InvalidRequest, + error_description: "Invalid MF2-JSON detected: `.` should be an object, `.type` should be an array of MF2 types".to_owned() + }); + } + + Ok(PostBody::MF2(body)) + } else { + Err(MicropubError { + error: ErrorType::InvalidRequest, + error_description: "Invalid JSON object passed.".to_owned(), + }) + } + } else if content_type == ContentType::form_url_encoded() { + if let Ok(body) = serde_urlencoded::from_bytes::(&body) { + Ok(PostBody::Action(body.into())) + } else if let Ok(body) = serde_urlencoded::from_bytes::>(&body) { + Ok(PostBody::MF2(form_to_mf2_json(body))) + } else { + Err(MicropubError { + error: ErrorType::InvalidRequest, + error_description: "Invalid form-encoded data. Try h=entry&content=Hello!" + .to_owned(), + }) + } + } else { + Err(MicropubError::new( + ErrorType::UnsupportedMediaType, + "This Content-Type is not recognized. Try application/json instead?", + )) + } +} + +#[tracing::instrument(skip(db, http))] +pub(crate) async fn post( + Extension(db): Extension, + Extension(http): Extension, + Extension(jobset): Extension>>>, + TypedHeader(content_type): TypedHeader, + user: User, + body: BodyStream, +) -> axum::response::Response { + match dispatch_body(body, content_type).await { + Ok(PostBody::Action(action)) => match post_action(action, db, user).await { + Ok(()) => Response::default(), + Err(err) => err.into_response(), + }, + Ok(PostBody::MF2(mf2)) => { + let (uid, mf2) = normalize_mf2(mf2, &user); + match _post(&user, uid, mf2, db, http, jobset).await { + Ok(response) => response, + Err(err) => err.into_response(), + } + } + Err(err) => err.into_response(), + } +} + +#[tracing::instrument(skip(db))] +pub(crate) async fn query( + Extension(db): Extension, + query: Option>, + Host(host): Host, + user: User, +) -> axum::response::Response { + // We handle the invalid query case manually to return a + // MicropubError instead of HTTP 422 + let query = if let Some(Query(query)) = query { + query + } else { + return MicropubError::new( + ErrorType::InvalidRequest, + "Invalid query provided. Try ?q=config to see what you can do." + ).into_response(); + }; + + if axum::http::Uri::try_from(user.me.as_str()) + .unwrap() + .authority() + .unwrap() + != &host + { + return MicropubError::new( + ErrorType::NotAuthorized, + "This website doesn't belong to you.", + ) + .into_response(); + } + + let user_domain = format!( + "{}{}", + user.me.host_str().unwrap(), + user.me.port() + .map(|port| format!(":{}", port)) + .unwrap_or_default() + ); + match query.q { + QueryType::Config => { + let channels: Vec = match db.get_channels(user.me.as_str()).await { + Ok(chans) => chans, + Err(err) => { + return MicropubError::new( + ErrorType::InternalServerError, + &format!("Error fetching channels: {}", err), + ) + .into_response() + } + }; + + axum::response::Json(json!({ + "q": [ + QueryType::Source, + QueryType::Config, + QueryType::Channel, + QueryType::SyndicateTo + ], + "channels": channels, + "_kittybox_authority": user.me.as_str(), + "syndicate-to": [], + "media-endpoint": user.me.join("/.kittybox/media").unwrap().as_str() + })) + .into_response() + } + QueryType::Source => { + match query.url { + Some(url) => { + match db.get_post(&url).await { + Ok(some) => match some { + Some(post) => axum::response::Json(&post).into_response(), + None => MicropubError::new( + ErrorType::NotFound, + "The specified MF2 object was not found in database.", + ) + .into_response(), + }, + Err(err) => MicropubError::new( + ErrorType::InternalServerError, + &format!("Backend error: {}", err), + ) + .into_response(), + } + } + None => { + // Here, one should probably attempt to query at least the main feed and collect posts + // Using a pre-made query function can't be done because it does unneeded filtering + // Don't implement for now, this is optional + MicropubError::new( + ErrorType::InvalidRequest, + "Querying for post list is not implemented yet.", + ) + .into_response() + } + } + } + QueryType::Channel => match db.get_channels(&user_domain).await { + Ok(chans) => axum::response::Json(json!({ "channels": chans })).into_response(), + Err(err) => MicropubError::new( + ErrorType::InternalServerError, + &format!("Error fetching channels: {}", err), + ) + .into_response(), + }, + QueryType::SyndicateTo => { + axum::response::Json(json!({ "syndicate-to": [] })).into_response() + } + } +} + +#[must_use] +pub fn router( + storage: S, + http: reqwest::Client, + auth: A, + jobset: Arc>> +) -> axum::routing::MethodRouter +where + S: Storage + 'static, + A: AuthBackend +{ + axum::routing::get(query::) + .post(post::) + .layer::<_, _, std::convert::Infallible>(tower_http::cors::CorsLayer::new() + .allow_methods([ + axum::http::Method::GET, + axum::http::Method::POST, + ]) + .allow_origin(tower_http::cors::Any)) + .layer::<_, _, std::convert::Infallible>(axum::Extension(storage)) + .layer::<_, _, std::convert::Infallible>(axum::Extension(http)) + .layer::<_, _, std::convert::Infallible>(axum::Extension(auth)) + .layer::<_, _, std::convert::Infallible>(axum::Extension(jobset)) +} + +#[cfg(test)] +#[allow(dead_code)] +impl MicropubQuery { + fn config() -> Self { + Self { + q: QueryType::Config, + url: None, + } + } + + fn source(url: &str) -> Self { + Self { + q: QueryType::Source, + url: Some(url.to_owned()), + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use crate::{database::Storage, micropub::MicropubError}; + use hyper::body::HttpBody; + use serde_json::json; + use tokio::sync::Mutex; + + use super::FetchedPostContext; + use kittybox_indieauth::{Scopes, Scope, TokenData}; + use axum::extract::Host; + + #[test] + fn test_populate_reply_context() { + let already_expanded_reply_ctx = json!({ + "type": ["h-entry"], + "properties": { + "content": ["Hello world!"] + } + }); + let mf2 = json!({ + "type": ["h-entry"], + "properties": { + "like-of": [ + "https://fireburn.ru/posts/example", + already_expanded_reply_ctx, + "https://fireburn.ru/posts/non-existent" + ] + } + }); + let test_ctx = json!({ + "type": ["h-entry"], + "properties": { + "content": ["This is a post which was reacted to."] + } + }); + let reply_contexts = vec![FetchedPostContext { + url: "https://fireburn.ru/posts/example".parse().unwrap(), + mf2: json!({ "items": [test_ctx] }), + webmention: None, + }]; + + let like_of = super::populate_reply_context(&mf2, "like-of", &reply_contexts).unwrap(); + + assert_eq!(like_of[0], test_ctx); + assert_eq!(like_of[1], already_expanded_reply_ctx); + assert_eq!(like_of[2], "https://fireburn.ru/posts/non-existent"); + } + + #[tokio::test] + async fn test_post_reject_scope() { + let db = crate::database::MemoryStorage::new(); + + let post = json!({ + "type": ["h-entry"], + "properties": { + "content": ["Hello world!"] + } + }); + let user = TokenData { + me: "https://localhost:8080/".parse().unwrap(), + client_id: "https://kittybox.fireburn.ru/".parse().unwrap(), + scope: Scopes::new(vec![Scope::Profile]), + iat: None, exp: None + }; + let (uid, mf2) = super::normalize_mf2(post, &user); + + let err = super::_post(&user, uid, mf2, db.clone(), reqwest::Client::new(), Arc::new(Mutex::new(tokio::task::JoinSet::new()))) + .await + .unwrap_err(); + + assert_eq!(err.error, super::ErrorType::InvalidScope); + + let hashmap = db.mapping.read().await; + assert!(hashmap.is_empty()); + } + + #[tokio::test] + async fn test_post_reject_different_user() { + let db = crate::database::MemoryStorage::new(); + + let post = json!({ + "type": ["h-entry"], + "properties": { + "content": ["Hello world!"], + "uid": ["https://fireburn.ru/posts/hello"], + "url": ["https://fireburn.ru/posts/hello"] + } + }); + let user = TokenData { + me: "https://aaronparecki.com/".parse().unwrap(), + client_id: "https://kittybox.fireburn.ru/".parse().unwrap(), + scope: Scopes::new(vec![Scope::Profile, Scope::Create, Scope::Update, Scope::Media]), + iat: None, exp: None + }; + let (uid, mf2) = super::normalize_mf2(post, &user); + + let err = super::_post(&user, uid, mf2, db.clone(), reqwest::Client::new(), Arc::new(Mutex::new(tokio::task::JoinSet::new()))) + .await + .unwrap_err(); + + assert_eq!(err.error, super::ErrorType::Forbidden); + + let hashmap = db.mapping.read().await; + assert!(hashmap.is_empty()); + } + + #[tokio::test] + async fn test_post_mf2() { + let db = crate::database::MemoryStorage::new(); + + let post = json!({ + "type": ["h-entry"], + "properties": { + "content": ["Hello world!"] + } + }); + let user = TokenData { + me: "https://localhost:8080/".parse().unwrap(), + client_id: "https://kittybox.fireburn.ru/".parse().unwrap(), + scope: Scopes::new(vec![Scope::Profile, Scope::Create]), + iat: None, exp: None + }; + let (uid, mf2) = super::normalize_mf2(post, &user); + + let res = super::_post(&user, uid, mf2, db.clone(), reqwest::Client::new(), Arc::new(Mutex::new(tokio::task::JoinSet::new()))) + .await + .unwrap(); + + assert!(res.headers().contains_key("Location")); + let location = res.headers().get("Location").unwrap(); + assert!(db.post_exists(location.to_str().unwrap()).await.unwrap()); + assert!(db + .post_exists("https://localhost:8080/feeds/main") + .await + .unwrap()); + } + + #[tokio::test] + async fn test_query_foreign_url() { + let mut res = super::query( + axum::Extension(crate::database::MemoryStorage::new()), + Some(axum::extract::Query(super::MicropubQuery::source( + "https://aaronparecki.com/feeds/main", + ))), + Host("aaronparecki.com".to_owned()), + crate::indieauth::User::( + TokenData { + me: "https://fireburn.ru/".parse().unwrap(), + client_id: "https://kittybox.fireburn.ru/".parse().unwrap(), + scope: Scopes::new(vec![Scope::Profile, Scope::Create, Scope::Update, Scope::Media]), + iat: None, exp: None + }, std::marker::PhantomData + ) + ) + .await; + + assert_eq!(res.status(), 401); + let body = res.body_mut().data().await.unwrap().unwrap(); + let json: MicropubError = serde_json::from_slice(&body as &[u8]).unwrap(); + assert_eq!(json.error, super::ErrorType::NotAuthorized); + } +} -- cgit 1.4.1