From 998ffd3e3ac85a38aec8fc15c7addd02fa1af9ae Mon Sep 17 00:00:00 2001 From: Vika Date: Sun, 6 Mar 2022 17:14:10 +0300 Subject: Restored most of the functionality (except onboarding and some queries) --- Cargo.lock | 1 + Cargo.toml | 1 + src/database/file/mod.rs | 4 +- src/database/memory.rs | 29 ++- src/frontend/mod.rs | 1 - src/lib.rs | 16 +- src/main.rs | 124 ++++++---- src/micropub/mod.rs | 617 ++++++++++++++++++++++++++++++++++++++++++----- src/micropub/post.rs | 126 +++------- 9 files changed, 689 insertions(+), 230 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1a92ad8..bad84b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1433,6 +1433,7 @@ dependencies = [ "chrono", "data-encoding", "easy-scraper", + "either", "ellipse", "env_logger 0.8.4", "fd-lock", diff --git a/Cargo.toml b/Cargo.toml index e858f91..539f973 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ async-trait = "^0.1.50" # Type erasure for async trait methods bytes = "^1.1.0" data-encoding = "^2.3.2" # Efficient and customizable data-encoding functions like base64, base32, and hex easy-scraper = "^0.2.0" # HTML scraping library focused on ease of use +either = "^1.6.1" # A general purpose sum type with two cases ellipse = "^0.2.0" # Truncate and ellipsize strings in a human-friendly way env_logger = "^0.8.3" # A logging implementation for `log` which is configured via an environment variable fd-lock = "^3.0.0" # Advisory reader-writer locks for files diff --git a/src/database/file/mod.rs b/src/database/file/mod.rs index a33e7c4..841f9c0 100644 --- a/src/database/file/mod.rs +++ b/src/database/file/mod.rs @@ -1,4 +1,4 @@ -#![warn(clippy::unwrap_used)] +//#![warn(clippy::unwrap_used)] use crate::database::{filter_post, ErrorKind, Result, Storage, StorageError}; use std::fs::{File, OpenOptions}; use std::io::{ErrorKind as IOErrorKind, Seek, SeekFrom, Read, Write}; @@ -332,7 +332,7 @@ impl Storage for FileStorage { #[allow(clippy::unwrap_used)] // JoinHandle captures panics, this closure shouldn't panic tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { #[warn(clippy::unwrap_used)] - let parent = post_path.parent().unwrap().to_owned(); + let parent = post_path.parent().expect("Parent for this directory should always exist").to_owned(); if !parent.is_dir() { std::fs::create_dir_all(post_path.parent().unwrap())?; } diff --git a/src/database/memory.rs b/src/database/memory.rs index c83bc8c..df142d3 100644 --- a/src/database/memory.rs +++ b/src/database/memory.rs @@ -1,3 +1,4 @@ +#![allow(clippy::todo)] use async_trait::async_trait; use std::collections::HashMap; use std::sync::Arc; @@ -6,7 +7,6 @@ use futures_util::FutureExt; use serde_json::json; use crate::database::{Storage, Result, StorageError, ErrorKind, MicropubChannel}; -use crate::indieauth::User; #[derive(Clone, Debug)] pub struct MemoryStorage { @@ -41,7 +41,7 @@ impl Storage for MemoryStorage { } } - async fn put_post(&self, post: &'_ serde_json::Value, user: &'_ str) -> Result<()> { + async fn put_post(&self, post: &'_ serde_json::Value, _user: &'_ str) -> Result<()> { let mapping = &mut self.mapping.write().await; let key: &str; match post["properties"]["uid"][0].as_str() { @@ -51,7 +51,7 @@ impl Storage for MemoryStorage { mapping.insert(key.to_string(), post.clone()); if post["properties"]["url"].is_array() { for url in post["properties"]["url"].as_array().unwrap().iter().map(|i| i.as_str().unwrap().to_string()) { - if &url != key { + if url != key { mapping.insert(url, json!({"see_other": key})); } } @@ -59,7 +59,7 @@ impl Storage for MemoryStorage { if post["type"].as_array().unwrap().iter().any(|i| i == "h-feed") { // This is a feed. Add it to the channels array if it's not already there. println!("{:#}", post); - self.channels.write().await.entry(post["properties"]["author"][0].as_str().unwrap().to_string()).or_insert(vec![]).push(key.to_string()) + self.channels.write().await.entry(post["properties"]["author"][0].as_str().unwrap().to_string()).or_insert_with(Vec::new).push(key.to_string()) } Ok(()) } @@ -155,19 +155,18 @@ impl Storage for MemoryStorage { .map(|channel| self.get_post(channel) .map(|result| result.unwrap()) .map(|post: Option| { - if let Some(post) = post { - Some(MicropubChannel { - uid: post["properties"]["uid"][0].as_str().unwrap().to_string(), - name: post["properties"]["name"][0].as_str().unwrap().to_string() - }) - } else { None } + post.map(|post| MicropubChannel { + uid: post["properties"]["uid"][0].as_str().unwrap().to_string(), + name: post["properties"]["name"][0].as_str().unwrap().to_string() + }) }) - ).collect::>()).await.into_iter().filter_map(|chan| chan).collect::>()), + ).collect::>()).await.into_iter().flatten().collect::>()), None => Ok(vec![]) } } + #[allow(unused_variables)] async fn read_feed_with_limit(&self, url: &'_ str, after: &'_ Option, limit: usize, user: &'_ Option) -> Result> { todo!() } @@ -177,15 +176,23 @@ impl Storage for MemoryStorage { Ok(()) } + #[allow(unused_variables)] async fn get_setting(&self, setting: &'_ str, user: &'_ str) -> Result { todo!() } + #[allow(unused_variables)] async fn set_setting(&self, setting: &'_ str, user: &'_ str, value: &'_ str) -> Result<()> { todo!() } } +impl Default for MemoryStorage { + fn default() -> Self { + Self::new() + } +} + impl MemoryStorage { pub fn new() -> Self { Self { diff --git a/src/frontend/mod.rs b/src/frontend/mod.rs index ffeb9de..ad50161 100644 --- a/src/frontend/mod.rs +++ b/src/frontend/mod.rs @@ -1,4 +1,3 @@ -#![warn(clippy::todo)] use std::convert::TryInto; use crate::database::Storage; use serde::{Deserialize, Serialize}; diff --git a/src/lib.rs b/src/lib.rs index 93e4593..ffef443 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,23 +1,20 @@ -#[allow(unused_imports)] -use warp::Filter; +#![deny(unsafe_code)] +#![warn(clippy::todo)] pub mod metrics; /// Database abstraction layer for Kittybox, allowing the CMS to work with any kind of database. pub mod database; pub mod micropub; pub mod indieauth; -//pub mod frontend; +pub mod frontend; -/*use crate::indieauth::IndieAuthMiddleware; -use crate::micropub::CORSMiddleware;*/ - -pub mod rejections { +pub(crate) mod rejections { #[derive(Debug)] - pub struct UnacceptableContentType; + pub(crate) struct UnacceptableContentType; impl warp::reject::Reject for UnacceptableContentType {} #[derive(Debug)] - pub struct HostHeaderUnset; + pub(crate) struct HostHeaderUnset; impl warp::reject::Reject for HostHeaderUnset {} } @@ -49,6 +46,7 @@ pub mod util { // This is unneccesarily complicated because I want to reuse some http-types parsing // and http-types has constructor for Headers private so I need to construct // a mock Request to reason about headers... this is so dumb wtf + // so much for zero-cost abstractions, huh let bytes: &[u8] = accept.as_bytes(); let value = http_types::headers::HeaderValue::from_bytes(bytes.to_vec()).unwrap(); let values: http_types::headers::HeaderValues = vec![value].into(); diff --git a/src/main.rs b/src/main.rs index 261e8a2..3ce32af 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,12 +1,12 @@ use log::{debug, error, info}; -use std::env; +use std::{convert::Infallible, env, time::Duration}; use http_types::Url; use hyper::client::{HttpConnector,connect::dns::GaiResolver}; use hyper_rustls::HttpsConnector; -use warp::{Filter, host::Authority, path::FullPath}; +use warp::{Filter, host::Authority}; #[tokio::main] -async fn main() -> Result<(), kittybox::database::StorageError> { +async fn main() { // TODO json logging in the future? let logger_env = env_logger::Env::new().filter_or("RUST_LOG", "info"); env_logger::init_from_env(logger_env); @@ -59,19 +59,23 @@ async fn main() -> Result<(), kittybox::database::StorageError> { } } - let media_endpoint: Option = env::var("MEDIA_ENDPOINT").ok(); + //let internal_token: Option = env::var("KITTYBOX_INTERNAL_TOKEN").ok(); - let internal_token: Option = env::var("KITTYBOX_INTERNAL_TOKEN").ok(); - - let cookie_secret: String = match env::var("COOKIE_SECRET").ok() { + /*let cookie_secret: String = match env::var("COOKIE_SECRET").ok() { Some(value) => value, None => { if let Ok(filename) = env::var("COOKIE_SECRET_FILE") { use tokio::io::AsyncReadExt; - let mut file = tokio::fs::File::open(filename).await?; + let mut file = tokio::fs::File::open(filename).await.map_err(|e| { + error!("Couldn't open the cookie secret file: {}", e); + std::process::exit(1); + }).unwrap(); let mut temp_string = String::new(); - file.read_to_string(&mut temp_string).await?; + file.read_to_string(&mut temp_string).await.map_err(|e| { + error!("Couldn't read the cookie secret from file: {}", e); + std::process::exit(1); + }).unwrap(); temp_string } else { @@ -79,12 +83,12 @@ async fn main() -> Result<(), kittybox::database::StorageError> { std::process::exit(1); } } - }; + };*/ - let host: std::net::SocketAddr = match env::var("SERVE_AT") + let listen_at = match env::var("SERVE_AT") .ok() - .unwrap_or_else(|| "0.0.0.0:8080".to_string()) - .parse() { + .unwrap_or_else(|| "[::]:8080".to_string()) + .parse::() { Ok(addr) => addr, Err(e) => { error!("Cannot parse SERVE_AT: {}", e); @@ -111,17 +115,27 @@ async fn main() -> Result<(), kittybox::database::StorageError> { let database = { let folder = backend_uri.strip_prefix("file://").unwrap(); let path = std::path::PathBuf::from(folder); - kittybox::database::FileStorage::new(path).await? + match kittybox::database::FileStorage::new(path).await { + Ok(db) => db, + Err(err) => { + error!("Error creating database: {:?}", err); + std::process::exit(1); + } + } }; + let endpoints = kittybox::frontend::IndiewebEndpoints { + authorization_endpoint: authorization_endpoint.to_string(), + token_endpoint: token_endpoint.to_string(), + webmention: None, + microsub: None, + }; + // TODO interpret HEAD - let homepage = kittybox::util::require_host() - .and(warp::get()) + let homepage = warp::get() .and(warp::path::end()) - // TODO fetch content from the database - // TODO parse content-type and determine appropriate response - .map(|host| format!("front page for {}!", host)); - + .and(kittybox::frontend::homepage(database.clone(), endpoints.clone())); + let micropub = warp::path("micropub") .and(warp::path::end() .and(kittybox::micropub::micropub( @@ -132,14 +146,14 @@ async fn main() -> Result<(), kittybox::database::StorageError> { .or(warp::get() .and(warp::path("client")) .and(warp::path::end()) - .map(|| kittybox::MICROPUB_CLIENT))); + .map(|| warp::reply::html(kittybox::MICROPUB_CLIENT)))); let media = warp::path("media") .and(warp::path::end() .and(kittybox::micropub::media::media()) .or(kittybox::util::require_host() .and(warp::path::param()) - .map(|host: Authority, path: String| format!("media file {}", path)))); + .map(|_host: Authority, path: String| format!("media file {}", path)))); // TODO remember how login logic works because I forgor let login = warp::path("login") @@ -152,40 +166,62 @@ async fn main() -> Result<(), kittybox::database::StorageError> { let coffee = warp::path("coffee") .map(|| warp::reply::with_status("I'm a teapot!", warp::http::StatusCode::IM_A_TEAPOT)); - // TODO interpret HEAD - let static_files = warp::get() - .and(warp::path!("static" / String)) - .map(|path| path); + let static_files = warp::path("static") + .and(kittybox::frontend::static_files()); - // TODO interpret HEAD - let catchall = warp::get() - .and(kittybox::util::require_host()) - .and(warp::path::full()) - .map(|host: Authority, path: FullPath| host.to_string() + path.as_str() + ".json") - // TODO fetch content from the database - // TODO parse content-type and determine appropriate response - ; + let catchall = kittybox::frontend::catchall( + database.clone(), + endpoints.clone() + ); let health = warp::path("health").and(warp::path::end()).map(|| "OK"); - // TODO instrumentation middleware (see metrics.rs for comments) - //let metrics = warp::path("metrics").and(warp::path::end()).map(kittybox::metrics::gather); + let metrics = warp::path("metrics").and(warp::path::end()).map(kittybox::metrics::gather); + let app = homepage - .or(login) + .or(metrics + .or(health)) .or(static_files) + .or(login) .or(coffee) - .or(health) .or(micropub) .or(media) .or(catchall) + .with(warp::log("kittybox")) + .with(kittybox::metrics::metrics(vec![ + "health".to_string(), + "micropub".to_string(), + "static".to_string(), + "media".to_string(), + "metrics".to_string() + ])) ; - let server = warp::serve(app); + let svc = warp::service(app); - // TODO https://github.com/seanmonstar/warp/issues/854 - // TODO move to Hyper to wrap the requests in metrics - info!("Listening on {:?}", host); - server.bind(host).await; - Ok(()) + let tcp_listener: std::net::TcpListener; + let mut listenfd = listenfd::ListenFd::from_env(); + if let Ok(Some(listener)) = listenfd.take_tcp_listener(0) { + tcp_listener = listener; + } else { + tcp_listener = std::net::TcpListener::bind(listen_at).unwrap(); + } + tcp_listener.set_nonblocking(true).unwrap(); + + info!("Listening on {}", tcp_listener.local_addr().unwrap()); + let server: hyper::server::Server<_, _> = hyper::server::Server::from_tcp(tcp_listener) + .unwrap() + .tcp_keepalive(Some(Duration::from_secs(30 * 60))) + .serve(hyper::service::make_service_fn(move |_| { + let service = svc.clone(); + async move { + Ok::<_, Infallible>(service) + } + })); + + if let Err(err) = server.await { + error!("Error serving requests: {}", err); + std::process::exit(1); + } } else { println!("Unknown backend, not starting."); std::process::exit(1); diff --git a/src/micropub/mod.rs b/src/micropub/mod.rs index b81ac17..f3152d7 100644 --- a/src/micropub/mod.rs +++ b/src/micropub/mod.rs @@ -1,10 +1,13 @@ use std::convert::Infallible; - +use either::Either; +use log::warn; use warp::http::StatusCode; use warp::{Filter, Rejection, reject::InvalidQuery}; -use serde_json::{json, Value}; +use serde_json::json; use serde::{Serialize, Deserialize}; -use crate::database::{MicropubChannel, Storage}; +use crate::database::{MicropubChannel, Storage, StorageError}; +use crate::indieauth::User; +use crate::micropub::util::form_to_mf2_json; #[derive(Serialize, Deserialize, Debug, PartialEq)] #[serde(rename_all = "kebab-case")] @@ -24,26 +27,61 @@ struct MicropubQuery { #[derive(Serialize, Deserialize, PartialEq, Debug)] #[serde(rename_all = "snake_case")] enum ErrorType { - InvalidRequest, + AlreadyExists, + Forbidden, InternalServerError, - NotFound, + InvalidRequest, + InvalidScope, NotAuthorized, + NotFound, + UnsupportedMediaType } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Debug)] struct MicropubError { error: ErrorType, error_description: String } -impl From for StatusCode { - fn from(err: MicropubError) -> Self { +impl From for MicropubError { + fn from(err: StorageError) -> Self { + Self { + error: match err.kind() { + crate::database::ErrorKind::NotFound => ErrorType::NotFound, + _ => ErrorType::InternalServerError + }, + error_description: format!("Backend error: {}", err.to_string()) + } + } +} + +impl From<&MicropubError> for StatusCode { + fn from(err: &MicropubError) -> Self { use ErrorType::*; match err.error { - InvalidRequest => StatusCode::BAD_REQUEST, + AlreadyExists => StatusCode::CONFLICT, + Forbidden => StatusCode::FORBIDDEN, InternalServerError => StatusCode::INTERNAL_SERVER_ERROR, + InvalidRequest => StatusCode::BAD_REQUEST, + InvalidScope => StatusCode::UNAUTHORIZED, + NotAuthorized => StatusCode::UNAUTHORIZED, NotFound => StatusCode::NOT_FOUND, - NotAuthorized => StatusCode::UNAUTHORIZED + UnsupportedMediaType => StatusCode::UNSUPPORTED_MEDIA_TYPE, + } + } +} +impl From for StatusCode { + fn from(err: MicropubError) -> Self { + (&err).into() + } +} + +impl From for MicropubError { + fn from(err: serde_json::Error) -> Self { + use ErrorType::*; + Self { + error: InvalidRequest, + error_description: err.to_string() } } } @@ -57,8 +95,13 @@ impl MicropubError { } } +impl warp::reject::Reject for MicropubError {} + +mod post; + +#[allow(unused_variables)] pub mod media { - use futures_util::{Stream, StreamExt}; + use futures_util::StreamExt; use bytes::buf::Buf; use warp::{Filter, Rejection, Reply, multipart::{FormData, Part}}; @@ -71,9 +114,7 @@ pub mod media { pub fn options() -> impl Filter + Clone { warp::options() .map(|| warp::reply::json::>(&None)) - // TODO: why doesn't this work? - // .map(warp::reply::with::header("Allow", "GET, POST")) - .map(|reply| warp::reply::with_header(reply, "Allow", "GET, POST")) + .with(warp::reply::with::header("Allow", "GET, POST")) } pub fn upload() -> impl Filter + Clone { @@ -108,47 +149,396 @@ pub mod media { } } -async fn _post(db: D, host: warp::host::Authority, user: crate::indieauth::User) -> impl warp::Reply { - todo!("post to database {:?} for host {:?} using user {:?}", db, host, user); - #[allow(unreachable_code)] - "" +mod util { + use serde_json::json; + + pub(crate) fn form_to_mf2_json(form: Vec<(String, String)>) -> serde_json::Value { + let mut mf2 = json!({"type": [], "properties": {}}); + for (k, v) in form { + if k == "h" { + mf2["type"] + .as_array_mut() + .unwrap() + .push(json!("h-".to_string() + &v)); + } else if k != "access_token" { + let key = k.strip_suffix("[]").unwrap_or(&k); + match mf2["properties"][key].as_array_mut() { + Some(prop) => prop.push(json!(v)), + None => mf2["properties"][key] = json!([v]), + } + } + } + if mf2["type"].as_array().unwrap().is_empty() { + mf2["type"].as_array_mut().unwrap().push(json!("h-entry")); + } + mf2 + } + + #[cfg(test)] + mod tests { + use serde_json::json; + #[test] + fn test_form_to_mf2() { + assert_eq!( + super::form_to_mf2_json( + serde_urlencoded::from_str( + "h=entry&content=something%20interesting" + ).unwrap() + ), + json!({ + "type": ["h-entry"], + "properties": { + "content": ["something interesting"] + } + }) + ) + } + } +} + +// TODO actually save the post to the database and schedule post-processing +async fn _post( + user: crate::indieauth::User, + uid: String, + mf2: serde_json::Value, + db: D, + http: hyper::Client +) -> Result { + // Here, we have the following guarantees: + // - The user is the same user for this host (guaranteed by ensure_same_user) + // - The MF2-JSON document is normalized (guaranteed by normalize_mf2)\ + // - The MF2-JSON document contains a UID + // - The MF2-JSON document's URL list contains its UID + // - The MF2-JSON document's "content" field contains an HTML blob, if present + // - The MF2-JSON document's publishing datetime is present + // - The MF2-JSON document's target channels are set + // - The MF2-JSON document's author is set + + // Security check! Do we have an oAuth2 scope to proceed? + if !user.check_scope("create") { + return Err(MicropubError { + error: ErrorType::InvalidScope, + error_description: "Not enough privileges - try acquiring the \"create\" scope.".to_owned() + }); + } + + // Security check #2! Are we posting to our own website? + if !uid.starts_with(user.me.as_str()) || mf2["properties"]["channel"] + .as_array() + .unwrap_or(&vec![]) + .iter() + .any(|url| !url.as_str().unwrap().starts_with(user.me.as_str())) + { + return Err(MicropubError { + error: ErrorType::Forbidden, + error_description: "You're posting to a website that's not yours.".to_owned() + }); + } + + // Security check #3! Are we overwriting an existing document? + if db.post_exists(&uid).await? { + return Err(MicropubError { + error: ErrorType::AlreadyExists, + error_description: "UID clash was detected, operation aborted.".to_owned() + }); + } + + // Save the post + db.put_post(&mf2, user.me.as_str()).await?; + + let mut channels = mf2["properties"]["channel"] + .as_array() + .unwrap() + .iter() + .map(|i| i.as_str().unwrap_or("")) + .filter(|i| !i.is_empty()); + + let default_channel = user.me.join(post::DEFAULT_CHANNEL_PATH).unwrap().to_string(); + let vcards_channel = user.me.join(post::CONTACTS_CHANNEL_PATH).unwrap().to_string(); + let food_channel = user.me.join(post::FOOD_CHANNEL_PATH).unwrap().to_string(); + let default_channels = vec![default_channel, vcards_channel, food_channel]; + + for chan in &mut channels { + if db.post_exists(chan).await? { + db.update_post(chan, json!({"add": {"children": [uid]}})).await?; + } else if default_channels.iter().any(|i| chan == i) { + post::create_feed(&db, &uid, chan, &user).await?; + } else { + warn!("Ignoring non-existent channel: {}", chan); + } + } + + let reply = warp::reply::with_status( + warp::reply::with_header( + warp::reply::json(&json!({"location": &uid})), + "Location", &uid + ), + StatusCode::ACCEPTED + ); + + // TODO: Post-processing the post (aka second write pass) + // - [-] Download rich reply contexts + // - [-] Syndicate the post if requested, add links to the syndicated copies + // - [ ] Send WebSub notifications to the hub (if we happen to have one) + // - [x] Send webmentions + #[allow(unused_imports)] + tokio::task::spawn(async move { + use hyper::{Uri, Response, Body, body::HttpBody}; + use bytes::{Buf, BufMut}; + use futures_util::StreamExt; + + let mut contextually_significant_posts: Vec = vec![]; + for prop in &["in-reply-to", "like-of", "repost-of", "bookmark-of"] { + if let Some(array) = mf2["properties"][prop].as_array() { + contextually_significant_posts.extend( + array + .iter() + .filter_map(|v| v.as_str().and_then(|v| v.parse::().ok())), + ); + } + } + contextually_significant_posts.sort_unstable_by_key(|u| u.to_string()); + contextually_significant_posts.dedup(); + + // TODO: Make a stream to fetch all these posts and convert them to MF2 + + todo!() + }); + + Ok(reply) +} + +#[derive(Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +enum ActionType { + Delete, + Update +} + +#[derive(Serialize, Deserialize)] +struct MicropubFormAction { + action: ActionType, + url: String +} + +#[derive(Serialize, Deserialize)] +struct MicropubAction { + action: ActionType, + url: String, + #[serde(skip_serializing_if = "Option::is_none")] + replace: Option, + #[serde(skip_serializing_if = "Option::is_none")] + add: Option, + #[serde(skip_serializing_if = "Option::is_none")] + delete: Option } -pub fn post(db: D, token_endpoint: String, http: hyper::Client) -> impl Filter + Clone +impl From for MicropubAction { + fn from(a: MicropubFormAction) -> Self { + Self { + action: a.action, + url: a.url, + replace: None, add: None, delete: None + } + } +} + +// TODO perform the requested actions synchronously +async fn post_action( + action: MicropubAction, + db: D, + user: User +) -> Result { + + let uri = if let Ok(uri) = action.url.parse::() { + uri + } else { + return Err(MicropubError { + error: ErrorType::InvalidRequest, + error_description: "Your URL doesn't parse properly.".to_owned() + }); + }; + + if uri.authority().unwrap() != user.me.as_str().parse::().unwrap().authority().unwrap() { + return Err(MicropubError { + error: ErrorType::Forbidden, + error_description: "Don't tamper with others' posts!".to_owned() + }); + } + + match action.action { + ActionType::Delete => { + if !user.check_scope("delete") { + return Err(MicropubError { + error: ErrorType::InvalidScope, + error_description: "You need a \"delete\" scope for this.".to_owned() + }); + } + + db.delete_post(&action.url).await? + }, + ActionType::Update => { + if !user.check_scope("update") { + return Err(MicropubError { + error: ErrorType::InvalidScope, + error_description: "You need an \"update\" scope for this.".to_owned() + }); + } + + db.update_post( + &action.url, + // Here, unwrapping is safe, because this value + // was recently deserialized from JSON already. + serde_json::to_value(&action).unwrap() + ).await? + }, + } + + Ok(warp::reply::reply()) +} + +async fn check_auth(host: warp::host::Authority, user: User) -> Result { + let user_authority = warp::http::Uri::try_from(user.me.as_str()) + .unwrap() + .authority() + .unwrap() + .clone(); + // TODO compare with potential list of allowed websites + // to allow one user to edit several websites with one token + if host != user_authority { + Err(warp::reject::custom(MicropubError::new( + ErrorType::NotAuthorized, + "This user is not authorized to use Micropub on this website." + ))) + } else { + Ok(user) + } +} + +#[cfg(any(not(debug_assertions), test))] +fn ensure_same_user_as_host( + token_endpoint: String, + http: hyper::Client +) -> impl Filter + Clone where T: hyper::client::connect::Connect + Clone + Send + Sync + 'static { - warp::post() - .map(move || db.clone()) - .and(crate::util::require_host()) + crate::util::require_host() .and(crate::indieauth::require_token(token_endpoint, http)) - // TODO get body and process it - .then(_post) + .and_then(check_auth) +} + +async fn dispatch_post_body( + mut body: impl bytes::Buf, + mimetype: http_types::Mime +) -> Result, warp::Rejection> { + // Since hyper::common::buf::BufList doesn't implement Clone, we can't use Clone in here + // We have to copy the body. Ugh!!! + // so much for zero-copy buffers + let body = { + let mut _body: Vec = Vec::default(); + while body.has_remaining() { + _body.extend(body.chunk()); + body.advance(body.chunk().len()); + } + _body + }; + match mimetype.essence() { + "application/json" => { + if let Ok(body) = serde_json::from_slice::(&body) { + Ok(Either::Left(body)) + } else if let Ok(body) = serde_json::from_slice::(&body) { + // quick sanity check + if !body.is_object() || !body["type"].is_array() { + return Err(MicropubError { + error: ErrorType::InvalidRequest, + error_description: "Invalid MF2-JSON detected: `.` should be an object, `.type` should be an array of MF2 types".to_owned() + }.into()) + } + Ok(Either::Right(body)) + } else { + Err(MicropubError { + error: ErrorType::InvalidRequest, + error_description: "Invalid JSON object passed.".to_owned() + }.into()) + } + }, + "application/x-www-form-urlencoded" => { + if let Ok(body) = serde_urlencoded::from_bytes::(&body) { + Ok(Either::Left(body.into())) + } else if let Ok(body) = serde_urlencoded::from_bytes::>(&body) { + Ok(Either::Right(form_to_mf2_json(body))) + } else { + Err(MicropubError { + error: ErrorType::InvalidRequest, + error_description: "Invalid form-encoded data. Try h=entry&content=Hello!".to_owned() + }.into()) + } + }, + other => Err(MicropubError { + error: ErrorType::UnsupportedMediaType, + error_description: format!("Unsupported media type: {}. Try application/json?", other) + }.into()) + } +} + +#[cfg_attr(all(debug_assertions, not(test)), allow(unused_variables))] +pub fn post( + db: D, + token_endpoint: String, + http: hyper::Client +) -> impl Filter + Clone +where T: hyper::client::connect::Connect + Clone + Send + Sync + 'static { + let inject_db = warp::any().map(move || db.clone()); + #[cfg(all(debug_assertions, not(test)))] + let ensure_same_user = warp::any().map(|| crate::indieauth::User::new( + "http://localhost:8080/", + "https://quill.p3k.io/", + "create update delete media" + )); + #[cfg(any(not(debug_assertions), test))] + let ensure_same_user = ensure_same_user_as_host(token_endpoint, http.clone()); + + warp::post() + .and(warp::body::content_length_limit(1024 * 512) + .and(warp::body::aggregate()) + .and(warp::header::("Content-Type")) + .and_then(dispatch_post_body)) + .and(inject_db) + .and(warp::any().map(move || http.clone())) + .and(ensure_same_user) + .and_then(|body: Either, db: D, http: hyper::Client, user: crate::indieauth::User| async move { + (match body { + Either::Left(action) => { + post_action(action, db, user).await.map(|p| Box::new(p) as Box) + }, + Either::Right(post) => { + let (uid, mf2) = post::normalize_mf2(post, &user); + _post(user, uid, mf2, db, http).await.map(|p| Box::new(p) as Box) + } + }).map_err(warp::reject::custom) + }) } -pub fn options() -> impl Filter + Copy { +pub fn options() -> impl Filter + Clone { warp::options() // TODO make it reply with a basic description of Micropub spec .map(|| warp::reply::json::>(&None)) - // TODO: why doesn't this work? - // .map(warp::reply::with::header("Allow", "GET, POST")) - .map(|reply| warp::reply::with_header(reply, "Allow", "GET, POST")) + .with(warp::reply::with::header("Allow", "GET, POST")) } -async fn _query(db: D, host: warp::host::Authority, query: MicropubQuery, user: crate::indieauth::User) -> impl warp::Reply { - let user_authority = warp::http::Uri::try_from(user.me.as_str()).unwrap().authority().unwrap().clone(); - // TODO compare with potential list of allowed websites - // to allow one user to edit several websites with one token - if host != user_authority { - return Box::new(warp::reply::with_status( - warp::reply::json(&MicropubError::new( - ErrorType::NotAuthorized, - "This user is not authorized to use Micropub on this website." - )), - StatusCode::UNAUTHORIZED - )) as Box - } +async fn _query( + db: D, + query: MicropubQuery, + user: crate::indieauth::User +) -> Box { + let user_authority = warp::http::Uri::try_from(user.me.as_str()) + .unwrap() + .authority() + .unwrap() + .clone(); + match query.q { QueryType::Config => { - let channels: Vec = match db.get_channels(host.as_str()).await { + let channels: Vec = match db.get_channels(user_authority.as_str()).await { Ok(chans) => chans, Err(err) => return Box::new(warp::reply::with_status( warp::reply::json(&MicropubError::new( @@ -167,7 +557,7 @@ async fn _query(db: D, host: warp::host::Authority, query: MicropubQ QueryType::SyndicateTo ], "channels": channels, - "_kittybox_authority": host.as_str() + "_kittybox_authority": user_authority.as_str() }).as_object().unwrap())) }, QueryType::Source => { @@ -204,9 +594,7 @@ async fn _query(db: D, host: warp::host::Authority, query: MicropubQ None => todo!() } }, - _ => { - todo!() - } + _ => todo!() } } @@ -214,13 +602,24 @@ pub fn query(db: D, token_endpoint: String, http: hyper::Client()) - .and(crate::indieauth::require_token(token_endpoint, http)) + .and(crate::util::require_host() + .and(crate::indieauth::require_token(token_endpoint, http)) + .and_then(check_auth)) .then(_query) + .recover(|e: warp::Rejection| async move { + if let Some(err) = e.find::() { + Ok(warp::reply::json(err)) + } else { + Err(e) + } + }) } pub async fn recover(err: Rejection) -> Result { + if let Some(error) = err.find::() { + return Ok(warp::reply::with_status(warp::reply::json(&error), error.into())) + } let error = if err.find::().is_some() { MicropubError::new( ErrorType::InvalidRequest, @@ -237,14 +636,15 @@ pub async fn recover(err: Rejection) -> Result { Ok(warp::reply::with_status(warp::reply::json(&error), error.into())) } -pub fn micropub(db: D, token_endpoint: String, http: hyper::Client) -> impl Filter + Clone +pub fn micropub(db: D, token_endpoint: String, http: hyper::Client) -> impl Filter + Clone where T: hyper::client::connect::Connect + Clone + Send + Sync + 'static { query(db.clone(), token_endpoint.clone(), http.clone()) - .or(post(db.clone(), token_endpoint.clone(), http.clone())) + .or(post(db, token_endpoint, http)) .or(options()) .recover(recover) } #[cfg(test)] +#[allow(dead_code)] impl MicropubQuery { fn config() -> Self { Self { @@ -264,29 +664,113 @@ impl MicropubQuery { #[cfg(test)] mod tests { use hyper::body::HttpBody; - use crate::micropub::MicropubError; + use crate::{database::Storage, micropub::MicropubError}; use warp::{Filter, Reply}; + use serde_json::json; #[tokio::test] - async fn test_query_wrong_auth() { - let mut res = warp::test::request() - .filter(&warp::any().then(|| super::_query( - crate::database::MemoryStorage::new(), - warp::host::Authority::from_static("aaronparecki.com"), - super::MicropubQuery::config(), - crate::indieauth::User::new( - "https://fireburn.ru/", - "https://quill.p3k.io/", - "create update media" - ) - ))) + async fn check_post_reject_scope() { + let inject_db = { + let db = crate::database::MemoryStorage::new(); + + move || db.clone() + }; + let db = inject_db(); + + let res = warp::test::request() + .filter(&warp::any() + .map(inject_db) + .and_then(move |db| async move { + let post = json!({ + "type": ["h-entry"], + "properties": { + "content": ["Hello world!"] + } + }); + let user = crate::indieauth::User::new( + "https://localhost:8080/", + "https://kittybox.fireburn.ru/", + "profile" + ); + let (uid, mf2) = super::post::normalize_mf2(post, &user); + + super::_post( + user, uid, mf2, db, hyper::Client::new() + ).await.map_err(warp::reject::custom) + }) + ) + .await + .map(|_| panic!("Tried to do something with a reply!")) + .unwrap_err(); + + if let Some(err) = res.find::() { + assert_eq!(err.error, super::ErrorType::InvalidScope); + } else { + panic!("Did not return MicropubError"); + } + + let hashmap = db.mapping.read().await; + assert!(hashmap.is_empty()); + } + + #[tokio::test] + async fn check_post_mf2() { + let inject_db = { + let db = crate::database::MemoryStorage::new(); + + move || db.clone() + }; + let db = inject_db(); + + let res = warp::test::request() + .filter(&warp::any() + .map(inject_db) + .and_then(move |db| async move { + let post = json!({ + "type": ["h-entry"], + "properties": { + "content": ["Hello world!"] + } + }); + let user = crate::indieauth::User::new( + "https://localhost:8080/", + "https://kittybox.fireburn.ru/", + "create" + ); + let (uid, mf2) = super::post::normalize_mf2(post, &user); + + super::_post( + user, uid, mf2, db, hyper::Client::new() + ).await.map_err(warp::reject::custom) + }) + ) .await .unwrap() .into_response(); - assert_eq!(res.status(), 401); - let body = res.body_mut().data().await.unwrap().unwrap(); - let json: MicropubError = serde_json::from_slice(&body as &[u8]).unwrap(); + assert!(res.headers().contains_key("Location")); + let location = res.headers().get("Location").unwrap(); + assert!(db.post_exists(location.to_str().unwrap()).await.unwrap()); + assert!(db.post_exists("https://localhost:8080/feeds/main").await.unwrap()); + } + + #[tokio::test] + async fn test_check_auth() { + let err = warp::test::request() + .filter(&warp::any() + .map(|| ( + warp::host::Authority::from_static("aaronparecki.com"), + crate::indieauth::User::new( + "https://fireburn.ru/", + "https://quill.p3k.io/", + "create update media" + ))) + .untuple_one() + .and_then(super::check_auth)) + .await + .unwrap_err(); + + let json: &MicropubError = err.find::().unwrap(); assert_eq!(json.error, super::ErrorType::NotAuthorized); } @@ -295,7 +779,6 @@ mod tests { let mut res = warp::test::request() .filter(&warp::any().then(|| super::_query( crate::database::MemoryStorage::new(), - warp::host::Authority::from_static("aaronparecki.com"), super::MicropubQuery::source("https://aaronparecki.com/feeds/main"), crate::indieauth::User::new( "https://fireburn.ru/", diff --git a/src/micropub/post.rs b/src/micropub/post.rs index 30304e5..cf9f3d9 100644 --- a/src/micropub/post.rs +++ b/src/micropub/post.rs @@ -1,43 +1,18 @@ use crate::database::Storage; use crate::indieauth::User; -use crate::ApplicationState; use chrono::prelude::*; use core::iter::Iterator; -use futures::stream; -use futures::StreamExt; -use http_types::Mime; -use log::{error, info, warn}; use newbase60::num_to_sxg; use std::convert::TryInto; -use std::str::FromStr; use serde_json::json; -static DEFAULT_CHANNEL_PATH: &str = "/feeds/main"; +pub(crate) static DEFAULT_CHANNEL_PATH: &str = "/feeds/main"; static DEFAULT_CHANNEL_NAME: &str = "Main feed"; -static CONTACTS_CHANNEL_PATH: &str = "/feeds/vcards"; +pub(crate) static CONTACTS_CHANNEL_PATH: &str = "/feeds/vcards"; static CONTACTS_CHANNEL_NAME: &str = "My address book"; -static FOOD_CHANNEL_PATH: &str = "/feeds/food"; +pub(crate) static FOOD_CHANNEL_PATH: &str = "/feeds/food"; static FOOD_CHANNEL_NAME: &str = "My recipe book"; -macro_rules! response { - ($($code:expr, $json:tt)+) => { - $( - Ok(Response::builder($code).body(json!($json)).build()) - )+ - }; -} - -macro_rules! error_json { - ($($code:expr, $error:expr, $error_desc:expr)+) => { - $( - response!($code, { - "error": $error, - "error_description": $error_desc - }) - )+ - } -} - fn get_folder_from_type(post_type: &str) -> String { (match post_type { "h-feed" => "feeds/", @@ -49,34 +24,31 @@ fn get_folder_from_type(post_type: &str) -> String { .to_string() } +/// Reset the datetime to a proper datetime. +/// Do not attempt to recover the information. +/// Do not pass GO. Do not collect $200. +fn reset_dt(post: &mut serde_json::Value) -> DateTime { + let curtime: DateTime = Local::now(); + post["properties"]["published"] = json!([curtime.to_rfc3339()]); + chrono::DateTime::from(curtime) +} + pub fn normalize_mf2(mut body: serde_json::Value, user: &User) -> (String, serde_json::Value) { // Normalize the MF2 object here. let me = &user.me; - let published: DateTime; let folder = get_folder_from_type(body["type"][0].as_str().unwrap()); - if let Some(dt) = body["properties"]["published"][0].as_str() { + let published: DateTime = if let Some(dt) = body["properties"]["published"][0].as_str() { // Check if the datetime is parsable. match DateTime::parse_from_rfc3339(dt) { - Ok(dt) => { - published = dt; - } - Err(_) => { - // Reset the datetime to a proper datetime. - // Do not attempt to recover the information. - // Do not pass GO. Do not collect $200. - let curtime: DateTime = Local::now(); - body["properties"]["published"] = - serde_json::Value::Array(vec![serde_json::Value::String(curtime.to_rfc3339())]); - published = chrono::DateTime::from(curtime); - } + Ok(dt) => dt, + Err(_) => reset_dt(&mut body) } } else { // Set the datetime. - let curtime: DateTime = Local::now(); - body["properties"]["published"] = - serde_json::Value::Array(vec![serde_json::Value::String(curtime.to_rfc3339())]); - published = chrono::DateTime::from(curtime); - } + // Note: this code block duplicates functionality with the above failsafe. + // Consider refactoring it to a helper function? + reset_dt(&mut body) + }; match body["properties"]["uid"][0].as_str() { None => { let uid = serde_json::Value::String( @@ -182,7 +154,7 @@ pub fn normalize_mf2(mut body: serde_json::Value, user: &User) -> (String, serde ); } -pub async fn new_post( +/*pub async fn new_post( req: Request>, body: serde_json::Value, ) -> Result { @@ -298,9 +270,9 @@ pub async fn new_post( .header("Location", &uid) .body(json!({"status": "accepted", "location": &uid})) .build()) -} +}*/ -async fn create_feed( +pub(crate) async fn create_feed( storage: &impl Storage, uid: &str, channel: &str, @@ -335,7 +307,7 @@ async fn create_feed( storage.put_post(&feed, user.me.as_str()).await } -async fn post_process_new_post( +/*async fn post_process_new_post( req: Request>, post: serde_json::Value, ) { @@ -563,9 +535,9 @@ async fn post_process_new_post( .buffer_unordered(3) .collect::>() .await; -} +}*/ -async fn process_json( +/*async fn process_json( req: Request>, body: serde_json::Value, ) -> Result { @@ -647,31 +619,9 @@ async fn process_json( "Try sending MF2-structured data or an object with an \"action\" and \"url\" keys." ); } -} +}*/ -fn convert_form_to_mf2_json(form: Vec<(String, String)>) -> serde_json::Value { - let mut mf2 = json!({"type": [], "properties": {}}); - for (k, v) in form { - if k == "h" { - mf2["type"] - .as_array_mut() - .unwrap() - .push(json!("h-".to_string() + &v)); - } else if k != "access_token" { - let key = k.strip_suffix("[]").unwrap_or(&k); - match mf2["properties"][key].as_array_mut() { - Some(prop) => prop.push(json!(v)), - None => mf2["properties"][key] = json!([v]), - } - } - } - if mf2["type"].as_array().unwrap().is_empty() { - mf2["type"].as_array_mut().unwrap().push(json!("h-entry")); - } - mf2 -} - -async fn process_form( +/*async fn process_form( req: Request>, form: Vec<(String, String)>, ) -> Result { @@ -725,9 +675,9 @@ async fn process_form( "invalid_request", "Try sending h=entry&content=something%20interesting" ); -} +}*/ -pub async fn post_handler(mut req: Request>) -> Result { +/*pub async fn post_handler(mut req: Request>) -> Result { match req.content_type() { Some(value) => { if value == Mime::from_str("application/json").unwrap() { @@ -766,7 +716,7 @@ pub async fn post_handler(mut req: Request>) -> ); } } -} +}*/ #[cfg(test)] mod tests { @@ -853,22 +803,6 @@ mod tests { ); } - - #[test] - fn test_form_to_mf2() { - use serde_urlencoded::from_str; - - assert_eq!( - convert_form_to_mf2_json(from_str("h=entry&content=something%20interesting").unwrap()), - json!({ - "type": ["h-entry"], - "properties": { - "content": ["something interesting"] - } - }) - ) - } - #[test] fn test_normalize_mf2() { let mf2 = json!({ -- cgit 1.4.1