use crate::database::{MicropubChannel, Storage, StorageError};
use crate::indieauth::backend::AuthBackend;
use crate::indieauth::User;
use crate::micropub::util::form_to_mf2_json;
use axum::extract::{BodyStream, Query, Host};
use axum::headers::ContentType;
use axum::response::{IntoResponse, Response};
use axum::TypedHeader;
use axum::{http::StatusCode, Extension};
use serde::{Deserialize, Serialize};
use serde_json::json;
use tracing::{debug, error, info, warn};
use kittybox_indieauth::{Scope, TokenData};
use kittybox_util::{MicropubError, ErrorType};
#[derive(Serialize, Deserialize, Debug, PartialEq)]
#[serde(rename_all = "kebab-case")]
enum QueryType {
Source,
Config,
Channel,
SyndicateTo,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct MicropubQuery {
q: QueryType,
url: Option<String>,
}
impl From<StorageError> for MicropubError {
fn from(err: StorageError) -> Self {
Self {
error: match err.kind() {
crate::database::ErrorKind::NotFound => ErrorType::NotFound,
_ => ErrorType::InternalServerError,
},
error_description: format!("Backend error: {}", err),
}
}
}
mod util;
pub(crate) use util::normalize_mf2;
#[derive(Debug)]
struct FetchedPostContext {
url: url::Url,
mf2: serde_json::Value,
webmention: Option<url::Url>,
}
fn populate_reply_context(
mf2: &serde_json::Value,
prop: &str,
ctxs: &[FetchedPostContext],
) -> Option<serde_json::Value> {
mf2["properties"][prop].as_array().map(|array| {
json!(array
.iter()
// TODO: This seems to be O(n^2) and I don't like it.
// Switching `ctxs` to a hashmap might speed it up to O(n)
// The key would be the URL/UID
.map(|i| ctxs
.iter()
.find(|ctx| Some(ctx.url.as_str()) == i.as_str())
.and_then(|ctx| ctx.mf2["items"].get(0))
.unwrap_or(i))
.collect::<Vec<&serde_json::Value>>())
})
}
#[tracing::instrument(skip(db))]
async fn background_processing<D: 'static + Storage>(
db: D,
mf2: serde_json::Value,
http: reqwest::Client,
) -> () {
// TODO: Post-processing the post (aka second write pass)
// - [x] Download rich reply contexts
// - [ ] Syndicate the post if requested, add links to the syndicated copies
// - [ ] Send WebSub notifications to the hub (if we happen to have one)
// - [x] Send webmentions
use futures_util::StreamExt;
let uid: &str = mf2["properties"]["uid"][0].as_str().unwrap();
let context_props = ["in-reply-to", "like-of", "repost-of", "bookmark-of"];
let mut context_urls: Vec<url::Url> = vec![];
for prop in &context_props {
if let Some(array) = mf2["properties"][prop].as_array() {
context_urls.extend(
array
.iter()
.filter_map(|v| v.as_str())
.filter_map(|v| v.parse::<url::Url>().ok()),
);
}
}
// TODO parse HTML in e-content and add links found here
context_urls.sort_unstable_by_key(|u| u.to_string());
context_urls.dedup();
// TODO: Make a stream to fetch all these posts and convert them to MF2
let post_contexts = {
let http = &http;
tokio_stream::iter(context_urls.into_iter())
.then(move |url: url::Url| http.get(url).send())
.filter_map(|response| futures::future::ready(response.ok()))
.filter(|response| futures::future::ready(response.status() == 200))
.filter_map(|response: reqwest::Response| async move {
// 1. We need to preserve the URL
// 2. We need to get the HTML for MF2 processing
// 3. We need to get the webmention endpoint address
// All of that can be done in one go.
let url = response.url().clone();
// TODO parse link headers
let links = response
.headers()
.get_all(hyper::http::header::LINK)
.iter()
.cloned()
.collect::<Vec<hyper::http::HeaderValue>>();
let html = response.text().await;
if html.is_err() {
return None;
}
let html = html.unwrap();
let mf2 = microformats::from_html(&html, url.clone()).unwrap();
// TODO use first Link: header if available
let webmention: Option<url::Url> = mf2
.rels
.by_rels()
.get("webmention")
.and_then(|i| i.first().cloned());
dbg!(Some(FetchedPostContext {
url,
mf2: serde_json::to_value(mf2).unwrap(),
webmention
}))
})
.collect::<Vec<FetchedPostContext>>()
.await
};
let mut update = json!({ "replace": {} });
for prop in &context_props {
if let Some(json) = populate_reply_context(&mf2, prop, &post_contexts) {
update["replace"][prop] = json;
}
}
if !update["replace"].as_object().unwrap().is_empty() {
if let Err(err) = db.update_post(uid, update).await {
error!("Failed to update post with rich reply contexts: {}", err);
}
}
// At this point we can start syndicating the post.
// Currently we don't really support any syndication endpoints, but still!
/*if let Some(syndicate_to) = mf2["properties"]["mp-syndicate-to"].as_array() {
let http = &http;
tokio_stream::iter(syndicate_to)
.filter_map(|i| futures::future::ready(i.as_str()))
.for_each_concurrent(3, |s: &str| async move {
#[allow(clippy::match_single_binding)]
match s {
_ => {
todo!("Syndicate to generic webmention-aware service {}", s);
}
// TODO special handling for non-webmention-aware services like the birdsite
}
})
.await;
}*/
{
let http = &http;
tokio_stream::iter(
post_contexts
.into_iter()
.filter(|ctx| ctx.webmention.is_some()),
)
.for_each_concurrent(2, |ctx| async move {
let mut map = std::collections::HashMap::new();
map.insert("source", uid);
map.insert("target", ctx.url.as_str());
match http
.post(ctx.webmention.unwrap().clone())
.form(&map)
.send()
.await
{
Ok(res) => {
if !res.status().is_success() {
warn!(
"Failed to send a webmention for {}: got HTTP {}",
ctx.url,
res.status()
);
} else {
info!(
"Sent a webmention to {}, got HTTP {}",
ctx.url,
res.status()
)
}
}
Err(err) => warn!("Failed to send a webmention for {}: {}", ctx.url, err),
}
})
.await;
}
}
// TODO actually save the post to the database and schedule post-processing
pub(crate) async fn _post<D: 'static + Storage>(
user: &TokenData,
uid: String,
mf2: serde_json::Value,
db: D,
http: reqwest::Client,
) -> Result<Response, MicropubError> {
// Here, we have the following guarantees:
// - The MF2-JSON document is normalized (guaranteed by normalize_mf2)
// - The MF2-JSON document contains a UID
// - The MF2-JSON document's URL list contains its UID
// - The MF2-JSON document's "content" field contains an HTML blob, if present
// - The MF2-JSON document's publishing datetime is present
// - The MF2-JSON document's target channels are set
// - The MF2-JSON document's author is set
// Security check! Do we have an OAuth2 scope to proceed?
if !user.check_scope(&Scope::Create) {
return Err(MicropubError {
error: ErrorType::InvalidScope,
error_description: "Not enough privileges - try acquiring the \"create\" scope."
.to_owned(),
});
}
// Security check #2! Are we posting to our own website?
if !uid.starts_with(user.me.as_str())
|| mf2["properties"]["channel"]
.as_array()
.unwrap_or(&vec![])
.iter()
.any(|url| !url.as_str().unwrap().starts_with(user.me.as_str()))
{
return Err(MicropubError {
error: ErrorType::Forbidden,
error_description: "You're posting to a website that's not yours.".to_owned(),
});
}
// Security check #3! Are we overwriting an existing document?
if db.post_exists(&uid).await? {
return Err(MicropubError {
error: ErrorType::AlreadyExists,
error_description: "UID clash was detected, operation aborted.".to_owned(),
});
}
// Save the post
db.put_post(&mf2, user.me.as_str()).await?;
let mut channels = mf2["properties"]["channel"]
.as_array()
.unwrap()
.iter()
.map(|i| i.as_str().unwrap_or(""))
.filter(|i| !i.is_empty());
let default_channel = user
.me
.join(util::DEFAULT_CHANNEL_PATH)
.unwrap()
.to_string();
let vcards_channel = user
.me
.join(util::CONTACTS_CHANNEL_PATH)
.unwrap()
.to_string();
let food_channel = user.me.join(util::FOOD_CHANNEL_PATH).unwrap().to_string();
let default_channels = vec![default_channel, vcards_channel, food_channel];
for chan in &mut channels {
if db.post_exists(chan).await? {
db.update_post(chan, json!({"add": {"children": [uid]}}))
.await?;
} else if default_channels.iter().any(|i| chan == i) {
util::create_feed(&db, &uid, chan, user).await?;
} else {
warn!("Ignoring non-existent channel: {}", chan);
}
}
let reply =
IntoResponse::into_response((StatusCode::ACCEPTED, [("Location", uid.as_str())]));
tokio::task::spawn(background_processing(db, mf2, http));
Ok(reply)
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "snake_case")]
enum ActionType {
Delete,
Update,
}
#[derive(Serialize, Deserialize)]
struct MicropubFormAction {
action: ActionType,
url: String,
}
#[derive(Serialize, Deserialize, Debug)]
struct MicropubAction {
action: ActionType,
url: String,
#[serde(skip_serializing_if = "Option::is_none")]
replace: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
add: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
delete: Option<serde_json::Value>,
}
impl From<MicropubFormAction> for MicropubAction {
fn from(a: MicropubFormAction) -> Self {
Self {
action: a.action,
url: a.url,
replace: None,
add: None,
delete: None,
}
}
}
#[tracing::instrument(skip(db))]
async fn post_action<D: Storage, A: AuthBackend>(
action: MicropubAction,
db: D,
user: User<A>,
) -> Result<(), MicropubError> {
let uri = if let Ok(uri) = action.url.parse::<hyper::Uri>() {
uri
} else {
return Err(MicropubError {
error: ErrorType::InvalidRequest,
error_description: "Your URL doesn't parse properly.".to_owned(),
});
};
if uri.authority().unwrap()
!= user
.me
.as_str()
.parse::<hyper::Uri>()
.unwrap()
.authority()
.unwrap()
{
return Err(MicropubError {
error: ErrorType::Forbidden,
error_description: "Don't tamper with others' posts!".to_owned(),
});
}
match action.action {
ActionType::Delete => {
if !user.check_scope(&Scope::Delete) {
return Err(MicropubError {
error: ErrorType::InvalidScope,
error_description: "You need a \"delete\" scope for this.".to_owned(),
});
}
db.delete_post(&action.url).await?
}
ActionType::Update => {
if !user.check_scope(&Scope::Update) {
return Err(MicropubError {
error: ErrorType::InvalidScope,
error_description: "You need an \"update\" scope for this.".to_owned(),
});
}
db.update_post(
&action.url,
// Here, unwrapping is safe, because this value
// was recently deserialized from JSON already.
serde_json::to_value(&action).unwrap(),
)
.await?
}
}
Ok(())
}
enum PostBody {
Action(MicropubAction),
MF2(serde_json::Value),
}
#[tracing::instrument]
async fn dispatch_body(
mut body: BodyStream,
content_type: ContentType,
) -> Result<PostBody, MicropubError> {
let body: Vec<u8> = {
debug!("Buffering body...");
use tokio_stream::StreamExt;
let mut buf = Vec::default();
while let Some(chunk) = body.next().await {
buf.extend_from_slice(&chunk.unwrap())
}
buf
};
debug!("Content-Type: {:?}", content_type);
if content_type == ContentType::json() {
if let Ok(action) = serde_json::from_slice::<MicropubAction>(&body) {
Ok(PostBody::Action(action))
} else if let Ok(body) = serde_json::from_slice::<serde_json::Value>(&body) {
// quick sanity check
if !body.is_object() || !body["type"].is_array() {
return Err(MicropubError {
error: ErrorType::InvalidRequest,
error_description: "Invalid MF2-JSON detected: `.` should be an object, `.type` should be an array of MF2 types".to_owned()
});
}
Ok(PostBody::MF2(body))
} else {
Err(MicropubError {
error: ErrorType::InvalidRequest,
error_description: "Invalid JSON object passed.".to_owned(),
})
}
} else if content_type == ContentType::form_url_encoded() {
if let Ok(body) = serde_urlencoded::from_bytes::<MicropubFormAction>(&body) {
Ok(PostBody::Action(body.into()))
} else if let Ok(body) = serde_urlencoded::from_bytes::<Vec<(String, String)>>(&body) {
Ok(PostBody::MF2(form_to_mf2_json(body)))
} else {
Err(MicropubError {
error: ErrorType::InvalidRequest,
error_description: "Invalid form-encoded data. Try h=entry&content=Hello!"
.to_owned(),
})
}
} else {
Err(MicropubError::new(
ErrorType::UnsupportedMediaType,
"This Content-Type is not recognized. Try application/json instead?",
))
}
}
#[tracing::instrument(skip(db, http))]
pub(crate) async fn post<D: Storage + 'static, A: AuthBackend>(
Extension(db): Extension<D>,
Extension(http): Extension<reqwest::Client>,
user: User<A>,
body: BodyStream,
TypedHeader(content_type): TypedHeader<ContentType>,
) -> axum::response::Response {
match dispatch_body(body, content_type).await {
Ok(PostBody::Action(action)) => match post_action(action, db, user).await {
Ok(()) => Response::default(),
Err(err) => err.into_response(),
},
Ok(PostBody::MF2(mf2)) => {
let (uid, mf2) = normalize_mf2(mf2, &user);
match _post(&user, uid, mf2, db, http).await {
Ok(response) => response,
Err(err) => err.into_response(),
}
}
Err(err) => err.into_response(),
}
}
#[tracing::instrument(skip(db))]
pub(crate) async fn query<D: Storage, A: AuthBackend>(
Extension(db): Extension<D>,
query: Option<Query<MicropubQuery>>,
Host(host): Host,
user: User<A>,
) -> axum::response::Response {
// We handle the invalid query case manually to return a
// MicropubError instead of HTTP 422
let query = if let Some(Query(query)) = query {
query
} else {
return MicropubError::new(
ErrorType::InvalidRequest,
"Invalid query provided. Try ?q=config to see what you can do."
).into_response();
};
if axum::http::Uri::try_from(user.me.as_str())
.unwrap()
.authority()
.unwrap()
!= &host
{
return MicropubError::new(
ErrorType::NotAuthorized,
"This website doesn't belong to you.",
)
.into_response();
}
match query.q {
QueryType::Config => {
let channels: Vec<MicropubChannel> = match db.get_channels(user.me.as_str()).await {
Ok(chans) => chans,
Err(err) => {
return MicropubError::new(
ErrorType::InternalServerError,
&format!("Error fetching channels: {}", err),
)
.into_response()
}
};
axum::response::Json(json!({
"q": [
QueryType::Source,
QueryType::Config,
QueryType::Channel,
QueryType::SyndicateTo
],
"channels": channels,
"_kittybox_authority": user.me.as_str(),
"syndicate-to": [],
"media_endpoint": user.me.join("/.kittybox/media").unwrap().as_str()
}))
.into_response()
}
QueryType::Source => {
match query.url {
Some(url) => {
match db.get_post(&url).await {
Ok(some) => match some {
Some(post) => axum::response::Json(&post).into_response(),
None => MicropubError::new(
ErrorType::NotFound,
"The specified MF2 object was not found in database.",
)
.into_response(),
},
Err(err) => MicropubError::new(
ErrorType::InternalServerError,
&format!("Backend error: {}", err),
)
.into_response(),
}
}
None => {
// Here, one should probably attempt to query at least the main feed and collect posts
// Using a pre-made query function can't be done because it does unneeded filtering
// Don't implement for now, this is optional
MicropubError::new(
ErrorType::InvalidRequest,
"Querying for post list is not implemented yet.",
)
.into_response()
}
}
}
QueryType::Channel => match db.get_channels(user.me.as_str()).await {
Ok(chans) => axum::response::Json(json!({ "channels": chans })).into_response(),
Err(err) => MicropubError::new(
ErrorType::InternalServerError,
&format!("Error fetching channels: {}", err),
)
.into_response(),
},
QueryType::SyndicateTo => {
axum::response::Json(json!({ "syndicate-to": [] })).into_response()
}
}
}
pub fn router<S, A>(
storage: S,
http: reqwest::Client,
auth: A
) -> axum::routing::MethodRouter
where
S: Storage + 'static,
A: AuthBackend
{
axum::routing::get(query::<S, A>)
.post(post::<S, A>)
.layer(tower_http::cors::CorsLayer::new()
.allow_methods([
axum::http::Method::GET,
axum::http::Method::POST,
])
.allow_origin(tower_http::cors::Any))
.layer(axum::Extension(storage))
.layer(axum::Extension(http))
.layer(axum::Extension(auth))
}
#[cfg(test)]
#[allow(dead_code)]
impl MicropubQuery {
fn config() -> Self {
Self {
q: QueryType::Config,
url: None,
}
}
fn source(url: &str) -> Self {
Self {
q: QueryType::Source,
url: Some(url.to_owned()),
}
}
}
#[cfg(test)]
mod tests {
use crate::{database::Storage, micropub::MicropubError};
use hyper::body::HttpBody;
use serde_json::json;
use super::FetchedPostContext;
use kittybox_indieauth::{Scopes, Scope, TokenData};
use axum::extract::Host;
#[test]
fn test_populate_reply_context() {
let already_expanded_reply_ctx = json!({
"type": ["h-entry"],
"properties": {
"content": ["Hello world!"]
}
});
let mf2 = json!({
"type": ["h-entry"],
"properties": {
"like-of": [
"https://fireburn.ru/posts/example",
already_expanded_reply_ctx,
"https://fireburn.ru/posts/non-existent"
]
}
});
let test_ctx = json!({
"type": ["h-entry"],
"properties": {
"content": ["This is a post which was reacted to."]
}
});
let reply_contexts = vec![FetchedPostContext {
url: "https://fireburn.ru/posts/example".parse().unwrap(),
mf2: json!({ "items": [test_ctx] }),
webmention: None,
}];
let like_of = super::populate_reply_context(&mf2, "like-of", &reply_contexts).unwrap();
assert_eq!(like_of[0], test_ctx);
assert_eq!(like_of[1], already_expanded_reply_ctx);
assert_eq!(like_of[2], "https://fireburn.ru/posts/non-existent");
}
#[tokio::test]
async fn test_post_reject_scope() {
let db = crate::database::MemoryStorage::new();
let post = json!({
"type": ["h-entry"],
"properties": {
"content": ["Hello world!"]
}
});
let user = TokenData {
me: "https://localhost:8080/".parse().unwrap(),
client_id: "https://kittybox.fireburn.ru/".parse().unwrap(),
scope: Scopes::new(vec![Scope::Profile]),
iat: None, exp: None
};
let (uid, mf2) = super::normalize_mf2(post, &user);
let err = super::_post(&user, uid, mf2, db.clone(), reqwest::Client::new())
.await
.unwrap_err();
assert_eq!(err.error, super::ErrorType::InvalidScope);
let hashmap = db.mapping.read().await;
assert!(hashmap.is_empty());
}
#[tokio::test]
async fn test_post_reject_different_user() {
let db = crate::database::MemoryStorage::new();
let post = json!({
"type": ["h-entry"],
"properties": {
"content": ["Hello world!"],
"uid": ["https://fireburn.ru/posts/hello"],
"url": ["https://fireburn.ru/posts/hello"]
}
});
let user = TokenData {
me: "https://aaronparecki.com/".parse().unwrap(),
client_id: "https://kittybox.fireburn.ru/".parse().unwrap(),
scope: Scopes::new(vec![Scope::Profile, Scope::Create, Scope::Update, Scope::Media]),
iat: None, exp: None
};
let (uid, mf2) = super::normalize_mf2(post, &user);
let err = super::_post(&user, uid, mf2, db.clone(), reqwest::Client::new())
.await
.unwrap_err();
assert_eq!(err.error, super::ErrorType::Forbidden);
let hashmap = db.mapping.read().await;
assert!(hashmap.is_empty());
}
#[tokio::test]
async fn test_post_mf2() {
let db = crate::database::MemoryStorage::new();
let post = json!({
"type": ["h-entry"],
"properties": {
"content": ["Hello world!"]
}
});
let user = TokenData {
me: "https://localhost:8080/".parse().unwrap(),
client_id: "https://kittybox.fireburn.ru/".parse().unwrap(),
scope: Scopes::new(vec![Scope::Profile, Scope::Create]),
iat: None, exp: None
};
let (uid, mf2) = super::normalize_mf2(post, &user);
let res = super::_post(&user, uid, mf2, db.clone(), reqwest::Client::new())
.await
.unwrap();
assert!(res.headers().contains_key("Location"));
let location = res.headers().get("Location").unwrap();
assert!(db.post_exists(location.to_str().unwrap()).await.unwrap());
assert!(db
.post_exists("https://localhost:8080/feeds/main")
.await
.unwrap());
}
#[tokio::test]
async fn test_query_foreign_url() {
let mut res = super::query(
axum::Extension(crate::database::MemoryStorage::new()),
Some(axum::extract::Query(super::MicropubQuery::source(
"https://aaronparecki.com/feeds/main",
))),
Host("aaronparecki.com".to_owned()),
crate::indieauth::User::<crate::indieauth::backend::fs::FileBackend>(
TokenData {
me: "https://fireburn.ru/".parse().unwrap(),
client_id: "https://kittybox.fireburn.ru/".parse().unwrap(),
scope: Scopes::new(vec![Scope::Profile, Scope::Create, Scope::Update, Scope::Media]),
iat: None, exp: None
}, std::marker::PhantomData
)
)
.await;
assert_eq!(res.status(), 401);
let body = res.body_mut().data().await.unwrap().unwrap();
let json: MicropubError = serde_json::from_slice(&body as &[u8]).unwrap();
assert_eq!(json.error, super::ErrorType::NotAuthorized);
}
}