From 08c09aaa055c05228855eed8cded9fdfe4939c0f Mon Sep 17 00:00:00 2001 From: Vika Date: Tue, 4 May 2021 17:05:51 +0300 Subject: Initial commit Working features: - Sending posts from the database - Reading posts from the database - Responding with MF2-JSON (only in debug mode!) - Not locking the database when not needed - All database actions are atomic (except for a small race where UIDs can clash, but that's not gonna happen often) TODOs: - Send webmentions - Send syndication requests - Send WebSub notifications - Make tombstones for deleted posts (update adding dt-deleted) - Rich reply contexts (possibly on the frontend part?) - Frontend? - Fix UID race Code maintenance TODOs: - Split the database module - Finish implementing the in-memory test database - Make RedisDatabase unit tests launch their own Redis instances (see redis-rs/tests/support/mod.rs for more info) - Write more unit-tests!!! --- src/database.rs | 625 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/edit_post.lua | 93 ++++++++ src/index.html | 172 ++++++++++++++ src/indieauth.rs | 116 ++++++++++ src/lib.rs | 276 +++++++++++++++++++++++ src/main.rs | 48 ++++ src/micropub/get.rs | 86 +++++++ src/micropub/mod.rs | 5 + src/micropub/post.rs | 433 +++++++++++++++++++++++++++++++++++ 9 files changed, 1854 insertions(+) create mode 100644 src/database.rs create mode 100644 src/edit_post.lua create mode 100644 src/index.html create mode 100644 src/indieauth.rs create mode 100644 src/lib.rs create mode 100644 src/main.rs create mode 100644 src/micropub/get.rs create mode 100644 src/micropub/mod.rs create mode 100644 src/micropub/post.rs (limited to 'src') diff --git a/src/database.rs b/src/database.rs new file mode 100644 index 0000000..3a9ac04 --- /dev/null +++ b/src/database.rs @@ -0,0 +1,625 @@ +#![allow(unused_variables)] +use async_trait::async_trait; +use lazy_static::lazy_static; +#[cfg(test)] +use std::collections::HashMap; +#[cfg(test)] +use std::sync::Arc; +#[cfg(test)] +use async_std::sync::RwLock; +use log::error; +use futures::stream; +use futures_util::FutureExt; +use futures_util::StreamExt; +use serde::{Serialize,Deserialize}; +use serde_json::json; +use redis; +use redis::AsyncCommands; + +use crate::indieauth::User; + +#[derive(Serialize, Deserialize, PartialEq, Debug)] +pub struct MicropubChannel { + pub uid: String, + pub name: String +} + +#[derive(Debug, Clone, Copy)] +pub enum ErrorKind { + Backend, + PermissionDenied, + JSONParsing, + NotFound, + Other +} + +// TODO get rid of your own errors and use a crate? +#[derive(Debug)] +pub struct StorageError { + msg: String, + source: Option>, + kind: ErrorKind +} +unsafe impl Send for StorageError {} +unsafe impl Sync for StorageError {} +impl std::error::Error for StorageError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + self.source.as_ref().map(|e| e.as_ref()) + } +} +impl From for StorageError { + fn from(err: redis::RedisError) -> Self { + Self { + msg: format!("{}", err), + source: Some(Box::new(err)), + kind: ErrorKind::Backend + } + } +} +impl From for StorageError { + fn from(err: serde_json::Error) -> Self { + Self { + msg: format!("{}", err), + source: Some(Box::new(err)), + kind: ErrorKind::JSONParsing + } + } +} +impl std::fmt::Display for StorageError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match match self.kind { + ErrorKind::Backend => write!(f, "backend error: "), + ErrorKind::JSONParsing => write!(f, "error while parsing JSON: "), + ErrorKind::PermissionDenied => write!(f, "permission denied: "), + ErrorKind::NotFound => write!(f, "not found: "), + ErrorKind::Other => write!(f, "generic storage layer error: ") + } { + Ok(_) => write!(f, "{}", self.msg), + Err(err) => Err(err) + } + } +} +impl serde::Serialize for StorageError { + fn serialize(&self, serializer: S) -> std::result::Result { + serializer.serialize_str(&self.to_string()) + } +} +impl StorageError { + fn new(kind: ErrorKind, msg: &str) -> Self { + return StorageError { + msg: msg.to_string(), + source: None, + kind + } + } + pub fn kind(&self) -> ErrorKind { + self.kind + } +} + +pub type Result = std::result::Result; + +/// A storage backend for the Micropub server. +/// +/// Implementations should note that all methods listed on this trait MUST be fully atomic +/// or lock the database so that write conflicts or reading half-written data should not occur. +#[async_trait] +pub trait Storage: Clone + Send + Sync { + /// Check if a post exists in the database. + async fn post_exists(&self, url: &str) -> Result; + + /// Load a post from the database in MF2-JSON format, deserialized from JSON. + async fn get_post(&self, url: &str) -> Result>; + + /// Save a post to the database as an MF2-JSON structure. + /// + /// Note that the `post` object MUST have `post["properties"]["uid"][0]` defined. + async fn put_post<'a>(&self, post: &'a serde_json::Value) -> Result<()>; + + /*/// Save a post and add it to the relevant feeds listed in `post["properties"]["channel"]`. + /// + /// Note that the `post` object MUST have `post["properties"]["uid"][0]` defined + /// and `post["properties"]["channel"]` defined, even if it's empty. + async fn put_and_index_post<'a>(&mut self, post: &'a serde_json::Value) -> Result<()>;*/ + + /// Modify a post using an update object as defined in the Micropub spec. + /// + /// Note to implementors: the update operation MUST be atomic OR MUST lock the database + /// to prevent two clients overwriting each other's changes. + /// + /// You can assume concurrent updates will never contradict each other, since that will be dumb. + /// The last update always wins. + async fn update_post<'a>(&self, url: &'a str, update: serde_json::Value) -> Result<()>; + + /// Get a list of channels available for the user represented by the `user` object to write. + async fn get_channels(&self, user: &User) -> Result>; + + /// Fetch a feed at `url` and return a an h-feed object containing + /// `limit` posts after a post by url `after`, filtering the content + /// in context of a user specified by `user` (or an anonymous user). + /// + /// Specifically, private posts that don't include the user in the audience + /// will be elided from the feed, and the posts containing location and not + /// specifying post["properties"]["location-visibility"][0] == "public" + /// will have their location data (but not check-in data) stripped. + /// + /// This function is used as an optimization so the client, whatever it is, + /// doesn't have to fetch posts, then realize some of them are private, and + /// fetch more posts. + /// + /// Note for implementors: if you use streams to fetch posts in parallel + /// from the database, preferably make this method use a connection pool + /// to reduce overhead of creating a database connection per post for + /// parallel fetching. + async fn read_feed_with_limit<'a>(&self, url: &'a str, after: &'a Option, limit: usize, user: &'a Option) -> Result>; + + /// Deletes a post from the database irreversibly. 'nuff said. Must be idempotent. + async fn delete_post<'a>(&self, url: &'a str) -> Result<()>; +} + +#[cfg(test)] +#[derive(Clone)] +pub struct MemoryStorage { + pub mapping: Arc>>, + pub channels: Arc>>> +} + +#[cfg(test)] +#[async_trait] +impl Storage for MemoryStorage { + async fn read_feed_with_limit<'a>(&self, url: &'a str, after: &'a Option, limit: usize, user: &'a Option) -> Result> { + todo!() + } + + async fn update_post<'a>(&self, url: &'a str, update: serde_json::Value) -> Result<()> { + todo!() + } + + async fn delete_post<'a>(&self, url: &'a str) -> Result<()> { + self.mapping.write().await.remove(url); + Ok(()) + } + + async fn post_exists(&self, url: &str) -> Result { + return Ok(self.mapping.read().await.contains_key(url)) + } + + async fn get_post(&self, url: &str) ->Result> { + let mapping = self.mapping.read().await; + match mapping.get(url) { + Some(val) => { + if let Some(new_url) = val["see_other"].as_str() { + match mapping.get(new_url) { + Some(val) => Ok(Some(val.clone())), + None => { + drop(mapping); + self.mapping.write().await.remove(url); + Ok(None) + } + } + } else { + Ok(Some(val.clone())) + } + }, + _ => Ok(None) + } + } + + async fn get_channels(&self, user: &User) -> Result> { + match self.channels.read().await.get(&user.me.to_string()) { + Some(channels) => Ok(futures_util::future::join_all(channels.iter() + .map(|channel| self.get_post(channel) + .map(|result| result.unwrap()) + .map(|post: Option| { + if let Some(post) = post { + Some(MicropubChannel { + uid: post["properties"]["uid"][0].as_str().unwrap().to_string(), + name: post["properties"]["name"][0].as_str().unwrap().to_string() + }) + } else { None } + }) + ).collect::>()).await.into_iter().filter_map(|chan| chan).collect::>()), + None => Ok(vec![]) + } + + } + + async fn put_post<'a>(&self, post: &'a serde_json::Value) -> Result<()> { + let mapping = &mut self.mapping.write().await; + let key: &str; + match post["properties"]["uid"][0].as_str() { + Some(uid) => key = uid, + None => return Err(StorageError::new(ErrorKind::Other, "post doesn't have a UID")) + } + mapping.insert(key.to_string(), post.clone()); + if post["properties"]["url"].is_array() { + for url in post["properties"]["url"].as_array().unwrap().iter().map(|i| i.as_str().unwrap().to_string()) { + if &url != key { + mapping.insert(url, json!({"see_other": key})); + } + } + } + if post["type"].as_array().unwrap().iter().any(|i| i == "h-feed") { + // This is a feed. Add it to the channels array if it's not already there. + println!("{:#}", post); + self.channels.write().await.entry(post["properties"]["author"][0].as_str().unwrap().to_string()).or_insert(vec![]).push(key.to_string()) + } + Ok(()) + } +} +#[cfg(test)] +impl MemoryStorage { + pub fn new() -> Self { + Self { + mapping: Arc::new(RwLock::new(HashMap::new())), + channels: Arc::new(RwLock::new(HashMap::new())) + } + } +} + +struct RedisScripts { + edit_post: redis::Script +} + +lazy_static! { + static ref SCRIPTS: RedisScripts = RedisScripts { + edit_post: redis::Script::new(include_str!("./edit_post.lua")) + }; +} + +#[derive(Clone)] +pub struct RedisStorage { + // TODO: use mobc crate to create a connection pool and reuse connections for efficiency + redis: redis::Client, +} + +fn filter_post<'a>(mut post: serde_json::Value, user: &'a Option) -> Option { + if post["properties"]["deleted"][0].is_string() { + return Some(json!({ + "type": post["type"], + "properties": { + "deleted": post["properties"]["deleted"] + } + })); + } + let empty_vec: Vec = vec![]; + let author = post["properties"]["author"].as_array().unwrap_or(&empty_vec).iter().map(|i| i.as_str().unwrap().to_string()); + let visibility = post["properties"]["visibility"][0].as_str().unwrap_or("public"); + let mut audience = author.chain(post["properties"]["audience"].as_array().unwrap_or(&empty_vec).iter().map(|i| i.as_str().unwrap().to_string())); + if visibility == "private" { + if !audience.any(|i| Some(i) == *user) { + return None + } + } + if post["properties"]["location"].is_array() { + let location_visibility = post["properties"]["location-visibility"][0].as_str().unwrap_or("private"); + let mut author = post["properties"]["author"].as_array().unwrap_or(&empty_vec).iter().map(|i| i.as_str().unwrap().to_string()); + if location_visibility == "private" && !author.any(|i| Some(i) == *user) { + post["properties"].as_object_mut().unwrap().remove("location"); + } + } + Some(post) +} + +#[async_trait] +impl Storage for RedisStorage { + async fn delete_post<'a>(&self, url: &'a str) -> Result<()> { + match self.redis.get_async_std_connection().await { + Ok(mut conn) => if let Err(err) = conn.hdel::<&str, &str, bool>("posts", url).await { + return Err(err.into()); + }, + Err(err) => return Err(err.into()) + } + Ok(()) + } + + async fn post_exists(&self, url: &str) -> Result { + match self.redis.get_async_std_connection().await { + Ok(mut conn) => match conn.hexists::<&str, &str, bool>(&"posts", url).await { + Ok(val) => Ok(val), + Err(err) => Err(err.into()) + }, + Err(err) => Err(err.into()) + } + } + + async fn get_post(&self, url: &str) -> Result> { + match self.redis.get_async_std_connection().await { + Ok(mut conn) => match conn.hget::<&str, &str, Option>(&"posts", url).await { + Ok(val) => match val { + Some(val) => match serde_json::from_str::(&val) { + Ok(parsed) => if let Some(new_url) = parsed["see_other"].as_str() { + match conn.hget::<&str, &str, Option>(&"posts", new_url).await { + Ok(val) => match val { + Some(val) => match serde_json::from_str::(&val) { + Ok(parsed) => Ok(Some(parsed)), + Err(err) => Err(err.into()) + }, + None => Ok(None) + } + Err(err) => { + Ok(None) + } + } + } else { + Ok(Some(parsed)) + }, + Err(err) => Err(err.into()) + }, + None => Ok(None) + }, + Err(err) => Err(err.into()) + }, + Err(err) => Err(err.into()) + } + } + + async fn get_channels(&self, user: &User) -> Result> { + match self.redis.get_async_std_connection().await { + Ok(mut conn) => match conn.smembers::>("channels_".to_string() + user.me.as_str()).await { + Ok(channels) => { + Ok(futures_util::future::join_all(channels.iter() + .map(|channel| self.get_post(channel) + .map(|result| result.unwrap()) + .map(|post: Option| { + if let Some(post) = post { + Some(MicropubChannel { + uid: post["properties"]["uid"][0].as_str().unwrap().to_string(), + name: post["properties"]["name"][0].as_str().unwrap().to_string() + }) + } else { None } + }) + ).collect::>()).await.into_iter().filter_map(|chan| chan).collect::>()) + }, + Err(err) => Err(err.into()) + }, + Err(err) => Err(err.into()) + } + } + + async fn put_post<'a>(&self, post: &'a serde_json::Value) -> Result<()> { + match self.redis.get_async_std_connection().await { + Ok(mut conn) => { + let key: &str; + match post["properties"]["uid"][0].as_str() { + Some(uid) => key = uid, + None => return Err(StorageError::new(ErrorKind::Other, "post doesn't have a UID")) + } + match conn.hset::<&str, &str, String, ()>(&"posts", key, post.to_string()).await { + Err(err) => return Err(err.into()), + _ => {} + } + if post["properties"]["url"].is_array() { + for url in post["properties"]["url"].as_array().unwrap().iter().map(|i| i.as_str().unwrap().to_string()) { + if &url != key { + match conn.hset::<&str, &str, String, ()>(&"posts", &url, json!({"see_other": key}).to_string()).await { + Err(err) => return Err(err.into()), + _ => {} + } + } + } + } + if post["type"].as_array().unwrap().iter().any(|i| i == "h-feed") { + // This is a feed. Add it to the channels array if it's not already there. + match conn.sadd::("channels_".to_string() + post["properties"]["author"][0].as_str().unwrap(), key).await { + Err(err) => return Err(err.into()), + _ => {}, + } + } + Ok(()) + }, + Err(err) => Err(err.into()) + } + } + + async fn read_feed_with_limit<'a>(&self, url: &'a str, after: &'a Option, limit: usize, user: &'a Option) -> Result> { + match self.redis.get_async_std_connection().await { + Ok(mut conn) => { + let mut feed; + match conn.hget::<&str, &str, Option>(&"posts", url).await { + Ok(post) => { + match post { + Some(post) => match serde_json::from_str::(&post) { + Ok(post) => feed = post, + Err(err) => return Err(err.into()) + }, + None => return Ok(None) + } + }, + Err(err) => return Err(err.into()) + } + if feed["see_other"].is_string() { + match conn.hget::<&str, &str, Option>(&"posts", feed["see_other"].as_str().unwrap()).await { + Ok(post) => { + match post { + Some(post) => match serde_json::from_str::(&post) { + Ok(post) => feed = post, + Err(err) => return Err(err.into()) + }, + None => return Ok(None) + } + }, + Err(err) => return Err(err.into()) + } + } + if let Some(post) = filter_post(feed, user) { + feed = post + } else { + return Err(StorageError::new(ErrorKind::PermissionDenied, "specified user cannot access this post")) + } + if feed["children"].is_array() { + let children = feed["children"].as_array().unwrap(); + let posts_iter: Box + Send>; + if let Some(after) = after { + posts_iter = Box::new(children.iter().map(|i| i.as_str().unwrap().to_string()).skip_while(move |i| i != after).skip(1)); + } else { + posts_iter = Box::new(children.iter().map(|i| i.as_str().unwrap().to_string())); + } + let posts = stream::iter(posts_iter) + .map(|url| async move { + // Is it rational to use a new connection for every post fetched? + match self.redis.get_async_std_connection().await { + Ok(mut conn) => match conn.hget::<&str, &str, Option>("posts", &url).await { + Ok(post) => match post { + Some(post) => match serde_json::from_str::(&post) { + Ok(post) => Some(post), + Err(err) => { + let err = StorageError::from(err); + error!("{}", err); + panic!("{}", err) + } + }, + // Happens because of a broken link (result of an improper deletion?) + None => None, + }, + Err(err) => { + let err = StorageError::from(err); + error!("{}", err); + panic!("{}", err) + } + }, + Err(err) => { + let err = StorageError::from(err); + error!("{}", err); + panic!("{}", err) + } + } + }) + // TODO: determine the optimal value for this buffer + // It will probably depend on how often can you encounter a private post on the page + // It shouldn't be too large, or we'll start fetching too many posts from the database + // It MUST NOT be larger than the typical page size + .buffered(std::cmp::min(3, limit)) + // Hack to unwrap the Option and sieve out broken links + // Broken links return None, and Stream::filter_map skips all Nones. + .filter_map(|post: Option| async move { post }) + .filter_map(|post| async move { + return filter_post(post, user) + }) + .take(limit); + match std::panic::AssertUnwindSafe(posts.collect::>()).catch_unwind().await { + Ok(posts) => feed["children"] = json!(posts), + Err(err) => return Err(StorageError::new(ErrorKind::Other, "Unknown error encountered while assembling feed, see logs for more info")) + } + } + return Ok(Some(feed)); + } + Err(err) => Err(err.into()) + } + } + + async fn update_post<'a>(&self, mut url: &'a str, update: serde_json::Value) -> Result<()> { + match self.redis.get_async_std_connection().await { + Ok(mut conn) => { + if !conn.hexists::<&str, &str, bool>("posts", url).await.unwrap() { + return Err(StorageError::new(ErrorKind::NotFound, "can't edit a non-existent post")) + } + let post: serde_json::Value = serde_json::from_str(&conn.hget::<&str, &str, String>("posts", url).await.unwrap()).unwrap(); + if let Some(new_url) = post["see_other"].as_str() { + url = new_url + } + if let Err(err) = SCRIPTS.edit_post.key("posts").arg(url).arg(update.to_string()).invoke_async::<_, ()>(&mut conn).await { + return Err(err.into()) + } + }, + Err(err) => return Err(err.into()) + } + Ok(()) + } +} + + +impl RedisStorage { + /// Create a new RedisDatabase that will connect to Redis at `redis_uri` to store data. + pub async fn new(redis_uri: String) -> Result { + match redis::Client::open(redis_uri) { + Ok(client) => Ok(Self { redis: client }), + Err(e) => Err(e.into()) + } + } +} + +#[cfg(test)] +mod tests { + use super::{Storage, MicropubChannel}; + use serde_json::json; + + async fn test_backend_basic_operations(backend: Backend) { + let post: serde_json::Value = json!({ + "type": ["h-entry"], + "properties": { + "content": ["Test content"], + "author": ["https://fireburn.ru/"], + "uid": ["https://fireburn.ru/posts/hello"], + "url": ["https://fireburn.ru/posts/hello", "https://fireburn.ru/posts/test"] + } + }); + let key = post["properties"]["uid"][0].as_str().unwrap().to_string(); + let alt_url = post["properties"]["url"][1].as_str().unwrap().to_string(); + + // Reading and writing + backend.put_post(&post).await.unwrap(); + if let Ok(Some(returned_post)) = backend.get_post(&key).await { + assert!(returned_post.is_object()); + assert_eq!(returned_post["type"].as_array().unwrap().len(), post["type"].as_array().unwrap().len()); + assert_eq!(returned_post["type"].as_array().unwrap(), post["type"].as_array().unwrap()); + let props: &serde_json::Map = post["properties"].as_object().unwrap(); + for key in props.keys() { + assert_eq!(returned_post["properties"][key].as_array().unwrap(), post["properties"][key].as_array().unwrap()) + } + } else { panic!("For some reason the backend did not return the post.") } + // Check the alternative URL - it should return the same post + if let Ok(Some(returned_post)) = backend.get_post(&alt_url).await { + assert!(returned_post.is_object()); + assert_eq!(returned_post["type"].as_array().unwrap().len(), post["type"].as_array().unwrap().len()); + assert_eq!(returned_post["type"].as_array().unwrap(), post["type"].as_array().unwrap()); + let props: &serde_json::Map = post["properties"].as_object().unwrap(); + for key in props.keys() { + assert_eq!(returned_post["properties"][key].as_array().unwrap(), post["properties"][key].as_array().unwrap()) + } + } else { panic!("For some reason the backend did not return the post.") } + } + + async fn test_backend_channel_support(backend: Backend) { + let feed = json!({ + "type": ["h-feed"], + "properties": { + "name": ["Main Page"], + "author": ["https://fireburn.ru/"], + "uid": ["https://fireburn.ru/feeds/main"] + }, + "children": [] + }); + let key = feed["properties"]["uid"][0].as_str().unwrap().to_string(); + backend.put_post(&feed).await.unwrap(); + let chans = backend.get_channels(&crate::indieauth::User::new("https://fireburn.ru/", "https://quill.p3k.io/", "create update media")).await.unwrap(); + assert_eq!(chans.len(), 1); + assert_eq!(chans[0], MicropubChannel { uid: "https://fireburn.ru/feeds/main".to_string(), name: "Main Page".to_string() }); + } + + #[async_std::test] + async fn test_memory_storage_basic_operations() { + let backend = super::MemoryStorage::new(); + test_backend_basic_operations(backend).await + } + #[async_std::test] + async fn test_memory_storage_channel_support() { + let backend = super::MemoryStorage::new(); + test_backend_channel_support(backend).await + } + + #[async_std::test] + #[ignore] + async fn test_redis_storage_basic_operations() { + let backend = super::RedisStorage::new(std::env::var("REDIS_URI").unwrap_or("redis://127.0.0.1:6379/0".to_string())).await.unwrap(); + redis::cmd("FLUSHDB").query_async::<_, ()>(&mut backend.redis.get_async_std_connection().await.unwrap()).await.unwrap(); + test_backend_basic_operations(backend).await + } + #[async_std::test] + #[ignore] + async fn test_redis_storage_channel_support() { + let backend = super::RedisStorage::new(std::env::var("REDIS_URI").unwrap_or("redis://127.0.0.1:6379/1".to_string())).await.unwrap(); + redis::cmd("FLUSHDB").query_async::<_, ()>(&mut backend.redis.get_async_std_connection().await.unwrap()).await.unwrap(); + test_backend_channel_support(backend).await + } +} diff --git a/src/edit_post.lua b/src/edit_post.lua new file mode 100644 index 0000000..a398f8d --- /dev/null +++ b/src/edit_post.lua @@ -0,0 +1,93 @@ +local posts = KEYS[1] +local update_desc = cjson.decode(ARGV[2]) +local post = cjson.decode(redis.call("HGET", posts, ARGV[1])) + +local delete_keys = {} +local delete_kvs = {} +local add_keys = {} + +if update_desc.replace ~= nil then + for k, v in pairs(update_desc.replace) do + table.insert(delete_keys, k) + add_keys[k] = v + end +end +if update_desc.delete ~= nil then + if update_desc.delete[0] == nil then + -- Table has string keys. Probably! + for k, v in pairs(update_desc.delete) do + delete_kvs[k] = v + end + else + -- Table has numeric keys. Probably! + for i, v in ipairs(update_desc.delete) do + table.insert(delete_keys, v) + end + end +end +if update_desc.add ~= nil then + for k, v in pairs(update_desc.add) do + add_keys[k] = v + end +end + +for i, v in ipairs(delete_keys) do + post["properties"][v] = nil + -- TODO delete URL links +end + +for k, v in pairs(delete_kvs) do + local index = -1 + if k == "children" then + for j, w in ipairs(post[k]) do + if w == v then + index = j + break + end + end + if index > -1 then + table.remove(post[k], index) + end + else + for j, w in ipairs(post["properties"][k]) do + if w == v then + index = j + break + end + end + if index > -1 then + table.remove(post["properties"][k], index) + -- TODO delete URL links + end + end +end + +for k, v in pairs(add_keys) do + if k == "children" then + if post["children"] == nil then + post["children"] = {} + end + for i, w in ipairs(v) do + table.insert(post["children"], 1, w) + end + else + if post["properties"][k] == nil then + post["properties"][k] = {} + end + for i, w in ipairs(v) do + table.insert(post["properties"][k], w) + end + if k == "url" then + redis.call("HSET", posts, v, cjson.encode({ see_other = post["properties"]["uid"][1] })) + elseif k == "channel" then + local feed = cjson.decode(redis.call("HGET", posts, v)) + table.insert(feed["children"], 1, post["properties"]["uid"][1]) + redis.call("HSET", posts, v, cjson.encode(feed)) + end + end +end + +local encoded = cjson.encode(post) +redis.call("SET", "debug", encoded) +redis.call("HSET", posts, post["properties"]["uid"][1], encoded) +return \ No newline at end of file diff --git a/src/index.html b/src/index.html new file mode 100644 index 0000000..15ccfc0 --- /dev/null +++ b/src/index.html @@ -0,0 +1,172 @@ + + + + Kittybox-Micropub debug client + + + + +

Kittybox-Micropub debug client

+ +
+

+ In a pinch? Lost your Micropub client, but need to make a quick announcement? + Worry not, the debug client has your back. I just hope you have a spare Micropub token stored somewhere like I do... +

+ +
+
+ Authorization details +
+ + + +

Get an access token (will open in a new tab)

+
+
+
+ Post details: +
+ + +
+
+ + +
+
+ + +
+
+ Channels +
+ + +
+ +
+ + +
+ + +
+
+ +
+
+ + \ No newline at end of file diff --git a/src/indieauth.rs b/src/indieauth.rs new file mode 100644 index 0000000..8d41577 --- /dev/null +++ b/src/indieauth.rs @@ -0,0 +1,116 @@ +use log::{error,info}; +use std::future::Future; +use std::pin::Pin; +use url::Url; +use tide::prelude::*; +use tide::{Request, Response, Next, Result}; + +use crate::database; +use crate::ApplicationState; + +#[derive(Deserialize, Serialize, Debug, PartialEq)] +pub struct User { + pub me: Url, + pub client_id: Url, + scope: String +} + +impl User { + pub fn check_scope(&self, scope: &str) -> bool { + self.scopes().any(|i| i == scope) + } + pub fn scopes(&self) -> std::str::SplitAsciiWhitespace<'_> { + self.scope.split_ascii_whitespace() + } + #[cfg(test)] + pub fn new(me: &str, client_id: &str, scope: &str) -> Self { + Self { + me: Url::parse(me).unwrap(), + client_id: Url::parse(client_id).unwrap(), + scope: scope.to_string() + } + } +} + +async fn get_token_data(token: String, token_endpoint: &http_types::Url, http_client: &surf::Client) -> (http_types::StatusCode, Option) { + match http_client.get(token_endpoint).header("Authorization", token).header("Accept", "application/json").send().await { + Ok(mut resp) => { + if resp.status() == 200 { + match resp.body_json::().await { + Ok(user) => { + info!("Token endpoint request successful. Validated user: {}", user.me); + (resp.status(), Some(user)) + }, + Err(err) => { + error!("Token endpoint parsing error (HTTP status {}): {}", resp.status(), err); + (http_types::StatusCode::InternalServerError, None) + } + } + } else { + error!("Token endpoint returned non-200: {}", resp.status()); + (resp.status(), None) + } + } + Err(err) => { + error!("Token endpoint connection error: {}", err); + (http_types::StatusCode::InternalServerError, None) + } + } +} + +// TODO: Figure out how to cache these authorization values - they can potentially take a lot of processing time +pub fn check_auth<'a, Backend>(mut req: Request>, next: Next<'a, ApplicationState>) -> Pin + Send + 'a>> +where + Backend: database::Storage + Send + Sync + Clone +{ + Box::pin(async { + let header = req.header("Authorization"); + match header { + None => { + Ok(Response::builder(401).body(json!({ + "error": "unauthorized", + "error_description": "Please provide an access token." + })).build()) + }, + Some(value) => { + // TODO check the token + let endpoint = &req.state().token_endpoint; + let http_client = &req.state().http_client; + match get_token_data(value.last().to_string(), endpoint, http_client).await { + (http_types::StatusCode::Ok, Some(user)) => { + req.set_ext(user); + Ok(next.run(req).await) + }, + (http_types::StatusCode::InternalServerError, None) => { + Ok(Response::builder(500).body(json!({ + "error": "token_endpoint_fail", + "error_description": "Token endpoint made a boo-boo and refused to answer." + })).build()) + }, + (_, None) => { + Ok(Response::builder(401).body(json!({ + "error": "unauthorized", + "error_description": "The token endpoint refused to accept your token." + })).build()) + }, + (_, Some(_)) => { + // This shouldn't happen. + panic!("The token validation function has caught rabies and returns malformed responses. Aborting."); + } + } + } + } + }) +} + +#[cfg(test)] +mod tests { + use super::*; + #[test] + fn user_scopes_are_checkable() { + let user = User::new("https://fireburn.ru/", "https://quill.p3k.io/", "create update media"); + + assert!(user.check_scope("create")); + assert!(!user.check_scope("delete")); + } +} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..459ad23 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,276 @@ +#[cfg(debug_assertions)] +use log::info; +#[cfg(debug_assertions)] +use serde::Deserialize; +use tide::{Request, Response}; + +mod database; +mod indieauth; +mod micropub; + +use crate::indieauth::check_auth; +use crate::micropub::{get_handler,post_handler}; + +#[derive(Clone)] +pub struct ApplicationState +where + StorageBackend: database::Storage + Send + Sync + 'static +{ + token_endpoint: surf::Url, + media_endpoint: Option, + http_client: surf::Client, + storage: StorageBackend +} + +type App = tide::Server>; + +static INDEX_PAGE: &[u8] = include_bytes!("./index.html"); + +#[cfg(debug_assertions)] +#[derive(Deserialize)] +struct Mf2JsonQuery { + url: String, + limit: usize, + user: Option, + after: Option +} + +fn equip_app(mut app: App) -> App +where + Storage: database::Storage + Send + Sync + Clone +{ + app.at("/").get(|_: Request<_>| async move { + Ok(Response::builder(200).body(INDEX_PAGE).content_type("text/html").build()) + }); + app.at("/micropub").with(check_auth).get(get_handler).post(post_handler); + #[cfg(debug_assertions)] + info!("Outfitting app with the debug function"); + #[cfg(debug_assertions)] + app.at("/mf2-json").get(|req: Request>| async move { + info!("DEBUG FUNCTION: Reading MF2-JSON"); + let backend = &req.state().storage; + let query = req.query::()?; + match backend.read_feed_with_limit(&query.url, &query.after, query.limit, &query.user).await { + Ok(result) => match result { + Some(post) => Ok(Response::builder(200).body(post).build()), + None => Ok(Response::builder(404).build()) + }, + Err(err) => match err.kind() { + database::ErrorKind::PermissionDenied => { + if let Some(_) = query.user { + Ok(Response::builder(403).build()) + } else { + Ok(Response::builder(401).build()) + } + } + _ => Ok(Response::builder(500).body(serde_json::json!({"error": "database_error", "error_description": format!("{}", err)})).build()) + } + } + }); + + return app +} + +pub async fn get_app_with_redis(token_endpoint: surf::Url, redis_uri: String, media_endpoint: Option) -> App { + let app = tide::with_state(ApplicationState { + token_endpoint, media_endpoint, + storage: database::RedisStorage::new(redis_uri).await.unwrap(), + http_client: surf::Client::new(), + }); + + equip_app(app) +} + +#[cfg(test)] +pub async fn get_app_with_memory_for_testing(token_endpoint: surf::Url) -> (database::MemoryStorage, App) { + let database = database::MemoryStorage::new(); + let app = tide::with_state(ApplicationState { + token_endpoint, media_endpoint: None, + storage: database.clone(), + http_client: surf::Client::new(), + }); + + return (database, equip_app(app)) +} + +#[cfg(test)] +#[allow(unused_variables,unused_imports)] +mod tests { + use super::*; + use serde_json::json; + use tide_testing::TideTestingExt; + use crate::database::Storage; + use mockito::mock; + + async fn create_app() -> (database::MemoryStorage, App) { + get_app_with_memory_for_testing(surf::Url::parse(&*mockito::server_url()).unwrap()).await + } + #[async_std::test] + async fn test_no_posting_to_others_websites() { + let _m = mock("GET", "/") + .with_status(200) + .with_header("Content-Type", "application/json") + .with_body(r#"{"me": "https://fireburn.ru", "client_id": "https://quill.p3k.io/", "scope": "create update media"}"#) + .create(); + + let (db, app) = create_app().await; + + let request: surf::RequestBuilder = app.post("/micropub") + .header("Authorization", "Bearer test") + .header("Content-Type", "application/json") + .body(json!({ + "type": ["h-entry"], + "properties": { + "content": ["Fake news about Aaron Parecki!"], + "uid": ["https://aaronparecki.com/posts/fake-news"] + } + })); + let response = request.send().await.unwrap(); + assert_eq!(response.status(), 403); + + let request: surf::RequestBuilder = app.post("/micropub") + .header("Authorization", "Bearer test") + .header("Content-Type", "application/json") + .body(json!({ + "type": ["h-entry"], + "properties": { + "content": ["More fake news about Aaron Parecki!"], + "url": ["https://aaronparecki.com/posts/more-fake-news"] + } + })); + let response = request.send().await.unwrap(); + assert_eq!(response.status(), 403); + + let request: surf::RequestBuilder = app.post("/micropub") + .header("Authorization", "Bearer test") + .header("Content-Type", "application/json") + .body(json!({ + "type": ["h-entry"], + "properties": { + "content": ["Sneaky advertisement designed to creep into someone else's feed! Buy whatever I'm promoting!"], + "channel": ["https://aaronparecki.com/feeds/main"] + } + })); + let response = request.send().await.unwrap(); + assert_eq!(response.status(), 403); + } + + #[async_std::test] + async fn test_successful_authorization() { + let _m = mock("GET", "/") + .with_status(200) + .with_header("Content-Type", "application/json") + .with_body(r#"{"me": "https://fireburn.ru", "client_id": "https://quill.p3k.io/", "scope": "create update media"}"#) + .create(); + + let (db, app) = create_app().await; + + let response: serde_json::Value = app.get("/micropub?q=config") + .header("Authorization", "test") + .recv_json().await.unwrap(); + assert!(!response["q"].as_array().unwrap().is_empty()); + } + + #[async_std::test] + async fn test_unsuccessful_authorization() { + let _m = mock("GET", "/") + .with_status(400) + .with_header("Content-Type", "application/json") + .with_body(r#"{"error":"unauthorized","error_description":"A valid access token is required."}"#) + .create(); + + let (db, app) = create_app().await; + + let response: surf::Response = app.get("/micropub?q=config") + .header("Authorization", "test") + .send().await.unwrap(); + assert_eq!(response.status(), 401); + } + + #[async_std::test] + async fn test_no_auth_header() { + let (db, app) = create_app().await; + + let request: surf::RequestBuilder = app.get("/micropub?q=config"); + let response: surf::Response = request.send().await.unwrap(); + assert_eq!(response.status(), 401); + } + + #[async_std::test] + async fn test_create_post_form_encoded() { + let _m = mock("GET", "/") + .with_status(200) + .with_header("Content-Type", "application/json") + .with_body(r#"{"me": "https://fireburn.ru", "client_id": "https://quill.p3k.io/", "scope": "create update media"}"#) + .create(); + + let (storage, app) = create_app().await; + + let request: surf::RequestBuilder = app.post("/micropub") + .header("Authorization", "Bearer test") + .header("Content-Type", "application/x-www-form-urlencoded") + .body("h=entry&content=something%20interesting&category[]=test&category[]=stuff"); + let mut response: surf::Response = request.send().await.unwrap(); + println!("{:#}", response.body_json::().await.unwrap()); + assert!(response.status() == 201 || response.status() == 202); + let uid = response.header("Location").unwrap().last().to_string(); + // Assume the post is in the database at this point. + let post = storage.get_post(&uid).await.unwrap().unwrap(); + assert_eq!(post["properties"]["content"][0]["html"].as_str().unwrap().trim(), "

something interesting

"); + } + + #[async_std::test] + async fn test_create_post_json() { + let _m = mock("GET", "/") + .with_status(200) + .with_header("Content-Type", "application/json") + .with_body(r#"{"me": "https://fireburn.ru", "client_id": "https://quill.p3k.io/", "scope": "create update media"}"#) + .create(); + + let (storage, app) = create_app().await; + + let request: surf::RequestBuilder = app.post("/micropub") + .header("Authorization", "Bearer test") + .header("Content-Type", "application/json") + .body(json!({ + "type": ["h-entry"], + "properties": { + "content": ["This is content!"] + } + })); + let mut response: surf::Response = request.send().await.unwrap(); + println!("{:#}", response.body_json::().await.unwrap()); + assert!(response.status() == 201 || response.status() == 202); + let uid = response.header("Location").unwrap().last().to_string(); + // Assume the post is in the database at this point. + let post = storage.get_post(&uid).await.unwrap().unwrap(); + assert_eq!(post["properties"]["content"][0]["html"].as_str().unwrap().trim(), "

This is content!

"); + let feed = storage.get_post("https://fireburn.ru/feeds/main").await.unwrap().unwrap(); + assert_eq!(feed["children"].as_array().unwrap().len(), 1); + assert_eq!(feed["children"][0].as_str().unwrap(), uid); + + let request: surf::RequestBuilder = app.post("/micropub") + .header("Authorization", "Bearer test") + .header("Content-Type", "application/json") + .body(json!({ + "type": ["h-entry"], + "properties": { + "content": ["#moar content for you!"] + } + })); + + let first_uid = uid; + + let mut response: surf::Response = request.send().await.unwrap(); + println!("{:#}", response.body_json::().await.unwrap()); + assert!(response.status() == 201 || response.status() == 202); + let uid = response.header("Location").unwrap().last().to_string(); + // Assume the post is in the database at this point. + println!("Keys in database: {:?}", storage.mapping.read().await.keys()); + let new_feed = storage.get_post("https://fireburn.ru/feeds/main").await.unwrap().unwrap(); + println!("{}", new_feed["children"]); + assert_eq!(new_feed["children"].as_array().unwrap().len(), 2); + assert_eq!(new_feed["children"][0].as_str().unwrap(), uid); + assert_eq!(new_feed["children"][1].as_str().unwrap(), first_uid); + } +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..3d0831e --- /dev/null +++ b/src/main.rs @@ -0,0 +1,48 @@ +use std::env; +use log::{error,info,debug}; +use env_logger; +use surf::Url; +use kittybox_micropub as micropub; + +#[async_std::main] +async fn main() -> Result<(), std::io::Error> { + // TODO json logging in the future? + let logger_env = env_logger::Env::new().filter_or("RUST_LOG", "info"); + env_logger::init_from_env(logger_env); + + info!("Starting the Micropub server..."); + + let redis_uri: String; + match env::var("REDIS_URI") { + Ok(val) => { + debug!("Redis connection: {}", val); + redis_uri = val + }, + Err(_) => { + error!("REDIS_URI is not set, cannot find a database"); + std::process::exit(1); + } + }; + let token_endpoint: Url; + match env::var("TOKEN_ENDPOINT") { + Ok(val) => { + debug!("Token endpoint: {}", val); + match Url::parse(&val) { + Ok(val) => token_endpoint = val, + _ => { + error!("Token endpoint URL cannot be parsed, aborting."); + std::process::exit(1) + } + } + } + Err(_) => { + error!("TOKEN_ENDPOINT is not set, will not be able to authorize users!"); + std::process::exit(1) + } + } + let media_endpoint: Option = env::var("MEDIA_ENDPOINT").ok(); + + let host = env::var("SERVE_AT").ok().unwrap_or("0.0.0.0:8080".to_string()); + let app = micropub::get_app_with_redis(token_endpoint, redis_uri, media_endpoint).await; + app.listen(host).await +} \ No newline at end of file diff --git a/src/micropub/get.rs b/src/micropub/get.rs new file mode 100644 index 0000000..9a12316 --- /dev/null +++ b/src/micropub/get.rs @@ -0,0 +1,86 @@ +use tide::prelude::{Deserialize, json}; +use tide::{Request, Response, Result}; +use crate::ApplicationState; +use crate::database::{MicropubChannel,Storage}; +use crate::indieauth::User; + +#[derive(Deserialize)] +struct QueryOptions { + q: String, + url: Option +} + +pub async fn get_handler(req: Request>) -> Result +where + Backend: Storage + Send + Sync +{ + let user = req.ext::().unwrap(); + let backend = &req.state().storage; + let media_endpoint = &req.state().media_endpoint; + let query = req.query::().unwrap_or(QueryOptions { q: "".to_string(), url: None }); + match &*query.q { + "config" => { + let channels: Vec; + match backend.get_channels(&user).await { + Ok(chans) => channels = chans, + Err(err) => return Ok(Response::builder(500).body(json!({ + "error": "database_error", + "error_description": format!("Couldn't fetch channel list from the database: {:?}", err) + })).build()) + } + Ok(Response::builder(200).body(json!({ + "q": ["source", "config", "channel"], + "channels": channels, + "media-endpoint": media_endpoint + })).build()) + }, + "channel" => { + let channels: Vec; + match backend.get_channels(&user).await { + Ok(chans) => channels = chans, + Err(err) => return Ok(Response::builder(500).body(json!({ + "error": "database_error", + "error_description": format!("Couldn't fetch channel list from the database: {:?}", err) + })).build()) + } + return Ok(Response::builder(200).body(json!(channels)).build()) + } + "source" => { + if user.check_scope("create") || user.check_scope("update") || user.check_scope("delete") || user.check_scope("undelete") { + if let Some(url) = query.url { + match backend.get_post(&url).await { + Ok(post) => if let Some(post) = post { + return Ok(Response::builder(200).body(post).build()) + } else { + return Ok(Response::builder(404).build()) + }, + Err(err) => return Ok(Response::builder(500).body(json!({ + "error": "database_error", + "error_description": err + })).build()) + } + } else { + return Ok(Response::builder(400).body(json!({ + "error": "invalid_request", + "error_description": "Please provide `url`." + })).build()) + } + } else { + Ok(Response::builder(401).body(json!({ + "error": "insufficient_scope", + "error_description": "You don't have the required scopes to proceed.", + "scope": "update" + })).build()) + } + }, + // Errors + "" => Ok(Response::builder(400).body(json!({ + "error": "invalid_request", + "error_description": "No ?q= parameter specified. Try ?q=config maybe?" + })).build()), + _ => Ok(Response::builder(400).body(json!({ + "error": "invalid_request", + "error_description": "Unsupported ?q= query. Try ?q=config and see the q array for supported values." + })).build()) + } +} diff --git a/src/micropub/mod.rs b/src/micropub/mod.rs new file mode 100644 index 0000000..ec5cd87 --- /dev/null +++ b/src/micropub/mod.rs @@ -0,0 +1,5 @@ +mod get; +mod post; + +pub use get::get_handler; +pub use post::post_handler; \ No newline at end of file diff --git a/src/micropub/post.rs b/src/micropub/post.rs new file mode 100644 index 0000000..38b205b --- /dev/null +++ b/src/micropub/post.rs @@ -0,0 +1,433 @@ +use core::iter::Iterator; +use std::str::FromStr; +use std::convert::TryInto; +use async_std::sync::RwLockUpgradableReadGuard; +use chrono::prelude::*; +use http_types::Mime; +use tide::prelude::json; +use tide::{Request, Response, Result}; +use newbase60::num_to_sxg; +use crate::ApplicationState; +use crate::database::{Storage}; +use crate::indieauth::User; + +static DEFAULT_CHANNEL_PATH: &str = "/feeds/main"; +static DEFAULT_CHANNEL_NAME: &str = "Main feed"; + +macro_rules! response { + ($($code:expr, $json:tt)+) => { + $( + Ok(Response::builder($code).body(json!($json)).build()) + )+ + }; +} + +macro_rules! error_json { + ($($code:expr, $error:expr, $error_desc:expr)+) => { + $( + response!($code, { + "error": $error, + "error_description": $error_desc + }) + )+ + } +} + +fn get_folder_from_type(post_type: &str) -> String { + (match post_type { + "h-feed" => "feeds/", + "h-event" => "events/", + _ => "posts/" + }).to_string() +} + +fn normalize_mf2<'a>(mut body: serde_json::Value, user: &User) -> (String, serde_json::Value) { + // Normalize the MF2 object here. + let me = &user.me; + let published: DateTime; + let folder = get_folder_from_type(body["type"][0].as_str().unwrap()); + if let Some(dt) = body["properties"]["published"][0].as_str() { + // Check if the datetime is parsable. + match DateTime::parse_from_rfc3339(dt) { + Ok(dt) => { + published = dt; + } + Err(_) => { + // Reset the datetime to a proper datetime. + // Do not attempt to recover the information. + // Do not pass GO. Do not collect $200. + let curtime: DateTime = Local::now(); + body["properties"]["published"] = serde_json::Value::Array(vec![ + serde_json::Value::String(curtime.to_rfc3339()) + ]); + published = chrono::DateTime::from(curtime); + } + } + } else { + // Set the datetime. + let curtime: DateTime = Local::now(); + body["properties"]["published"] = serde_json::Value::Array(vec![ + serde_json::Value::String(curtime.to_rfc3339()) + ]); + published = chrono::DateTime::from(curtime); + } + match body["properties"]["uid"][0].as_str() { + None => { + let uid = serde_json::Value::String( + me.join( + &(folder.clone() + &num_to_sxg(published.timestamp_millis().try_into().unwrap())) + ).unwrap().to_string()); + body["properties"]["uid"] = serde_json::Value::Array(vec![uid.clone()]); + match body["properties"]["url"].as_array_mut() { + Some(array) => { + array.push(uid) + } + None => { + body["properties"]["url"] = body["properties"]["uid"].clone() + } + } + } + Some(uid_str) => { + let uid = uid_str.to_string(); + match body["properties"]["url"].as_array_mut() { + Some(array) => { + if !array.iter().any(|i| i.as_str().unwrap_or("") == uid) { + array.push(serde_json::Value::String(uid.to_string())) + } + } + None => { + body["properties"]["url"] = body["properties"]["uid"].clone() + } + } + } + } + if let Some(slugs) = body["properties"]["mp-slug"].as_array() { + let new_urls = slugs.iter() + .map(|i| i.as_str().unwrap_or("")) + .filter(|i| i != &"") + .map(|i| me.join(&((&folder).clone() + i)).unwrap().to_string()) + .collect::>(); + let urls = body["properties"]["url"].as_array_mut().unwrap(); + new_urls.iter().for_each(|i| urls.push(json!(i))); + } + let props = body["properties"].as_object_mut().unwrap(); + props.remove("mp-slug"); + + if body["properties"]["content"][0].is_string() { + // Convert the content to HTML using the `markdown` crate + body["properties"]["content"] = json!([{ + "html": markdown::to_html(body["properties"]["content"][0].as_str().unwrap()), + "value": body["properties"]["content"][0] + }]) + } + if body["properties"]["channel"][0].as_str().is_none() && body["type"][0] != "h-feed" { + // Set the channel to the main channel... + let default_channel = me.join("/feeds/main").unwrap().to_string(); + + body["properties"]["channel"] = json!([default_channel]); + } + body["properties"]["posted-with"] = json!([user.client_id]); + if let None = body["properties"]["author"][0].as_str() { + body["properties"]["author"] = json!([me.as_str()]) + } + // TODO: maybe highlight #hashtags? + // Find other processing to do and insert it here + return (body["properties"]["uid"][0].as_str().unwrap().to_string(), body) +} + +async fn new_post(req: Request>, body: serde_json::Value) -> Result { + // First, check for rights. + let user = req.ext::().unwrap(); + if !user.check_scope("create") { + return error_json!(401, "invalid_scope", "Not enough privileges to post. Try a token with a \"create\" scope instead.") + } + let (uid, post) = normalize_mf2(body, user); + + // Security check! + // This software might also be used in a multi-user setting + // where several users or identities share one Micropub server + // (maybe a family website or a shitpost sideblog?) + if post["properties"]["url"].as_array().unwrap().iter().any(|url| !url.as_str().unwrap().starts_with(user.me.as_str())) + || !post["properties"]["uid"][0].as_str().unwrap().starts_with(user.me.as_str()) + || post["properties"]["channel"].as_array().unwrap().iter().any(|url| !url.as_str().unwrap().starts_with(user.me.as_str())) + { + return error_json!(403, "forbidden", "You're trying to post to someone else's website...") + } + + let storage = &req.state().storage; + match storage.post_exists(&uid).await { + Ok(exists) => if exists { + return error_json!(409, "already_exists", format!("A post with the exact same UID already exists in the database: {}", uid)) + }, + Err(err) => return error_json!(500, "database_error", err) + } + // WARNING: WRITE BOUNDARY + //let mut storage = RwLockUpgradableReadGuard::upgrade(storage).await; + if let Err(err) = storage.put_post(&post).await { + return error_json!(500, "database_error", format!("{}", err)) + } + for channel in post["properties"]["channel"] + .as_array().unwrap().iter() + .map(|i| i.as_str().unwrap_or("").to_string()) + .filter(|i| i != "") + .collect::>() + { + let default_channel = user.me.join(DEFAULT_CHANNEL_PATH).unwrap().to_string(); + match storage.post_exists(&channel).await { + Ok(exists) => if exists { + if let Err(err) = storage.update_post(&channel, json!({ + "add": { + "children": [uid] + } + })).await { + return error_json!(500, "database_error", format!("Couldn't insert post into the channel due to a database error: {}", err)) + } + } else if channel == default_channel { + let (_, feed) = normalize_mf2(json!({ + "type": ["h-feed"], + "properties": { + "name": [DEFAULT_CHANNEL_NAME], + "mp-slug": ["main"], + }, + "children": [uid] + }), &user); + if let Err(err) = storage.put_post(&feed).await { + return error_json!(500, "database_error", format!("Couldn't save feed: {}", err)) + } + }, + Err(err) => return error_json!(500, "database_error", err) + } + } + // END WRITE BOUNDARY + //drop(storage); + // TODO: Post-processing the post (aka second write pass) + // - [ ] Send webmentions + // - [ ] Download rich reply contexts + // - [ ] Send WebSub notifications to the hub (if we happen to have one) + // - [ ] Syndicate the post if requested, add links to the syndicated copies + + return Ok(Response::builder(202) + .header("Location", &uid) + .body(json!({"status": "accepted", "location": &uid})) + .build()); +} + +async fn process_json(req: Request>, body: serde_json::Value) -> Result { + let is_action = body["action"].is_string() && body["url"].is_string(); + if is_action { + // This could be an update, a deletion or an undeletion request. + // Process it separately. + let action = body["action"].as_str().unwrap(); + let url = body["url"].as_str().unwrap(); + let user = req.ext::().unwrap(); + match action { + "delete" => { + if !user.check_scope("delete") { + return error_json!(401, "insufficient_scope", "You need a `delete` scope to delete posts.") + } + if let Err(error) = req.state().storage.delete_post(&url).await { + return error_json!(500, "database_error", error) + } + return Ok(Response::builder(200).build()); + }, + "update" => { + if !user.check_scope("update") { + return error_json!(401, "insufficient_scope", "You need an `update` scope to update posts.") + } + if let Err(error) = req.state().storage.update_post(&url, body.clone()).await { + return error_json!(500, "database_error", error) + } else { + return Ok(Response::builder(204).build()) + } + }, + _ => { + return error_json!(400, "invalid_request", "This action is not supported.") + } + } + } else if let Some(_) = body["type"][0].as_str() { + // This is definitely an h-entry or something similar. Check if it has properties? + if let Some(_) = body["properties"].as_object() { + // Ok, this is definitely a new h-entry. Let's save it. + return new_post(req, body).await + } else { + return error_json!(400, "invalid_request", "This MF2-JSON object has a type, but not properties. This makes no sense to post.") + } + } else { + return error_json!(400, "invalid_request", "Try sending MF2-structured data or an object with an \"action\" and \"url\" keys.") + } +} + +fn convert_form_to_mf2_json(form: Vec<(String, String)>) -> serde_json::Value { + let mut mf2 = json!({"type": [], "properties": {}}); + for (k, v) in form { + if k == "h" { + mf2["type"].as_array_mut().unwrap().push(json!("h-".to_string() + &v)); + } else if k != "access_token" { + let key = k.strip_suffix("[]").unwrap_or(&k); + match mf2["properties"][key].as_array_mut() { + Some(prop) => prop.push(json!(v)), + None => mf2["properties"][key] = json!([v]) + } + } + } + if mf2["type"].as_array().unwrap().len() == 0 { + mf2["type"].as_array_mut().unwrap().push(json!("h-entry")); + } + return mf2 +} + +async fn process_form(req: Request>, form: Vec<(String, String)>) -> Result { + if let Some((_, v)) = form.iter().find(|(k, _)| k == "action") { + if v == "delete" { + let user = req.ext::().unwrap(); + if !user.check_scope("delete") { + return error_json!(401, "insufficient_scope", "You cannot delete posts without a `delete` scope.") + } + match form.iter().find(|(k, _)| k == "url") { + Some((_, url)) => { + if let Err(error) = req.state().storage.delete_post(&url).await { + return error_json!(500, "database_error", error) + } + return Ok(Response::builder(200).build()) + }, + None => return error_json!(400, "invalid_request", "Please provide an `url` to delete.") + } + } else { + return error_json!(400, "invalid_request", "This action is not supported in form-encoded mode. (JSON requests support more actions, use them!)") + } + } + + let mf2 = convert_form_to_mf2_json(form); + + if mf2["properties"].as_object().unwrap().keys().len() > 0 { + return new_post(req, mf2).await; + } + return error_json!(400, "invalid_request", "Try sending h=entry&content=something%20interesting"); +} + +pub async fn post_handler(mut req: Request>) -> Result { + match req.content_type() { + Some(value) => { + if value == Mime::from_str("application/json").unwrap() { + match req.body_json::().await { + Ok(parsed) => { + return process_json(req, parsed).await + }, + Err(err) => return error_json!( + 400, "invalid_request", + format!("Parsing JSON failed: {:?}", err) + ) + } + } else if value == Mime::from_str("application/x-www-form-urlencoded").unwrap() { + match req.body_form::>().await { + Ok(parsed) => { + return process_form(req, parsed).await + }, + Err(err) => return error_json!( + 400, "invalid_request", + format!("Parsing form failed: {:?}", err) + ) + } + } else { + return error_json!( + 415, "unsupported_media_type", + "What's this? Try sending JSON instead. (urlencoded form also works but is less cute)" + ) + } + } + _ => { + return error_json!( + 415, "unsupported_media_type", + "You didn't send a Content-Type header, so we don't know how to parse your request." + ); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_form_to_mf2() { + use serde_urlencoded::from_str; + + assert_eq!( + convert_form_to_mf2_json(from_str("h=entry&content=something%20interesting").unwrap()), + json!({ + "type": ["h-entry"], + "properties": { + "content": ["something interesting"] + } + }) + ) + } + + #[test] + fn test_normalize_mf2() { + let mf2 = json!({ + "type": ["h-entry"], + "properties": { + "content": ["This is content!"] + } + }); + + let (uid, post) = normalize_mf2(mf2, &User::new("https://fireburn.ru/", "https://quill.p3k.io/", "create update media")); + assert!(post["properties"]["published"].as_array().unwrap().len() > 0); + DateTime::parse_from_rfc3339(post["properties"]["published"][0].as_str().unwrap()).unwrap(); + assert!(post["properties"]["url"].as_array().unwrap().len() > 0); + assert!(post["properties"]["uid"].as_array().unwrap().len() > 0); + assert_eq!(post["properties"]["uid"][0].as_str().unwrap(), &uid); + assert!(uid.starts_with("https://fireburn.ru/posts/")); + assert_eq!(post["properties"]["content"][0]["html"].as_str().unwrap().trim(), "

This is content!

"); + assert_eq!(post["properties"]["channel"][0], "https://fireburn.ru/feeds/main"); + assert_eq!(post["properties"]["author"][0], "https://fireburn.ru/"); + } + + #[test] + fn test_mp_slug() { + let mf2 = json!({ + "type": ["h-entry"], + "properties": { + "content": ["This is content!"], + "mp-slug": ["hello-post"] + }, + }); + + let (_, post) = normalize_mf2(mf2, &User::new("https://fireburn.ru/", "https://quill.p3k.io/", "create update media")); + assert!(post["properties"]["url"] + .as_array() + .unwrap() + .iter() + .map(|i| i.as_str().unwrap()) + .any(|i| i == "https://fireburn.ru/posts/hello-post") + ); + if let Some(_) = post["properties"]["mp-slug"].as_array() { + panic!("mp-slug wasn't deleted from the array!") + } + } + + #[test] + fn test_normalize_feed() { + let mf2 = json!({ + "type": ["h-feed"], + "properties": { + "name": "Main feed", + "mp-slug": ["main"] + } + }); + + let (uid, post) = normalize_mf2(mf2, &User::new("https://fireburn.ru/", "https://quill.p3k.io/", "create update media")); + assert_eq!(post["properties"]["uid"][0].as_str().unwrap(), &uid); + assert_eq!(post["properties"]["author"][0], "https://fireburn.ru/"); + assert!(post["properties"]["url"] + .as_array() + .unwrap() + .iter() + .map(|i| i.as_str().unwrap()) + .any(|i| i == "https://fireburn.ru/feeds/main")); + if let Some(_) = post["properties"]["mp-slug"].as_array() { + panic!("mp-slug wasn't deleted from the array!") + } + } +} \ No newline at end of file -- cgit 1.4.1