use std::convert::Infallible;
use either::Either;
use log::warn;
use warp::http::StatusCode;
use warp::{Filter, Rejection, reject::InvalidQuery};
use serde_json::json;
use serde::{Serialize, Deserialize};
use crate::database::{MicropubChannel, Storage, StorageError};
use crate::indieauth::User;
use crate::micropub::util::form_to_mf2_json;
#[derive(Serialize, Deserialize, Debug, PartialEq)]
#[serde(rename_all = "kebab-case")]
enum QueryType {
Source,
Config,
Channel,
SyndicateTo
}
#[derive(Serialize, Deserialize, Debug)]
struct MicropubQuery {
q: QueryType,
url: Option<String>
}
#[derive(Serialize, Deserialize, PartialEq, Debug)]
#[serde(rename_all = "snake_case")]
enum ErrorType {
AlreadyExists,
Forbidden,
InternalServerError,
InvalidRequest,
InvalidScope,
NotAuthorized,
NotFound,
UnsupportedMediaType
}
#[derive(Serialize, Deserialize, Debug)]
struct MicropubError {
error: ErrorType,
error_description: String
}
impl From<StorageError> for MicropubError {
fn from(err: StorageError) -> Self {
Self {
error: match err.kind() {
crate::database::ErrorKind::NotFound => ErrorType::NotFound,
_ => ErrorType::InternalServerError
},
error_description: format!("Backend error: {}", err.to_string())
}
}
}
impl From<&MicropubError> for StatusCode {
fn from(err: &MicropubError) -> Self {
use ErrorType::*;
match err.error {
AlreadyExists => StatusCode::CONFLICT,
Forbidden => StatusCode::FORBIDDEN,
InternalServerError => StatusCode::INTERNAL_SERVER_ERROR,
InvalidRequest => StatusCode::BAD_REQUEST,
InvalidScope => StatusCode::UNAUTHORIZED,
NotAuthorized => StatusCode::UNAUTHORIZED,
NotFound => StatusCode::NOT_FOUND,
UnsupportedMediaType => StatusCode::UNSUPPORTED_MEDIA_TYPE,
}
}
}
impl From<MicropubError> for StatusCode {
fn from(err: MicropubError) -> Self {
(&err).into()
}
}
impl From<serde_json::Error> for MicropubError {
fn from(err: serde_json::Error) -> Self {
use ErrorType::*;
Self {
error: InvalidRequest,
error_description: err.to_string()
}
}
}
impl MicropubError {
fn new(error: ErrorType, error_description: &str) -> Self {
Self {
error,
error_description: error_description.to_owned()
}
}
}
impl warp::reject::Reject for MicropubError {}
mod post;
#[allow(unused_variables)]
pub mod media {
use futures_util::StreamExt;
use bytes::buf::Buf;
use warp::{Filter, Rejection, Reply, multipart::{FormData, Part}};
pub fn query() -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone {
warp::get()
.and(crate::util::require_host())
.map(|host| "media endpoint query...")
}
pub fn options() -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone {
warp::options()
.map(|| warp::reply::json::<Option<()>>(&None))
.with(warp::reply::with::header("Allow", "GET, POST"))
}
pub fn upload() -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone {
warp::post()
.and(crate::util::require_host())
.and(warp::multipart::form().max_length(1024*1024*150/*mb*/))
.and_then(|host, mut form: FormData| async move {
// TODO get rid of the double unwrap() here
let file: Part = form.next().await.unwrap().unwrap();
log::debug!("Uploaded: {:?}, type: {:?}", file.filename(), file.content_type());
let mut data = file.stream();
while let Some(buf) = data.next().await {
// TODO save it into a file
log::debug!("buffer length: {:?}", buf.map(|b| b.remaining()));
}
Ok::<_, warp::Rejection>(warp::reply::with_header(
warp::reply::with_status(
"",
warp::http::StatusCode::CREATED
),
"Location",
"./awoo.png"
))
})
}
pub fn media() -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone {
upload()
.or(query())
.or(options())
}
}
mod util {
use serde_json::json;
pub(crate) fn form_to_mf2_json(form: Vec<(String, String)>) -> serde_json::Value {
let mut mf2 = json!({"type": [], "properties": {}});
for (k, v) in form {
if k == "h" {
mf2["type"]
.as_array_mut()
.unwrap()
.push(json!("h-".to_string() + &v));
} else if k != "access_token" {
let key = k.strip_suffix("[]").unwrap_or(&k);
match mf2["properties"][key].as_array_mut() {
Some(prop) => prop.push(json!(v)),
None => mf2["properties"][key] = json!([v]),
}
}
}
if mf2["type"].as_array().unwrap().is_empty() {
mf2["type"].as_array_mut().unwrap().push(json!("h-entry"));
}
mf2
}
#[cfg(test)]
mod tests {
use serde_json::json;
#[test]
fn test_form_to_mf2() {
assert_eq!(
super::form_to_mf2_json(
serde_urlencoded::from_str(
"h=entry&content=something%20interesting"
).unwrap()
),
json!({
"type": ["h-entry"],
"properties": {
"content": ["something interesting"]
}
})
)
}
}
}
// TODO actually save the post to the database and schedule post-processing
async fn _post<D: Storage, T: hyper::client::connect::Connect + Clone + Send + Sync + 'static>(
user: crate::indieauth::User,
uid: String,
mf2: serde_json::Value,
db: D,
http: hyper::Client<T, hyper::Body>
) -> Result<impl warp::Reply, MicropubError> {
// Here, we have the following guarantees:
// - The user is the same user for this host (guaranteed by ensure_same_user)
// - The MF2-JSON document is normalized (guaranteed by normalize_mf2)\
// - The MF2-JSON document contains a UID
// - The MF2-JSON document's URL list contains its UID
// - The MF2-JSON document's "content" field contains an HTML blob, if present
// - The MF2-JSON document's publishing datetime is present
// - The MF2-JSON document's target channels are set
// - The MF2-JSON document's author is set
// Security check! Do we have an oAuth2 scope to proceed?
if !user.check_scope("create") {
return Err(MicropubError {
error: ErrorType::InvalidScope,
error_description: "Not enough privileges - try acquiring the \"create\" scope.".to_owned()
});
}
// Security check #2! Are we posting to our own website?
if !uid.starts_with(user.me.as_str()) || mf2["properties"]["channel"]
.as_array()
.unwrap_or(&vec![])
.iter()
.any(|url| !url.as_str().unwrap().starts_with(user.me.as_str()))
{
return Err(MicropubError {
error: ErrorType::Forbidden,
error_description: "You're posting to a website that's not yours.".to_owned()
});
}
// Security check #3! Are we overwriting an existing document?
if db.post_exists(&uid).await? {
return Err(MicropubError {
error: ErrorType::AlreadyExists,
error_description: "UID clash was detected, operation aborted.".to_owned()
});
}
// Save the post
db.put_post(&mf2, user.me.as_str()).await?;
let mut channels = mf2["properties"]["channel"]
.as_array()
.unwrap()
.iter()
.map(|i| i.as_str().unwrap_or(""))
.filter(|i| !i.is_empty());
let default_channel = user.me.join(post::DEFAULT_CHANNEL_PATH).unwrap().to_string();
let vcards_channel = user.me.join(post::CONTACTS_CHANNEL_PATH).unwrap().to_string();
let food_channel = user.me.join(post::FOOD_CHANNEL_PATH).unwrap().to_string();
let default_channels = vec![default_channel, vcards_channel, food_channel];
for chan in &mut channels {
if db.post_exists(chan).await? {
db.update_post(chan, json!({"add": {"children": [uid]}})).await?;
} else if default_channels.iter().any(|i| chan == i) {
post::create_feed(&db, &uid, chan, &user).await?;
} else {
warn!("Ignoring non-existent channel: {}", chan);
}
}
let reply = warp::reply::with_status(
warp::reply::with_header(
warp::reply::json(&json!({"location": &uid})),
"Location", &uid
),
StatusCode::ACCEPTED
);
// TODO: Post-processing the post (aka second write pass)
// - [-] Download rich reply contexts
// - [-] Syndicate the post if requested, add links to the syndicated copies
// - [ ] Send WebSub notifications to the hub (if we happen to have one)
// - [x] Send webmentions
#[allow(unused_imports)]
tokio::task::spawn(async move {
use hyper::{Uri, Response, Body, body::HttpBody};
use bytes::{Buf, BufMut};
use futures_util::StreamExt;
let mut contextually_significant_posts: Vec<hyper::Uri> = vec![];
for prop in &["in-reply-to", "like-of", "repost-of", "bookmark-of"] {
if let Some(array) = mf2["properties"][prop].as_array() {
contextually_significant_posts.extend(
array
.iter()
.filter_map(|v| v.as_str().and_then(|v| v.parse::<hyper::Uri>().ok())),
);
}
}
contextually_significant_posts.sort_unstable_by_key(|u| u.to_string());
contextually_significant_posts.dedup();
// TODO: Make a stream to fetch all these posts and convert them to MF2
todo!()
});
Ok(reply)
}
#[derive(Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
enum ActionType {
Delete,
Update
}
#[derive(Serialize, Deserialize)]
struct MicropubFormAction {
action: ActionType,
url: String
}
#[derive(Serialize, Deserialize)]
struct MicropubAction {
action: ActionType,
url: String,
#[serde(skip_serializing_if = "Option::is_none")]
replace: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
add: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
delete: Option<serde_json::Value>
}
impl From<MicropubFormAction> for MicropubAction {
fn from(a: MicropubFormAction) -> Self {
Self {
action: a.action,
url: a.url,
replace: None, add: None, delete: None
}
}
}
// TODO perform the requested actions synchronously
async fn post_action<D: Storage>(
action: MicropubAction,
db: D,
user: User
) -> Result<impl warp::Reply, MicropubError> {
let uri = if let Ok(uri) = action.url.parse::<hyper::Uri>() {
uri
} else {
return Err(MicropubError {
error: ErrorType::InvalidRequest,
error_description: "Your URL doesn't parse properly.".to_owned()
});
};
if uri.authority().unwrap() != user.me.as_str().parse::<hyper::Uri>().unwrap().authority().unwrap() {
return Err(MicropubError {
error: ErrorType::Forbidden,
error_description: "Don't tamper with others' posts!".to_owned()
});
}
match action.action {
ActionType::Delete => {
if !user.check_scope("delete") {
return Err(MicropubError {
error: ErrorType::InvalidScope,
error_description: "You need a \"delete\" scope for this.".to_owned()
});
}
db.delete_post(&action.url).await?
},
ActionType::Update => {
if !user.check_scope("update") {
return Err(MicropubError {
error: ErrorType::InvalidScope,
error_description: "You need an \"update\" scope for this.".to_owned()
});
}
db.update_post(
&action.url,
// Here, unwrapping is safe, because this value
// was recently deserialized from JSON already.
serde_json::to_value(&action).unwrap()
).await?
},
}
Ok(warp::reply::reply())
}
async fn check_auth(host: warp::host::Authority, user: User) -> Result<User, warp::Rejection> {
let user_authority = warp::http::Uri::try_from(user.me.as_str())
.unwrap()
.authority()
.unwrap()
.clone();
// TODO compare with potential list of allowed websites
// to allow one user to edit several websites with one token
if host != user_authority {
Err(warp::reject::custom(MicropubError::new(
ErrorType::NotAuthorized,
"This user is not authorized to use Micropub on this website."
)))
} else {
Ok(user)
}
}
#[cfg(any(not(debug_assertions), test))]
fn ensure_same_user_as_host<T>(
token_endpoint: String,
http: hyper::Client<T, hyper::Body>
) -> impl Filter<Extract = (User,), Error = warp::Rejection> + Clone
where T: hyper::client::connect::Connect + Clone + Send + Sync + 'static {
crate::util::require_host()
.and(crate::indieauth::require_token(token_endpoint, http))
.and_then(check_auth)
}
async fn dispatch_post_body(
mut body: impl bytes::Buf,
mimetype: http_types::Mime
) -> Result<Either<MicropubAction, serde_json::Value>, warp::Rejection> {
// Since hyper::common::buf::BufList doesn't implement Clone, we can't use Clone in here
// We have to copy the body. Ugh!!!
// so much for zero-copy buffers
let body = {
let mut _body: Vec<u8> = Vec::default();
while body.has_remaining() {
_body.extend(body.chunk());
body.advance(body.chunk().len());
}
_body
};
match mimetype.essence() {
"application/json" => {
if let Ok(body) = serde_json::from_slice::<MicropubAction>(&body) {
Ok(Either::Left(body))
} else if let Ok(body) = serde_json::from_slice::<serde_json::Value>(&body) {
// quick sanity check
if !body.is_object() || !body["type"].is_array() {
return Err(MicropubError {
error: ErrorType::InvalidRequest,
error_description: "Invalid MF2-JSON detected: `.` should be an object, `.type` should be an array of MF2 types".to_owned()
}.into())
}
Ok(Either::Right(body))
} else {
Err(MicropubError {
error: ErrorType::InvalidRequest,
error_description: "Invalid JSON object passed.".to_owned()
}.into())
}
},
"application/x-www-form-urlencoded" => {
if let Ok(body) = serde_urlencoded::from_bytes::<MicropubFormAction>(&body) {
Ok(Either::Left(body.into()))
} else if let Ok(body) = serde_urlencoded::from_bytes::<Vec<(String, String)>>(&body) {
Ok(Either::Right(form_to_mf2_json(body)))
} else {
Err(MicropubError {
error: ErrorType::InvalidRequest,
error_description: "Invalid form-encoded data. Try h=entry&content=Hello!".to_owned()
}.into())
}
},
other => Err(MicropubError {
error: ErrorType::UnsupportedMediaType,
error_description: format!("Unsupported media type: {}. Try application/json?", other)
}.into())
}
}
#[cfg_attr(all(debug_assertions, not(test)), allow(unused_variables))]
pub fn post<D: 'static + Storage, T>(
db: D,
token_endpoint: String,
http: hyper::Client<T, hyper::Body>
) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone
where T: hyper::client::connect::Connect + Clone + Send + Sync + 'static {
let inject_db = warp::any().map(move || db.clone());
#[cfg(all(debug_assertions, not(test)))]
let ensure_same_user = warp::any().map(|| crate::indieauth::User::new(
"http://localhost:8080/",
"https://quill.p3k.io/",
"create update delete media"
));
#[cfg(any(not(debug_assertions), test))]
let ensure_same_user = ensure_same_user_as_host(token_endpoint, http.clone());
warp::post()
.and(warp::body::content_length_limit(1024 * 512)
.and(warp::body::aggregate())
.and(warp::header::<http_types::Mime>("Content-Type"))
.and_then(dispatch_post_body))
.and(inject_db)
.and(warp::any().map(move || http.clone()))
.and(ensure_same_user)
.and_then(|body: Either<MicropubAction, serde_json::Value>, db: D, http: hyper::Client<T, hyper::Body>, user: crate::indieauth::User| async move {
(match body {
Either::Left(action) => {
post_action(action, db, user).await.map(|p| Box::new(p) as Box<dyn warp::Reply>)
},
Either::Right(post) => {
let (uid, mf2) = post::normalize_mf2(post, &user);
_post(user, uid, mf2, db, http).await.map(|p| Box::new(p) as Box<dyn warp::Reply>)
}
}).map_err(warp::reject::custom)
})
}
pub fn options() -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
warp::options()
// TODO make it reply with a basic description of Micropub spec
.map(|| warp::reply::json::<Option<()>>(&None))
.with(warp::reply::with::header("Allow", "GET, POST"))
}
async fn _query<D: Storage>(
db: D,
query: MicropubQuery,
user: crate::indieauth::User
) -> Box<dyn warp::Reply> {
let user_authority = warp::http::Uri::try_from(user.me.as_str())
.unwrap()
.authority()
.unwrap()
.clone();
match query.q {
QueryType::Config => {
let channels: Vec<MicropubChannel> = match db.get_channels(user_authority.as_str()).await {
Ok(chans) => chans,
Err(err) => return Box::new(warp::reply::with_status(
warp::reply::json(&MicropubError::new(
ErrorType::InternalServerError,
&format!("Error fetching channels: {}", err)
)),
StatusCode::INTERNAL_SERVER_ERROR
))
};
Box::new(warp::reply::json(json!({
"q": [
QueryType::Source,
QueryType::Config,
QueryType::Channel,
QueryType::SyndicateTo
],
"channels": channels,
"_kittybox_authority": user_authority.as_str()
}).as_object().unwrap()))
},
QueryType::Source => {
match query.url {
Some(url) => {
if warp::http::Uri::try_from(&url).unwrap().authority().unwrap() != &user_authority {
return Box::new(warp::reply::with_status(
warp::reply::json(&MicropubError::new(
ErrorType::NotAuthorized,
"You are requesting a post from a website that doesn't belong to you."
)),
StatusCode::UNAUTHORIZED
))
}
match db.get_post(&url).await {
Ok(some) => match some {
Some(post) => Box::new(warp::reply::json(&post)),
None => Box::new(warp::reply::with_status(
warp::reply::json(&MicropubError::new(
ErrorType::NotFound,
"The specified MF2 object was not found in database."
)),
StatusCode::NOT_FOUND
))
},
Err(err) => {
return Box::new(warp::reply::json(&MicropubError::new(
ErrorType::InternalServerError,
&format!("Backend error: {}", err)
)))
}
}
},
None => todo!()
}
},
_ => todo!()
}
}
pub fn query<D: Storage, T>(db: D, token_endpoint: String, http: hyper::Client<T, hyper::Body>) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone
where T: hyper::client::connect::Connect + Clone + Send + Sync + 'static {
warp::get()
.map(move || db.clone())
.and(warp::query::<MicropubQuery>())
.and(crate::util::require_host()
.and(crate::indieauth::require_token(token_endpoint, http))
.and_then(check_auth))
.then(_query)
.recover(|e: warp::Rejection| async move {
if let Some(err) = e.find::<MicropubError>() {
Ok(warp::reply::json(err))
} else {
Err(e)
}
})
}
pub async fn recover(err: Rejection) -> Result<impl warp::Reply, Infallible> {
if let Some(error) = err.find::<MicropubError>() {
return Ok(warp::reply::with_status(warp::reply::json(&error), error.into()))
}
let error = if err.find::<InvalidQuery>().is_some() {
MicropubError::new(
ErrorType::InvalidRequest,
"Invalid query parameters sent. Try ?q=config to see what you can do."
)
} else {
log::error!("Unhandled rejection: {:?}", err);
MicropubError::new(
ErrorType::InternalServerError,
&format!("Unknown error: {:?}", err)
)
};
Ok(warp::reply::with_status(warp::reply::json(&error), error.into()))
}
pub fn micropub<D: 'static + Storage, T>(db: D, token_endpoint: String, http: hyper::Client<T, hyper::Body>) -> impl Filter<Extract = (impl warp::Reply,), Error = Infallible> + Clone
where T: hyper::client::connect::Connect + Clone + Send + Sync + 'static {
query(db.clone(), token_endpoint.clone(), http.clone())
.or(post(db, token_endpoint, http))
.or(options())
.recover(recover)
}
#[cfg(test)]
#[allow(dead_code)]
impl MicropubQuery {
fn config() -> Self {
Self {
q: QueryType::Config,
url: None
}
}
fn source(url: &str) -> Self {
Self {
q: QueryType::Source,
url: Some(url.to_owned())
}
}
}
#[cfg(test)]
mod tests {
use hyper::body::HttpBody;
use crate::{database::Storage, micropub::MicropubError};
use warp::{Filter, Reply};
use serde_json::json;
#[tokio::test]
async fn check_post_reject_scope() {
let inject_db = {
let db = crate::database::MemoryStorage::new();
move || db.clone()
};
let db = inject_db();
let res = warp::test::request()
.filter(&warp::any()
.map(inject_db)
.and_then(move |db| async move {
let post = json!({
"type": ["h-entry"],
"properties": {
"content": ["Hello world!"]
}
});
let user = crate::indieauth::User::new(
"https://localhost:8080/",
"https://kittybox.fireburn.ru/",
"profile"
);
let (uid, mf2) = super::post::normalize_mf2(post, &user);
super::_post(
user, uid, mf2, db, hyper::Client::new()
).await.map_err(warp::reject::custom)
})
)
.await
.map(|_| panic!("Tried to do something with a reply!"))
.unwrap_err();
if let Some(err) = res.find::<MicropubError>() {
assert_eq!(err.error, super::ErrorType::InvalidScope);
} else {
panic!("Did not return MicropubError");
}
let hashmap = db.mapping.read().await;
assert!(hashmap.is_empty());
}
#[tokio::test]
async fn check_post_mf2() {
let inject_db = {
let db = crate::database::MemoryStorage::new();
move || db.clone()
};
let db = inject_db();
let res = warp::test::request()
.filter(&warp::any()
.map(inject_db)
.and_then(move |db| async move {
let post = json!({
"type": ["h-entry"],
"properties": {
"content": ["Hello world!"]
}
});
let user = crate::indieauth::User::new(
"https://localhost:8080/",
"https://kittybox.fireburn.ru/",
"create"
);
let (uid, mf2) = super::post::normalize_mf2(post, &user);
super::_post(
user, uid, mf2, db, hyper::Client::new()
).await.map_err(warp::reject::custom)
})
)
.await
.unwrap()
.into_response();
assert!(res.headers().contains_key("Location"));
let location = res.headers().get("Location").unwrap();
assert!(db.post_exists(location.to_str().unwrap()).await.unwrap());
assert!(db.post_exists("https://localhost:8080/feeds/main").await.unwrap());
}
#[tokio::test]
async fn test_check_auth() {
let err = warp::test::request()
.filter(&warp::any()
.map(|| (
warp::host::Authority::from_static("aaronparecki.com"),
crate::indieauth::User::new(
"https://fireburn.ru/",
"https://quill.p3k.io/",
"create update media"
)))
.untuple_one()
.and_then(super::check_auth))
.await
.unwrap_err();
let json: &MicropubError = err.find::<MicropubError>().unwrap();
assert_eq!(json.error, super::ErrorType::NotAuthorized);
}
#[tokio::test]
async fn test_query_foreign_url() {
let mut res = warp::test::request()
.filter(&warp::any().then(|| super::_query(
crate::database::MemoryStorage::new(),
super::MicropubQuery::source("https://aaronparecki.com/feeds/main"),
crate::indieauth::User::new(
"https://fireburn.ru/",
"https://quill.p3k.io/",
"create update media"
)
)))
.await
.unwrap()
.into_response();
assert_eq!(res.status(), 401);
let body = res.body_mut().data().await.unwrap().unwrap();
let json: MicropubError = serde_json::from_slice(&body as &[u8]).unwrap();
assert_eq!(json.error, super::ErrorType::NotAuthorized);
}
}