use std::collections::HashMap; use url::Url; use std::sync::Arc; use crate::database::{MicropubChannel, Storage, StorageError}; use crate::indieauth::backend::AuthBackend; use crate::indieauth::User; use crate::micropub::util::form_to_mf2_json; use axum::extract::{FromRef, Host, Query, State}; use axum::body::Body as BodyStream; use axum_extra::headers::ContentType; use axum::response::{IntoResponse, Response}; use axum_extra::TypedHeader; use axum::http::StatusCode; use serde::{Deserialize, Serialize}; use serde_json::json; use tokio::sync::Mutex; use tokio::task::JoinSet; use tracing::{debug, error, info, warn}; use kittybox_indieauth::{Scope, TokenData}; use kittybox_util::{MicropubError, ErrorType}; #[derive(Serialize, Deserialize, Debug, PartialEq)] #[serde(rename_all = "kebab-case")] enum QueryType { Source, Config, Channel, SyndicateTo, Category } #[derive(Serialize, Deserialize, Debug)] pub struct MicropubQuery { q: QueryType, url: Option<String>, } impl From<StorageError> for MicropubError { fn from(err: StorageError) -> Self { Self { error: match err.kind() { crate::database::ErrorKind::NotFound => ErrorType::NotFound, _ => ErrorType::InternalServerError, }, error_description: format!("Backend error: {}", err), } } } mod util; pub(crate) use util::normalize_mf2; #[derive(Debug)] struct FetchedPostContext { url: url::Url, mf2: serde_json::Value, webmention: Option<url::Url>, } fn populate_reply_context( mf2: &serde_json::Value, prop: &str, ctxs: &HashMap<Url, FetchedPostContext>, ) -> Option<Vec<serde_json::Value>> { mf2["properties"][prop].as_array().map(|array| { array .iter() .map(|i| { let mut item = i.as_str() .and_then(|i| i.parse::<Url>().ok()) .and_then(|url| ctxs.get(&url)) .and_then(|ctx| ctx.mf2["items"].get(0)) .unwrap_or(i) .clone(); if item.is_object() && (i != &item) { if let Some(props) = item["properties"].as_object_mut() { // Fixup the item: if it lacks a URL, add one. if !props.get("url").and_then(serde_json::Value::as_array).map(|a| a.len() > 0).unwrap_or(false) { props.insert("url".to_owned(), json!([i.as_str()])); } } } item }) .collect::<Vec<serde_json::Value>>() }) } #[tracing::instrument(skip(db))] async fn background_processing<D: 'static + Storage>( db: D, mf2: serde_json::Value, http: reqwest::Client, ) -> () { // TODO: Post-processing the post (aka second write pass) // - [x] Download rich reply contexts // - [ ] Syndicate the post if requested, add links to the syndicated copies // - [ ] Send WebSub notifications to the hub (if we happen to have one) // - [x] Send webmentions use futures_util::StreamExt; let uid: &str = mf2["properties"]["uid"][0].as_str().unwrap(); let context_props = ["in-reply-to", "like-of", "repost-of", "bookmark-of"]; let mut context_urls: Vec<url::Url> = vec![]; for prop in &context_props { if let Some(array) = mf2["properties"][prop].as_array() { context_urls.extend( array .iter() .filter_map(|v| v.as_str()) .filter_map(|v| v.parse::<url::Url>().ok()), ); } } // TODO parse HTML in e-content and add links found here context_urls.sort_unstable_by_key(|u| u.to_string()); context_urls.dedup(); // TODO: Make a stream to fetch all these posts and convert them to MF2 let post_contexts = { let http = &http; tokio_stream::iter(context_urls.into_iter()) .then(move |url: url::Url| http.get(url).send()) .filter_map(|response| futures::future::ready(response.ok())) .filter(|response| futures::future::ready(response.status() == 200)) .filter_map(|response: reqwest::Response| async move { // 1. We need to preserve the URL // 2. We need to get the HTML for MF2 processing // 3. We need to get the webmention endpoint address // All of that can be done in one go. let url = response.url().clone(); // TODO parse link headers let links = response .headers() .get_all(reqwest::header::LINK) .iter() .cloned() .collect::<Vec<reqwest::header::HeaderValue>>(); let html = response.text().await; if html.is_err() { return None; } let html = html.unwrap(); let mf2 = microformats::from_html(&html, url.clone()).unwrap(); // TODO use first Link: header if available let webmention: Option<url::Url> = mf2 .rels .by_rels() .get("webmention") .and_then(|i| i.first().cloned()); dbg!(Some((url.clone(), FetchedPostContext { url, mf2: serde_json::to_value(mf2).unwrap(), webmention }))) }) .collect::<HashMap<Url, FetchedPostContext>>() .await }; let mut update = MicropubUpdate { replace: Some(Default::default()), ..Default::default() }; for prop in context_props { if let Some(json) = populate_reply_context(&mf2, prop, &post_contexts) { update.replace.as_mut().unwrap().insert(prop.to_owned(), json); } } if !update.replace.as_ref().unwrap().is_empty() { if let Err(err) = db.update_post(uid, update).await { error!("Failed to update post with rich reply contexts: {}", err); } } // At this point we can start syndicating the post. // Currently we don't really support any syndication endpoints, but still! /*if let Some(syndicate_to) = mf2["properties"]["mp-syndicate-to"].as_array() { let http = &http; tokio_stream::iter(syndicate_to) .filter_map(|i| futures::future::ready(i.as_str())) .for_each_concurrent(3, |s: &str| async move { #[allow(clippy::match_single_binding)] match s { _ => { todo!("Syndicate to generic webmention-aware service {}", s); } // TODO special handling for non-webmention-aware services like the birdsite } }) .await; }*/ { let http = &http; tokio_stream::iter( post_contexts .into_iter() .filter(|(url, ctx)| ctx.webmention.is_some()), ) .for_each_concurrent(2, |(url, ctx)| async move { let mut map = std::collections::HashMap::new(); map.insert("source", uid); map.insert("target", ctx.url.as_str()); match http .post(ctx.webmention.unwrap().clone()) .form(&map) .send() .await { Ok(res) => { if !res.status().is_success() { warn!( "Failed to send a webmention for {}: got HTTP {}", ctx.url, res.status() ); } else { info!( "Sent a webmention to {}, got HTTP {}", ctx.url, res.status() ) } } Err(err) => warn!("Failed to send a webmention for {}: {}", ctx.url, err), } }) .await; } } // TODO actually save the post to the database and schedule post-processing pub(crate) async fn _post<D: 'static + Storage>( user: &TokenData, uid: String, mf2: serde_json::Value, db: D, http: reqwest::Client, jobset: Arc<Mutex<JoinSet<()>>>, ) -> Result<Response, MicropubError> { // Here, we have the following guarantees: // - The MF2-JSON document is normalized (guaranteed by normalize_mf2) // - The MF2-JSON document contains a UID // - The MF2-JSON document's URL list contains its UID // - The MF2-JSON document's "content" field contains an HTML blob, if present // - The MF2-JSON document's publishing datetime is present // - The MF2-JSON document's target channels are set // - The MF2-JSON document's author is set // Security check! Do we have an OAuth2 scope to proceed? if !user.check_scope(&Scope::Create) { return Err(MicropubError { error: ErrorType::InvalidScope, error_description: "Not enough privileges - try acquiring the \"create\" scope." .to_owned(), }); } // Security check #2! Are we posting to our own website? if !uid.starts_with(user.me.as_str()) || mf2["properties"]["channel"] .as_array() .unwrap_or(&vec![]) .iter() .any(|url| !url.as_str().unwrap().starts_with(user.me.as_str())) { return Err(MicropubError { error: ErrorType::Forbidden, error_description: "You're posting to a website that's not yours.".to_owned(), }); } // Security check #3! Are we overwriting an existing document? if db.post_exists(&uid).await? { return Err(MicropubError { error: ErrorType::AlreadyExists, error_description: "UID clash was detected, operation aborted.".to_owned(), }); } let user_domain = format!( "{}{}", user.me.host_str().unwrap(), user.me.port() .map(|port| format!(":{}", port)) .unwrap_or_default() ); // Save the post tracing::debug!("Saving post to database..."); db.put_post(&mf2, &user.me).await?; let mut channels = mf2["properties"]["channel"] .as_array() .unwrap() .iter() .map(|i| i.as_str().unwrap_or("")) .filter(|i| !i.is_empty()); let default_channel = user .me .join(util::DEFAULT_CHANNEL_PATH) .unwrap() .to_string(); let vcards_channel = user .me .join(util::CONTACTS_CHANNEL_PATH) .unwrap() .to_string(); let food_channel = user.me.join(util::FOOD_CHANNEL_PATH).unwrap().to_string(); let default_channels = vec![default_channel, vcards_channel, food_channel]; for chan in &mut channels { debug!("Adding post {} to channel {}", uid, chan); if db.post_exists(chan).await? { db.add_to_feed(chan, &uid).await?; } else if default_channels.iter().any(|i| chan == i) { util::create_feed(&db, &uid, chan, user).await?; } else { warn!("Ignoring non-existent channel: {}", chan); } } let reply = IntoResponse::into_response((StatusCode::ACCEPTED, [("Location", uid.as_str())])); #[cfg(not(tokio_unstable))] let _ = jobset.lock().await.spawn(background_processing(db, mf2, http)); #[cfg(tokio_unstable)] let _ = jobset.lock().await.build_task() .name(format!("Kittybox background processing for post {}", uid.as_str()).as_str()) .spawn(background_processing(db, mf2, http)); Ok(reply) } #[derive(Serialize, Deserialize, Debug)] #[serde(rename_all = "snake_case")] enum ActionType { Delete, Update, } #[derive(Serialize, Deserialize, Debug)] #[serde(untagged)] pub enum MicropubPropertyDeletion { Properties(Vec<String>), Values(HashMap<String, Vec<serde_json::Value>>) } #[derive(Serialize, Deserialize)] struct MicropubFormAction { action: ActionType, url: String, } #[derive(Serialize, Deserialize, Debug)] pub struct MicropubAction { action: ActionType, url: String, #[serde(flatten)] #[serde(skip_serializing_if = "Option::is_none")] update: Option<MicropubUpdate> } #[derive(Serialize, Deserialize, Debug, Default)] pub struct MicropubUpdate { #[serde(skip_serializing_if = "Option::is_none")] pub replace: Option<HashMap<String, Vec<serde_json::Value>>>, #[serde(skip_serializing_if = "Option::is_none")] pub add: Option<HashMap<String, Vec<serde_json::Value>>>, #[serde(skip_serializing_if = "Option::is_none")] pub delete: Option<MicropubPropertyDeletion>, } impl From<MicropubFormAction> for MicropubAction { fn from(a: MicropubFormAction) -> Self { debug_assert!(matches!(a.action, ActionType::Delete)); Self { action: a.action, url: a.url, update: None } } } #[tracing::instrument(skip(db))] async fn post_action<D: Storage, A: AuthBackend>( action: MicropubAction, db: D, user: User<A>, ) -> Result<(), MicropubError> { let uri = if let Ok(uri) = action.url.parse::<hyper::Uri>() { uri } else { return Err(MicropubError { error: ErrorType::InvalidRequest, error_description: "Your URL doesn't parse properly.".to_owned(), }); }; if uri.authority().unwrap() != user .me .as_str() .parse::<hyper::Uri>() .unwrap() .authority() .unwrap() { return Err(MicropubError { error: ErrorType::Forbidden, error_description: "Don't tamper with others' posts!".to_owned(), }); } match action.action { ActionType::Delete => { if !user.check_scope(&Scope::Delete) { return Err(MicropubError { error: ErrorType::InvalidScope, error_description: "You need a \"delete\" scope for this.".to_owned(), }); } db.delete_post(&action.url).await? } ActionType::Update => { if !user.check_scope(&Scope::Update) { return Err(MicropubError { error: ErrorType::InvalidScope, error_description: "You need an \"update\" scope for this.".to_owned(), }); } db.update_post( &action.url, action.update.ok_or(MicropubError { error: ErrorType::InvalidRequest, error_description: "Update request is not set.".to_owned(), })? ) .await? } } Ok(()) } enum PostBody { Action(MicropubAction), MF2(serde_json::Value), } #[tracing::instrument] async fn dispatch_body( body: BodyStream, content_type: ContentType, ) -> Result<PostBody, MicropubError> { let body: Vec<u8> = { debug!("Buffering body..."); use tokio_stream::StreamExt; let mut buf = Vec::default(); let mut body = body.into_data_stream(); while let Some(chunk) = body.next().await { buf.extend_from_slice(&chunk.unwrap()) } buf }; debug!("Content-Type: {:?}", content_type); if content_type == ContentType::json() { if let Ok(action) = serde_json::from_slice::<MicropubAction>(&body) { Ok(PostBody::Action(action)) } else if let Ok(body) = serde_json::from_slice::<serde_json::Value>(&body) { // quick sanity check if !body.is_object() || !body["type"].is_array() { return Err(MicropubError { error: ErrorType::InvalidRequest, error_description: "Invalid MF2-JSON detected: `.` should be an object, `.type` should be an array of MF2 types".to_owned() }); } Ok(PostBody::MF2(body)) } else { Err(MicropubError { error: ErrorType::InvalidRequest, error_description: "Invalid JSON object passed.".to_owned(), }) } } else if content_type == ContentType::form_url_encoded() { if let Ok(body) = serde_urlencoded::from_bytes::<MicropubFormAction>(&body) { Ok(PostBody::Action(body.into())) } else if let Ok(body) = serde_urlencoded::from_bytes::<Vec<(String, String)>>(&body) { Ok(PostBody::MF2(form_to_mf2_json(body))) } else { Err(MicropubError { error: ErrorType::InvalidRequest, error_description: "Invalid form-encoded data. Try h=entry&content=Hello!" .to_owned(), }) } } else { Err(MicropubError::new( ErrorType::UnsupportedMediaType, "This Content-Type is not recognized. Try application/json instead?", )) } } #[tracing::instrument(skip(db, http))] pub(crate) async fn post<D: Storage + 'static, A: AuthBackend>( State(db): State<D>, State(http): State<reqwest::Client>, State(jobset): State<Arc<Mutex<JoinSet<()>>>>, TypedHeader(content_type): TypedHeader<ContentType>, user: User<A>, body: BodyStream, ) -> axum::response::Response { match dispatch_body(body, content_type).await { Ok(PostBody::Action(action)) => match post_action(action, db, user).await { Ok(()) => Response::default(), Err(err) => err.into_response(), }, Ok(PostBody::MF2(mf2)) => { let (uid, mf2) = normalize_mf2(mf2, &user); match _post(&user, uid, mf2, db, http, jobset).await { Ok(response) => response, Err(err) => err.into_response(), } } Err(err) => err.into_response(), } } #[tracing::instrument(skip(db))] pub(crate) async fn query<D: Storage, A: AuthBackend>( State(db): State<D>, query: Option<Query<MicropubQuery>>, Host(host): Host, user: User<A>, ) -> axum::response::Response { // We handle the invalid query case manually to return a // MicropubError instead of HTTP 422 let query = if let Some(Query(query)) = query { query } else { return MicropubError::new( ErrorType::InvalidRequest, "Invalid query provided. Try ?q=config to see what you can do." ).into_response(); }; if axum::http::Uri::try_from(user.me.as_str()) .unwrap() .authority() .unwrap() != &host { return MicropubError::new( ErrorType::NotAuthorized, "This website doesn't belong to you.", ) .into_response(); } // TODO: consider replacing by `user.me.authority()`? let user_domain = format!( "{}{}", user.me.host_str().unwrap(), user.me.port() .map(|port| format!(":{}", port)) .unwrap_or_default() ); match query.q { QueryType::Config => { let channels: Vec<MicropubChannel> = match db.get_channels(&user.me).await { Ok(chans) => chans, Err(err) => { return MicropubError::new( ErrorType::InternalServerError, &format!("Error fetching channels: {}", err), ) .into_response() } }; axum::response::Json(json!({ "q": [ QueryType::Source, QueryType::Config, QueryType::Channel, QueryType::SyndicateTo, QueryType::Category ], "channels": channels, "_kittybox_authority": user.me.as_str(), "syndicate-to": [], "media-endpoint": user.me.join("/.kittybox/media").unwrap().as_str() })) .into_response() } QueryType::Source => { match query.url { Some(url) => { match db.get_post(&url).await { Ok(some) => match some { Some(post) => axum::response::Json(&post).into_response(), None => MicropubError::new( ErrorType::NotFound, "The specified MF2 object was not found in database.", ) .into_response(), }, Err(err) => MicropubError::new( ErrorType::InternalServerError, &format!("Backend error: {}", err), ) .into_response(), } } None => { // Here, one should probably attempt to query at least the main feed and collect posts // Using a pre-made query function can't be done because it does unneeded filtering // Don't implement for now, this is optional MicropubError::new( ErrorType::InvalidRequest, "Querying for post list is not implemented yet.", ) .into_response() } } } QueryType::Channel => match db.get_channels(&user.me).await { Ok(chans) => axum::response::Json(json!({ "channels": chans })).into_response(), Err(err) => MicropubError::new( ErrorType::InternalServerError, &format!("Error fetching channels: {}", err), ) .into_response(), }, QueryType::SyndicateTo => { axum::response::Json(json!({ "syndicate-to": [] })).into_response() }, QueryType::Category => { let categories = match db.categories(&user_domain).await { Ok(categories) => categories, Err(err) => { return MicropubError::new( ErrorType::InternalServerError, &format!("Error fetching categories: {}", err) ).into_response() } }; axum::response::Json(json!({ "categories": categories })).into_response() } } } pub fn router<A, S, St: Send + Sync + Clone + 'static>() -> axum::routing::MethodRouter<St> where S: Storage + FromRef<St> + 'static, A: AuthBackend + FromRef<St>, reqwest::Client: FromRef<St>, Arc<Mutex<JoinSet<()>>>: FromRef<St> { axum::routing::get(query::<S, A>) .post(post::<S, A>) .layer::<_, _>(tower_http::cors::CorsLayer::new() .allow_methods([ axum::http::Method::GET, axum::http::Method::POST, ]) .allow_origin(tower_http::cors::Any)) } #[cfg(test)] #[allow(dead_code)] impl MicropubQuery { fn config() -> Self { Self { q: QueryType::Config, url: None, } } fn source(url: &str) -> Self { Self { q: QueryType::Source, url: Some(url.to_owned()), } } } #[cfg(test)] mod tests { use std::sync::Arc; use crate::{database::Storage, micropub::MicropubError}; use bytes::Bytes; use futures::StreamExt; use serde_json::json; use tokio::sync::Mutex; use super::FetchedPostContext; use kittybox_indieauth::{Scopes, Scope, TokenData}; use axum::extract::{Host, State}; #[test] fn test_populate_reply_context() { let already_expanded_reply_ctx = json!({ "type": ["h-entry"], "properties": { "content": ["Hello world!"] } }); let mf2 = json!({ "type": ["h-entry"], "properties": { "like-of": [ "https://fireburn.ru/posts/example", already_expanded_reply_ctx, "https://fireburn.ru/posts/non-existent" ] } }); let test_ctx = json!({ "type": ["h-entry"], "properties": { "content": ["This is a post which was reacted to."] } }); let fetched_ctx_url: url::Url = "https://fireburn.ru/posts/example".parse().unwrap(); let reply_contexts = vec![(fetched_ctx_url.clone(), FetchedPostContext { url: fetched_ctx_url.clone(), mf2: json!({ "items": [test_ctx] }), webmention: None, })].into_iter().collect(); let like_of = super::populate_reply_context(&mf2, "like-of", &reply_contexts).unwrap(); assert_eq!(like_of[0]["properties"]["content"], test_ctx["properties"]["content"]); assert_eq!(like_of[0]["properties"]["url"][0].as_str().unwrap(), reply_contexts[&fetched_ctx_url].url.as_str()); assert_eq!(like_of[1], already_expanded_reply_ctx); assert_eq!(like_of[2], "https://fireburn.ru/posts/non-existent"); } #[tokio::test] async fn test_post_reject_scope() { let db = crate::database::MemoryStorage::default(); let post = json!({ "type": ["h-entry"], "properties": { "content": ["Hello world!"] } }); let user = TokenData { me: "https://localhost:8080/".parse().unwrap(), client_id: "https://kittybox.fireburn.ru/".parse().unwrap(), scope: Scopes::new(vec![Scope::Profile]), iat: None, exp: None }; let (uid, mf2) = super::normalize_mf2(post, &user); let err = super::_post(&user, uid, mf2, db.clone(), reqwest::Client::new(), Arc::new(Mutex::new(tokio::task::JoinSet::new()))) .await .unwrap_err(); assert_eq!(err.error, super::ErrorType::InvalidScope); let hashmap = db.mapping.read().await; assert!(hashmap.is_empty()); } #[tokio::test] async fn test_post_reject_different_user() { let db = crate::database::MemoryStorage::default(); let post = json!({ "type": ["h-entry"], "properties": { "content": ["Hello world!"], "uid": ["https://fireburn.ru/posts/hello"], "url": ["https://fireburn.ru/posts/hello"] } }); let user = TokenData { me: "https://aaronparecki.com/".parse().unwrap(), client_id: "https://kittybox.fireburn.ru/".parse().unwrap(), scope: Scopes::new(vec![Scope::Profile, Scope::Create, Scope::Update, Scope::Media]), iat: None, exp: None }; let (uid, mf2) = super::normalize_mf2(post, &user); let err = super::_post(&user, uid, mf2, db.clone(), reqwest::Client::new(), Arc::new(Mutex::new(tokio::task::JoinSet::new()))) .await .unwrap_err(); assert_eq!(err.error, super::ErrorType::Forbidden); let hashmap = db.mapping.read().await; assert!(hashmap.is_empty()); } #[tokio::test] async fn test_post_mf2() { let db = crate::database::MemoryStorage::default(); let post = json!({ "type": ["h-entry"], "properties": { "content": ["Hello world!"] } }); let user = TokenData { me: "https://localhost:8080/".parse().unwrap(), client_id: "https://kittybox.fireburn.ru/".parse().unwrap(), scope: Scopes::new(vec![Scope::Profile, Scope::Create]), iat: None, exp: None }; let (uid, mf2) = super::normalize_mf2(post, &user); let res = super::_post(&user, uid, mf2, db.clone(), reqwest::Client::new(), Arc::new(Mutex::new(tokio::task::JoinSet::new()))) .await .unwrap(); assert!(res.headers().contains_key("Location")); let location = res.headers().get("Location").unwrap(); assert!(db.post_exists(location.to_str().unwrap()).await.unwrap()); assert!(db .post_exists("https://localhost:8080/feeds/main") .await .unwrap()); } #[tokio::test] async fn test_query_foreign_url() { let mut res = super::query( State(crate::database::MemoryStorage::default()), Some(axum::extract::Query(super::MicropubQuery::source( "https://aaronparecki.com/feeds/main", ))), Host("aaronparecki.com".to_owned()), crate::indieauth::User::<crate::indieauth::backend::fs::FileBackend>( TokenData { me: "https://fireburn.ru/".parse().unwrap(), client_id: "https://kittybox.fireburn.ru/".parse().unwrap(), scope: Scopes::new(vec![Scope::Profile, Scope::Create, Scope::Update, Scope::Media]), iat: None, exp: None }, std::marker::PhantomData ) ) .await; assert_eq!(res.status(), 401); let body = res .into_body() .into_data_stream() .collect::<Vec<Result<Bytes, axum::Error>>>() .await .into_iter() .map(Result::unwrap) .by_ref() .fold(Vec::new(), |mut a, i| { a.extend(i); a}); let json: MicropubError = serde_json::from_slice(&body as &[u8]).unwrap(); assert_eq!(json.error, super::ErrorType::NotAuthorized); } }