diff options
author | Vika <vika@fireburn.ru> | 2022-07-07 00:32:33 +0300 |
---|---|---|
committer | Vika <vika@fireburn.ru> | 2022-07-07 00:36:39 +0300 |
commit | 7f23ec84bc05c236c1bf40c2f0d72412af711516 (patch) | |
tree | f0ba64804fffce29a8f04e5b6c76f9863de81dd2 /kittybox-rs/src | |
parent | 5cfac54aa4fb3c207ea2cbbeccd4571fa204a09b (diff) | |
download | kittybox-7f23ec84bc05c236c1bf40c2f0d72412af711516.tar.zst |
treewide: rewrite using Axum
Axum has streaming bodies and allows to write simpler code. It also helps enforce stronger types and looks much more neat. This allows me to progress on the media endpoint and add streaming reads and writes to the MediaStore trait. Metrics are temporarily not implemented. Everything else was preserved, and the tests still pass, after adjusting for new calling conventions. TODO: create method routers for protocol endpoints
Diffstat (limited to 'kittybox-rs/src')
-rw-r--r-- | kittybox-rs/src/bin/pyindieblog_to_kittybox.rs | 68 | ||||
-rw-r--r-- | kittybox-rs/src/database/file/mod.rs | 108 | ||||
-rw-r--r-- | kittybox-rs/src/database/memory.rs | 138 | ||||
-rw-r--r-- | kittybox-rs/src/database/mod.rs | 102 | ||||
-rw-r--r-- | kittybox-rs/src/frontend/mod.rs | 536 | ||||
-rw-r--r-- | kittybox-rs/src/frontend/onboarding.rs | 142 | ||||
-rw-r--r-- | kittybox-rs/src/indieauth.rs | 381 | ||||
-rw-r--r-- | kittybox-rs/src/lib.rs | 98 | ||||
-rw-r--r-- | kittybox-rs/src/main.rs | 166 | ||||
-rw-r--r-- | kittybox-rs/src/media/mod.rs | 50 | ||||
-rw-r--r-- | kittybox-rs/src/media/storage/file.rs | 61 | ||||
-rw-r--r-- | kittybox-rs/src/media/storage/mod.rs | 53 | ||||
-rw-r--r-- | kittybox-rs/src/micropub/mod.rs | 1040 | ||||
-rw-r--r-- | kittybox-rs/src/micropub/post.rs | 944 | ||||
-rw-r--r-- | kittybox-rs/src/micropub/util.rs | 457 |
15 files changed, 1950 insertions, 2394 deletions
diff --git a/kittybox-rs/src/bin/pyindieblog_to_kittybox.rs b/kittybox-rs/src/bin/pyindieblog_to_kittybox.rs deleted file mode 100644 index 38590c3..0000000 --- a/kittybox-rs/src/bin/pyindieblog_to_kittybox.rs +++ /dev/null @@ -1,68 +0,0 @@ -use anyhow::{anyhow, Context, Result}; - -use redis::AsyncCommands; -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::fs::File; - -#[derive(Default, Serialize, Deserialize)] -struct PyindieblogData { - posts: Vec<serde_json::Value>, - cards: Vec<serde_json::Value>, -} - -#[async_std::main] -async fn main() -> Result<()> { - let mut args = std::env::args(); - args.next(); // skip argv[0] which is the name - let redis_uri = args - .next() - .ok_or_else(|| anyhow!("No Redis URI provided"))?; - let client = redis::Client::open(redis_uri.as_str()) - .with_context(|| format!("Failed to construct Redis client on {}", redis_uri))?; - - let filename = args - .next() - .ok_or_else(|| anyhow!("No filename provided for export"))?; - - let mut data: Vec<serde_json::Value>; - - let file = File::create(filename)?; - - let mut conn = client - .get_async_std_connection() - .await - .with_context(|| "Failed to connect to the Redis server")?; - - data = conn - .hgetall::<&str, HashMap<String, String>>("posts") - .await? - .values() - .map(|s| { - serde_json::from_str::<serde_json::Value>(s) - .with_context(|| format!("Failed to parse the following entry: {:?}", s)) - }) - .collect::<std::result::Result<Vec<serde_json::Value>, anyhow::Error>>() - .with_context(|| "Failed to export h-entries from pyindieblog")?; - data.extend( - conn.hgetall::<&str, HashMap<String, String>>("hcards") - .await? - .values() - .map(|s| { - serde_json::from_str::<serde_json::Value>(s) - .with_context(|| format!("Failed to parse the following card: {:?}", s)) - }) - .collect::<std::result::Result<Vec<serde_json::Value>, anyhow::Error>>() - .with_context(|| "Failed to export h-cards from pyindieblog")?, - ); - - data.sort_by_key(|v| { - v["properties"]["published"][0] - .as_str() - .map(|s| s.to_string()) - }); - - serde_json::to_writer(file, &data)?; - - Ok(()) -} diff --git a/kittybox-rs/src/database/file/mod.rs b/kittybox-rs/src/database/file/mod.rs index 1e7aa96..fb18dc4 100644 --- a/kittybox-rs/src/database/file/mod.rs +++ b/kittybox-rs/src/database/file/mod.rs @@ -1,15 +1,15 @@ //#![warn(clippy::unwrap_used)] -use crate::database::{filter_post, ErrorKind, Result, Storage, StorageError, Settings}; -use std::io::ErrorKind as IOErrorKind; -use tokio::fs::{File, OpenOptions}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::task::spawn_blocking; +use crate::database::{filter_post, ErrorKind, Result, Settings, Storage, StorageError}; use async_trait::async_trait; use futures::{stream, StreamExt, TryStreamExt}; -use log::debug; use serde_json::json; use std::collections::HashMap; +use std::io::ErrorKind as IOErrorKind; use std::path::{Path, PathBuf}; +use tokio::fs::{File, OpenOptions}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::task::spawn_blocking; +use tracing::debug; impl From<std::io::Error> for StorageError { fn from(source: std::io::Error) -> Self { @@ -30,7 +30,7 @@ impl From<tokio::time::error::Elapsed> for StorageError { Self::with_source( ErrorKind::Backend, "timeout on I/O operation", - Box::new(source) + Box::new(source), ) } } @@ -107,7 +107,7 @@ fn url_to_path(root: &Path, url: &str) -> PathBuf { } fn url_to_relative_path(url: &str) -> relative_path::RelativePathBuf { - let url = warp::http::Uri::try_from(url).expect("Couldn't parse a URL"); + let url = axum::http::Uri::try_from(url).expect("Couldn't parse a URL"); let mut path = relative_path::RelativePathBuf::new(); path.push(url.authority().unwrap().to_string() + url.path() + ".json"); @@ -160,7 +160,10 @@ fn modify_post(post: &serde_json::Value, update: &serde_json::Value) -> Result<s if let Some(v) = v.as_array() { add_keys.insert(k.to_string(), v.clone()); } else { - return Err(StorageError::new(ErrorKind::BadRequest, "Malformed update object")); + return Err(StorageError::new( + ErrorKind::BadRequest, + "Malformed update object", + )); } } } @@ -194,9 +197,7 @@ fn modify_post(post: &serde_json::Value, update: &serde_json::Value) -> Result<s let k = &k; if let Some(prop) = props[k].as_array_mut() { if k == "children" { - v.into_iter() - .rev() - .for_each(|v| prop.insert(0, v)); + v.into_iter().rev().for_each(|v| prop.insert(0, v)); } else { prop.extend(v.into_iter()); } @@ -262,7 +263,7 @@ async fn hydrate_author<S: Storage>( if let Some(props) = feed["properties"].as_object_mut() { props["author"] = json!(author_list); } else { - feed["properties"] = json!({"author": author_list}); + feed["properties"] = json!({ "author": author_list }); } } } @@ -270,6 +271,7 @@ async fn hydrate_author<S: Storage>( #[async_trait] impl Storage for FileStorage { + #[tracing::instrument] async fn post_exists(&self, url: &str) -> Result<bool> { let path = url_to_path(&self.root_dir, url); debug!("Checking if {:?} exists...", path); @@ -289,6 +291,7 @@ impl Storage for FileStorage { Ok(spawn_blocking(move || path.is_file()).await.unwrap()) } + #[tracing::instrument] async fn get_post(&self, url: &str) -> Result<Option<serde_json::Value>> { let path = url_to_path(&self.root_dir, url); // TODO: check that the path actually belongs to the dir of user who requested it @@ -302,9 +305,13 @@ impl Storage for FileStorage { // Typechecks because OS magic acts on references // to FDs as if they were behind a mutex AsyncReadExt::read_to_string(&mut file, &mut content).await?; - debug!("Read {} bytes successfully from {:?}", content.as_bytes().len(), &path); + debug!( + "Read {} bytes successfully from {:?}", + content.as_bytes().len(), + &path + ); Ok(Some(serde_json::from_str(&content)?)) - }, + } Err(err) => { if err.kind() == IOErrorKind::NotFound { Ok(None) @@ -315,6 +322,7 @@ impl Storage for FileStorage { } } + #[tracing::instrument] async fn put_post(&self, post: &'_ serde_json::Value, user: &'_ str) -> Result<()> { let key = post["properties"]["uid"][0] .as_str() @@ -323,7 +331,10 @@ impl Storage for FileStorage { let tempfile = (&path).with_extension("tmp"); debug!("Creating {:?}", path); - let parent = path.parent().expect("Parent for this directory should always exist").to_owned(); + let parent = path + .parent() + .expect("Parent for this directory should always exist") + .to_owned(); if !parent.is_dir() { tokio::fs::create_dir_all(parent).await?; } @@ -331,7 +342,8 @@ impl Storage for FileStorage { let mut file = tokio::fs::OpenOptions::new() .write(true) .create_new(true) - .open(&tempfile).await?; + .open(&tempfile) + .await?; file.write_all(post.to_string().as_bytes()).await?; file.flush().await?; @@ -339,10 +351,7 @@ impl Storage for FileStorage { tokio::fs::rename(&tempfile, &path).await?; if let Some(urls) = post["properties"]["url"].as_array() { - for url in urls - .iter() - .map(|i| i.as_str().unwrap()) - { + for url in urls.iter().map(|i| i.as_str().unwrap()) { if url != key && url.starts_with(user) { let link = url_to_path(&self.root_dir, url); debug!("Creating a symlink at {:?}", link); @@ -370,7 +379,13 @@ impl Storage for FileStorage { println!("Adding to channel list..."); // Add the h-feed to the channel list let mut path = relative_path::RelativePathBuf::new(); - path.push(warp::http::Uri::try_from(user.to_string()).unwrap().authority().unwrap().to_string()); + path.push( + axum::http::Uri::try_from(user.to_string()) + .unwrap() + .authority() + .unwrap() + .to_string(), + ); path.push("channels"); let path = path.to_path(&self.root_dir); @@ -384,13 +399,15 @@ impl Storage for FileStorage { let mut tempfile = OpenOptions::new() .write(true) .create_new(true) - .open(&tempfilename).await?; + .open(&tempfilename) + .await?; let mut file = OpenOptions::new() .read(true) .write(true) .truncate(false) .create(true) - .open(&path).await?; + .open(&path) + .await?; let mut content = String::new(); file.read_to_string(&mut content).await?; @@ -406,7 +423,9 @@ impl Storage for FileStorage { name: channel_name, }); - tempfile.write_all(serde_json::to_string(&channels)?.as_bytes()).await?; + tempfile + .write_all(serde_json::to_string(&channels)?.as_bytes()) + .await?; tempfile.flush().await?; drop(tempfile); tokio::fs::rename(tempfilename, path).await?; @@ -414,6 +433,7 @@ impl Storage for FileStorage { Ok(()) } + #[tracing::instrument] async fn update_post(&self, url: &'_ str, update: serde_json::Value) -> Result<()> { let path = url_to_path(&self.root_dir, url); let tempfilename = path.with_extension("tmp"); @@ -424,10 +444,7 @@ impl Storage for FileStorage { .create_new(true) .open(&tempfilename) .await?; - let mut file = OpenOptions::new() - .read(true) - .open(&path) - .await?; + let mut file = OpenOptions::new().read(true).open(&path).await?; let mut content = String::new(); file.read_to_string(&mut content).await?; @@ -447,9 +464,16 @@ impl Storage for FileStorage { Ok(()) } + #[tracing::instrument] async fn get_channels(&self, user: &'_ str) -> Result<Vec<super::MicropubChannel>> { let mut path = relative_path::RelativePathBuf::new(); - path.push(warp::http::Uri::try_from(user.to_string()).unwrap().authority().unwrap().to_string()); + path.push( + axum::http::Uri::try_from(user.to_string()) + .unwrap() + .authority() + .unwrap() + .to_string(), + ); path.push("channels"); let path = path.to_path(&self.root_dir); @@ -474,6 +498,7 @@ impl Storage for FileStorage { } } + #[tracing::instrument] async fn read_feed_with_limit( &self, url: &'_ str, @@ -498,7 +523,7 @@ impl Storage for FileStorage { if let Some(after) = after { for s in posts_iter.by_ref() { if &s == after { - break + break; } } }; @@ -539,6 +564,7 @@ impl Storage for FileStorage { } } + #[tracing::instrument] async fn delete_post(&self, url: &'_ str) -> Result<()> { let path = url_to_path(&self.root_dir, url); if let Err(e) = tokio::fs::remove_file(path).await { @@ -549,9 +575,10 @@ impl Storage for FileStorage { } } + #[tracing::instrument] async fn get_setting(&self, setting: Settings, user: &'_ str) -> Result<String> { log::debug!("User for getting settings: {}", user); - let url = warp::http::Uri::try_from(user).expect("Couldn't parse a URL"); + let url = axum::http::Uri::try_from(user).expect("Couldn't parse a URL"); let mut path = relative_path::RelativePathBuf::new(); path.push(url.authority().unwrap().to_string()); path.push("settings"); @@ -572,8 +599,9 @@ impl Storage for FileStorage { .ok_or_else(|| StorageError::new(ErrorKind::Backend, "Setting not set")) } + #[tracing::instrument] async fn set_setting(&self, setting: Settings, user: &'_ str, value: &'_ str) -> Result<()> { - let url = warp::http::Uri::try_from(user).expect("Couldn't parse a URL"); + let url = axum::http::Uri::try_from(user).expect("Couldn't parse a URL"); let mut path = relative_path::RelativePathBuf::new(); path.push(url.authority().unwrap().to_string()); path.push("settings"); @@ -604,14 +632,18 @@ impl Storage for FileStorage { serde_json::from_str(&content)? } } - Err(err) => if err.kind() == IOErrorKind::NotFound { - HashMap::default() - } else { - return Err(err.into()) + Err(err) => { + if err.kind() == IOErrorKind::NotFound { + HashMap::default() + } else { + return Err(err.into()); + } } }; settings.insert(setting, value); - tempfile.write_all(serde_json::to_string(&settings)?.as_bytes()).await?; + tempfile + .write_all(serde_json::to_string(&settings)?.as_bytes()) + .await?; drop(tempfile); tokio::fs::rename(temppath, path).await?; Ok(()) diff --git a/kittybox-rs/src/database/memory.rs b/kittybox-rs/src/database/memory.rs index 786466c..c8cc125 100644 --- a/kittybox-rs/src/database/memory.rs +++ b/kittybox-rs/src/database/memory.rs @@ -1,26 +1,26 @@ #![allow(clippy::todo)] use async_trait::async_trait; +use futures_util::FutureExt; +use serde_json::json; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::RwLock; -use futures_util::FutureExt; -use serde_json::json; -use crate::database::{Storage, Result, StorageError, ErrorKind, MicropubChannel, Settings}; +use crate::database::{ErrorKind, MicropubChannel, Result, Settings, Storage, StorageError}; #[derive(Clone, Debug)] pub struct MemoryStorage { pub mapping: Arc<RwLock<HashMap<String, serde_json::Value>>>, - pub channels: Arc<RwLock<HashMap<String, Vec<String>>>> + pub channels: Arc<RwLock<HashMap<String, Vec<String>>>>, } #[async_trait] impl Storage for MemoryStorage { async fn post_exists(&self, url: &str) -> Result<bool> { - return Ok(self.mapping.read().await.contains_key(url)) + return Ok(self.mapping.read().await.contains_key(url)); } - async fn get_post(&self, url: &str) ->Result<Option<serde_json::Value>> { + async fn get_post(&self, url: &str) -> Result<Option<serde_json::Value>> { let mapping = self.mapping.read().await; match mapping.get(url) { Some(val) => { @@ -36,8 +36,8 @@ impl Storage for MemoryStorage { } else { Ok(Some(val.clone())) } - }, - _ => Ok(None) + } + _ => Ok(None), } } @@ -45,20 +45,45 @@ impl Storage for MemoryStorage { let mapping = &mut self.mapping.write().await; let key: &str = match post["properties"]["uid"][0].as_str() { Some(uid) => uid, - None => return Err(StorageError::new(ErrorKind::Other, "post doesn't have a UID")) + None => { + return Err(StorageError::new( + ErrorKind::Other, + "post doesn't have a UID", + )) + } }; mapping.insert(key.to_string(), post.clone()); if post["properties"]["url"].is_array() { - for url in post["properties"]["url"].as_array().unwrap().iter().map(|i| i.as_str().unwrap().to_string()) { + for url in post["properties"]["url"] + .as_array() + .unwrap() + .iter() + .map(|i| i.as_str().unwrap().to_string()) + { if url != key { - mapping.insert(url, json!({"see_other": key})); + mapping.insert(url, json!({ "see_other": key })); } } } - if post["type"].as_array().unwrap().iter().any(|i| i == "h-feed") { + if post["type"] + .as_array() + .unwrap() + .iter() + .any(|i| i == "h-feed") + { // This is a feed. Add it to the channels array if it's not already there. println!("{:#}", post); - self.channels.write().await.entry(post["properties"]["author"][0].as_str().unwrap().to_string()).or_insert_with(Vec::new).push(key.to_string()) + self.channels + .write() + .await + .entry( + post["properties"]["author"][0] + .as_str() + .unwrap() + .to_string(), + ) + .or_insert_with(Vec::new) + .push(key.to_string()) } Ok(()) } @@ -69,13 +94,24 @@ impl Storage for MemoryStorage { let mut remove_values: HashMap<String, Vec<serde_json::Value>> = HashMap::new(); if let Some(delete) = update["delete"].as_array() { - remove_keys.extend(delete.iter().filter_map(|v| v.as_str()).map(|v| v.to_string())); + remove_keys.extend( + delete + .iter() + .filter_map(|v| v.as_str()) + .map(|v| v.to_string()), + ); } else if let Some(delete) = update["delete"].as_object() { for (k, v) in delete { if let Some(v) = v.as_array() { - remove_values.entry(k.to_string()).or_default().extend(v.clone()); + remove_values + .entry(k.to_string()) + .or_default() + .extend(v.clone()); } else { - return Err(StorageError::new(ErrorKind::BadRequest, "Malformed update object")); + return Err(StorageError::new( + ErrorKind::BadRequest, + "Malformed update object", + )); } } } @@ -84,7 +120,10 @@ impl Storage for MemoryStorage { if v.is_array() { add_keys.insert(k.to_string(), v.clone()); } else { - return Err(StorageError::new(ErrorKind::BadRequest, "Malformed update object")); + return Err(StorageError::new( + ErrorKind::BadRequest, + "Malformed update object", + )); } } } @@ -100,7 +139,10 @@ impl Storage for MemoryStorage { if let Some(new_post) = mapping.get(url) { post = new_post } else { - return Err(StorageError::new(ErrorKind::NotFound, "The post you have requested is not found in the database.")); + return Err(StorageError::new( + ErrorKind::NotFound, + "The post you have requested is not found in the database.", + )); } } let mut post = post.clone(); @@ -131,7 +173,12 @@ impl Storage for MemoryStorage { let k = &k; if let Some(prop) = props[k].as_array_mut() { if k == "children" { - v.as_array().unwrap().iter().cloned().rev().for_each(|v| prop.insert(0, v)); + v.as_array() + .unwrap() + .iter() + .cloned() + .rev() + .for_each(|v| prop.insert(0, v)); } else { prop.extend(v.as_array().unwrap().iter().cloned()); } @@ -139,32 +186,55 @@ impl Storage for MemoryStorage { post["properties"][k] = v } } - mapping.insert(post["properties"]["uid"][0].as_str().unwrap().to_string(), post); + mapping.insert( + post["properties"]["uid"][0].as_str().unwrap().to_string(), + post, + ); } else { - return Err(StorageError::new(ErrorKind::NotFound, "The designated post wasn't found in the database.")); + return Err(StorageError::new( + ErrorKind::NotFound, + "The designated post wasn't found in the database.", + )); } Ok(()) } async fn get_channels(&self, user: &'_ str) -> Result<Vec<MicropubChannel>> { match self.channels.read().await.get(user) { - Some(channels) => Ok(futures_util::future::join_all(channels.iter() - .map(|channel| self.get_post(channel) - .map(|result| result.unwrap()) - .map(|post: Option<serde_json::Value>| { - post.map(|post| MicropubChannel { - uid: post["properties"]["uid"][0].as_str().unwrap().to_string(), - name: post["properties"]["name"][0].as_str().unwrap().to_string() - }) + Some(channels) => Ok(futures_util::future::join_all( + channels + .iter() + .map(|channel| { + self.get_post(channel).map(|result| result.unwrap()).map( + |post: Option<serde_json::Value>| { + post.map(|post| MicropubChannel { + uid: post["properties"]["uid"][0].as_str().unwrap().to_string(), + name: post["properties"]["name"][0] + .as_str() + .unwrap() + .to_string(), + }) + }, + ) }) - ).collect::<Vec<_>>()).await.into_iter().flatten().collect::<Vec<_>>()), - None => Ok(vec![]) + .collect::<Vec<_>>(), + ) + .await + .into_iter() + .flatten() + .collect::<Vec<_>>()), + None => Ok(vec![]), } - } #[allow(unused_variables)] - async fn read_feed_with_limit(&self, url: &'_ str, after: &'_ Option<String>, limit: usize, user: &'_ Option<String>) -> Result<Option<serde_json::Value>> { + async fn read_feed_with_limit( + &self, + url: &'_ str, + after: &'_ Option<String>, + limit: usize, + user: &'_ Option<String>, + ) -> Result<Option<serde_json::Value>> { todo!() } @@ -194,7 +264,7 @@ impl MemoryStorage { pub fn new() -> Self { Self { mapping: Arc::new(RwLock::new(HashMap::new())), - channels: Arc::new(RwLock::new(HashMap::new())) + channels: Arc::new(RwLock::new(HashMap::new())), } } } diff --git a/kittybox-rs/src/database/mod.rs b/kittybox-rs/src/database/mod.rs index 6bf5409..bd25d8d 100644 --- a/kittybox-rs/src/database/mod.rs +++ b/kittybox-rs/src/database/mod.rs @@ -55,8 +55,6 @@ pub struct StorageError { kind: ErrorKind, } -impl warp::reject::Reject for StorageError {} - impl std::error::Error for StorageError { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { self.source @@ -75,18 +73,20 @@ impl From<serde_json::Error> for StorageError { } impl std::fmt::Display for StorageError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match match self.kind { - ErrorKind::Backend => write!(f, "backend error: "), - ErrorKind::JsonParsing => write!(f, "error while parsing JSON: "), - ErrorKind::PermissionDenied => write!(f, "permission denied: "), - ErrorKind::NotFound => write!(f, "not found: "), - ErrorKind::BadRequest => write!(f, "bad request: "), - ErrorKind::Conflict => write!(f, "conflict with an in-flight request or existing data: "), - ErrorKind::Other => write!(f, "generic storage layer error: "), - } { - Ok(_) => write!(f, "{}", self.msg), - Err(err) => Err(err), - } + write!( + f, + "{}: {}", + match self.kind { + ErrorKind::Backend => "backend error", + ErrorKind::JsonParsing => "JSON parsing error", + ErrorKind::PermissionDenied => "permission denied", + ErrorKind::NotFound => "not found", + ErrorKind::BadRequest => "bad request", + ErrorKind::Conflict => "conflict with an in-flight request or existing data", + ErrorKind::Other => "generic storage layer error", + }, + self.msg + ) } } impl serde::Serialize for StorageError { @@ -377,9 +377,11 @@ mod tests { returned_post["properties"]["category"].as_array().unwrap(), &vec![json!("testing")] ); - }, + } something_else => { - something_else.expect("Shouldn't error").expect("Should have the post"); + something_else + .expect("Shouldn't error") + .expect("Should have the post"); } } } @@ -411,7 +413,11 @@ mod tests { async fn test_settings<Backend: Storage>(backend: Backend) { backend - .set_setting(crate::database::Settings::SiteName, "https://fireburn.ru/", "Vika's Hideout") + .set_setting( + crate::database::Settings::SiteName, + "https://fireburn.ru/", + "Vika's Hideout", + ) .await .unwrap(); assert_eq!( @@ -428,7 +434,9 @@ mod tests { let uid = format!( "https://{domain}/posts/{}-{}-{}", - rand::random::<Word>(), rand::random::<Word>(), rand::random::<Word>() + rand::random::<Word>(), + rand::random::<Word>(), + rand::random::<Word>() ); let post = json!({ @@ -467,12 +475,16 @@ mod tests { .unwrap(); println!("---"); for (i, post) in posts.iter().enumerate() { - backend.put_post(post, "https://fireburn.ru/").await.unwrap(); + backend + .put_post(post, "https://fireburn.ru/") + .await + .unwrap(); println!("posts[{}] = {}", i, post["properties"]["uid"][0]); } println!("---"); let limit: usize = 10; - let result = backend.read_feed_with_limit(key, &None, limit, &None) + let result = backend + .read_feed_with_limit(key, &None, limit, &None) .await .unwrap() .unwrap(); @@ -482,31 +494,38 @@ mod tests { println!("---"); assert_eq!(result["children"].as_array().unwrap()[0..10], posts[0..10]); - let result2 = backend.read_feed_with_limit( - key, - &result["children"] - .as_array() - .unwrap() - .last() - .unwrap() - ["properties"]["uid"][0] - .as_str() - .map(|i| i.to_owned()), - limit, &None - ).await.unwrap().unwrap(); + let result2 = backend + .read_feed_with_limit( + key, + &result["children"].as_array().unwrap().last().unwrap()["properties"]["uid"][0] + .as_str() + .map(|i| i.to_owned()), + limit, + &None, + ) + .await + .unwrap() + .unwrap(); for (i, post) in result2["children"].as_array().unwrap().iter().enumerate() { println!("feed[1][{}] = {}", i, post["properties"]["uid"][0]); } println!("---"); - assert_eq!(result2["children"].as_array().unwrap()[0..10], posts[10..20]); + assert_eq!( + result2["children"].as_array().unwrap()[0..10], + posts[10..20] + ); // Regression test for #4 let nonsense_after = Some("1010101010".to_owned()); let result3 = tokio::time::timeout(tokio::time::Duration::from_secs(10), async move { - backend.read_feed_with_limit( - key, &nonsense_after, limit, &None - ).await.unwrap().unwrap() - }).await.expect("Operation should not hang: see https://gitlab.com/kittybox/kittybox/-/issues/4"); + backend + .read_feed_with_limit(key, &nonsense_after, limit, &None) + .await + .unwrap() + .unwrap() + }) + .await + .expect("Operation should not hang: see https://gitlab.com/kittybox/kittybox/-/issues/4"); assert!(result3["children"].as_array().unwrap().is_empty()); } @@ -520,20 +539,21 @@ mod tests { $func_name!(test_update); $func_name!(test_feed_pagination); } - } + }; } macro_rules! file_test { ($func_name:ident) => { #[tokio::test] - async fn $func_name () { + async fn $func_name() { test_logger::ensure_env_logger_initialized(); let tempdir = tempdir::TempDir::new("file").expect("Failed to create tempdir"); - let backend = super::super::FileStorage::new(tempdir.into_path()).await.unwrap(); + let backend = super::super::FileStorage::new(tempdir.into_path()) + .await + .unwrap(); super::$func_name(backend).await } }; } test_all!(file_test, file); - } diff --git a/kittybox-rs/src/frontend/mod.rs b/kittybox-rs/src/frontend/mod.rs index b87f9c6..51db2e1 100644 --- a/kittybox-rs/src/frontend/mod.rs +++ b/kittybox-rs/src/frontend/mod.rs @@ -1,18 +1,25 @@ -use std::convert::TryInto; -use crate::database::Storage; +use crate::database::{Storage, StorageError}; +use axum::{ + extract::{Host, Path, Query}, + http::{StatusCode, Uri}, + response::IntoResponse, + Extension, +}; +use futures_util::FutureExt; use serde::Deserialize; -use futures_util::TryFutureExt; -use warp::{http::StatusCode, Filter, host::Authority, path::FullPath}; - +use std::convert::TryInto; +use tracing::{debug, error}; //pub mod login; +pub mod onboarding; -#[allow(unused_imports)] -use kittybox_templates::{ErrorPage, MainPage, OnboardingPage, Template, POSTS_PER_PAGE}; +use kittybox_templates::{ + Entry, ErrorPage, Feed, MainPage, Template, VCard, POSTS_PER_PAGE, +}; pub use kittybox_util::IndiewebEndpoints; -#[derive(Deserialize)] -struct QueryParams { +#[derive(Debug, Deserialize)] +pub struct QueryParams { after: Option<String>, } @@ -42,8 +49,8 @@ impl FrontendError { } } -impl From<crate::database::StorageError> for FrontendError { - fn from(err: crate::database::StorageError) -> Self { +impl From<StorageError> for FrontendError { + fn from(err: StorageError) -> Self { Self { msg: "Database error".to_string(), source: Some(Box::new(err)), @@ -66,8 +73,6 @@ impl std::fmt::Display for FrontendError { } } -impl warp::reject::Reject for FrontendError {} - async fn get_post_from_database<S: Storage>( db: &S, url: &str, @@ -105,309 +110,169 @@ async fn get_post_from_database<S: Storage>( } } -#[allow(dead_code)] -#[derive(Deserialize)] -struct OnboardingFeed { - slug: String, - name: String, -} - -#[allow(dead_code)] -#[derive(Deserialize)] -struct OnboardingData { - user: serde_json::Value, - first_post: serde_json::Value, - #[serde(default = "OnboardingData::default_blog_name")] - blog_name: String, - feeds: Vec<OnboardingFeed>, -} - -impl OnboardingData { - fn default_blog_name() -> String { - "Kitty Box!".to_owned() - } -} - -/*pub async fn onboarding_receiver<S: Storage>(mut req: Request<ApplicationState<S>>) -> Result { - use serde_json::json; - - log::debug!("Entering onboarding receiver..."); - - // This cannot error out as the URL must be valid. Or there is something horribly wrong - // and we shouldn't serve this request anyway. - <dyn AsMut<tide::http::Request>>::as_mut(&mut req) - .url_mut() - .set_scheme("https") - .unwrap(); - - log::debug!("Parsing the body..."); - let body = req.body_json::<OnboardingData>().await?; - log::debug!("Body parsed!"); - let backend = &req.state().storage; - - #[cfg(any(not(debug_assertions), test))] - let me = req.url(); - #[cfg(all(debug_assertions, not(test)))] - let me = url::Url::parse("https://localhost:8080/").unwrap(); - - log::debug!("me value: {:?}", me); - - if get_post_from_database(backend, me.as_str(), None, &None) - .await - .is_ok() - { - return Err(FrontendError::with_code( - StatusCode::Forbidden, - "Onboarding is over. Are you trying to take over somebody's website?!", - ) - .into()); - } - info!("Onboarding new user: {}", me); - - let user = crate::indieauth::User::new(me.as_str(), "https://kittybox.fireburn.ru/", "create"); - - log::debug!("Setting the site name to {}", &body.blog_name); - backend - .set_setting("site_name", user.me.as_str(), &body.blog_name) - .await?; - - if body.user["type"][0] != "h-card" || body.first_post["type"][0] != "h-entry" { - return Err(FrontendError::with_code( - StatusCode::BadRequest, - "user and first_post should be h-card and h-entry", - ) - .into()); - } - info!("Validated body.user and body.first_post as microformats2"); - - let mut hcard = body.user; - let hentry = body.first_post; - - // Ensure the h-card's UID is set to the main page, so it will be fetchable. - hcard["properties"]["uid"] = json!([me.as_str()]); - // Normalize the h-card - note that it should preserve the UID we set here. - let (_, hcard) = crate::micropub::normalize_mf2(hcard, &user); - // The h-card is written directly - all the stuff in the Micropub's - // post function is just to ensure that the posts will be syndicated - // and inserted into proper feeds. Here, we don't have a need for this, - // since the h-card is DIRECTLY accessible via its own URL. - log::debug!("Saving the h-card..."); - backend.put_post(&hcard, me.as_str()).await?; - - log::debug!("Creating feeds..."); - for feed in body.feeds { - if feed.name.is_empty() || feed.slug.is_empty() { - continue; - }; - log::debug!("Creating feed {} with slug {}", &feed.name, &feed.slug); - let (_, feed) = crate::micropub::normalize_mf2( - json!({ - "type": ["h-feed"], - "properties": {"name": [feed.name], "mp-slug": [feed.slug]} - }), - &user, - ); - - backend.put_post(&feed, me.as_str()).await?; - } - log::debug!("Saving the h-entry..."); - // This basically puts the h-entry post through the normal creation process. - // We need to insert it into feeds and optionally send a notification to everywhere. - req.set_ext(user); - crate::micropub::post::new_post(req, hentry).await?; - - Ok(Response::builder(201).header("Location", "/").build()) -} -*/ - -fn request_uri() -> impl Filter<Extract = (String,), Error = warp::Rejection> + Copy { - crate::util::require_host() - .and(warp::path::full()) - .map(|host: Authority, path: FullPath| "https://".to_owned() + host.as_str() + path.as_str()) -} - -#[forbid(clippy::unwrap_used)] -pub fn homepage<D: Storage>(db: D, endpoints: IndiewebEndpoints) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone { - let inject_db = move || db.clone(); - warp::any() - .map(inject_db.clone()) - .and(crate::util::require_host()) - .and(warp::query()) - .and_then(|db: D, host: Authority, q: QueryParams| async move { - let path = format!("https://{}/", host); - let feed_path = format!("https://{}/feeds/main", host); - - match tokio::try_join!( - get_post_from_database(&db, &path, None, &None), - get_post_from_database(&db, &feed_path, q.after, &None) - ) { - Ok((hcard, hfeed)) => Ok(( - Some(hcard), - Some(hfeed), - StatusCode::OK - )), - Err(err) => { - if err.code == StatusCode::NOT_FOUND { - // signal for onboarding flow - Ok((None, None, err.code)) - } else { - Err(warp::reject::custom(err)) +#[tracing::instrument(skip(db))] +pub async fn homepage<D: Storage>( + Host(host): Host, + Query(query): Query<QueryParams>, + Extension(db): Extension<D>, +) -> impl IntoResponse { + let user = None; // TODO authentication + let path = format!("https://{}/", host); + let feed_path = format!("https://{}/feeds/main", host); + + match tokio::try_join!( + get_post_from_database(&db, &path, None, &user), + get_post_from_database(&db, &feed_path, query.after, &user) + ) { + Ok((hcard, hfeed)) => { + // Here, we know those operations can't really fail + // (or it'll be a transient failure that will show up on + // other requests anyway if it's serious...) + // + // btw is it more efficient to fetch these in parallel? + let (blogname, channels) = tokio::join!( + db.get_setting(crate::database::Settings::SiteName, &path) + .map(|i| i.unwrap_or_else(|_| "Kittybox".to_owned())), + db.get_channels(&path).map(|i| i.unwrap_or_default()) + ); + // Render the homepage + ( + StatusCode::OK, + [( + axum::http::header::CONTENT_TYPE, + r#"text/html; charset="utf-8""#, + )], + Template { + title: &blogname, + blog_name: &blogname, + endpoints: None, // XXX this will be deprecated soon anyway + feeds: channels, + user, + content: MainPage { + feed: &hfeed, + card: &hcard, } + .to_string(), } + .to_string(), + ) + } + Err(err) => { + if err.code == StatusCode::NOT_FOUND { + debug!("Transferring to onboarding..."); + // Transfer to onboarding + ( + StatusCode::FOUND, + [(axum::http::header::LOCATION, "/.kittybox/onboarding")], + String::default(), + ) + } else { + error!("Error while fetching h-card and/or h-feed: {}", err); + // Return the error + let (blogname, channels) = tokio::join!( + db.get_setting(crate::database::Settings::SiteName, &path) + .map(|i| i.unwrap_or_else(|_| "Kittybox".to_owned())), + db.get_channels(&path).map(|i| i.unwrap_or_default()) + ); + + ( + err.code(), + [( + axum::http::header::CONTENT_TYPE, + r#"text/html; charset="utf-8""#, + )], + Template { + title: &blogname, + blog_name: &blogname, + endpoints: None, // XXX this will be deprecated soon anyway + feeds: channels, + user, + content: ErrorPage { + code: err.code(), + msg: Some(err.msg().to_string()), + } + .to_string(), + } + .to_string(), + ) } - }) - .and(warp::any().map(move || endpoints.clone())) - .and(crate::util::require_host()) - .and(warp::any().map(inject_db)) - .then(|content: (Option<serde_json::Value>, Option<serde_json::Value>, StatusCode), endpoints: IndiewebEndpoints, host: Authority, db: D| async move { - let owner = format!("https://{}/", host.as_str()); - let blog_name = db.get_setting(crate::database::Settings::SiteName, &owner).await - .unwrap_or_else(|_| "Kitty Box!".to_string()); - let feeds = db.get_channels(&owner).await.unwrap_or_default(); - match content { - (Some(card), Some(feed), StatusCode::OK) => { - Box::new(warp::reply::html(Template { - title: &blog_name, - blog_name: &blog_name, - endpoints: Some(endpoints), - feeds, - user: None, // TODO - content: MainPage { feed: &feed, card: &card }.to_string() - }.to_string())) as Box<dyn warp::Reply> - }, - (None, None, StatusCode::NOT_FOUND) => { - // TODO Onboarding - Box::new(warp::redirect::found( - hyper::Uri::from_static("/onboarding") - )) as Box<dyn warp::Reply> - } - _ => unreachable!() - } - }) + } + } } -pub fn onboarding<D: 'static + Storage>( - db: D, - endpoints: IndiewebEndpoints, - http: reqwest::Client -) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone { - let inject_db = move || db.clone(); - warp::get() - .map(move || warp::reply::html(Template { - title: "Kittybox - Onboarding", - blog_name: "Kittybox", - endpoints: Some(endpoints.clone()), - feeds: vec![], - user: None, - content: OnboardingPage {}.to_string() - }.to_string())) - .or(warp::post() - .and(crate::util::require_host()) - .and(warp::any().map(inject_db)) - .and(warp::body::json::<OnboardingData>()) - .and(warp::any().map(move || http.clone())) - .and_then(|host: warp::host::Authority, db: D, body: OnboardingData, http: reqwest::Client| async move { - let user_uid = format!("https://{}/", host.as_str()); - if db.post_exists(&user_uid).await.map_err(FrontendError::from)? { - - return Ok(warp::redirect(hyper::Uri::from_static("/"))); - } - let user = crate::indieauth::User::new(&user_uid, "https://kittybox.fireburn.ru/", "create"); - if body.user["type"][0] != "h-card" || body.first_post["type"][0] != "h-entry" { - return Err(FrontendError::with_code(StatusCode::BAD_REQUEST, "user and first_post should be an h-card and an h-entry").into()); - } - db.set_setting(crate::database::Settings::SiteName, user.me.as_str(), &body.blog_name) - .await - .map_err(FrontendError::from)?; +#[tracing::instrument(skip(db))] +pub async fn catchall<D: Storage>( + Extension(db): Extension<D>, + Host(host): Host, + Query(query): Query<QueryParams>, + uri: Uri, +) -> impl IntoResponse { + let user = None; // TODO authentication + let path = url::Url::parse(&format!("https://{}/", host)) + .unwrap() + .join(uri.path()) + .unwrap(); - let (_, hcard) = { - let mut hcard = body.user; - hcard["properties"]["uid"] = serde_json::json!([&user_uid]); - crate::micropub::normalize_mf2(hcard, &user) - }; - db.put_post(&hcard, &user_uid).await.map_err(FrontendError::from)?; - let (uid, post) = crate::micropub::normalize_mf2(body.first_post, &user); - crate::micropub::_post(user, uid, post, db, http).await.map_err(|e| { - FrontendError { - msg: "Error while posting the first post".to_string(), - source: Some(Box::new(e)), - code: StatusCode::INTERNAL_SERVER_ERROR + match get_post_from_database(&db, path.as_str(), query.after, &user).await { + Ok(post) => { + let (blogname, channels) = tokio::join!( + db.get_setting(crate::database::Settings::SiteName, &host) + .map(|i| i.unwrap_or_else(|_| "Kittybox".to_owned())), + db.get_channels(&host).map(|i| i.unwrap_or_default()) + ); + // Render the homepage + ( + StatusCode::OK, + [( + axum::http::header::CONTENT_TYPE, + r#"text/html; charset="utf-8""#, + )], + Template { + title: &blogname, + blog_name: &blogname, + endpoints: None, // XXX this will be deprecated soon anyway + feeds: channels, + user, + content: match post.pointer("/type/0").and_then(|i| i.as_str()) { + Some("h-entry") => Entry { post: &post }.to_string(), + Some("h-feed") => Feed { feed: &post }.to_string(), + Some("h-card") => VCard { card: &post }.to_string(), + unknown => { + unimplemented!("Template for MF2-JSON type {:?}", unknown) + } + }, + } + .to_string(), + ) + } + Err(err) => { + let (blogname, channels) = tokio::join!( + db.get_setting(crate::database::Settings::SiteName, &host) + .map(|i| i.unwrap_or_else(|_| "Kittybox".to_owned())), + db.get_channels(&host).map(|i| i.unwrap_or_default()) + ); + ( + err.code(), + [( + axum::http::header::CONTENT_TYPE, + r#"text/html; charset="utf-8""#, + )], + Template { + title: &blogname, + blog_name: &blogname, + endpoints: None, + feeds: channels, + user, + content: ErrorPage { + code: err.code(), + msg: Some(err.msg().to_owned()), } - })?; - Ok::<_, warp::Rejection>(warp::redirect(hyper::Uri::from_static("/"))) - })) - -} - -#[forbid(clippy::unwrap_used)] -pub fn catchall<D: Storage>(db: D, endpoints: IndiewebEndpoints) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone { - let inject_db = move || db.clone(); - warp::any() - .map(inject_db.clone()) - .and(request_uri()) - .and(warp::query()) - .and_then(|db: D, path: String, query: QueryParams| async move { - get_post_from_database(&db, &path, query.after, &None).map_err(warp::reject::custom).await - }) - // Rendering pipeline - .and_then(|post: serde_json::Value| async move { - let post_name = &post["properties"]["name"][0].as_str().to_owned(); - match post["type"][0] - .as_str() - { - Some("h-entry") => Ok(( - post_name.unwrap_or("Note").to_string(), - kittybox_templates::Entry { post: &post }.to_string(), - StatusCode::OK - )), - Some("h-card") => Ok(( - post_name.unwrap_or("Contact card").to_string(), - kittybox_templates::VCard { card: &post }.to_string(), - StatusCode::OK - )), - Some("h-feed") => Ok(( - post_name.unwrap_or("Feed").to_string(), - kittybox_templates::Feed { feed: &post }.to_string(), - StatusCode::OK - )), - _ => Err(warp::reject::custom(FrontendError::with_code( - StatusCode::INTERNAL_SERVER_ERROR, - &format!("Couldn't render an unknown type: {}", post["type"][0]), - ))) - } - }) - .recover(|err: warp::Rejection| { - use warp::Rejection; - use futures_util::future; - if let Some(err) = err.find::<FrontendError>() { - return future::ok::<(String, String, StatusCode), Rejection>(( - format!("Error: HTTP {}", err.code().as_u16()), - ErrorPage { code: err.code(), msg: Some(err.msg().to_string()) }.to_string(), - err.code() - )); - } - future::err::<(String, String, StatusCode), Rejection>(err) - }) - .unify() - .and(warp::any().map(move || endpoints.clone())) - .and(crate::util::require_host()) - .and(warp::any().map(inject_db)) - .then(|content: (String, String, StatusCode), endpoints: IndiewebEndpoints, host: Authority, db: D| async move { - let owner = format!("https://{}/", host.as_str()); - let blog_name = db.get_setting(crate::database::Settings::SiteName, &owner).await - .unwrap_or_else(|_| "Kitty Box!".to_string()); - let feeds = db.get_channels(&owner).await.unwrap_or_default(); - let (title, content, code) = content; - warp::reply::with_status(warp::reply::html(Template { - title: &title, - blog_name: &blog_name, - endpoints: Some(endpoints), - feeds, - user: None, // TODO - content, - }.to_string()), code) - }) - + .to_string(), + } + .to_string(), + ) + } + } } static STYLE_CSS: &[u8] = include_bytes!("./style.css"); @@ -416,44 +281,19 @@ static ONBOARDING_CSS: &[u8] = include_bytes!("./onboarding.css"); static MIME_JS: &str = "application/javascript"; static MIME_CSS: &str = "text/css"; - -fn _dispatch_static(name: &str) -> Option<(&'static [u8], &'static str)> { - match name { - "style.css" => Some((STYLE_CSS, MIME_CSS)), - "onboarding.js" => Some((ONBOARDING_JS, MIME_JS)), - "onboarding.css" => Some((ONBOARDING_CSS, MIME_CSS)), - _ => None +static MIME_PLAIN: &str = "text/plain"; + +pub async fn statics(Path(name): Path<String>) -> impl IntoResponse { + use axum::http::header::CONTENT_TYPE; + + match name.as_str() { + "style.css" => (StatusCode::OK, [(CONTENT_TYPE, MIME_CSS)], STYLE_CSS), + "onboarding.js" => (StatusCode::OK, [(CONTENT_TYPE, MIME_JS)], ONBOARDING_JS), + "onboarding.css" => (StatusCode::OK, [(CONTENT_TYPE, MIME_CSS)], ONBOARDING_CSS), + _ => ( + StatusCode::NOT_FOUND, + [(CONTENT_TYPE, MIME_PLAIN)], + "not found".as_bytes(), + ), } } - -pub fn static_files() -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Copy { - use futures_util::future; - - warp::get() - .and(warp::path::param() - .and_then(|filename: String| { - match _dispatch_static(&filename) { - Some((buf, content_type)) => future::ok( - warp::reply::with_header( - buf, "Content-Type", content_type - ) - ), - None => future::err(warp::reject()) - } - })) - .or(warp::head() - .and(warp::path::param() - .and_then(|filename: String| { - match _dispatch_static(&filename) { - Some((buf, content_type)) => future::ok( - warp::reply::with_header( - warp::reply::with_header( - warp::reply(), "Content-Type", content_type - ), - "Content-Length", buf.len() - ) - ), - None => future::err(warp::reject()) - } - }))) -} diff --git a/kittybox-rs/src/frontend/onboarding.rs b/kittybox-rs/src/frontend/onboarding.rs new file mode 100644 index 0000000..18def1d --- /dev/null +++ b/kittybox-rs/src/frontend/onboarding.rs @@ -0,0 +1,142 @@ +use kittybox_templates::{ErrorPage, Template, OnboardingPage}; +use crate::database::{Storage, Settings}; +use axum::{ + Json, + extract::{Host, Extension}, + http::StatusCode, + response::{Html, IntoResponse}, +}; +use serde::Deserialize; +use tracing::{debug, error}; + +use super::FrontendError; + +pub async fn get() -> Html<String> { + Html(Template { + title: "Kittybox - Onboarding", + blog_name: "Kittybox", + feeds: vec![], + endpoints: None, + user: None, + content: OnboardingPage {}.to_string() + }.to_string()) +} + +#[derive(Deserialize, Debug)] +struct OnboardingFeed { + slug: String, + name: String, +} + +#[derive(Deserialize, Debug)] +pub struct OnboardingData { + user: serde_json::Value, + first_post: serde_json::Value, + #[serde(default = "OnboardingData::default_blog_name")] + blog_name: String, + feeds: Vec<OnboardingFeed>, +} + +impl OnboardingData { + fn default_blog_name() -> String { + "Kitty Box!".to_owned() + } +} + +#[tracing::instrument(skip(db, http))] +async fn onboard<D: Storage + 'static>( + db: D, user_uid: url::Url, data: OnboardingData, http: reqwest::Client +) -> Result<(), FrontendError> { + // Create a user to pass to the backend + // At this point the site belongs to nobody, so it is safe to do + let user = crate::indieauth::User::new( + user_uid.as_str(), + "https://kittybox.fireburn.ru/", + "create" + ); + + if data.user["type"][0] != "h-card" || data.first_post["type"][0] != "h-entry" { + return Err(FrontendError::with_code( + StatusCode::BAD_REQUEST, + "user and first_post should be an h-card and an h-entry" + )) + } + + db.set_setting(Settings::SiteName, user.me.as_str(), &data.blog_name) + .await + .map_err(FrontendError::from)?; + + let (_, hcard) = { + let mut hcard = data.user; + hcard["properties"]["uid"] = serde_json::json!([&user_uid]); + crate::micropub::normalize_mf2(hcard, &user) + }; + db.put_post(&hcard, user_uid.as_str()).await.map_err(FrontendError::from)?; + + debug!("Creating feeds..."); + for feed in data.feeds { + if feed.name.is_empty() || feed.slug.is_empty() { + continue; + }; + log::debug!("Creating feed {} with slug {}", &feed.name, &feed.slug); + let (_, feed) = crate::micropub::normalize_mf2( + serde_json::json!({ + "type": ["h-feed"], + "properties": {"name": [feed.name], "mp-slug": [feed.slug]} + }), + &user, + ); + + db.put_post(&feed, user_uid.as_str()).await.map_err(FrontendError::from)?; + } + let (uid, post) = crate::micropub::normalize_mf2(data.first_post, &user); + crate::micropub::_post(user, uid, post, db, http).await.map_err(|e| { + FrontendError { + msg: "Error while posting the first post".to_string(), + source: Some(Box::new(e)), + code: StatusCode::INTERNAL_SERVER_ERROR + } + })?; + + Ok(()) +} + +pub async fn post<D: Storage + 'static>( + Extension(db): Extension<D>, + Host(host): Host, + Json(data): Json<OnboardingData>, + Extension(http): Extension<reqwest::Client> +) -> axum::response::Response { + let user_uid = format!("https://{}/", host.as_str()); + + if db.post_exists(&user_uid).await.unwrap() { + IntoResponse::into_response(( + StatusCode::FOUND, + [("Location", "/")] + )) + } else { + match onboard(db, user_uid.parse().unwrap(), data, http).await { + Ok(()) => IntoResponse::into_response(( + StatusCode::FOUND, + [("Location", "/")] + )), + Err(err) => { + error!("Onboarding error: {}", err); + IntoResponse::into_response(( + err.code(), + Html(Template { + title: "Kittybox - Onboarding", + blog_name: "Kittybox", + feeds: vec![], + endpoints: None, + user: None, + content: ErrorPage { + code: err.code(), + msg: Some(err.msg().to_string()), + }.to_string(), + }.to_string()) + )) + } + } + } +} diff --git a/kittybox-rs/src/indieauth.rs b/kittybox-rs/src/indieauth.rs index 57c0301..63de859 100644 --- a/kittybox-rs/src/indieauth.rs +++ b/kittybox-rs/src/indieauth.rs @@ -1,6 +1,5 @@ +use serde::{Deserialize, Serialize}; use url::Url; -use serde::{Serialize, Deserialize}; -use warp::{Filter, Rejection, reject::MissingHeader}; #[derive(Deserialize, Serialize, Debug, PartialEq, Clone)] pub struct User { @@ -15,40 +14,46 @@ pub enum ErrorKind { NotAuthorized, TokenEndpointError, JsonParsing, - Other + InvalidHeader, + Other, } #[derive(Deserialize, Serialize, Debug, Clone)] pub struct TokenEndpointError { error: String, - error_description: String + error_description: String, } #[derive(Debug)] pub struct IndieAuthError { source: Option<Box<dyn std::error::Error + Send + Sync>>, kind: ErrorKind, - msg: String + msg: String, } impl std::error::Error for IndieAuthError { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - self.source.as_ref().map(|e| e.as_ref() as &dyn std::error::Error) + self.source + .as_ref() + .map(|e| e.as_ref() as &dyn std::error::Error) } } impl std::fmt::Display for IndieAuthError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match match self.kind { - ErrorKind::TokenEndpointError => write!(f, "token endpoint returned an error: "), - ErrorKind::JsonParsing => write!(f, "error while parsing token endpoint response: "), - ErrorKind::NotAuthorized => write!(f, "token endpoint did not recognize the token: "), - ErrorKind::PermissionDenied => write!(f, "token endpoint rejected the token: "), - ErrorKind::Other => write!(f, "token endpoint communication error: "), - } { - Ok(_) => write!(f, "{}", self.msg), - Err(err) => Err(err) - } + write!( + f, + "{}: {}", + match self.kind { + ErrorKind::TokenEndpointError => "token endpoint returned an error: ", + ErrorKind::JsonParsing => "error while parsing token endpoint response: ", + ErrorKind::NotAuthorized => "token endpoint did not recognize the token: ", + ErrorKind::PermissionDenied => "token endpoint rejected the token: ", + ErrorKind::InvalidHeader => "authorization header parsing error: ", + ErrorKind::Other => "token endpoint communication error: ", + }, + self.msg + ) } } @@ -72,7 +77,42 @@ impl From<reqwest::Error> for IndieAuthError { } } -impl warp::reject::Reject for IndieAuthError {} +impl From<axum::extract::rejection::TypedHeaderRejection> for IndieAuthError { + fn from(err: axum::extract::rejection::TypedHeaderRejection) -> Self { + Self { + msg: format!("{:?}", err.reason()), + source: Some(Box::new(err)), + kind: ErrorKind::InvalidHeader, + } + } +} + +impl axum::response::IntoResponse for IndieAuthError { + fn into_response(self) -> axum::response::Response { + let status_code: StatusCode = match self.kind { + ErrorKind::PermissionDenied => StatusCode::FORBIDDEN, + ErrorKind::NotAuthorized => StatusCode::UNAUTHORIZED, + ErrorKind::TokenEndpointError => StatusCode::INTERNAL_SERVER_ERROR, + ErrorKind::JsonParsing => StatusCode::BAD_REQUEST, + ErrorKind::InvalidHeader => StatusCode::UNAUTHORIZED, + ErrorKind::Other => StatusCode::INTERNAL_SERVER_ERROR, + }; + + let body = serde_json::json!({ + "error": match self.kind { + ErrorKind::PermissionDenied => "forbidden", + ErrorKind::NotAuthorized => "unauthorized", + ErrorKind::TokenEndpointError => "token_endpoint_error", + ErrorKind::JsonParsing => "invalid_request", + ErrorKind::InvalidHeader => "unauthorized", + ErrorKind::Other => "unknown_error", + }, + "error_description": self.msg + }); + + (status_code, axum::response::Json(body)).into_response() + } +} impl User { pub fn check_scope(&self, scope: &str) -> bool { @@ -90,89 +130,112 @@ impl User { } } -pub fn require_token(token_endpoint: String, http: reqwest::Client) -> impl Filter<Extract = (User,), Error = Rejection> + Clone { - // It might be OK to panic here, because we're still inside the initialisation sequence for now. - // Proper error handling on the top of this should be used though. - let token_endpoint_uri = url::Url::parse(&token_endpoint) - .expect("Couldn't parse the token endpoint URI!"); - warp::any() - .map(move || token_endpoint_uri.clone()) - .and(warp::any().map(move || http.clone())) - .and(warp::header::<String>("Authorization").recover(|err: Rejection| async move { - if err.find::<MissingHeader>().is_some() { - Err(IndieAuthError { - source: None, - msg: "No Authorization header provided.".to_string(), - kind: ErrorKind::NotAuthorized - }.into()) - } else { - Err(err) - } - }).unify()) - .and_then(|token_endpoint, http: reqwest::Client, token| async move { - use hyper::StatusCode; - - match http - .get(token_endpoint) - .header("Authorization", token) - .header("Accept", "application/json") - .send() +use axum::{ + extract::{Extension, FromRequest, RequestParts, TypedHeader}, + headers::{ + authorization::{Bearer, Credentials}, + Authorization, + }, + http::StatusCode, +}; + +// this newtype is required due to axum::Extension retrieving items by type +// it's based on compiler magic matching extensions by their type's hashes +#[derive(Debug, Clone)] +pub struct TokenEndpoint(pub url::Url); + +#[async_trait::async_trait] +impl<B> FromRequest<B> for User +where + B: Send, +{ + type Rejection = IndieAuthError; + + #[cfg_attr(all(debug_assertions, not(test)), allow(unreachable_code, unused_variables))] + async fn from_request(req: &mut RequestParts<B>) -> Result<Self, Self::Rejection> { + // Return a fake user if we're running a debug build + // I don't wanna bother with authentication + #[cfg(all(debug_assertions, not(test)))] + return Ok(User::new( + "http://localhost:8080/", + "https://quill.p3k.io/", + "create update delete media" + )); + + let TypedHeader(Authorization(token)) = + TypedHeader::<Authorization<Bearer>>::from_request(req) .await - { - Ok(res) => match res.status() { - StatusCode::OK => match res.json::<serde_json::Value>().await { - Ok(json) => match serde_json::from_value::<User>(json.clone()) { - Ok(user) => Ok(user), - Err(err) => { - if let Some(false) = json["active"].as_bool() { - Err(IndieAuthError { - source: None, - kind: ErrorKind::NotAuthorized, - msg: "The token is not active for this user.".to_owned() - }.into()) - } else { - Err(IndieAuthError::from(err).into()) - } + .map_err(IndieAuthError::from)?; + + let Extension(TokenEndpoint(token_endpoint)): Extension<TokenEndpoint> = + Extension::from_request(req).await.unwrap(); + + let Extension(http): Extension<reqwest::Client> = + Extension::from_request(req).await.unwrap(); + + match http + .get(token_endpoint) + .header("Authorization", token.encode()) + .header("Accept", "application/json") + .send() + .await + { + Ok(res) => match res.status() { + StatusCode::OK => match res.json::<serde_json::Value>().await { + Ok(json) => match serde_json::from_value::<User>(json.clone()) { + Ok(user) => Ok(user), + Err(err) => { + if let Some(false) = json["active"].as_bool() { + Err(IndieAuthError { + source: None, + kind: ErrorKind::NotAuthorized, + msg: "The token is not active for this user.".to_owned(), + }) + } else { + Err(IndieAuthError::from(err)) } } - Err(err) => Err(IndieAuthError::from(err).into()) }, - StatusCode::BAD_REQUEST => { - match res.json::<TokenEndpointError>().await { - Ok(err) => { - if err.error == "unauthorized" { - Err(IndieAuthError { - source: None, - kind: ErrorKind::NotAuthorized, - msg: err.error_description - }.into()) - } else { - Err(IndieAuthError { - source: None, - kind: ErrorKind::TokenEndpointError, - msg: err.error_description - }.into()) - } - }, - Err(err) => Err(IndieAuthError::from(err).into()) + Err(err) => Err(IndieAuthError::from(err)), + }, + StatusCode::BAD_REQUEST => match res.json::<TokenEndpointError>().await { + Ok(err) => { + if err.error == "unauthorized" { + Err(IndieAuthError { + source: None, + kind: ErrorKind::NotAuthorized, + msg: err.error_description, + }) + } else { + Err(IndieAuthError { + source: None, + kind: ErrorKind::TokenEndpointError, + msg: err.error_description, + }) } - }, - _ => Err(IndieAuthError { - source: None, - msg: format!("Token endpoint returned {}", res.status()), - kind: ErrorKind::TokenEndpointError - }.into()) + } + Err(err) => Err(IndieAuthError::from(err)), }, - Err(err) => Err(warp::reject::custom(IndieAuthError::from(err))) - } - }) + _ => Err(IndieAuthError { + source: None, + msg: format!("Token endpoint returned {}", res.status()), + kind: ErrorKind::TokenEndpointError, + }), + }, + Err(err) => Err(IndieAuthError::from(err)), + } + } } #[cfg(test)] mod tests { - use super::{User, IndieAuthError, require_token}; + use super::User; + use axum::{ + extract::FromRequest, + http::{Method, Request} + }; use httpmock::prelude::*; - + #[test] fn user_scopes_are_checkable() { let user = User::new( @@ -189,76 +252,88 @@ mod tests { fn get_http_client() -> reqwest::Client { reqwest::Client::new() } - + + fn request<A: Into<Option<&'static str>>, T: TryInto<url::Url> + std::fmt::Debug>( + auth: A, + endpoint: T, + ) -> Request<()> + where + <T as std::convert::TryInto<url::Url>>::Error: std::fmt::Debug, + { + let request = Request::builder().method(Method::GET); + + match auth.into() { + Some(auth) => request.header("Authorization", auth), + None => request, + } + .extension(super::TokenEndpoint(endpoint.try_into().unwrap())) + .extension(get_http_client()) + .body(()) + .unwrap() + } + #[tokio::test] async fn test_require_token_with_token() { let server = MockServer::start_async().await; - server.mock_async(|when, then| { - when.path("/token") - .header("Authorization", "Bearer token"); - - then.status(200) - .header("Content-Type", "application/json") - .json_body(serde_json::to_value(User::new( - "https://fireburn.ru/", - "https://quill.p3k.io/", - "create update media", - )).unwrap()); - }).await; - - let filter = require_token(server.url("/token"), get_http_client()); - - let res: User = warp::test::request() - .path("/") - .header("Authorization", "Bearer token") - .filter(&filter) - .await - .unwrap(); - - assert_eq!(res.me.as_str(), "https://fireburn.ru/") + server + .mock_async(|when, then| { + when.path("/token").header("Authorization", "Bearer token"); + + then.status(200) + .header("Content-Type", "application/json") + .json_body( + serde_json::to_value(User::new( + "https://fireburn.ru/", + "https://quill.p3k.io/", + "create update media", + )) + .unwrap(), + ); + }) + .await; + + let request = request("Bearer token", server.url("/token").as_str()); + let mut parts = axum::extract::RequestParts::new(request); + let user = User::from_request(&mut parts).await.unwrap(); + + assert_eq!(user.me.as_str(), "https://fireburn.ru/") } #[tokio::test] async fn test_require_token_fake_token() { let server = MockServer::start_async().await; - server.mock_async(|when, then| { - when.path("/refuse_token"); - - then.status(200) - .json_body(serde_json::json!({"active": false})); - }).await; + server + .mock_async(|when, then| { + when.path("/refuse_token"); - let filter = require_token(server.url("/refuse_token"), get_http_client()); + then.status(200) + .json_body(serde_json::json!({"active": false})); + }) + .await; - let res = warp::test::request() - .path("/") - .header("Authorization", "Bearer token") - .filter(&filter) - .await - .unwrap_err(); + let request = request("Bearer token", server.url("/refuse_token").as_str()); + let mut parts = axum::extract::RequestParts::new(request); + let err = User::from_request(&mut parts).await.unwrap_err(); - let err: &IndieAuthError = res.find().unwrap(); - assert_eq!(err.kind, super::ErrorKind::NotAuthorized); + assert_eq!(err.kind, super::ErrorKind::NotAuthorized) } #[tokio::test] async fn test_require_token_no_token() { let server = MockServer::start_async().await; - let mock = server.mock_async(|when, then| { - when.path("/should_never_be_called"); + let mock = server + .mock_async(|when, then| { + when.path("/should_never_be_called"); - then.status(500); - }).await; - let filter = require_token(server.url("/should_never_be_called"), get_http_client()); + then.status(500); + }) + .await; - let res = warp::test::request() - .path("/") - .filter(&filter) - .await - .unwrap_err(); + let request = request(None, server.url("/should_never_be_called").as_str()); + let mut parts = axum::extract::RequestParts::new(request); + let err = User::from_request(&mut parts).await.unwrap_err(); - let err: &IndieAuthError = res.find().unwrap(); - assert_eq!(err.kind, super::ErrorKind::NotAuthorized); + assert_eq!(err.kind, super::ErrorKind::InvalidHeader); mock.assert_hits_async(0).await; } @@ -266,26 +341,24 @@ mod tests { #[tokio::test] async fn test_require_token_400_error_unauthorized() { let server = MockServer::start_async().await; - server.mock_async(|when, then| { - when.path("/refuse_token_with_400"); + server + .mock_async(|when, then| { + when.path("/refuse_token_with_400"); - then.status(400) - .json_body(serde_json::json!({ + then.status(400).json_body(serde_json::json!({ "error": "unauthorized", "error_description": "The token provided was malformed" })); - }).await; - - let filter = require_token(server.url("/refuse_token_with_400"), get_http_client()); + }) + .await; - let res = warp::test::request() - .path("/") - .header("Authorization", "Bearer token") - .filter(&filter) - .await - .unwrap_err(); + let request = request( + "Bearer token", + server.url("/refuse_token_with_400").as_str(), + ); + let mut parts = axum::extract::RequestParts::new(request); + let err = User::from_request(&mut parts).await.unwrap_err(); - let err: &IndieAuthError = res.find().unwrap(); assert_eq!(err.kind, super::ErrorKind::NotAuthorized); } } diff --git a/kittybox-rs/src/lib.rs b/kittybox-rs/src/lib.rs index 1800b5b..84e9d60 100644 --- a/kittybox-rs/src/lib.rs +++ b/kittybox-rs/src/lib.rs @@ -1,103 +1,11 @@ #![forbid(unsafe_code)] #![warn(clippy::todo)] -pub mod metrics; /// Database abstraction layer for Kittybox, allowing the CMS to work with any kind of database. pub mod database; -pub mod micropub; -pub mod media; -pub mod indieauth; pub mod frontend; - -pub(crate) mod rejections { - #[derive(Debug)] - pub(crate) struct UnacceptableContentType; - impl warp::reject::Reject for UnacceptableContentType {} - - #[derive(Debug)] - pub(crate) struct HostHeaderUnset; - impl warp::reject::Reject for HostHeaderUnset {} -} +pub mod indieauth; +pub mod media; +pub mod micropub; pub static MICROPUB_CLIENT: &[u8] = include_bytes!("./index.html"); - -pub mod util { - use warp::{Filter, host::Authority}; - use super::rejections; - - pub fn require_host() -> impl Filter<Extract = (Authority,), Error = warp::Rejection> + Copy { - warp::host::optional() - .and_then(|authority: Option<Authority>| async move { - authority.ok_or_else(|| warp::reject::custom(rejections::HostHeaderUnset)) - }) - } - - pub fn parse_accept() -> impl Filter<Extract = (http_types::Mime,), Error = warp::Rejection> + Copy { - warp::header::value("Accept").and_then(|accept: warp::http::HeaderValue| async move { - let mut accept: http_types::content::Accept = { - // This is unneccesarily complicated because I want to reuse some http-types parsing - // and http-types has constructor for Headers private so I need to construct - // a mock Request to reason about headers... this is so dumb wtf - // so much for zero-cost abstractions, huh - let bytes: &[u8] = accept.as_bytes(); - let value = http_types::headers::HeaderValue::from_bytes(bytes.to_vec()).unwrap(); - let values: http_types::headers::HeaderValues = vec![value].into(); - let mut request = http_types::Request::new(http_types::Method::Get, "http://example.com/"); - request.append_header("Accept".parse::<http_types::headers::HeaderName>().unwrap(), &values); - http_types::content::Accept::from_headers(&request).unwrap().unwrap() - }; - - // This code is INCREDIBLY dumb, honestly... - // why did I even try to use it? - // TODO vendor this stuff in so I can customize it - match accept.negotiate(&[ - "text/html; encoding=\"utf-8\"".into(), - "application/json; encoding=\"utf-8\"".into(), - "text/html".into(), - "application/json".into(), - - ]) { - Ok(mime) => { - Ok(http_types::Mime::from(mime.value().as_str())) - }, - Err(err) => { - log::error!("Content-Type negotiation error: {:?}, accepting: {:?}", err, accept); - Err(warp::reject::custom(rejections::UnacceptableContentType)) - } - } - }) - } - - mod tests { - #[tokio::test] - async fn test_require_host_with_host() { - use super::require_host; - - let filter = require_host(); - - let res = warp::test::request() - .path("/") - .header("Host", "localhost:8080") - .filter(&filter) - .await - .unwrap(); - - assert_eq!(res, "localhost:8080"); - - } - - #[tokio::test] - async fn test_require_host_no_host() { - use super::require_host; - - let filter = require_host(); - - let res = warp::test::request() - .path("/") - .filter(&filter) - .await; - - assert!(res.is_err()); - } - } -} diff --git a/kittybox-rs/src/main.rs b/kittybox-rs/src/main.rs index fd1875c..ef051ba 100644 --- a/kittybox-rs/src/main.rs +++ b/kittybox-rs/src/main.rs @@ -1,17 +1,16 @@ -use log::{debug, error, info}; -use std::{convert::Infallible, env, time::Duration}; +use kittybox::database::FileStorage; +use std::{env, time::Duration}; +use tracing::{debug, error, info}; use url::Url; -use warp::{Filter, host::Authority}; #[tokio::main] async fn main() { - // TODO turn into a feature so I can enable and disable it - #[cfg(debug_assertions)] - console_subscriber::init(); - // TODO use tracing instead of log - let logger_env = env_logger::Env::new().filter_or("RUST_LOG", "info"); - env_logger::init_from_env(logger_env); + use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Registry}; + Registry::default() + .with(EnvFilter::from_default_env()) + .with(tracing_subscriber::fmt::layer().json()) + .init(); info!("Starting the kittybox server..."); @@ -63,24 +62,22 @@ async fn main() { let listen_at = match env::var("SERVE_AT") .ok() .unwrap_or_else(|| "[::]:8080".to_string()) - .parse::<std::net::SocketAddr>() { - Ok(addr) => addr, - Err(e) => { - error!("Cannot parse SERVE_AT: {}", e); - std::process::exit(1); - } - }; + .parse::<std::net::SocketAddr>() + { + Ok(addr) => addr, + Err(e) => { + error!("Cannot parse SERVE_AT: {}", e); + std::process::exit(1); + } + }; - // This thing handles redirects automatically but is type-incompatible with hyper::Client - // Bonus: less generics to be aware of, this thing hides its complexity let http: reqwest::Client = { #[allow(unused_mut)] - let mut builder = reqwest::Client::builder() - .user_agent(concat!( - env!("CARGO_PKG_NAME"), - "/", - env!("CARGO_PKG_VERSION") - )); + let mut builder = reqwest::Client::builder().user_agent(concat!( + env!("CARGO_PKG_NAME"), + "/", + env!("CARGO_PKG_VERSION") + )); // TODO: add a root certificate if there's an environment variable pointing at it //builder = builder.add_root_certificate(reqwest::Certificate::from_pem(todo!())); @@ -109,12 +106,8 @@ async fn main() { webmention: None, microsub: None, }; - - let homepage = warp::get() - .and(warp::path::end()) - .and(kittybox::frontend::homepage(database.clone(), endpoints.clone())); - - let micropub = warp::path("micropub") + + /*let micropub = warp::path("micropub") .and(warp::path::end() .and(kittybox::micropub::micropub( database.clone(), @@ -169,11 +162,8 @@ async fn main() { // TODO prettier error response let coffee = warp::path("coffee") .map(|| warp::reply::with_status("I'm a teapot!", warp::http::StatusCode::IM_A_TEAPOT)); - - let catchall = kittybox::frontend::catchall( - database.clone(), - endpoints.clone() - ); + + et catchall = ; let app = homepage .or(technical) @@ -186,29 +176,103 @@ async fn main() { ; let svc = warp::service(app); + */ - // A little dance to turn a potential file descriptor into an async network socket - let mut listenfd = listenfd::ListenFd::from_env(); - let tcp_listener: std::net::TcpListener = if let Ok(Some(listener)) = listenfd.take_tcp_listener(0) { - listener - } else { - std::net::TcpListener::bind(listen_at).unwrap() - }; - // Set the socket to non-blocking so tokio can work with it properly - // This is the async magic - tcp_listener.set_nonblocking(true).unwrap(); + let svc = axum::Router::new() + .route( + "/", + axum::routing::get(kittybox::frontend::homepage::<FileStorage>), + ) + .route( + "/.kittybox/coffee", + axum::routing::get(|| async { + use axum::http::{header, StatusCode}; + ( + StatusCode::IM_A_TEAPOT, + [(header::CONTENT_TYPE, "text/plain")], + "Sorry, can't brew coffee yet!", + ) + }), + ) + .route( + "/.kittybox/onboarding", + axum::routing::get(kittybox::frontend::onboarding::get) + .post(kittybox::frontend::onboarding::post::<FileStorage>) + ) + .route( + "/.kittybox/micropub", + axum::routing::get(kittybox::micropub::query::<FileStorage>) + .post(kittybox::micropub::post::<FileStorage>) + .layer(tower_http::cors::CorsLayer::new() + .allow_methods([axum::http::Method::GET, axum::http::Method::POST]) + .allow_origin(tower_http::cors::Any)), + ) + .route( + "/.kittybox/micropub/client", + axum::routing::get(|| { + std::future::ready(axum::response::Html(kittybox::MICROPUB_CLIENT)) + }), + ) + .route( + "/.kittybox/health", + axum::routing::get(|| async { + // TODO health-check the database + "OK" + }), + ) + .route( + "/.kittybox/metrics", + axum::routing::get(|| async { todo!() }), + ) + .nest( + "/.kittybox/media", + axum::Router::new() + .route( + "/", + axum::routing::get(|| async { todo!() }).post(|| async { todo!() }), + ) + .route("/:filename", axum::routing::get(|| async { todo!() })), + ) + .route( + "/.kittybox/static/:path", + axum::routing::get(kittybox::frontend::statics), + ) + .fallback(axum::routing::get( + kittybox::frontend::catchall::<FileStorage>, + )) + .layer(axum::Extension(database)) + .layer(axum::Extension(http)) + .layer(axum::Extension(kittybox::indieauth::TokenEndpoint( + token_endpoint, + ))) + .layer( + tower::ServiceBuilder::new() + .layer(tower_http::trace::TraceLayer::new_for_http()) + .into_inner(), + ); + + // A little dance to turn a potential file descriptor into a guaranteed async network socket + let tcp_listener: std::net::TcpListener = { + let mut listenfd = listenfd::ListenFd::from_env(); + let tcp_listener = if let Ok(Some(listener)) = listenfd.take_tcp_listener(0) { + listener + } else { + std::net::TcpListener::bind(listen_at).unwrap() + }; + // Set the socket to non-blocking so tokio can work with it properly + // This is the async magic + tcp_listener.set_nonblocking(true).unwrap(); + + tcp_listener + }; info!("Listening on {}", tcp_listener.local_addr().unwrap()); + let server = hyper::server::Server::from_tcp(tcp_listener) .unwrap() // Otherwise Chrome keeps connections open for too long .tcp_keepalive(Some(Duration::from_secs(30 * 60))) - .serve(hyper::service::make_service_fn(move |_| { - let service = svc.clone(); - async move { - Ok::<_, Infallible>(service) - } - })) + .serve(svc.into_make_service()) .with_graceful_shutdown(async move { // Defer to C-c handler whenever we're not on Unix // TODO consider using a diverging future here diff --git a/kittybox-rs/src/media/mod.rs b/kittybox-rs/src/media/mod.rs index 0d46e0c..d18cf34 100644 --- a/kittybox-rs/src/media/mod.rs +++ b/kittybox-rs/src/media/mod.rs @@ -1,27 +1,25 @@ -use futures_util::StreamExt; use bytes::buf::Buf; -use warp::{Filter, Rejection, Reply, multipart::{FormData, Part}}; - -pub fn query() -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone { - warp::get() - .and(crate::util::require_host()) - .map(|host| "media endpoint query...") -} +use futures_util::StreamExt; +use axum::{ + extract::{Host, Extension, Multipart}, + response::{Response, IntoResponse, Json} +}; -pub fn options() -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone { - warp::options() - .map(|| warp::reply::json::<Option<()>>(&None)) - .with(warp::reply::with::header("Allow", "GET, POST")) -} +pub mod storage; +use storage::{MediaStore, MediaStoreError}; -pub fn upload() -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone { +/*pub fn upload() -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone { warp::post() .and(crate::util::require_host()) - .and(warp::multipart::form().max_length(1024*1024*150/*mb*/)) + .and(warp::multipart::form().max_length(1024 * 1024 * 150 /*mb*/)) .and_then(|host, mut form: FormData| async move { // TODO get rid of the double unwrap() here let file: Part = form.next().await.unwrap().unwrap(); - log::debug!("Uploaded: {:?}, type: {:?}", file.filename(), file.content_type()); + log::debug!( + "Uploaded: {:?}, type: {:?}", + file.filename(), + file.content_type() + ); let mut data = file.stream(); while let Some(buf) = data.next().await { @@ -29,18 +27,16 @@ pub fn upload() -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clo log::debug!("buffer length: {:?}", buf.map(|b| b.remaining())); } Ok::<_, warp::Rejection>(warp::reply::with_header( - warp::reply::with_status( - "", - warp::http::StatusCode::CREATED - ), + warp::reply::with_status("", warp::http::StatusCode::CREATED), "Location", - "./awoo.png" + "./awoo.png", )) }) -} - -pub fn media() -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone { - upload() - .or(query()) - .or(options()) +}*/ +pub async fn upload<S: MediaStore>( + Host(host): Host, + upload: Multipart, + Extension(db): Extension<S> +) -> Response { + todo!() } diff --git a/kittybox-rs/src/media/storage/file.rs b/kittybox-rs/src/media/storage/file.rs new file mode 100644 index 0000000..8c0ddf0 --- /dev/null +++ b/kittybox-rs/src/media/storage/file.rs @@ -0,0 +1,61 @@ +use super::{ErrorKind, MediaStore, MediaStoreError, Result}; +use async_trait::async_trait; +use std::path::PathBuf; +use tokio::fs::OpenOptions; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; + +#[derive(Clone)] +pub struct FileStore { + base: PathBuf, +} + +impl From<tokio::io::Error> for MediaStoreError { + fn from(source: tokio::io::Error) -> Self { + Self { + source: Some(Box::new(source)), + msg: "file I/O error".to_owned(), + kind: ErrorKind::Backend, + } + } +} + +#[async_trait] +impl MediaStore for FileStore { + async fn write_streaming( + &self, domain: url::Host, filename: &str, + content: axum::extract::multipart::Field<'_> + ) -> Result<()> { + todo!() + } + + async fn read_streaming(&self, domain: url::Host, filename: &str) -> Result<tokio_util::io::ReaderStream<Box<dyn tokio::io::AsyncRead>>> { + todo!() + } + + async fn write(&self, domain: url::Host, filename: &str, content: &[u8]) -> Result<()> { + let path = self.base.join(format!("{}/{}", domain, filename)); + + let mut file = OpenOptions::new() + .create_new(true) + .write(true) + .open(path) + .await?; + + Ok(file.write_all(content).await?) + } + + async fn read(&self, domain: url::Host, filename: &str) -> Result<Vec<u8>> { + let path = self.base.join(format!("{}/{}", domain, filename)); + + let mut file = OpenOptions::new().read(true).open(path).await?; + + let mut buf: Vec<u8> = Vec::default(); + file.read_to_end(&mut buf).await?; + + Ok(buf) + } + + async fn delete(&self, domain: url::Host, filename: &str) -> Result<()> { + todo!() + } +} diff --git a/kittybox-rs/src/media/storage/mod.rs b/kittybox-rs/src/media/storage/mod.rs new file mode 100644 index 0000000..e9b01f9 --- /dev/null +++ b/kittybox-rs/src/media/storage/mod.rs @@ -0,0 +1,53 @@ +use async_trait::async_trait; + +pub mod file; + +#[derive(Debug, Clone, Copy)] +pub enum ErrorKind { + Backend, + Permission, + Conflict, + Other, +} + +#[derive(Debug)] +pub struct MediaStoreError { + kind: ErrorKind, + source: Option<Box<dyn std::error::Error + Send + Sync>>, + msg: String, +} + +impl std::error::Error for MediaStoreError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + self.source + .as_ref() + .map(|i| i.as_ref() as &dyn std::error::Error) + } +} + +impl std::fmt::Display for MediaStoreError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}: {}", + match self.kind { + ErrorKind::Backend => "media storage backend error", + ErrorKind::Permission => "permission denied", + ErrorKind::Conflict => "conflict with existing data", + ErrorKind::Other => "unknown media storage error", + }, + self.msg + ) + } +} + +pub type Result<T> = std::result::Result<T, MediaStoreError>; + +#[async_trait] +pub trait MediaStore: 'static + Send + Sync + Clone { + async fn write_streaming(&self, domain: url::Host, filename: &str, content: axum::extract::multipart::Field<'_>) -> Result<()>; + async fn write(&self, domain: url::Host, filename: &str, content: &[u8]) -> Result<()>; + async fn read_streaming(&self, domain: url::Host, filename: &str) -> Result<tokio_util::io::ReaderStream<Box<dyn tokio::io::AsyncRead>>>; + async fn read(&self, domain: url::Host, filename: &str) -> Result<Vec<u8>>; + async fn delete(&self, domain: url::Host, filename: &str) -> Result<()>; +} diff --git a/kittybox-rs/src/micropub/mod.rs b/kittybox-rs/src/micropub/mod.rs index f426c77..d7be785 100644 --- a/kittybox-rs/src/micropub/mod.rs +++ b/kittybox-rs/src/micropub/mod.rs @@ -1,14 +1,15 @@ -use std::convert::Infallible; -use std::fmt::Display; -use either::Either; -use log::{info, warn, error}; -use warp::http::StatusCode; -use warp::{Filter, Rejection, reject::InvalidQuery}; -use serde_json::json; -use serde::{Serialize, Deserialize}; use crate::database::{MicropubChannel, Storage, StorageError}; use crate::indieauth::User; use crate::micropub::util::form_to_mf2_json; +use axum::TypedHeader; +use axum::extract::{BodyStream, Query}; +use axum::headers::ContentType; +use axum::response::{IntoResponse, Response}; +use axum::{http::StatusCode, Extension}; +use tracing::{error, info, warn, debug}; +use serde::{Deserialize, Serialize}; +use serde_json::json; +use std::fmt::Display; #[derive(Serialize, Deserialize, Debug, PartialEq)] #[serde(rename_all = "kebab-case")] @@ -16,13 +17,13 @@ enum QueryType { Source, Config, Channel, - SyndicateTo + SyndicateTo, } #[derive(Serialize, Deserialize, Debug)] -struct MicropubQuery { +pub struct MicropubQuery { q: QueryType, - url: Option<String> + url: Option<String>, } #[derive(Serialize, Deserialize, PartialEq, Debug)] @@ -35,13 +36,13 @@ enum ErrorType { InvalidScope, NotAuthorized, NotFound, - UnsupportedMediaType + UnsupportedMediaType, } #[derive(Serialize, Deserialize, Debug)] pub(crate) struct MicropubError { error: ErrorType, - error_description: String + error_description: String, } impl From<StorageError> for MicropubError { @@ -49,9 +50,9 @@ impl From<StorageError> for MicropubError { Self { error: match err.kind() { crate::database::ErrorKind::NotFound => ErrorType::NotFound, - _ => ErrorType::InternalServerError + _ => ErrorType::InternalServerError, }, - error_description: format!("Backend error: {}", err) + error_description: format!("Backend error: {}", err), } } } @@ -86,12 +87,21 @@ impl From<MicropubError> for StatusCode { } } +impl axum::response::IntoResponse for MicropubError { + fn into_response(self) -> axum::response::Response { + axum::response::IntoResponse::into_response(( + StatusCode::from(&self), + axum::response::Json(self) + )) + } +} + impl From<serde_json::Error> for MicropubError { fn from(err: serde_json::Error) -> Self { use ErrorType::*; Self { error: InvalidRequest, - error_description: err.to_string() + error_description: err.to_string(), } } } @@ -100,90 +110,184 @@ impl MicropubError { fn new(error: ErrorType, error_description: &str) -> Self { Self { error, - error_description: error_description.to_owned() + error_description: error_description.to_owned(), } } } -impl warp::reject::Reject for MicropubError {} +mod util; +pub(crate) use util::normalize_mf2; -mod post; -pub(crate) use post::normalize_mf2; +#[derive(Debug)] +struct FetchedPostContext { + url: url::Url, + mf2: serde_json::Value, + webmention: Option<url::Url>, +} -mod util { - use serde_json::json; +fn populate_reply_context( + mf2: &serde_json::Value, + prop: &str, + ctxs: &[FetchedPostContext], +) -> Option<serde_json::Value> { + mf2["properties"][prop].as_array().map(|array| { + json!(array + .iter() + // TODO: This seems to be O(n^2) and I don't like it. + // Switching `ctxs` to a hashmap might speed it up to O(n) + // The key would be the URL/UID + .map(|i| ctxs + .iter() + .find(|ctx| Some(ctx.url.as_str()) == i.as_str()) + .and_then(|ctx| ctx.mf2["items"].get(0)) + .or(Some(i)) + .unwrap()) + .collect::<Vec<&serde_json::Value>>()) + }) +} - pub(crate) fn form_to_mf2_json(form: Vec<(String, String)>) -> serde_json::Value { - let mut mf2 = json!({"type": [], "properties": {}}); - for (k, v) in form { - if k == "h" { - mf2["type"] - .as_array_mut() - .unwrap() - .push(json!("h-".to_string() + &v)); - } else if k != "access_token" { - let key = k.strip_suffix("[]").unwrap_or(&k); - match mf2["properties"][key].as_array_mut() { - Some(prop) => prop.push(json!(v)), - None => mf2["properties"][key] = json!([v]), - } - } - } - if mf2["type"].as_array().unwrap().is_empty() { - mf2["type"].as_array_mut().unwrap().push(json!("h-entry")); +#[tracing::instrument(skip(db))] +async fn background_processing<D: 'static + Storage>( + db: D, + mf2: serde_json::Value, + http: reqwest::Client, +) -> () { + // TODO: Post-processing the post (aka second write pass) + // - [x] Download rich reply contexts + // - [ ] Syndicate the post if requested, add links to the syndicated copies + // - [ ] Send WebSub notifications to the hub (if we happen to have one) + // - [x] Send webmentions + + use futures_util::StreamExt; + + let uid: &str = mf2["properties"]["uid"][0].as_str().unwrap(); + + let context_props = ["in-reply-to", "like-of", "repost-of", "bookmark-of"]; + let mut context_urls: Vec<url::Url> = vec![]; + for prop in &context_props { + if let Some(array) = mf2["properties"][prop].as_array() { + context_urls.extend( + array + .iter() + .filter_map(|v| v.as_str()) + .filter_map(|v| v.parse::<url::Url>().ok()), + ); } - mf2 } + // TODO parse HTML in e-content and add links found here + context_urls.sort_unstable_by_key(|u| u.to_string()); + context_urls.dedup(); + + // TODO: Make a stream to fetch all these posts and convert them to MF2 + let post_contexts = { + let http = &http; + tokio_stream::iter(context_urls.into_iter()) + .then(move |url: url::Url| http.get(url).send()) + .filter_map(|response| futures::future::ready(response.ok())) + .filter(|response| futures::future::ready(response.status() == 200)) + .filter_map(|response: reqwest::Response| async move { + // 1. We need to preserve the URL + // 2. We need to get the HTML for MF2 processing + // 3. We need to get the webmention endpoint address + // All of that can be done in one go. + let url = response.url().clone(); + // TODO parse link headers + let links = response + .headers() + .get_all(hyper::http::header::LINK) + .iter() + .cloned() + .collect::<Vec<hyper::http::HeaderValue>>(); + let html = response.text().await; + if html.is_err() { + return None; + } + let html = html.unwrap(); + let mf2 = microformats::from_html(&html, url.clone()).unwrap(); + // TODO use first Link: header if available + let webmention: Option<url::Url> = mf2 + .rels + .by_rels() + .get("webmention") + .and_then(|i| i.first().cloned()); + + dbg!(Some(FetchedPostContext { + url, + mf2: serde_json::to_value(mf2).unwrap(), + webmention + })) + }) + .collect::<Vec<FetchedPostContext>>() + .await + }; - #[cfg(test)] - mod tests { - use serde_json::json; - #[test] - fn test_form_to_mf2() { - assert_eq!( - super::form_to_mf2_json( - serde_urlencoded::from_str( - "h=entry&content=something%20interesting" - ).unwrap() - ), - json!({ - "type": ["h-entry"], - "properties": { - "content": ["something interesting"] - } - }) - ) + let mut update = json!({ "replace": {} }); + for prop in &context_props { + if let Some(json) = populate_reply_context(&mf2, prop, &post_contexts) { + update["replace"][prop] = json; + } + } + if !update["replace"].as_object().unwrap().is_empty() { + if let Err(err) = db.update_post(uid, update).await { + error!("Failed to update post with rich reply contexts: {}", err); } } -} -#[derive(Debug)] -struct FetchedPostContext { - url: url::Url, - mf2: serde_json::Value, - webmention: Option<url::Url> -} + // At this point we can start syndicating the post. + // Currently we don't really support any syndication endpoints, but still! + /*if let Some(syndicate_to) = mf2["properties"]["mp-syndicate-to"].as_array() { + let http = &http; + tokio_stream::iter(syndicate_to) + .filter_map(|i| futures::future::ready(i.as_str())) + .for_each_concurrent(3, |s: &str| async move { + #[allow(clippy::match_single_binding)] + match s { + _ => { + todo!("Syndicate to generic webmention-aware service {}", s); + } + // TODO special handling for non-webmention-aware services like the birdsite + } + }) + .await; + }*/ -fn populate_reply_context(mf2: &serde_json::Value, prop: &str, ctxs: &[FetchedPostContext]) -> Option<serde_json::Value> { - if mf2["properties"][prop].is_array() { - Some(json!( - mf2["properties"][prop] - .as_array() - // Safe to unwrap because we checked its existence and type - // And it's not like we can make it disappear without unsafe code - .unwrap() - .iter() - // This seems to be O(n^2) and I don't like it. - // Nevertheless, I lack required knowledge to optimize it. Also, it works, so... - .map(|i| ctxs.iter() - .find(|ctx| Some(ctx.url.as_str()) == i.as_str()) - .and_then(|ctx| ctx.mf2["items"].get(0)) - .or(Some(i)) - .unwrap()) - .collect::<Vec<&serde_json::Value>>() - )) - } else { - None + { + let http = &http; + tokio_stream::iter( + post_contexts + .into_iter() + .filter(|ctx| ctx.webmention.is_some()), + ) + .for_each_concurrent(2, |ctx| async move { + let mut map = std::collections::HashMap::new(); + map.insert("source", uid); + map.insert("target", ctx.url.as_str()); + + match http + .post(ctx.webmention.unwrap().clone()) + .form(&map) + .send() + .await + { + Ok(res) => { + if !res.status().is_success() { + warn!( + "Failed to send a webmention for {}: got HTTP {}", + ctx.url, + res.status() + ); + } else { + info!( + "Sent a webmention to {}, got HTTP {}", + ctx.url, + res.status() + ) + } + } + Err(err) => warn!("Failed to send a webmention for {}: {}", ctx.url, err), + } + }) + .await; } } @@ -193,8 +297,8 @@ pub(crate) async fn _post<D: 'static + Storage>( uid: String, mf2: serde_json::Value, db: D, - http: reqwest::Client -) -> Result<impl warp::Reply, MicropubError> { + http: reqwest::Client, +) -> Result<Response, MicropubError> { // Here, we have the following guarantees: // - The user is the same user for this host (guaranteed by ensure_same_user) // - The MF2-JSON document is normalized (guaranteed by normalize_mf2)\ @@ -205,24 +309,26 @@ pub(crate) async fn _post<D: 'static + Storage>( // - The MF2-JSON document's target channels are set // - The MF2-JSON document's author is set - // Security check! Do we have an oAuth2 scope to proceed? + // Security check! Do we have an OAuth2 scope to proceed? if !user.check_scope("create") { return Err(MicropubError { error: ErrorType::InvalidScope, - error_description: "Not enough privileges - try acquiring the \"create\" scope.".to_owned() + error_description: "Not enough privileges - try acquiring the \"create\" scope." + .to_owned(), }); } // Security check #2! Are we posting to our own website? - if !uid.starts_with(user.me.as_str()) || mf2["properties"]["channel"] - .as_array() - .unwrap_or(&vec![]) - .iter() - .any(|url| !url.as_str().unwrap().starts_with(user.me.as_str())) + if !uid.starts_with(user.me.as_str()) + || mf2["properties"]["channel"] + .as_array() + .unwrap_or(&vec![]) + .iter() + .any(|url| !url.as_str().unwrap().starts_with(user.me.as_str())) { return Err(MicropubError { error: ErrorType::Forbidden, - error_description: "You're posting to a website that's not yours.".to_owned() + error_description: "You're posting to a website that's not yours.".to_owned(), }); } @@ -230,7 +336,7 @@ pub(crate) async fn _post<D: 'static + Storage>( if db.post_exists(&uid).await? { return Err(MicropubError { error: ErrorType::AlreadyExists, - error_description: "UID clash was detected, operation aborted.".to_owned() + error_description: "UID clash was detected, operation aborted.".to_owned(), }); } @@ -244,172 +350,55 @@ pub(crate) async fn _post<D: 'static + Storage>( .map(|i| i.as_str().unwrap_or("")) .filter(|i| !i.is_empty()); - let default_channel = user.me.join(post::DEFAULT_CHANNEL_PATH).unwrap().to_string(); - let vcards_channel = user.me.join(post::CONTACTS_CHANNEL_PATH).unwrap().to_string(); - let food_channel = user.me.join(post::FOOD_CHANNEL_PATH).unwrap().to_string(); + let default_channel = user + .me + .join(util::DEFAULT_CHANNEL_PATH) + .unwrap() + .to_string(); + let vcards_channel = user + .me + .join(util::CONTACTS_CHANNEL_PATH) + .unwrap() + .to_string(); + let food_channel = user.me.join(util::FOOD_CHANNEL_PATH).unwrap().to_string(); let default_channels = vec![default_channel, vcards_channel, food_channel]; for chan in &mut channels { if db.post_exists(chan).await? { - db.update_post(chan, json!({"add": {"children": [uid]}})).await?; + db.update_post(chan, json!({"add": {"children": [uid]}})) + .await?; } else if default_channels.iter().any(|i| chan == i) { - post::create_feed(&db, &uid, chan, &user).await?; + util::create_feed(&db, &uid, chan, &user).await?; } else { warn!("Ignoring non-existent channel: {}", chan); } } - let reply = warp::reply::with_status( - warp::reply::with_header( - warp::reply::json(&json!({"location": &uid})), - "Location", &uid - ), - StatusCode::ACCEPTED - ); - - // TODO: Post-processing the post (aka second write pass) - // - [x] Download rich reply contexts - // - [ ] Syndicate the post if requested, add links to the syndicated copies - // - [ ] Send WebSub notifications to the hub (if we happen to have one) - // - [x] Send webmentions - tokio::task::spawn(async move { - use futures_util::StreamExt; - - let uid: &str = mf2["properties"]["uid"][0].as_str().unwrap(); - - let context_props = ["in-reply-to", "like-of", "repost-of", "bookmark-of"]; - let mut context_urls: Vec<url::Url> = vec![]; - for prop in &context_props { - if let Some(array) = mf2["properties"][prop].as_array() { - context_urls.extend( - array - .iter() - .filter_map(|v| v.as_str()) - .filter_map(|v| v.parse::<url::Url>().ok()), - ); - } - } - // TODO parse HTML in e-content and add links found here - context_urls.sort_unstable_by_key(|u| u.to_string()); - context_urls.dedup(); - - // TODO: Make a stream to fetch all these posts and convert them to MF2 - let post_contexts = { - let http = &http; - tokio_stream::iter(context_urls.into_iter()) - .then(move |url: url::Url| http.get(url).send()) - .filter_map(|response| futures::future::ready(response.ok())) - .filter(|response| futures::future::ready(response.status() == 200)) - .filter_map(|response: reqwest::Response| async move { - // 1. We need to preserve the URL - // 2. We need to get the HTML for MF2 processing - // 3. We need to get the webmention endpoint address - // All of that can be done in one go. - let url = response.url().clone(); - // TODO parse link headers - let links = response - .headers() - .get_all(hyper::http::header::LINK) - .iter() - .cloned() - .collect::<Vec<hyper::http::HeaderValue>>(); - let html = response.text().await; - if html.is_err() { - return None; - } - let html = html.unwrap(); - let mf2 = microformats::from_html(&html, url.clone()).unwrap(); - // TODO use first Link: header if available - let webmention: Option<url::Url> = mf2.rels.by_rels().get("webmention") - .and_then(|i| i.first().cloned()); - - dbg!(Some(FetchedPostContext { - url, mf2: serde_json::to_value(mf2).unwrap(), webmention - })) - }) - .collect::<Vec<FetchedPostContext>>() - .await - }; + let reply = IntoResponse::into_response(( + StatusCode::ACCEPTED, + [("Location", uid.as_str())], + () + )); - let mut update = json!({ "replace": {} }); - for prop in &context_props { - if let Some(json) = populate_reply_context(&mf2, prop, &post_contexts) { - update["replace"][prop] = json; - } - } - if !update["replace"].as_object().unwrap().is_empty() { - if let Err(err) = db.update_post(uid, update).await { - error!("Failed to update post with rich reply contexts: {}", err); - } - } + tokio::task::spawn(background_processing(db, mf2, http)); - // At this point we can start syndicating the post. - // Currently we don't really support any syndication endpoints, but still! - /*if let Some(syndicate_to) = mf2["properties"]["mp-syndicate-to"].as_array() { - let http = &http; - tokio_stream::iter(syndicate_to) - .filter_map(|i| futures::future::ready(i.as_str())) - .for_each_concurrent(3, |s: &str| async move { - #[allow(clippy::match_single_binding)] - match s { - _ => { - todo!("Syndicate to generic webmention-aware service {}", s); - } - // TODO special handling for non-webmention-aware services like the birdsite - } - }) - .await; - }*/ - - { - let http = &http; - tokio_stream::iter( - post_contexts.into_iter() - .filter(|ctx| ctx.webmention.is_some())) - .for_each_concurrent(2, |ctx| async move { - let mut map = std::collections::HashMap::new(); - map.insert("source", uid); - map.insert("target", ctx.url.as_str()); - - match http.post(ctx.webmention.unwrap().clone()) - .form(&map) - .send() - .await - { - Ok(res) => { - if !res.status().is_success() { - warn!( - "Failed to send a webmention for {}: got HTTP {}", - ctx.url, res.status() - ); - } else { - info!("Sent a webmention to {}, got HTTP {}", ctx.url, res.status()) - } - }, - Err(err) => warn!("Failed to send a webmention for {}: {}", ctx.url, err) - } - }) - .await; - } - }); - Ok(reply) } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Debug)] #[serde(rename_all = "snake_case")] enum ActionType { Delete, - Update + Update, } #[derive(Serialize, Deserialize)] struct MicropubFormAction { action: ActionType, - url: String + url: String, } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Debug)] struct MicropubAction { action: ActionType, url: String, @@ -418,7 +407,7 @@ struct MicropubAction { #[serde(skip_serializing_if = "Option::is_none")] add: Option<serde_json::Value>, #[serde(skip_serializing_if = "Option::is_none")] - delete: Option<serde_json::Value> + delete: Option<serde_json::Value>, } impl From<MicropubFormAction> for MicropubAction { @@ -426,31 +415,40 @@ impl From<MicropubFormAction> for MicropubAction { Self { action: a.action, url: a.url, - replace: None, add: None, delete: None + replace: None, + add: None, + delete: None, } } } -// TODO perform the requested actions synchronously +#[tracing::instrument(skip(db))] async fn post_action<D: Storage>( action: MicropubAction, db: D, - user: User -) -> Result<impl warp::Reply, MicropubError> { - + user: User, +) -> Result<(), MicropubError> { let uri = if let Ok(uri) = action.url.parse::<hyper::Uri>() { uri } else { return Err(MicropubError { error: ErrorType::InvalidRequest, - error_description: "Your URL doesn't parse properly.".to_owned() + error_description: "Your URL doesn't parse properly.".to_owned(), }); }; - if uri.authority().unwrap() != user.me.as_str().parse::<hyper::Uri>().unwrap().authority().unwrap() { + if uri.authority().unwrap() + != user + .me + .as_str() + .parse::<hyper::Uri>() + .unwrap() + .authority() + .unwrap() + { return Err(MicropubError { error: ErrorType::Forbidden, - error_description: "Don't tamper with others' posts!".to_owned() + error_description: "Don't tamper with others' posts!".to_owned(), }); } @@ -459,17 +457,17 @@ async fn post_action<D: Storage>( if !user.check_scope("delete") { return Err(MicropubError { error: ErrorType::InvalidScope, - error_description: "You need a \"delete\" scope for this.".to_owned() + error_description: "You need a \"delete\" scope for this.".to_owned(), }); } db.delete_post(&action.url).await? - }, + } ActionType::Update => { if !user.check_scope("update") { return Err(MicropubError { error: ErrorType::InvalidScope, - error_description: "You need an \"update\" scope for this.".to_owned() + error_description: "You need an \"update\" scope for this.".to_owned(), }); } @@ -477,146 +475,104 @@ async fn post_action<D: Storage>( &action.url, // Here, unwrapping is safe, because this value // was recently deserialized from JSON already. - serde_json::to_value(&action).unwrap() - ).await? - }, + serde_json::to_value(&action).unwrap(), + ) + .await? + } } - Ok(warp::reply::reply()) + Ok(()) } -async fn check_auth(host: warp::host::Authority, user: User) -> Result<User, warp::Rejection> { - let user_authority = warp::http::Uri::try_from(user.me.as_str()) - .unwrap() - .authority() - .unwrap() - .clone(); - // TODO compare with potential list of allowed websites - // to allow one user to edit several websites with one token - if host != user_authority { - Err(warp::reject::custom(MicropubError::new( - ErrorType::NotAuthorized, - "This user is not authorized to use Micropub on this website." - ))) - } else { - Ok(user) - } +enum PostBody { + Action(MicropubAction), + MF2(serde_json::Value) } -#[cfg(any(not(debug_assertions), test))] -fn ensure_same_user_as_host( - token_endpoint: String, - http: reqwest::Client -) -> impl Filter<Extract = (User,), Error = warp::Rejection> + Clone { - crate::util::require_host() - .and(crate::indieauth::require_token(token_endpoint, http)) - .and_then(check_auth) -} +#[tracing::instrument] +async fn dispatch_body(mut body: BodyStream, content_type: ContentType) -> Result<PostBody, MicropubError> { + let body: Vec<u8> = { + debug!("Buffering body..."); + use tokio_stream::StreamExt; + let mut buf = Vec::default(); -async fn dispatch_post_body( - mut body: impl bytes::Buf, - mimetype: http_types::Mime -) -> Result<Either<MicropubAction, serde_json::Value>, warp::Rejection> { - // Since hyper::common::buf::BufList doesn't implement Clone, we can't use Clone in here - // We have to copy the body. Ugh!!! - // so much for zero-copy buffers - let body = { - let mut _body: Vec<u8> = Vec::default(); - while body.has_remaining() { - _body.extend(body.chunk()); - body.advance(body.chunk().len()); + while let Some(chunk) = body.next().await { + buf.extend_from_slice(&chunk.unwrap()) } - _body + + buf }; - match mimetype.essence() { - "application/json" => { - if let Ok(body) = serde_json::from_slice::<MicropubAction>(&body) { - Ok(Either::Left(body)) - } else if let Ok(body) = serde_json::from_slice::<serde_json::Value>(&body) { - // quick sanity check - if !body.is_object() || !body["type"].is_array() { - return Err(MicropubError { - error: ErrorType::InvalidRequest, - error_description: "Invalid MF2-JSON detected: `.` should be an object, `.type` should be an array of MF2 types".to_owned() - }.into()) - } - Ok(Either::Right(body)) - } else { - Err(MicropubError { + + debug!("Content-Type: {:?}", content_type); + if content_type == ContentType::json() { + if let Ok(action) = serde_json::from_slice::<MicropubAction>(&body) { + Ok(PostBody::Action(action)) + } else if let Ok(body) = serde_json::from_slice::<serde_json::Value>(&body) { + // quick sanity check + if !body.is_object() || !body["type"].is_array() { + return Err(MicropubError { error: ErrorType::InvalidRequest, - error_description: "Invalid JSON object passed.".to_owned() - }.into()) + error_description: "Invalid MF2-JSON detected: `.` should be an object, `.type` should be an array of MF2 types".to_owned() + }); } + + Ok(PostBody::MF2(body)) + } else { + Err(MicropubError { + error: ErrorType::InvalidRequest, + error_description: "Invalid JSON object passed.".to_owned(), + }) + } + } else if content_type == ContentType::form_url_encoded() { + if let Ok(body) = serde_urlencoded::from_bytes::<MicropubFormAction>(&body) { + Ok(PostBody::Action(body.into())) + } else if let Ok(body) = serde_urlencoded::from_bytes::<Vec<(String, String)>>(&body) { + Ok(PostBody::MF2(form_to_mf2_json(body))) + } else { + Err(MicropubError { + error: ErrorType::InvalidRequest, + error_description: "Invalid form-encoded data. Try h=entry&content=Hello!" + .to_owned(), + }) + } + } else { + Err(MicropubError::new( + ErrorType::UnsupportedMediaType, + "This Content-Type is not recognized. Try application/json instead?" + )) + } +} + +#[tracing::instrument(skip(db, http))] +pub async fn post<D: Storage + 'static>( + Extension(db): Extension<D>, + Extension(http): Extension<reqwest::Client>, + user: User, + body: BodyStream, + TypedHeader(content_type): TypedHeader<ContentType> +) -> axum::response::Response { + match dispatch_body(body, content_type).await { + Ok(PostBody::Action(action)) => match post_action(action, db, user).await { + Ok(()) => Response::default(), + Err(err) => err.into_response() }, - "application/x-www-form-urlencoded" => { - if let Ok(body) = serde_urlencoded::from_bytes::<MicropubFormAction>(&body) { - Ok(Either::Left(body.into())) - } else if let Ok(body) = serde_urlencoded::from_bytes::<Vec<(String, String)>>(&body) { - Ok(Either::Right(form_to_mf2_json(body))) - } else { - Err(MicropubError { - error: ErrorType::InvalidRequest, - error_description: "Invalid form-encoded data. Try h=entry&content=Hello!".to_owned() - }.into()) + Ok(PostBody::MF2(mf2)) => { + let (uid, mf2) = normalize_mf2(mf2, &user); + match _post(user, uid, mf2, db, http).await { + Ok(response) => response, + Err(err) => err.into_response() } }, - other => Err(MicropubError { - error: ErrorType::UnsupportedMediaType, - error_description: format!("Unsupported media type: {}. Try application/json?", other) - }.into()) + Err(err) => err.into_response() } } -#[cfg_attr(all(debug_assertions, not(test)), allow(unused_variables))] -pub fn post<D: 'static + Storage>( - db: D, - token_endpoint: String, - http: reqwest::Client -) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone { - let inject_db = warp::any().map(move || db.clone()); - #[cfg(all(debug_assertions, not(test)))] - let ensure_same_user = warp::any().map(|| crate::indieauth::User::new( - "http://localhost:8080/", - "https://quill.p3k.io/", - "create update delete media" - )); - #[cfg(any(not(debug_assertions), test))] - let ensure_same_user = ensure_same_user_as_host(token_endpoint, http.clone()); - - warp::post() - .and(warp::body::content_length_limit(1024 * 512) - .and(warp::body::aggregate()) - .and(warp::header::<http_types::Mime>("Content-Type")) - .and_then(dispatch_post_body)) - .and(inject_db) - .and(warp::any().map(move || http.clone())) - .and(ensure_same_user) - .and_then(|body: Either<MicropubAction, serde_json::Value>, db: D, http: reqwest::Client, user: crate::indieauth::User| async move { - (match body { - Either::Left(action) => { - post_action(action, db, user).await.map(|p| Box::new(p) as Box<dyn warp::Reply>) - }, - Either::Right(post) => { - let (uid, mf2) = post::normalize_mf2(post, &user); - _post(user, uid, mf2, db, http).await.map(|p| Box::new(p) as Box<dyn warp::Reply>) - } - }).map_err(warp::reject::custom) - }) -} - -pub fn options() -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone { - warp::options() - // TODO make it reply with a basic description of Micropub spec - .map(|| warp::reply::json::<Option<()>>(&None)) - .with(warp::reply::with::header("Allow", "GET, POST")) -} - -async fn _query<D: Storage>( - db: D, - query: MicropubQuery, - user: crate::indieauth::User -) -> Box<dyn warp::Reply> { - let user_authority = warp::http::Uri::try_from(user.me.as_str()) +pub async fn query<D: Storage>( + Extension(db): Extension<D>, + Query(query): Query<MicropubQuery>, + user: User +) -> axum::response::Response { + let host = axum::http::Uri::try_from(user.me.as_str()) .unwrap() .authority() .unwrap() @@ -624,18 +580,15 @@ async fn _query<D: Storage>( match query.q { QueryType::Config => { - let channels: Vec<MicropubChannel> = match db.get_channels(user_authority.as_str()).await { + let channels: Vec<MicropubChannel> = match db.get_channels(host.as_str()).await { Ok(chans) => chans, - Err(err) => return Box::new(warp::reply::with_status( - warp::reply::json(&MicropubError::new( - ErrorType::InternalServerError, - &format!("Error fetching channels: {}", err) - )), - StatusCode::INTERNAL_SERVER_ERROR - )) + Err(err) => return MicropubError::new( + ErrorType::InternalServerError, + &format!("Error fetching channels: {}", err) + ).into_response(), }; - Box::new(warp::reply::json(json!({ + axum::response::Json(json!({ "q": [ QueryType::Source, QueryType::Config, @@ -643,149 +596,81 @@ async fn _query<D: Storage>( QueryType::SyndicateTo ], "channels": channels, - "_kittybox_authority": user_authority.as_str(), + "_kittybox_authority": host.as_str(), "syndicate-to": [] - }).as_object().unwrap())) + })).into_response() }, QueryType::Source => { match query.url { Some(url) => { - if warp::http::Uri::try_from(&url).unwrap().authority().unwrap() != &user_authority { - return Box::new(warp::reply::with_status( - warp::reply::json(&MicropubError::new( - ErrorType::NotAuthorized, - "You are requesting a post from a website that doesn't belong to you." - )), - StatusCode::UNAUTHORIZED - )) + if axum::http::Uri::try_from(&url).unwrap().authority().unwrap() != &host { + return MicropubError::new( + ErrorType::NotAuthorized, + "You are requesting a post from a website that doesn't belong to you." + ).into_response() } match db.get_post(&url).await { Ok(some) => match some { - Some(post) => Box::new(warp::reply::json(&post)), - None => Box::new(warp::reply::with_status( - warp::reply::json(&MicropubError::new( - ErrorType::NotFound, - "The specified MF2 object was not found in database." - )), - StatusCode::NOT_FOUND - )) + Some(post) => axum::response::Json(&post).into_response(), + None => MicropubError::new( + ErrorType::NotFound, + "The specified MF2 object was not found in database." + ).into_response() }, - Err(err) => { - Box::new(warp::reply::json(&MicropubError::new( - ErrorType::InternalServerError, - &format!("Backend error: {}", err) - ))) - } + Err(err) => MicropubError::new( + ErrorType::InternalServerError, + &format!("Backend error: {}", err) + ).into_response() } }, None => { // Here, one should probably attempt to query at least the main feed and collect posts // Using a pre-made query function can't be done because it does unneeded filtering // Don't implement for now, this is optional - Box::new(warp::reply::with_status( - warp::reply::json(&MicropubError::new( - ErrorType::InvalidRequest, - "Querying for post list is not implemented yet." - )), - StatusCode::BAD_REQUEST - )) + MicropubError::new( + ErrorType::InvalidRequest, + "Querying for post list is not implemented yet." + ).into_response() } } }, QueryType::Channel => { - let channels: Vec<MicropubChannel> = match db.get_channels(user_authority.as_str()).await { - Ok(chans) => chans, - Err(err) => return Box::new(warp::reply::with_status( - warp::reply::json(&MicropubError::new( - ErrorType::InternalServerError, - &format!("Error fetching channels: {}", err) - )), - StatusCode::INTERNAL_SERVER_ERROR - )) - }; - - Box::new(warp::reply::json(&json!({ "channels": channels }))) + match db.get_channels(host.as_str()).await { + Ok(chans) => axum::response::Json(json!({"channels": chans})).into_response(), + Err(err) => MicropubError::new( + ErrorType::InternalServerError, + &format!("Error fetching channels: {}", err) + ).into_response() + } }, QueryType::SyndicateTo => { - Box::new(warp::reply::json(&json!({ "syndicate-to": [] }))) + axum::response::Json(json!({ "syndicate-to": [] })).into_response() } } } -pub fn query<D: Storage>( - db: D, - token_endpoint: String, - http: reqwest::Client -) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone { - warp::get() - .map(move || db.clone()) - .and(warp::query::<MicropubQuery>()) - .and(crate::util::require_host() - .and(crate::indieauth::require_token(token_endpoint, http)) - .and_then(check_auth)) - .then(_query) - .recover(|e: warp::Rejection| async move { - if let Some(err) = e.find::<MicropubError>() { - Ok(warp::reply::json(err)) - } else { - Err(e) - } - }) -} - -pub async fn recover(err: Rejection) -> Result<impl warp::Reply, Infallible> { - if let Some(error) = err.find::<MicropubError>() { - return Ok(warp::reply::with_status(warp::reply::json(&error), error.into())) - } - let error = if err.find::<InvalidQuery>().is_some() { - MicropubError::new( - ErrorType::InvalidRequest, - "Invalid query parameters sent. Try ?q=config to see what you can do." - ) - } else { - log::error!("Unhandled rejection: {:?}", err); - MicropubError::new( - ErrorType::InternalServerError, - &format!("Unknown error: {:?}", err) - ) - }; - - Ok(warp::reply::with_status(warp::reply::json(&error), error.into())) -} - -pub fn micropub<D: 'static + Storage>( - db: D, - token_endpoint: String, - http: reqwest::Client -) -> impl Filter<Extract = (impl warp::Reply,), Error = Infallible> + Clone { - query(db.clone(), token_endpoint.clone(), http.clone()) - .or(post(db, token_endpoint, http)) - .or(options()) - .recover(recover) -} #[cfg(test)] #[allow(dead_code)] impl MicropubQuery { fn config() -> Self { Self { q: QueryType::Config, - url: None + url: None, } } fn source(url: &str) -> Self { Self { q: QueryType::Source, - url: Some(url.to_owned()) + url: Some(url.to_owned()), } } } #[cfg(test)] mod tests { - use hyper::body::HttpBody; use crate::{database::Storage, micropub::MicropubError}; - use warp::{Filter, Reply}; + use hyper::body::HttpBody; use serde_json::json; use super::FetchedPostContext; @@ -814,17 +699,11 @@ mod tests { "content": ["This is a post which was reacted to."] } }); - let reply_contexts = vec![ - FetchedPostContext { - url: "https://fireburn.ru/posts/example".parse().unwrap(), - mf2: json!({ - "items": [ - test_ctx - ] - }), - webmention: None - } - ]; + let reply_contexts = vec![FetchedPostContext { + url: "https://fireburn.ru/posts/example".parse().unwrap(), + mf2: json!({ "items": [test_ctx] }), + webmention: None, + }]; let like_of = super::populate_reply_context(&mf2, "like-of", &reply_contexts).unwrap(); @@ -834,84 +713,82 @@ mod tests { } #[tokio::test] - async fn check_post_reject_scope() { - let inject_db = { - let db = crate::database::MemoryStorage::new(); - - move || db.clone() - }; - let db = inject_db(); - - let res = warp::test::request() - .filter(&warp::any() - .map(inject_db) - .and_then(move |db| async move { - let post = json!({ - "type": ["h-entry"], - "properties": { - "content": ["Hello world!"] - } - }); - let user = crate::indieauth::User::new( - "https://localhost:8080/", - "https://kittybox.fireburn.ru/", - "profile" - ); - let (uid, mf2) = super::post::normalize_mf2(post, &user); + async fn test_post_reject_scope() { + let db = crate::database::MemoryStorage::new(); - super::_post( - user, uid, mf2, db, reqwest::Client::new() - ).await.map_err(warp::reject::custom) - }) - ) - .await - .map(|_| panic!("Tried to do something with a reply!")) - .unwrap_err(); + let post = json!({ + "type": ["h-entry"], + "properties": { + "content": ["Hello world!"] + } + }); + let user = crate::indieauth::User::new( + "https://localhost:8080/", + "https://kittybox.fireburn.ru/", + "profile" + ); + let (uid, mf2) = super::normalize_mf2(post, &user); + + let err = super::_post( + user, uid, mf2, db.clone(), reqwest::Client::new() + ).await.unwrap_err(); - if let Some(err) = res.find::<MicropubError>() { - assert_eq!(err.error, super::ErrorType::InvalidScope); - } else { - panic!("Did not return MicropubError"); - } + assert_eq!(err.error, super::ErrorType::InvalidScope); + + let hashmap = db.mapping.read().await; + assert!(hashmap.is_empty()); + } + + #[tokio::test] + async fn test_post_reject_different_user() { + let db = crate::database::MemoryStorage::new(); + + let post = json!({ + "type": ["h-entry"], + "properties": { + "content": ["Hello world!"], + "uid": ["https://fireburn.ru/posts/hello"], + "url": ["https://fireburn.ru/posts/hello"] + } + }); + let user = crate::indieauth::User::new( + "https://aaronparecki.com/", + "https://kittybox.fireburn.ru/", + "create update media" + ); + let (uid, mf2) = super::normalize_mf2(post, &user); + + let err = super::_post( + user, uid, mf2, db.clone(), reqwest::Client::new() + ).await.unwrap_err(); + assert_eq!(err.error, super::ErrorType::Forbidden); + let hashmap = db.mapping.read().await; assert!(hashmap.is_empty()); } + #[tokio::test] - async fn check_post_mf2() { - let inject_db = { - let db = crate::database::MemoryStorage::new(); + async fn test_post_mf2() { + let db = crate::database::MemoryStorage::new(); - move || db.clone() - }; - let db = inject_db(); - - let res = warp::test::request() - .filter(&warp::any() - .map(inject_db) - .and_then(move |db| async move { - let post = json!({ - "type": ["h-entry"], - "properties": { - "content": ["Hello world!"] - } - }); - let user = crate::indieauth::User::new( - "https://localhost:8080/", - "https://kittybox.fireburn.ru/", - "create" - ); - let (uid, mf2) = super::post::normalize_mf2(post, &user); + let post = json!({ + "type": ["h-entry"], + "properties": { + "content": ["Hello world!"] + } + }); + let user = crate::indieauth::User::new( + "https://localhost:8080/", + "https://kittybox.fireburn.ru/", + "create" + ); + let (uid, mf2) = super::normalize_mf2(post, &user); - super::_post( - user, uid, mf2, db, reqwest::Client::new() - ).await.map_err(warp::reject::custom) - }) - ) - .await - .unwrap() - .into_response(); + let res = super::_post( + user, uid, mf2, db.clone(), reqwest::Client::new() + ).await.unwrap(); assert!(res.headers().contains_key("Location")); let location = res.headers().get("Location").unwrap(); @@ -920,40 +797,16 @@ mod tests { } #[tokio::test] - async fn test_check_auth() { - let err = warp::test::request() - .filter(&warp::any() - .map(|| ( - warp::host::Authority::from_static("aaronparecki.com"), - crate::indieauth::User::new( - "https://fireburn.ru/", - "https://quill.p3k.io/", - "create update media" - ))) - .untuple_one() - .and_then(super::check_auth)) - .await - .unwrap_err(); - - let json: &MicropubError = err.find::<MicropubError>().unwrap(); - assert_eq!(json.error, super::ErrorType::NotAuthorized); - } - - #[tokio::test] async fn test_query_foreign_url() { - let mut res = warp::test::request() - .filter(&warp::any().then(|| super::_query( - crate::database::MemoryStorage::new(), - super::MicropubQuery::source("https://aaronparecki.com/feeds/main"), - crate::indieauth::User::new( - "https://fireburn.ru/", - "https://quill.p3k.io/", - "create update media" - ) - ))) - .await - .unwrap() - .into_response(); + let mut res = super::query( + axum::Extension(crate::database::MemoryStorage::new()), + axum::extract::Query(super::MicropubQuery::source("https://aaronparecki.com/feeds/main")), + crate::indieauth::User::new( + "https://fireburn.ru/", + "https://quill.p3k.io/", + "create update media" + ) + ).await; assert_eq!(res.status(), 401); let body = res.body_mut().data().await.unwrap().unwrap(); @@ -961,4 +814,3 @@ mod tests { assert_eq!(json.error, super::ErrorType::NotAuthorized); } } - diff --git a/kittybox-rs/src/micropub/post.rs b/kittybox-rs/src/micropub/post.rs deleted file mode 100644 index cf9f3d9..0000000 --- a/kittybox-rs/src/micropub/post.rs +++ /dev/null @@ -1,944 +0,0 @@ -use crate::database::Storage; -use crate::indieauth::User; -use chrono::prelude::*; -use core::iter::Iterator; -use newbase60::num_to_sxg; -use std::convert::TryInto; -use serde_json::json; - -pub(crate) static DEFAULT_CHANNEL_PATH: &str = "/feeds/main"; -static DEFAULT_CHANNEL_NAME: &str = "Main feed"; -pub(crate) static CONTACTS_CHANNEL_PATH: &str = "/feeds/vcards"; -static CONTACTS_CHANNEL_NAME: &str = "My address book"; -pub(crate) static FOOD_CHANNEL_PATH: &str = "/feeds/food"; -static FOOD_CHANNEL_NAME: &str = "My recipe book"; - -fn get_folder_from_type(post_type: &str) -> String { - (match post_type { - "h-feed" => "feeds/", - "h-card" => "vcards/", - "h-event" => "events/", - "h-food" => "food/", - _ => "posts/", - }) - .to_string() -} - -/// Reset the datetime to a proper datetime. -/// Do not attempt to recover the information. -/// Do not pass GO. Do not collect $200. -fn reset_dt(post: &mut serde_json::Value) -> DateTime<FixedOffset> { - let curtime: DateTime<Local> = Local::now(); - post["properties"]["published"] = json!([curtime.to_rfc3339()]); - chrono::DateTime::from(curtime) -} - -pub fn normalize_mf2(mut body: serde_json::Value, user: &User) -> (String, serde_json::Value) { - // Normalize the MF2 object here. - let me = &user.me; - let folder = get_folder_from_type(body["type"][0].as_str().unwrap()); - let published: DateTime<FixedOffset> = if let Some(dt) = body["properties"]["published"][0].as_str() { - // Check if the datetime is parsable. - match DateTime::parse_from_rfc3339(dt) { - Ok(dt) => dt, - Err(_) => reset_dt(&mut body) - } - } else { - // Set the datetime. - // Note: this code block duplicates functionality with the above failsafe. - // Consider refactoring it to a helper function? - reset_dt(&mut body) - }; - match body["properties"]["uid"][0].as_str() { - None => { - let uid = serde_json::Value::String( - me.join( - &(folder.clone() - + &num_to_sxg(published.timestamp_millis().try_into().unwrap())), - ) - .unwrap() - .to_string(), - ); - body["properties"]["uid"] = serde_json::Value::Array(vec![uid.clone()]); - match body["properties"]["url"].as_array_mut() { - Some(array) => array.push(uid), - None => body["properties"]["url"] = body["properties"]["uid"].clone(), - } - } - Some(uid_str) => { - let uid = uid_str.to_string(); - match body["properties"]["url"].as_array_mut() { - Some(array) => { - if !array.iter().any(|i| i.as_str().unwrap_or("") == uid) { - array.push(serde_json::Value::String(uid)) - } - } - None => body["properties"]["url"] = body["properties"]["uid"].clone(), - } - } - } - if let Some(slugs) = body["properties"]["mp-slug"].as_array() { - let new_urls = slugs - .iter() - .map(|i| i.as_str().unwrap_or("")) - .filter(|i| i != &"") - .map(|i| me.join(&((&folder).clone() + i)).unwrap().to_string()) - .collect::<Vec<String>>(); - let urls = body["properties"]["url"].as_array_mut().unwrap(); - new_urls.iter().for_each(|i| urls.push(json!(i))); - } - let props = body["properties"].as_object_mut().unwrap(); - props.remove("mp-slug"); - - if body["properties"]["content"][0].is_string() { - // Convert the content to HTML using the `markdown` crate - body["properties"]["content"] = json!([{ - "html": markdown::to_html(body["properties"]["content"][0].as_str().unwrap()), - "value": body["properties"]["content"][0] - }]) - } - // TODO: apply this normalization to editing too - if body["properties"]["mp-channel"].is_array() { - let mut additional_channels = body["properties"]["mp-channel"].as_array().unwrap().clone(); - if let Some(array) = body["properties"]["channel"].as_array_mut() { - array.append(&mut additional_channels); - } else { - body["properties"]["channel"] = json!(additional_channels) - } - body["properties"].as_object_mut().unwrap().remove("mp-channel"); - } else if body["properties"]["mp-channel"].is_string() { - let chan = body["properties"]["mp-channel"].as_str().unwrap().to_owned(); - if let Some(array) = body["properties"]["channel"].as_array_mut() { - array.push(json!(chan)) - } else { - body["properties"]["channel"] = json!([chan]); - } - body["properties"].as_object_mut().unwrap().remove("mp-channel"); - } - if body["properties"]["channel"][0].as_str().is_none() { - match body["type"][0].as_str() { - Some("h-entry") => { - // Set the channel to the main channel... - let default_channel = me.join(DEFAULT_CHANNEL_PATH).unwrap().to_string(); - - body["properties"]["channel"] = json!([default_channel]); - } - Some("h-card") => { - let default_channel = me.join(CONTACTS_CHANNEL_PATH).unwrap().to_string(); - - body["properties"]["channel"] = json!([default_channel]); - } - Some("h-food") => { - let default_channel = me.join(FOOD_CHANNEL_PATH).unwrap().to_string(); - - body["properties"]["channel"] = json!([default_channel]); - } - // TODO h-event - /*"h-event" => { - let default_channel - },*/ - _ => { - body["properties"]["channel"] = json!([]); - } - } - } - body["properties"]["posted-with"] = json!([user.client_id]); - if body["properties"]["author"][0].as_str().is_none() { - body["properties"]["author"] = json!([me.as_str()]) - } - // TODO: maybe highlight #hashtags? - // Find other processing to do and insert it here - return ( - body["properties"]["uid"][0].as_str().unwrap().to_string(), - body, - ); -} - -/*pub async fn new_post<S: Storage>( - req: Request<ApplicationState<S>>, - body: serde_json::Value, -) -> Result { - // First, check for rights. - let user = req.ext::<User>().unwrap(); - let storage = &req.state().storage; - if !user.check_scope("create") { - return error_json!( - 401, - "invalid_scope", - "Not enough privileges to post. Try a token with a \"create\" scope instead." - ); - } - let (uid, post) = normalize_mf2(body, user); - - // Security check! - // This software might also be used in a multi-user setting - // where several users or identities share one Micropub server - // (maybe a family website or a shitpost sideblog?) - if !post["properties"]["uid"][0] - .as_str() - .unwrap() - .starts_with(user.me.as_str()) - || post["properties"]["channel"] - .as_array() - .unwrap() - .iter() - .any(|url| !url.as_str().unwrap().starts_with(user.me.as_str())) - { - return error_json!( - 403, - "forbidden", - "You're trying to post to someone else's website..." - ); - } - - match storage.post_exists(&uid).await { - Ok(exists) => { - if exists { - return error_json!( - 409, - "already_exists", - format!( - "A post with the exact same UID already exists in the database: {}", - uid - ) - ); - } - } - Err(err) => return Ok(err.into()), - } - - if let Err(err) = storage.put_post(&post, user.me.as_str()).await { - return error_json!(500, "database_error", format!("{}", err)); - } - - // It makes sense to use a loop here, because you wouldn't post to a hundred channels at once - // Mostly one or two, and even those ones will be the ones picked for you by software - for channel in post["properties"]["channel"] - .as_array() - .unwrap() - .iter() - .map(|i| i.as_str().unwrap_or("").to_string()) - .filter(|i| !i.is_empty()) - .collect::<Vec<_>>() - { - let default_channel = user.me.join(DEFAULT_CHANNEL_PATH).unwrap().to_string(); - let vcards_channel = user.me.join(CONTACTS_CHANNEL_PATH).unwrap().to_string(); - let food_channel = user.me.join(FOOD_CHANNEL_PATH).unwrap().to_string(); - match storage.post_exists(&channel).await { - Ok(exists) => { - if exists { - if let Err(err) = storage - .update_post(&channel, json!({"add": {"children": [uid]}})) - .await - { - return error_json!( - 500, - "database_error", - format!( - "Couldn't insert post into the channel due to a database error: {}", - err - ) - ); - } - } else if channel == default_channel - || channel == vcards_channel - || channel == food_channel - { - if let Err(err) = create_feed(storage, &uid, &channel, user).await { - return error_json!( - 500, - "database_error", - format!("Couldn't save feed: {}", err) - ); - } - } else { - warn!( - "Ignoring request to post to a non-existent feed: {}", - channel - ); - } - } - Err(err) => return error_json!(500, "database_error", err), - } - } - // END WRITE BOUNDARY - - // do background processing on the post - async_std::task::spawn(post_process_new_post(req, post)); - - Ok(Response::builder(202) - .header("Location", &uid) - .body(json!({"status": "accepted", "location": &uid})) - .build()) -}*/ - -pub(crate) async fn create_feed( - storage: &impl Storage, - uid: &str, - channel: &str, - user: &User, -) -> crate::database::Result<()> { - let path = url::Url::parse(channel).unwrap().path().to_string(); - - // Note to Future Vika: DO NOT CONVERT THIS TO A MATCH BLOCK - // It will get treated as a binding instead of a const - // See `rustc --explain E0530` for more info - let name = if path == DEFAULT_CHANNEL_PATH { - DEFAULT_CHANNEL_NAME - } else if path == CONTACTS_CHANNEL_PATH { - CONTACTS_CHANNEL_NAME - } else if path == FOOD_CHANNEL_PATH { - FOOD_CHANNEL_NAME - } else { - panic!("Tried to create an unknown default feed!") - }; - - let (_, feed) = normalize_mf2( - json!({ - "type": ["h-feed"], - "properties": { - "name": [name], - "uid": [channel] - }, - "children": [uid] - }), - user, - ); - storage.put_post(&feed, user.me.as_str()).await -} - -/*async fn post_process_new_post<S: Storage>( - req: Request<ApplicationState<S>>, - post: serde_json::Value, -) { - // TODO: Post-processing the post (aka second write pass) - // - [-] Download rich reply contexts - // - [-] Syndicate the post if requested, add links to the syndicated copies - // - [ ] Send WebSub notifications to the hub (if we happen to have one) - // - [x] Send webmentions - let http = &req.state().http_client; - let uid = post["properties"]["uid"][0].as_str().unwrap().to_string(); - // 1. Download rich reply contexts - // This needs to be done first, because at this step we can also determine webmention endpoints - // and save them for later use. Additionally, the richer our content is, the better. - // This needs to be done asynchronously, so the posting experience for the author will be as fast - // as possible without making them wait for potentially slow downstream websites to load - // 1.1. Collect the list of contextually-significant post to load context from. - // This will include reply-tos, liked, reposted and bookmarked content - // - // TODO: Fetch links mentioned in a post, since we need to send webmentions to those as mentions - let mut contextually_significant_posts: Vec<surf::Url> = vec![]; - for prop in &["in-reply-to", "like-of", "repost-of", "bookmark-of"] { - if let Some(array) = post["properties"][prop].as_array() { - contextually_significant_posts.extend( - array - .iter() - .filter_map(|v| v.as_str().and_then(|v| surf::Url::parse(v).ok())), - ); - } - } - // 1.2. Deduplicate the list - contextually_significant_posts.sort_unstable(); - contextually_significant_posts.dedup(); - - // 1.3. Fetch the posts with their bodies and save them in a new Vec<(surf::Url, String)> - let posts_with_bodies: Vec<(surf::Url, surf::Response, String)> = - stream::iter(contextually_significant_posts.into_iter()) - .filter_map(|v: surf::Url| async move { - if let Ok(res) = http.get(&v).send().await { - if res.status() != 200 { - None - } else { - Some((v, res)) - } - } else { - None - } - }) - .filter_map(|(v, mut res): (surf::Url, surf::Response)| async move { - if let Ok(body) = res.body_string().await { - Some((v, res, body)) - } else { - None - } - }) - .collect() - .await; - // 1.4. Parse the bodies and include them in relevant places on the MF2 struct - // This requires an MF2 parser, and there are none for Rust at the moment. - // - // TODO: integrate https://gitlab.com/vikanezrimaya/mf2-parser when it's ready - - // 2. Syndicate the post - let syndicated_copies: Vec<serde_json::Value>; - if let Some(syndication_targets) = post["properties"]["syndicate-to"].as_array() { - syndicated_copies = stream::iter( - syndication_targets - .iter() - .filter_map(|v| v.as_str()) - .filter_map(|t| surf::Url::parse(t).ok()) - .collect::<Vec<_>>() - .into_iter() - .map(|_t: surf::Url| async move { - // TODO: Define supported syndication methods - // and syndicate the endpoint there - // Possible ideas: - // - indieweb.xyz (might need a lot of space for the buttons though, investigate proposing grouping syndication targets) - // - news.indieweb.org (IndieNews - needs a category linking to #indienews) - // - Twitter via brid.gy (do I really need Twitter syndication tho?) - if false { - Some("") - } else { - None - } - }), - ) - .buffer_unordered(3) - .filter_map(|v| async move { v }) - .map(|v| serde_json::Value::String(v.to_string())) - .collect::<Vec<_>>() - .await; - } else { - syndicated_copies = vec![] - } - // Save the post a second time here after syndication - // We use update_post here to prevent race conditions since its required to be atomic - let mut update = json!({ - "action": "update", - "url": &uid - }); - if !syndicated_copies.is_empty() { - update["add"] = json!({}); - update["add"]["syndication"] = serde_json::Value::Array(syndicated_copies); - } - if !posts_with_bodies.is_empty() { - error!("Replacing context links with parsed MF2-JSON data is not yet implemented (but it's ok! it'll just be less pretty)") - /* TODO: Replace context links with parsed MF2-JSON data * / - update["replace"] = {} - update["replace"]["like-of"] = [] - update["replace"]["in-reply-to"] = [] - update["replace"]["bookmark-of"] = [] - update["replace"]["repost-of"] = [] - // */ - } - // We don't need the original copy of the post anymore... I hope! - // This will act as a safeguard so I can't read stale data by accident anymore... - drop(post); - if let Err(err) = req.state().storage.update_post(&uid, update).await { - error!("Encountered error while post-processing a post: {}", err) - // At this point, we can still continue, we just won't have rich data for the post - // I wonder why could it even happen except in case of a database disconnection? - } - // 3. Send WebSub notifications - // TODO WebSub support - - // 4. Send webmentions - // We'll need the bodies here to get their endpoints - let source = &uid; - stream::iter(posts_with_bodies.into_iter()) - .filter_map( - |(url, response, body): (surf::Url, surf::Response, String)| async move { - // Check Link headers first - // the first webmention endpoint will be returned - if let Some(values) = response.header("Link") { - let iter = values.iter().flat_map(|i| i.as_str().split(',')); - - // Honestly I don't like this parser. It's very crude. - // But it should do the job. But I don't like it. - for link in iter { - let mut split = link.split(';'); - - match split.next() { - Some(uri) => { - if let Some(uri) = uri.strip_prefix('<') { - if let Some(uri) = uri.strip_suffix('>') { - for prop in split { - let lowercased = prop.to_ascii_lowercase(); - if &lowercased == "rel=\"webmention\"" - || &lowercased == "rel=webmention" - { - if let Ok(endpoint) = url.join(uri) { - return Some((url, endpoint)); - } - } - } - } - } - } - None => continue, - } - } - } - // TODO: Replace this function once the MF2 parser is ready - // A compliant parser's output format includes rels, - // we could just find a Webmention one in there - let pattern = - easy_scraper::Pattern::new(r#"<link href="{{url}}" rel="webmention">"#) - .expect("Pattern for webmentions couldn't be parsed"); - let matches = pattern.matches(&body); - if matches.is_empty() { - return None; - } - let endpoint = &matches[0]["url"]; - if let Ok(endpoint) = url.join(endpoint) { - Some((url, endpoint)) - } else { - None - } - }, - ) - .map(|(target, endpoint)| async move { - info!( - "Sending webmention to {} about {}", - source, - &target.to_string() - ); - let response = http - .post(&endpoint) - .content_type("application/x-www-form-urlencoded") - .body( - serde_urlencoded::to_string(vec![ - ("source", source), - ("target", &target.to_string()), - ]) - .expect("Couldn't construct webmention form"), - ) - .send() - .await; - match response { - Ok(response) => { - if response.status() == 200 - || response.status() == 201 - || response.status() == 202 - { - info!("Sent webmention for {} to {}", target, endpoint); - Ok(()) - } else { - error!( - "Sending webmention for {} to {} failed: Endpoint replied with HTTP {}", - target, - endpoint, - response.status() - ); - Err(()) - } - } - Err(err) => { - error!( - "Sending webmention for {} to {} failed: {}", - target, endpoint, err - ); - Err(()) - } - } - }) - .buffer_unordered(3) - .collect::<Vec<_>>() - .await; -}*/ - -/*async fn process_json<S: Storage>( - req: Request<ApplicationState<S>>, - body: serde_json::Value, -) -> Result { - let is_action = body["action"].is_string() && body["url"].is_string(); - if is_action { - // This could be an update, a deletion or an undeletion request. - // Process it separately. - let action = body["action"].as_str().unwrap(); - let url = body["url"].as_str().unwrap(); - let user = req.ext::<User>().unwrap(); - match action { - "delete" => { - if !user.check_scope("delete") { - return error_json!( - 401, - "insufficient_scope", - "You need a `delete` scope to delete posts." - ); - } - // This special scope is not available through a token endpoint, since the - // authorization endpoint is supposed to reject any auth request trying to get this - // scope. It is intended for TRUSTED external services that need to modify the - // database while ignoring any access controls - if (url::Url::parse(url)?.origin().ascii_serialization() + "/") != user.me.as_str() - && !user.check_scope("kittybox_internal:do_what_thou_wilt") - { - return error_json!( - 403, - "forbidden", - "You're not allowed to delete someone else's posts." - ); - } - if let Err(error) = req.state().storage.delete_post(url).await { - return Ok(error.into()); - } - Ok(Response::builder(200).build()) - } - "update" => { - if !user.check_scope("update") { - return error_json!( - 401, - "insufficient_scope", - "You need an `update` scope to update posts." - ); - } - if (url::Url::parse(url)?.origin().ascii_serialization() + "/") != user.me.as_str() - && !user.check_scope("kittybox_internal:do_what_thou_wilt") - { - return error_json!( - 403, - "forbidden", - "You're not allowed to delete someone else's posts." - ); - } - if let Err(error) = req.state().storage.update_post(url, body.clone()).await { - Ok(error.into()) - } else { - Ok(Response::builder(204).build()) - } - } - _ => return error_json!(400, "invalid_request", "This action is not supported."), - } - } else if body["type"][0].is_string() { - // This is definitely an h-entry or something similar. Check if it has properties? - if body["properties"].is_object() { - // Ok, this is definitely a new h-entry. Let's save it. - return new_post(req, body).await; - } else { - return error_json!( - 400, - "invalid_request", - "This MF2-JSON object has a type, but not properties. This makes no sense to post." - ); - } - } else { - return error_json!( - 400, - "invalid_request", - "Try sending MF2-structured data or an object with an \"action\" and \"url\" keys." - ); - } -}*/ - -/*async fn process_form<S: Storage>( - req: Request<ApplicationState<S>>, - form: Vec<(String, String)>, -) -> Result { - if let Some((_, v)) = form.iter().find(|(k, _)| k == "action") { - if v == "delete" { - let user = req.ext::<User>().unwrap(); - if !user.check_scope("delete") { - return error_json!( - 401, - "insufficient_scope", - "You cannot delete posts without a `delete` scope." - ); - } - match form.iter().find(|(k, _)| k == "url") { - Some((_, url)) => { - if (url::Url::parse(url)?.origin().ascii_serialization() + "/") - != user.me.as_str() - && !user.check_scope("kittybox_internal:do_what_thou_wilt") - { - return error_json!( - 403, - "forbidden", - "You're not allowed to delete someone else's posts." - ); - } - if let Err(error) = req.state().storage.delete_post(url).await { - return error_json!(500, "database_error", error); - } - return Ok(Response::builder(200).build()); - } - None => { - return error_json!( - 400, - "invalid_request", - "Please provide an `url` to delete." - ) - } - } - } else { - return error_json!(400, "invalid_request", "This action is not supported in form-encoded mode. (JSON requests support more actions, use JSON!)"); - } - } - - let mf2 = convert_form_to_mf2_json(form); - - if mf2["properties"].as_object().unwrap().keys().len() > 0 { - return new_post(req, mf2).await; - } - return error_json!( - 400, - "invalid_request", - "Try sending h=entry&content=something%20interesting" - ); -}*/ - -/*pub async fn post_handler<S: Storage>(mut req: Request<ApplicationState<S>>) -> Result { - match req.content_type() { - Some(value) => { - if value == Mime::from_str("application/json").unwrap() { - match req.body_json::<serde_json::Value>().await { - Ok(parsed) => return process_json(req, parsed).await, - Err(err) => { - return error_json!( - 400, - "invalid_request", - format!("Parsing JSON failed: {:?}", err) - ) - } - } - } else if value == Mime::from_str("application/x-www-form-urlencoded").unwrap() { - match req.body_form::<Vec<(String, String)>>().await { - Ok(parsed) => return process_form(req, parsed).await, - Err(err) => { - return error_json!( - 400, - "invalid_request", - format!("Parsing form failed: {:?}", err) - ) - } - } - } else { - return error_json!( - 415, "unsupported_media_type", - "What's this? Try sending JSON instead. (urlencoded form also works but is less cute)" - ); - } - } - _ => { - return error_json!( - 415, "unsupported_media_type", - "You didn't send a Content-Type header, so we don't know how to parse your request." - ); - } - } -}*/ - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_no_replace_uid() { - let mf2 = json!({ - "type": ["h-card"], - "properties": { - "uid": ["https://fireburn.ru/"], - "name": ["Vika Nezrimaya"], - "note": ["A crazy programmer girl who wants some hugs"] - } - }); - - let (uid, normalized) = normalize_mf2( - mf2.clone(), - &User::new( - "https://fireburn.ru/", - "https://quill.p3k.io/", - "create update media", - ), - ); - assert_eq!( - normalized["properties"]["uid"][0], mf2["properties"]["uid"][0], - "UID was replaced" - ); - assert_eq!( - normalized["properties"]["uid"][0], uid, - "Returned post location doesn't match UID" - ); - } - - #[test] - fn test_mp_channel() { - let mf2 = json!({ - "type": ["h-entry"], - "properties": { - "uid": ["https://fireburn.ru/posts/test"], - "content": [{"html": "<p>Hello world!</p>"}], - "mp-channel": ["https://fireburn.ru/feeds/test"] - } - }); - - let (_, normalized) = normalize_mf2( - mf2.clone(), - &User::new( - "https://fireburn.ru/", - "https://quill.p3k.io/", - "create update media", - ) - ); - - assert_eq!( - normalized["properties"]["channel"], - mf2["properties"]["mp-channel"] - ); - } - - #[test] - fn test_mp_channel_as_string() { - let mf2 = json!({ - "type": ["h-entry"], - "properties": { - "uid": ["https://fireburn.ru/posts/test"], - "content": [{"html": "<p>Hello world!</p>"}], - "mp-channel": "https://fireburn.ru/feeds/test" - } - }); - - let (_, normalized) = normalize_mf2( - mf2.clone(), - &User::new( - "https://fireburn.ru/", - "https://quill.p3k.io/", - "create update media", - ) - ); - - assert_eq!( - normalized["properties"]["channel"][0], - mf2["properties"]["mp-channel"] - ); - } - - #[test] - fn test_normalize_mf2() { - let mf2 = json!({ - "type": ["h-entry"], - "properties": { - "content": ["This is content!"] - } - }); - - let (uid, post) = normalize_mf2( - mf2, - &User::new( - "https://fireburn.ru/", - "https://quill.p3k.io/", - "create update media", - ), - ); - assert_eq!( - post["properties"]["published"] - .as_array() - .expect("post['published'] is undefined") - .len(), - 1, - "Post doesn't have a published time" - ); - DateTime::parse_from_rfc3339(post["properties"]["published"][0].as_str().unwrap()) - .expect("Couldn't parse date from rfc3339"); - assert!( - !post["properties"]["url"] - .as_array() - .expect("post['url'] is undefined") - .is_empty(), - "Post doesn't have any URLs" - ); - assert_eq!( - post["properties"]["uid"] - .as_array() - .expect("post['uid'] is undefined") - .len(), - 1, - "Post doesn't have a single UID" - ); - assert_eq!( - post["properties"]["uid"][0], uid, - "UID of a post and its supposed location don't match" - ); - assert!( - uid.starts_with("https://fireburn.ru/posts/"), - "The post namespace is incorrect" - ); - assert_eq!( - post["properties"]["content"][0]["html"] - .as_str() - .expect("Post doesn't have a rich content object") - .trim(), - "<p>This is content!</p>", - "Parsed Markdown content doesn't match expected HTML" - ); - assert_eq!( - post["properties"]["channel"][0], "https://fireburn.ru/feeds/main", - "Post isn't posted to the main channel" - ); - assert_eq!( - post["properties"]["author"][0], "https://fireburn.ru/", - "Post author is unknown" - ); - } - - #[test] - fn test_mp_slug() { - let mf2 = json!({ - "type": ["h-entry"], - "properties": { - "content": ["This is content!"], - "mp-slug": ["hello-post"] - }, - }); - - let (_, post) = normalize_mf2( - mf2, - &User::new( - "https://fireburn.ru/", - "https://quill.p3k.io/", - "create update media", - ), - ); - assert!( - post["properties"]["url"] - .as_array() - .unwrap() - .iter() - .map(|i| i.as_str().unwrap()) - .any(|i| i == "https://fireburn.ru/posts/hello-post"), - "Didn't found an URL pointing to the location expected by the mp-slug semantics" - ); - assert!( - post["properties"]["mp-slug"].as_array().is_none(), - "mp-slug wasn't deleted from the array!" - ) - } - - #[test] - fn test_normalize_feed() { - let mf2 = json!({ - "type": ["h-feed"], - "properties": { - "name": "Main feed", - "mp-slug": ["main"] - } - }); - - let (uid, post) = normalize_mf2( - mf2, - &User::new( - "https://fireburn.ru/", - "https://quill.p3k.io/", - "create update media", - ), - ); - assert_eq!( - post["properties"]["uid"][0], uid, - "UID of a post and its supposed location don't match" - ); - assert_eq!(post["properties"]["author"][0], "https://fireburn.ru/"); - assert!( - post["properties"]["url"] - .as_array() - .unwrap() - .iter() - .map(|i| i.as_str().unwrap()) - .any(|i| i == "https://fireburn.ru/feeds/main"), - "Didn't found an URL pointing to the location expected by the mp-slug semantics" - ); - assert!( - post["properties"]["mp-slug"].as_array().is_none(), - "mp-slug wasn't deleted from the array!" - ) - } -} diff --git a/kittybox-rs/src/micropub/util.rs b/kittybox-rs/src/micropub/util.rs new file mode 100644 index 0000000..97ec09a --- /dev/null +++ b/kittybox-rs/src/micropub/util.rs @@ -0,0 +1,457 @@ +use crate::database::Storage; +use crate::indieauth::User; +use chrono::prelude::*; +use core::iter::Iterator; +use newbase60::num_to_sxg; +use serde_json::json; +use std::convert::TryInto; + +pub(crate) const DEFAULT_CHANNEL_PATH: &str = "/feeds/main"; +const DEFAULT_CHANNEL_NAME: &str = "Main feed"; +pub(crate) const CONTACTS_CHANNEL_PATH: &str = "/feeds/vcards"; +const CONTACTS_CHANNEL_NAME: &str = "My address book"; +pub(crate) const FOOD_CHANNEL_PATH: &str = "/feeds/food"; +const FOOD_CHANNEL_NAME: &str = "My recipe book"; + +fn get_folder_from_type(post_type: &str) -> String { + (match post_type { + "h-feed" => "feeds/", + "h-card" => "vcards/", + "h-event" => "events/", + "h-food" => "food/", + _ => "posts/", + }) + .to_string() +} + +/// Reset the datetime to a proper datetime. +/// Do not attempt to recover the information. +/// Do not pass GO. Do not collect $200. +fn reset_dt(post: &mut serde_json::Value) -> DateTime<FixedOffset> { + let curtime: DateTime<Local> = Local::now(); + post["properties"]["published"] = json!([curtime.to_rfc3339()]); + chrono::DateTime::from(curtime) +} + +pub fn normalize_mf2(mut body: serde_json::Value, user: &User) -> (String, serde_json::Value) { + // Normalize the MF2 object here. + let me = &user.me; + let folder = get_folder_from_type(body["type"][0].as_str().unwrap()); + let published: DateTime<FixedOffset> = + if let Some(dt) = body["properties"]["published"][0].as_str() { + // Check if the datetime is parsable. + match DateTime::parse_from_rfc3339(dt) { + Ok(dt) => dt, + Err(_) => reset_dt(&mut body), + } + } else { + // Set the datetime. + // Note: this code block duplicates functionality with the above failsafe. + // Consider refactoring it to a helper function? + reset_dt(&mut body) + }; + match body["properties"]["uid"][0].as_str() { + None => { + let uid = serde_json::Value::String( + me.join( + &(folder.clone() + + &num_to_sxg(published.timestamp_millis().try_into().unwrap())), + ) + .unwrap() + .to_string(), + ); + body["properties"]["uid"] = serde_json::Value::Array(vec![uid.clone()]); + match body["properties"]["url"].as_array_mut() { + Some(array) => array.push(uid), + None => body["properties"]["url"] = body["properties"]["uid"].clone(), + } + } + Some(uid_str) => { + let uid = uid_str.to_string(); + match body["properties"]["url"].as_array_mut() { + Some(array) => { + if !array.iter().any(|i| i.as_str().unwrap_or("") == uid) { + array.push(serde_json::Value::String(uid)) + } + } + None => body["properties"]["url"] = body["properties"]["uid"].clone(), + } + } + } + if let Some(slugs) = body["properties"]["mp-slug"].as_array() { + let new_urls = slugs + .iter() + .map(|i| i.as_str().unwrap_or("")) + .filter(|i| i != &"") + .map(|i| me.join(&((&folder).clone() + i)).unwrap().to_string()) + .collect::<Vec<String>>(); + let urls = body["properties"]["url"].as_array_mut().unwrap(); + new_urls.iter().for_each(|i| urls.push(json!(i))); + } + let props = body["properties"].as_object_mut().unwrap(); + props.remove("mp-slug"); + + if body["properties"]["content"][0].is_string() { + // Convert the content to HTML using the `markdown` crate + body["properties"]["content"] = json!([{ + "html": markdown::to_html(body["properties"]["content"][0].as_str().unwrap()), + "value": body["properties"]["content"][0] + }]) + } + // TODO: apply this normalization to editing too + if body["properties"]["mp-channel"].is_array() { + let mut additional_channels = body["properties"]["mp-channel"].as_array().unwrap().clone(); + if let Some(array) = body["properties"]["channel"].as_array_mut() { + array.append(&mut additional_channels); + } else { + body["properties"]["channel"] = json!(additional_channels) + } + body["properties"] + .as_object_mut() + .unwrap() + .remove("mp-channel"); + } else if body["properties"]["mp-channel"].is_string() { + let chan = body["properties"]["mp-channel"] + .as_str() + .unwrap() + .to_owned(); + if let Some(array) = body["properties"]["channel"].as_array_mut() { + array.push(json!(chan)) + } else { + body["properties"]["channel"] = json!([chan]); + } + body["properties"] + .as_object_mut() + .unwrap() + .remove("mp-channel"); + } + if body["properties"]["channel"][0].as_str().is_none() { + match body["type"][0].as_str() { + Some("h-entry") => { + // Set the channel to the main channel... + let default_channel = me.join(DEFAULT_CHANNEL_PATH).unwrap().to_string(); + + body["properties"]["channel"] = json!([default_channel]); + } + Some("h-card") => { + let default_channel = me.join(CONTACTS_CHANNEL_PATH).unwrap().to_string(); + + body["properties"]["channel"] = json!([default_channel]); + } + Some("h-food") => { + let default_channel = me.join(FOOD_CHANNEL_PATH).unwrap().to_string(); + + body["properties"]["channel"] = json!([default_channel]); + } + // TODO h-event + /*"h-event" => { + let default_channel + },*/ + _ => { + body["properties"]["channel"] = json!([]); + } + } + } + body["properties"]["posted-with"] = json!([user.client_id]); + if body["properties"]["author"][0].as_str().is_none() { + body["properties"]["author"] = json!([me.as_str()]) + } + // TODO: maybe highlight #hashtags? + // Find other processing to do and insert it here + return ( + body["properties"]["uid"][0].as_str().unwrap().to_string(), + body, + ); +} + +pub(crate) fn form_to_mf2_json(form: Vec<(String, String)>) -> serde_json::Value { + let mut mf2 = json!({"type": [], "properties": {}}); + for (k, v) in form { + if k == "h" { + mf2["type"] + .as_array_mut() + .unwrap() + .push(json!("h-".to_string() + &v)); + } else if k != "access_token" { + let key = k.strip_suffix("[]").unwrap_or(&k); + match mf2["properties"][key].as_array_mut() { + Some(prop) => prop.push(json!(v)), + None => mf2["properties"][key] = json!([v]), + } + } + } + if mf2["type"].as_array().unwrap().is_empty() { + mf2["type"].as_array_mut().unwrap().push(json!("h-entry")); + } + mf2 +} + +pub(crate) async fn create_feed( + storage: &impl Storage, + uid: &str, + channel: &str, + user: &User, +) -> crate::database::Result<()> { + let path = url::Url::parse(channel).unwrap().path().to_string(); + + let name = match path.as_str() { + DEFAULT_CHANNEL_PATH => DEFAULT_CHANNEL_NAME, + CONTACTS_CHANNEL_PATH => CONTACTS_CHANNEL_NAME, + FOOD_CHANNEL_PATH => FOOD_CHANNEL_NAME, + _ => panic!("Tried to create an unknown default feed!"), + }; + + let (_, feed) = normalize_mf2( + json!({ + "type": ["h-feed"], + "properties": { + "name": [name], + "uid": [channel] + }, + "children": [uid] + }), + user, + ); + storage.put_post(&feed, user.me.as_str()).await +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn test_form_to_mf2() { + assert_eq!( + super::form_to_mf2_json( + serde_urlencoded::from_str("h=entry&content=something%20interesting").unwrap() + ), + json!({ + "type": ["h-entry"], + "properties": { + "content": ["something interesting"] + } + }) + ) + } + + #[test] + fn test_no_replace_uid() { + let mf2 = json!({ + "type": ["h-card"], + "properties": { + "uid": ["https://fireburn.ru/"], + "name": ["Vika Nezrimaya"], + "note": ["A crazy programmer girl who wants some hugs"] + } + }); + + let (uid, normalized) = normalize_mf2( + mf2.clone(), + &User::new( + "https://fireburn.ru/", + "https://quill.p3k.io/", + "create update media", + ), + ); + assert_eq!( + normalized["properties"]["uid"][0], mf2["properties"]["uid"][0], + "UID was replaced" + ); + assert_eq!( + normalized["properties"]["uid"][0], uid, + "Returned post location doesn't match UID" + ); + } + + #[test] + fn test_mp_channel() { + let mf2 = json!({ + "type": ["h-entry"], + "properties": { + "uid": ["https://fireburn.ru/posts/test"], + "content": [{"html": "<p>Hello world!</p>"}], + "mp-channel": ["https://fireburn.ru/feeds/test"] + } + }); + + let (_, normalized) = normalize_mf2( + mf2.clone(), + &User::new( + "https://fireburn.ru/", + "https://quill.p3k.io/", + "create update media", + ), + ); + + assert_eq!( + normalized["properties"]["channel"], + mf2["properties"]["mp-channel"] + ); + } + + #[test] + fn test_mp_channel_as_string() { + let mf2 = json!({ + "type": ["h-entry"], + "properties": { + "uid": ["https://fireburn.ru/posts/test"], + "content": [{"html": "<p>Hello world!</p>"}], + "mp-channel": "https://fireburn.ru/feeds/test" + } + }); + + let (_, normalized) = normalize_mf2( + mf2.clone(), + &User::new( + "https://fireburn.ru/", + "https://quill.p3k.io/", + "create update media", + ), + ); + + assert_eq!( + normalized["properties"]["channel"][0], + mf2["properties"]["mp-channel"] + ); + } + + #[test] + fn test_normalize_mf2() { + let mf2 = json!({ + "type": ["h-entry"], + "properties": { + "content": ["This is content!"] + } + }); + + let (uid, post) = normalize_mf2( + mf2, + &User::new( + "https://fireburn.ru/", + "https://quill.p3k.io/", + "create update media", + ), + ); + assert_eq!( + post["properties"]["published"] + .as_array() + .expect("post['published'] is undefined") + .len(), + 1, + "Post doesn't have a published time" + ); + DateTime::parse_from_rfc3339(post["properties"]["published"][0].as_str().unwrap()) + .expect("Couldn't parse date from rfc3339"); + assert!( + !post["properties"]["url"] + .as_array() + .expect("post['url'] is undefined") + .is_empty(), + "Post doesn't have any URLs" + ); + assert_eq!( + post["properties"]["uid"] + .as_array() + .expect("post['uid'] is undefined") + .len(), + 1, + "Post doesn't have a single UID" + ); + assert_eq!( + post["properties"]["uid"][0], uid, + "UID of a post and its supposed location don't match" + ); + assert!( + uid.starts_with("https://fireburn.ru/posts/"), + "The post namespace is incorrect" + ); + assert_eq!( + post["properties"]["content"][0]["html"] + .as_str() + .expect("Post doesn't have a rich content object") + .trim(), + "<p>This is content!</p>", + "Parsed Markdown content doesn't match expected HTML" + ); + assert_eq!( + post["properties"]["channel"][0], "https://fireburn.ru/feeds/main", + "Post isn't posted to the main channel" + ); + assert_eq!( + post["properties"]["author"][0], "https://fireburn.ru/", + "Post author is unknown" + ); + } + + #[test] + fn test_mp_slug() { + let mf2 = json!({ + "type": ["h-entry"], + "properties": { + "content": ["This is content!"], + "mp-slug": ["hello-post"] + }, + }); + + let (_, post) = normalize_mf2( + mf2, + &User::new( + "https://fireburn.ru/", + "https://quill.p3k.io/", + "create update media", + ), + ); + assert!( + post["properties"]["url"] + .as_array() + .unwrap() + .iter() + .map(|i| i.as_str().unwrap()) + .any(|i| i == "https://fireburn.ru/posts/hello-post"), + "Didn't found an URL pointing to the location expected by the mp-slug semantics" + ); + assert!( + post["properties"]["mp-slug"].as_array().is_none(), + "mp-slug wasn't deleted from the array!" + ) + } + + #[test] + fn test_normalize_feed() { + let mf2 = json!({ + "type": ["h-feed"], + "properties": { + "name": "Main feed", + "mp-slug": ["main"] + } + }); + + let (uid, post) = normalize_mf2( + mf2, + &User::new( + "https://fireburn.ru/", + "https://quill.p3k.io/", + "create update media", + ), + ); + assert_eq!( + post["properties"]["uid"][0], uid, + "UID of a post and its supposed location don't match" + ); + assert_eq!(post["properties"]["author"][0], "https://fireburn.ru/"); + assert!( + post["properties"]["url"] + .as_array() + .unwrap() + .iter() + .map(|i| i.as_str().unwrap()) + .any(|i| i == "https://fireburn.ru/feeds/main"), + "Didn't found an URL pointing to the location expected by the mp-slug semantics" + ); + assert!( + post["properties"]["mp-slug"].as_array().is_none(), + "mp-slug wasn't deleted from the array!" + ) + } +} |