diff options
Diffstat (limited to 'src/database')
-rw-r--r-- | src/database/file/mod.rs | 733 | ||||
-rw-r--r-- | src/database/memory.rs | 249 | ||||
-rw-r--r-- | src/database/mod.rs | 793 | ||||
-rw-r--r-- | src/database/postgres/mod.rs | 416 | ||||
-rw-r--r-- | src/database/redis/edit_post.lua | 93 | ||||
-rw-r--r-- | src/database/redis/mod.rs | 398 |
6 files changed, 2682 insertions, 0 deletions
diff --git a/src/database/file/mod.rs b/src/database/file/mod.rs new file mode 100644 index 0000000..27d3da1 --- /dev/null +++ b/src/database/file/mod.rs @@ -0,0 +1,733 @@ +//#![warn(clippy::unwrap_used)] +use crate::database::{ErrorKind, Result, settings, Storage, StorageError}; +use crate::micropub::{MicropubUpdate, MicropubPropertyDeletion}; +use async_trait::async_trait; +use futures::{stream, StreamExt, TryStreamExt}; +use kittybox_util::MentionType; +use serde_json::json; +use std::borrow::Cow; +use std::collections::HashMap; +use std::io::ErrorKind as IOErrorKind; +use std::path::{Path, PathBuf}; +use tokio::fs::{File, OpenOptions}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::task::spawn_blocking; +use tracing::{debug, error}; + +impl From<std::io::Error> for StorageError { + fn from(source: std::io::Error) -> Self { + Self::with_source( + match source.kind() { + IOErrorKind::NotFound => ErrorKind::NotFound, + IOErrorKind::AlreadyExists => ErrorKind::Conflict, + _ => ErrorKind::Backend, + }, + Cow::Owned(format!("file I/O error: {}", &source)), + Box::new(source), + ) + } +} + +impl From<tokio::time::error::Elapsed> for StorageError { + fn from(source: tokio::time::error::Elapsed) -> Self { + Self::with_source( + ErrorKind::Backend, + Cow::Borrowed("timeout on I/O operation"), + Box::new(source), + ) + } +} + +// Copied from https://stackoverflow.com/questions/39340924 +// This routine is adapted from the *old* Path's `path_relative_from` +// function, which works differently from the new `relative_from` function. +// In particular, this handles the case on unix where both paths are +// absolute but with only the root as the common directory. +fn path_relative_from(path: &Path, base: &Path) -> Option<PathBuf> { + use std::path::Component; + + if path.is_absolute() != base.is_absolute() { + if path.is_absolute() { + Some(PathBuf::from(path)) + } else { + None + } + } else { + let mut ita = path.components(); + let mut itb = base.components(); + let mut comps: Vec<Component> = vec![]; + loop { + match (ita.next(), itb.next()) { + (None, None) => break, + (Some(a), None) => { + comps.push(a); + comps.extend(ita.by_ref()); + break; + } + (None, _) => comps.push(Component::ParentDir), + (Some(a), Some(b)) if comps.is_empty() && a == b => (), + (Some(a), Some(b)) if b == Component::CurDir => comps.push(a), + (Some(_), Some(b)) if b == Component::ParentDir => return None, + (Some(a), Some(_)) => { + comps.push(Component::ParentDir); + for _ in itb { + comps.push(Component::ParentDir); + } + comps.push(a); + comps.extend(ita.by_ref()); + break; + } + } + } + Some(comps.iter().map(|c| c.as_os_str()).collect()) + } +} + +#[allow(clippy::unwrap_used, clippy::expect_used)] +#[cfg(test)] +mod tests { + #[test] + fn test_relative_path_resolving() { + let path1 = std::path::Path::new("/home/vika/Projects/kittybox"); + let path2 = std::path::Path::new("/home/vika/Projects/nixpkgs"); + let relative_path = super::path_relative_from(path2, path1).unwrap(); + + assert_eq!(relative_path, std::path::Path::new("../nixpkgs")) + } +} + +// TODO: Check that the path ACTUALLY IS INSIDE THE ROOT FOLDER +// This could be checked by completely resolving the path +// and checking if it has a common prefix +fn url_to_path(root: &Path, url: &str) -> PathBuf { + let path = url_to_relative_path(url).to_logical_path(root); + if !path.starts_with(root) { + // TODO: handle more gracefully + panic!("Security error: {:?} is not a prefix of {:?}", path, root) + } else { + path + } +} + +fn url_to_relative_path(url: &str) -> relative_path::RelativePathBuf { + let url = url::Url::try_from(url).expect("Couldn't parse a URL"); + let mut path = relative_path::RelativePathBuf::new(); + let user_domain = format!( + "{}{}", + url.host_str().unwrap(), + url.port() + .map(|port| format!(":{}", port)) + .unwrap_or_default() + ); + path.push(user_domain + url.path() + ".json"); + + path +} + +fn modify_post(post: &serde_json::Value, update: MicropubUpdate) -> Result<serde_json::Value> { + let mut post = post.clone(); + + let mut add_keys: HashMap<String, Vec<serde_json::Value>> = HashMap::new(); + let mut remove_keys: Vec<String> = vec![]; + let mut remove_values: HashMap<String, Vec<serde_json::Value>> = HashMap::new(); + + if let Some(MicropubPropertyDeletion::Properties(delete)) = update.delete { + remove_keys.extend(delete.iter().cloned()); + } else if let Some(MicropubPropertyDeletion::Values(delete)) = update.delete { + for (k, v) in delete { + remove_values + .entry(k.to_string()) + .or_default() + .extend(v.clone()); + } + } + if let Some(add) = update.add { + for (k, v) in add { + add_keys.insert(k.to_string(), v.clone()); + } + } + if let Some(replace) = update.replace { + for (k, v) in replace { + remove_keys.push(k.to_string()); + add_keys.insert(k.to_string(), v.clone()); + } + } + + if let Some(props) = post["properties"].as_object_mut() { + for k in remove_keys { + props.remove(&k); + } + } + for (k, v) in remove_values { + let k = &k; + let props = if k == "children" { + &mut post + } else { + &mut post["properties"] + }; + v.iter().for_each(|v| { + if let Some(vec) = props[k].as_array_mut() { + if let Some(index) = vec.iter().position(|w| w == v) { + vec.remove(index); + } + } + }); + } + for (k, v) in add_keys { + tracing::debug!("Adding k/v to post: {} => {:?}", k, v); + let props = if k == "children" { + &mut post + } else { + &mut post["properties"] + }; + if let Some(prop) = props[&k].as_array_mut() { + if k == "children" { + v.into_iter().rev().for_each(|v| prop.insert(0, v)); + } else { + prop.extend(v.into_iter()); + } + } else { + props[&k] = serde_json::Value::Array(v) + } + } + Ok(post) +} + +#[derive(Clone, Debug)] +/// A backend using a folder with JSON files as a backing store. +/// Uses symbolic links to represent a many-to-one mapping of URLs to a post. +pub struct FileStorage { + root_dir: PathBuf, +} + +impl FileStorage { + /// Create a new storage wrapping a folder specified by root_dir. + pub async fn new(root_dir: PathBuf) -> Result<Self> { + // TODO check if the dir is writable + Ok(Self { root_dir }) + } +} + +async fn hydrate_author<S: Storage>( + feed: &mut serde_json::Value, + user: &'_ Option<String>, + storage: &S, +) { + let url = feed["properties"]["uid"][0] + .as_str() + .expect("MF2 value should have a UID set! Check if you used normalize_mf2 before recording the post!"); + if let Some(author) = feed["properties"]["author"].as_array().cloned() { + if !feed["type"] + .as_array() + .expect("MF2 value should have a type set!") + .iter() + .any(|i| i == "h-card") + { + let author_list: Vec<serde_json::Value> = stream::iter(author.iter()) + .then(|i| async move { + if let Some(i) = i.as_str() { + match storage.get_post(i).await { + Ok(post) => match post { + Some(post) => post, + None => json!(i), + }, + Err(e) => { + error!("Error while hydrating post {}: {}", url, e); + json!(i) + } + } + } else { + i.clone() + } + }) + .collect::<Vec<_>>() + .await; + if let Some(props) = feed["properties"].as_object_mut() { + props["author"] = json!(author_list); + } else { + feed["properties"] = json!({ "author": author_list }); + } + } + } +} + +#[async_trait] +impl Storage for FileStorage { + #[tracing::instrument(skip(self))] + async fn post_exists(&self, url: &str) -> Result<bool> { + let path = url_to_path(&self.root_dir, url); + debug!("Checking if {:?} exists...", path); + /*let result = match tokio::fs::metadata(path).await { + Ok(metadata) => { + Ok(true) + }, + Err(err) => { + if err.kind() == IOErrorKind::NotFound { + Ok(false) + } else { + Err(err.into()) + } + } + };*/ + #[allow(clippy::unwrap_used)] // JoinHandle captures panics, this closure shouldn't panic + Ok(spawn_blocking(move || path.is_file()).await.unwrap()) + } + + #[tracing::instrument(skip(self))] + async fn get_post(&self, url: &str) -> Result<Option<serde_json::Value>> { + let path = url_to_path(&self.root_dir, url); + // TODO: check that the path actually belongs to the dir of user who requested it + // it's not like you CAN access someone else's private posts with it + // so it's not exactly a security issue, but it's still not good + debug!("Opening {:?}", path); + + match File::open(&path).await { + Ok(mut file) => { + let mut content = String::new(); + // Typechecks because OS magic acts on references + // to FDs as if they were behind a mutex + AsyncReadExt::read_to_string(&mut file, &mut content).await?; + debug!( + "Read {} bytes successfully from {:?}", + content.as_bytes().len(), + &path + ); + Ok(Some(serde_json::from_str(&content)?)) + } + Err(err) => { + if err.kind() == IOErrorKind::NotFound { + Ok(None) + } else { + Err(err.into()) + } + } + } + } + + #[tracing::instrument(skip(self))] + async fn put_post(&self, post: &'_ serde_json::Value, user: &'_ str) -> Result<()> { + let key = post["properties"]["uid"][0] + .as_str() + .expect("Tried to save a post without UID"); + let path = url_to_path(&self.root_dir, key); + let tempfile = (&path).with_extension("tmp"); + debug!("Creating {:?}", path); + + let parent = path + .parent() + .expect("Parent for this directory should always exist") + .to_owned(); + tokio::fs::create_dir_all(&parent).await?; + + let mut file = tokio::fs::OpenOptions::new() + .write(true) + .create_new(true) + .open(&tempfile) + .await?; + + file.write_all(post.to_string().as_bytes()).await?; + file.flush().await?; + file.sync_all().await?; + drop(file); + tokio::fs::rename(&tempfile, &path).await?; + tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?; + + if let Some(urls) = post["properties"]["url"].as_array() { + for url in urls.iter().map(|i| i.as_str().unwrap()) { + let url_domain = { + let url = url::Url::parse(url).unwrap(); + format!( + "{}{}", + url.host_str().unwrap(), + url.port() + .map(|port| format!(":{}", port)) + .unwrap_or_default() + ) + }; + if url != key && url_domain == user { + let link = url_to_path(&self.root_dir, url); + debug!("Creating a symlink at {:?}", link); + let orig = path.clone(); + // We're supposed to have a parent here. + let basedir = link.parent().ok_or_else(|| { + StorageError::from_static( + ErrorKind::Backend, + "Failed to calculate parent directory when creating a symlink", + ) + })?; + let relative = path_relative_from(&orig, basedir).unwrap(); + println!("{:?} - {:?} = {:?}", &orig, &basedir, &relative); + tokio::fs::symlink(relative, link).await?; + } + } + } + + if post["type"] + .as_array() + .unwrap() + .iter() + .any(|s| s.as_str() == Some("h-feed")) + { + tracing::debug!("Adding to channel list..."); + // Add the h-feed to the channel list + let path = { + let mut path = relative_path::RelativePathBuf::new(); + path.push(user); + path.push("channels"); + + path.to_path(&self.root_dir) + }; + tokio::fs::create_dir_all(path.parent().unwrap()).await?; + tracing::debug!("Channels file path: {}", path.display()); + let tempfilename = path.with_extension("tmp"); + let channel_name = post["properties"]["name"][0] + .as_str() + .map(|s| s.to_string()) + .unwrap_or_else(String::default); + let key = key.to_string(); + tracing::debug!("Opening temporary file to modify chnanels..."); + let mut tempfile = OpenOptions::new() + .write(true) + .create_new(true) + .open(&tempfilename) + .await?; + tracing::debug!("Opening real channel file..."); + let mut channels: Vec<super::MicropubChannel> = { + match OpenOptions::new() + .read(true) + .write(false) + .truncate(false) + .create(false) + .open(&path) + .await + { + Err(err) if err.kind() == std::io::ErrorKind::NotFound => { + Vec::default() + } + Err(err) => { + // Propagate the error upwards + return Err(err.into()); + } + Ok(mut file) => { + let mut content = String::new(); + file.read_to_string(&mut content).await?; + drop(file); + + if !content.is_empty() { + serde_json::from_str(&content)? + } else { + Vec::default() + } + } + } + }; + + channels.push(super::MicropubChannel { + uid: key.to_string(), + name: channel_name, + }); + + tempfile + .write_all(serde_json::to_string(&channels)?.as_bytes()) + .await?; + tempfile.flush().await?; + tempfile.sync_all().await?; + drop(tempfile); + tokio::fs::rename(tempfilename, &path).await?; + tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?; + } + Ok(()) + } + + #[tracing::instrument(skip(self))] + async fn update_post(&self, url: &str, update: MicropubUpdate) -> Result<()> { + let path = url_to_path(&self.root_dir, url); + let tempfilename = path.with_extension("tmp"); + #[allow(unused_variables)] + let (old_json, new_json) = { + let mut temp = OpenOptions::new() + .write(true) + .create_new(true) + .open(&tempfilename) + .await?; + let mut file = OpenOptions::new().read(true).open(&path).await?; + + let mut content = String::new(); + file.read_to_string(&mut content).await?; + let json: serde_json::Value = serde_json::from_str(&content)?; + drop(file); + // Apply the editing algorithms + let new_json = modify_post(&json, update)?; + + temp.write_all(new_json.to_string().as_bytes()).await?; + temp.flush().await?; + temp.sync_all().await?; + drop(temp); + tokio::fs::rename(tempfilename, &path).await?; + tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?; + + (json, new_json) + }; + // TODO check if URLs changed between old and new JSON + Ok(()) + } + + #[tracing::instrument(skip(self))] + async fn get_channels(&self, user: &'_ str) -> Result<Vec<super::MicropubChannel>> { + let mut path = relative_path::RelativePathBuf::new(); + path.push(user); + path.push("channels"); + + let path = path.to_path(&self.root_dir); + tracing::debug!("Channels file path: {}", path.display()); + + match File::open(&path).await { + Ok(mut f) => { + let mut content = String::new(); + f.read_to_string(&mut content).await?; + // This should not happen, but if it does, handle it gracefully + if content.is_empty() { + return Ok(vec![]); + } + let channels: Vec<super::MicropubChannel> = serde_json::from_str(&content)?; + Ok(channels) + } + Err(e) => { + if e.kind() == IOErrorKind::NotFound { + Ok(vec![]) + } else { + Err(e.into()) + } + } + } + } + + async fn read_feed_with_cursor( + &self, + url: &'_ str, + cursor: Option<&'_ str>, + limit: usize, + user: Option<&'_ str> + ) -> Result<Option<(serde_json::Value, Option<String>)>> { + Ok(self.read_feed_with_limit( + url, + &cursor.map(|v| v.to_owned()), + limit, + &user.map(|v| v.to_owned()) + ).await? + .map(|feed| { + tracing::debug!("Feed: {:#}", serde_json::Value::Array( + feed["children"] + .as_array() + .map(|v| v.as_slice()) + .unwrap_or_default() + .iter() + .map(|mf2| mf2["properties"]["uid"][0].clone()) + .collect::<Vec<_>>() + )); + let cursor: Option<String> = feed["children"] + .as_array() + .map(|v| v.as_slice()) + .unwrap_or_default() + .last() + .map(|v| v["properties"]["uid"][0].as_str().unwrap().to_owned()); + tracing::debug!("Extracted the cursor: {:?}", cursor); + (feed, cursor) + }) + ) + } + + #[tracing::instrument(skip(self))] + async fn read_feed_with_limit( + &self, + url: &'_ str, + after: &'_ Option<String>, + limit: usize, + user: &'_ Option<String>, + ) -> Result<Option<serde_json::Value>> { + if let Some(mut feed) = self.get_post(url).await? { + if feed["children"].is_array() { + // Take this out of the MF2-JSON document to save memory + // + // This uses a clever match with enum destructuring + // to extract the underlying Vec without cloning it + let children: Vec<serde_json::Value> = match feed["children"].take() { + serde_json::Value::Array(children) => children, + // We've already checked it's an array + _ => unreachable!() + }; + tracing::debug!("Full children array: {:#}", serde_json::Value::Array(children.clone())); + let mut posts_iter = children + .into_iter() + .map(|s: serde_json::Value| s.as_str().unwrap().to_string()); + // Note: we can't actually use `skip_while` here because we end up emitting `after`. + // This imperative snippet consumes after instead of emitting it, allowing the + // stream of posts to return only those items that truly come *after* that one. + // If I would implement an Iter combinator like this, I would call it `skip_until` + if let Some(after) = after { + for s in posts_iter.by_ref() { + if &s == after { + break; + } + } + }; + let posts = stream::iter(posts_iter) + .map(|url: String| async move { self.get_post(&url).await }) + .buffered(std::cmp::min(3, limit)) + // Hack to unwrap the Option and sieve out broken links + // Broken links return None, and Stream::filter_map skips Nones. + .try_filter_map(|post: Option<serde_json::Value>| async move { Ok(post) }) + .and_then(|mut post| async move { + hydrate_author(&mut post, user, self).await; + Ok(post) + }) + .take(limit); + + match posts.try_collect::<Vec<serde_json::Value>>().await { + Ok(posts) => feed["children"] = serde_json::json!(posts), + Err(err) => { + return Err(StorageError::with_source( + ErrorKind::Other, + Cow::Owned(format!("Feed assembly error: {}", &err)), + Box::new(err), + )); + } + } + } + hydrate_author(&mut feed, user, self).await; + Ok(Some(feed)) + } else { + Ok(None) + } + } + + #[tracing::instrument(skip(self))] + async fn delete_post(&self, url: &'_ str) -> Result<()> { + let path = url_to_path(&self.root_dir, url); + if let Err(e) = tokio::fs::remove_file(path).await { + Err(e.into()) + } else { + // TODO check for dangling references in the channel list + Ok(()) + } + } + + #[tracing::instrument(skip(self))] + async fn get_setting<S: settings::Setting<'a>, 'a>(&self, user: &'_ str) -> Result<S> { + debug!("User for getting settings: {}", user); + let mut path = relative_path::RelativePathBuf::new(); + path.push(user); + path.push("settings"); + + let path = path.to_path(&self.root_dir); + debug!("Getting settings from {:?}", &path); + + let mut file = File::open(path).await?; + let mut content = String::new(); + file.read_to_string(&mut content).await?; + + let settings: HashMap<&str, serde_json::Value> = serde_json::from_str(&content)?; + match settings.get(S::ID) { + Some(value) => Ok(serde_json::from_value::<S>(value.clone())?), + None => Err(StorageError::from_static(ErrorKind::Backend, "Setting not set")) + } + } + + #[tracing::instrument(skip(self))] + async fn set_setting<S: settings::Setting<'a> + 'a, 'a>(&self, user: &'a str, value: S::Data) -> Result<()> { + let mut path = relative_path::RelativePathBuf::new(); + path.push(user); + path.push("settings"); + + let path = path.to_path(&self.root_dir); + let temppath = path.with_extension("tmp"); + + let parent = path.parent().unwrap().to_owned(); + tokio::fs::create_dir_all(&parent).await?; + + let mut tempfile = OpenOptions::new() + .write(true) + .create_new(true) + .open(&temppath) + .await?; + + let mut settings: HashMap<String, serde_json::Value> = match File::open(&path).await { + Ok(mut f) => { + let mut content = String::new(); + f.read_to_string(&mut content).await?; + if content.is_empty() { + Default::default() + } else { + serde_json::from_str(&content)? + } + } + Err(err) => { + if err.kind() == IOErrorKind::NotFound { + Default::default() + } else { + return Err(err.into()); + } + } + }; + settings.insert(S::ID.to_owned(), serde_json::to_value(S::new(value))?); + + tempfile + .write_all(serde_json::to_string(&settings)?.as_bytes()) + .await?; + tempfile.flush().await?; + tempfile.sync_all().await?; + drop(tempfile); + tokio::fs::rename(temppath, &path).await?; + tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?; + Ok(()) + } + + #[tracing::instrument(skip(self))] + async fn add_or_update_webmention(&self, target: &str, mention_type: MentionType, mention: serde_json::Value) -> Result<()> { + let path = url_to_path(&self.root_dir, target); + let tempfilename = path.with_extension("tmp"); + + let mut temp = OpenOptions::new() + .write(true) + .create_new(true) + .open(&tempfilename) + .await?; + let mut file = OpenOptions::new().read(true).open(&path).await?; + + let mut post: serde_json::Value = { + let mut content = String::new(); + file.read_to_string(&mut content).await?; + drop(file); + + serde_json::from_str(&content)? + }; + + let key: &'static str = match mention_type { + MentionType::Reply => "comment", + MentionType::Like => "like", + MentionType::Repost => "repost", + MentionType::Bookmark => "bookmark", + MentionType::Mention => "mention", + }; + let mention_uid = mention["properties"]["uid"][0].clone(); + if let Some(values) = post["properties"][key].as_array_mut() { + for value in values.iter_mut() { + if value["properties"]["uid"][0] == mention_uid { + *value = mention; + break; + } + } + } else { + post["properties"][key] = serde_json::Value::Array(vec![mention]); + } + + temp.write_all(post.to_string().as_bytes()).await?; + temp.flush().await?; + temp.sync_all().await?; + drop(temp); + tokio::fs::rename(tempfilename, &path).await?; + tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?; + + Ok(()) + } +} diff --git a/src/database/memory.rs b/src/database/memory.rs new file mode 100644 index 0000000..6339e7a --- /dev/null +++ b/src/database/memory.rs @@ -0,0 +1,249 @@ +#![allow(clippy::todo)] +use async_trait::async_trait; +use futures_util::FutureExt; +use serde_json::json; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::RwLock; + +use crate::database::{ErrorKind, MicropubChannel, Result, settings, Storage, StorageError}; + +#[derive(Clone, Debug)] +pub struct MemoryStorage { + pub mapping: Arc<RwLock<HashMap<String, serde_json::Value>>>, + pub channels: Arc<RwLock<HashMap<String, Vec<String>>>>, +} + +#[async_trait] +impl Storage for MemoryStorage { + async fn post_exists(&self, url: &str) -> Result<bool> { + return Ok(self.mapping.read().await.contains_key(url)); + } + + async fn get_post(&self, url: &str) -> Result<Option<serde_json::Value>> { + let mapping = self.mapping.read().await; + match mapping.get(url) { + Some(val) => { + if let Some(new_url) = val["see_other"].as_str() { + match mapping.get(new_url) { + Some(val) => Ok(Some(val.clone())), + None => { + drop(mapping); + self.mapping.write().await.remove(url); + Ok(None) + } + } + } else { + Ok(Some(val.clone())) + } + } + _ => Ok(None), + } + } + + async fn put_post(&self, post: &'_ serde_json::Value, _user: &'_ str) -> Result<()> { + let mapping = &mut self.mapping.write().await; + let key: &str = match post["properties"]["uid"][0].as_str() { + Some(uid) => uid, + None => { + return Err(StorageError::from_static( + ErrorKind::Other, + "post doesn't have a UID", + )) + } + }; + mapping.insert(key.to_string(), post.clone()); + if post["properties"]["url"].is_array() { + for url in post["properties"]["url"] + .as_array() + .unwrap() + .iter() + .map(|i| i.as_str().unwrap().to_string()) + { + if url != key { + mapping.insert(url, json!({ "see_other": key })); + } + } + } + if post["type"] + .as_array() + .unwrap() + .iter() + .any(|i| i == "h-feed") + { + // This is a feed. Add it to the channels array if it's not already there. + println!("{:#}", post); + self.channels + .write() + .await + .entry( + post["properties"]["author"][0] + .as_str() + .unwrap() + .to_string(), + ) + .or_insert_with(Vec::new) + .push(key.to_string()) + } + Ok(()) + } + + async fn update_post(&self, url: &'_ str, update: crate::micropub::MicropubUpdate) -> Result<()> { + let mut guard = self.mapping.write().await; + let mut post = guard.get_mut(url).ok_or(StorageError::from_static(ErrorKind::NotFound, "The specified post wasn't found in the database."))?; + + use crate::micropub::MicropubPropertyDeletion; + + let mut add_keys: HashMap<String, Vec<serde_json::Value>> = HashMap::new(); + let mut remove_keys: Vec<String> = vec![]; + let mut remove_values: HashMap<String, Vec<serde_json::Value>> = HashMap::new(); + + if let Some(MicropubPropertyDeletion::Properties(delete)) = update.delete { + remove_keys.extend(delete.iter().cloned()); + } else if let Some(MicropubPropertyDeletion::Values(delete)) = update.delete { + for (k, v) in delete { + remove_values + .entry(k.to_string()) + .or_default() + .extend(v.clone()); + } + } + if let Some(add) = update.add { + for (k, v) in add { + add_keys.insert(k.to_string(), v.clone()); + } + } + if let Some(replace) = update.replace { + for (k, v) in replace { + remove_keys.push(k.to_string()); + add_keys.insert(k.to_string(), v.clone()); + } + } + + if let Some(props) = post["properties"].as_object_mut() { + for k in remove_keys { + props.remove(&k); + } + } + for (k, v) in remove_values { + let k = &k; + let props = if k == "children" { + &mut post + } else { + &mut post["properties"] + }; + v.iter().for_each(|v| { + if let Some(vec) = props[k].as_array_mut() { + if let Some(index) = vec.iter().position(|w| w == v) { + vec.remove(index); + } + } + }); + } + for (k, v) in add_keys { + tracing::debug!("Adding k/v to post: {} => {:?}", k, v); + let props = if k == "children" { + &mut post + } else { + &mut post["properties"] + }; + if let Some(prop) = props[&k].as_array_mut() { + if k == "children" { + v.into_iter().rev().for_each(|v| prop.insert(0, v)); + } else { + prop.extend(v.into_iter()); + } + } else { + props[&k] = serde_json::Value::Array(v) + } + } + + Ok(()) + } + + async fn get_channels(&self, user: &'_ str) -> Result<Vec<MicropubChannel>> { + match self.channels.read().await.get(user) { + Some(channels) => Ok(futures_util::future::join_all( + channels + .iter() + .map(|channel| { + self.get_post(channel).map(|result| result.unwrap()).map( + |post: Option<serde_json::Value>| { + post.map(|post| MicropubChannel { + uid: post["properties"]["uid"][0].as_str().unwrap().to_string(), + name: post["properties"]["name"][0] + .as_str() + .unwrap() + .to_string(), + }) + }, + ) + }) + .collect::<Vec<_>>(), + ) + .await + .into_iter() + .flatten() + .collect::<Vec<_>>()), + None => Ok(vec![]), + } + } + + #[allow(unused_variables)] + async fn read_feed_with_limit( + &self, + url: &'_ str, + after: &'_ Option<String>, + limit: usize, + user: &'_ Option<String>, + ) -> Result<Option<serde_json::Value>> { + todo!() + } + + #[allow(unused_variables)] + async fn read_feed_with_cursor( + &self, + url: &'_ str, + cursor: Option<&'_ str>, + limit: usize, + user: Option<&'_ str> + ) -> Result<Option<(serde_json::Value, Option<String>)>> { + todo!() + } + + async fn delete_post(&self, url: &'_ str) -> Result<()> { + self.mapping.write().await.remove(url); + Ok(()) + } + + #[allow(unused_variables)] + async fn get_setting<S: settings::Setting<'a>, 'a>(&'_ self, user: &'_ str) -> Result<S> { + todo!() + } + + #[allow(unused_variables)] + async fn set_setting<S: settings::Setting<'a> + 'a, 'a>(&self, user: &'a str, value: S::Data) -> Result<()> { + todo!() + } + + #[allow(unused_variables)] + async fn add_or_update_webmention(&self, target: &str, mention_type: kittybox_util::MentionType, mention: serde_json::Value) -> Result<()> { + todo!() + } + +} + +impl Default for MemoryStorage { + fn default() -> Self { + Self::new() + } +} + +impl MemoryStorage { + pub fn new() -> Self { + Self { + mapping: Arc::new(RwLock::new(HashMap::new())), + channels: Arc::new(RwLock::new(HashMap::new())), + } + } +} diff --git a/src/database/mod.rs b/src/database/mod.rs new file mode 100644 index 0000000..b4b70b2 --- /dev/null +++ b/src/database/mod.rs @@ -0,0 +1,793 @@ +#![warn(missing_docs)] +use std::borrow::Cow; + +use async_trait::async_trait; +use kittybox_util::MentionType; + +mod file; +pub use crate::database::file::FileStorage; +use crate::micropub::MicropubUpdate; +#[cfg(feature = "postgres")] +mod postgres; +#[cfg(feature = "postgres")] +pub use postgres::PostgresStorage; + +#[cfg(test)] +mod memory; +#[cfg(test)] +pub use crate::database::memory::MemoryStorage; + +pub use kittybox_util::MicropubChannel; + +use self::settings::Setting; + +/// Enum representing different errors that might occur during the database query. +#[derive(Debug, Clone, Copy)] +pub enum ErrorKind { + /// Backend error (e.g. database connection error) + Backend, + /// Error due to insufficient contextual permissions for the query + PermissionDenied, + /// Error due to the database being unable to parse JSON returned from the backing storage. + /// Usually indicative of someone fiddling with the database manually instead of using proper tools. + JsonParsing, + /// - ErrorKind::NotFound - equivalent to a 404 error. Note, some requests return an Option, + /// in which case None is also equivalent to a 404. + NotFound, + /// The user's query or request to the database was malformed. Used whenever the database processes + /// the user's query directly, such as when editing posts inside of the database (e.g. Redis backend) + BadRequest, + /// the user's query collided with an in-flight request and needs to be retried + Conflict, + /// - ErrorKind::Other - when something so weird happens that it becomes undescribable. + Other, +} + +/// Settings that can be stored in the database. +pub mod settings { + mod private { + pub trait Sealed {} + } + + /// A trait for various settings that should be contained here. + /// + /// **Note**: this trait is sealed to prevent external + /// implementations, as it wouldn't make sense to add new settings + /// that aren't used by Kittybox itself. + pub trait Setting<'de>: private::Sealed + std::fmt::Debug + Default + Clone + serde::Serialize + serde::de::DeserializeOwned + /*From<Settings> +*/ Send + Sync { + type Data: std::fmt::Debug + Send + Sync; + const ID: &'static str; + + /// Unwrap the setting type, returning owned data contained within. + fn into_inner(self) -> Self::Data; + /// Create a new instance of this type containing certain data. + fn new(data: Self::Data) -> Self; + } + + /// A website's title, shown in the header. + #[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq, Eq)] + pub struct SiteName(String); + impl Default for SiteName { + fn default() -> Self { + Self("Kittybox".to_string()) + } + } + impl AsRef<str> for SiteName { + fn as_ref(&self) -> &str { + self.0.as_str() + } + } + impl private::Sealed for SiteName {} + impl Setting<'_> for SiteName { + type Data = String; + const ID: &'static str = "site_name"; + + fn into_inner(self) -> String { + self.0 + } + fn new(data: Self::Data) -> Self { + Self(data) + } + } + impl SiteName { + fn from_str(data: &str) -> Self { + Self(data.to_owned()) + } + } + + /// Participation status in the IndieWeb Webring: https://🕸💍.ws/dashboard + #[derive(Debug, Default, serde::Deserialize, serde::Serialize, Clone, Copy, PartialEq, Eq)] + pub struct Webring(bool); + impl private::Sealed for Webring {} + impl Setting<'_> for Webring { + type Data = bool; + const ID: &'static str = "webring"; + + fn into_inner(self) -> Self::Data { + self.0 + } + + fn new(data: Self::Data) -> Self { + Self(data) + } + } +} + +/// Error signalled from the database. +#[derive(Debug)] +pub struct StorageError { + msg: std::borrow::Cow<'static, str>, + source: Option<Box<dyn std::error::Error + Send + Sync>>, + kind: ErrorKind, +} + +impl std::error::Error for StorageError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + self.source + .as_ref() + .map(|e| e.as_ref() as &dyn std::error::Error) + } +} +impl From<serde_json::Error> for StorageError { + fn from(err: serde_json::Error) -> Self { + Self { + msg: std::borrow::Cow::Owned(format!("{}", err)), + source: Some(Box::new(err)), + kind: ErrorKind::JsonParsing, + } + } +} +impl std::fmt::Display for StorageError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}: {}", + match self.kind { + ErrorKind::Backend => "backend error", + ErrorKind::JsonParsing => "JSON parsing error", + ErrorKind::PermissionDenied => "permission denied", + ErrorKind::NotFound => "not found", + ErrorKind::BadRequest => "bad request", + ErrorKind::Conflict => "conflict with an in-flight request or existing data", + ErrorKind::Other => "generic storage layer error", + }, + self.msg + ) + } +} +impl serde::Serialize for StorageError { + fn serialize<S: serde::Serializer>( + &self, + serializer: S, + ) -> std::result::Result<S::Ok, S::Error> { + serializer.serialize_str(&self.to_string()) + } +} +impl StorageError { + /// Create a new StorageError of an ErrorKind with a message. + pub fn new(kind: ErrorKind, msg: String) -> Self { + Self { + msg: Cow::Owned(msg), + source: None, + kind, + } + } + /// Create a new StorageError of an ErrorKind with a message from + /// a static string. + /// + /// This saves an allocation for a new string and is the preferred + /// way in case the error message doesn't change. + pub fn from_static(kind: ErrorKind, msg: &'static str) -> Self { + Self { + msg: Cow::Borrowed(msg), + source: None, + kind + } + } + /// Create a StorageError using another arbitrary Error as a source. + pub fn with_source( + kind: ErrorKind, + msg: std::borrow::Cow<'static, str>, + source: Box<dyn std::error::Error + Send + Sync>, + ) -> Self { + Self { + msg, + source: Some(source), + kind, + } + } + /// Get the kind of an error. + pub fn kind(&self) -> ErrorKind { + self.kind + } + /// Get the message as a string slice. + pub fn msg(&self) -> &str { + &self.msg + } +} + +/// A special Result type for the Micropub backing storage. +pub type Result<T> = std::result::Result<T, StorageError>; + +/// A storage backend for the Micropub server. +/// +/// Implementations should note that all methods listed on this trait MUST be fully atomic +/// or lock the database so that write conflicts or reading half-written data should not occur. +#[async_trait] +pub trait Storage: std::fmt::Debug + Clone + Send + Sync { + /// Check if a post exists in the database. + async fn post_exists(&self, url: &str) -> Result<bool>; + + /// Load a post from the database in MF2-JSON format, deserialized from JSON. + async fn get_post(&self, url: &str) -> Result<Option<serde_json::Value>>; + + /// Save a post to the database as an MF2-JSON structure. + /// + /// Note that the `post` object MUST have `post["properties"]["uid"][0]` defined. + async fn put_post(&self, post: &'_ serde_json::Value, user: &'_ str) -> Result<()>; + + /// Add post to feed. Some database implementations might have optimized ways to do this. + #[tracing::instrument(skip(self))] + async fn add_to_feed(&self, feed: &'_ str, post: &'_ str) -> Result<()> { + tracing::debug!("Inserting {} into {} using `update_post`", post, feed); + self.update_post(feed, serde_json::from_value( + serde_json::json!({"add": {"children": [post]}})).unwrap() + ).await + } + /// Remove post from feed. Some database implementations might have optimized ways to do this. + #[tracing::instrument(skip(self))] + async fn remove_from_feed(&self, feed: &'_ str, post: &'_ str) -> Result<()> { + tracing::debug!("Removing {} into {} using `update_post`", post, feed); + self.update_post(feed, serde_json::from_value( + serde_json::json!({"delete": {"children": [post]}})).unwrap() + ).await + } + + /// Modify a post using an update object as defined in the + /// Micropub spec. + /// + /// Note to implementors: the update operation MUST be atomic and + /// SHOULD lock the database to prevent two clients overwriting + /// each other's changes or simply corrupting something. Rejecting + /// is allowed in case of concurrent updates if waiting for a lock + /// cannot be done. + async fn update_post(&self, url: &str, update: MicropubUpdate) -> Result<()>; + + /// Get a list of channels available for the user represented by + /// the `user` domain to write to. + async fn get_channels(&self, user: &'_ str) -> Result<Vec<MicropubChannel>>; + + /// Fetch a feed at `url` and return an h-feed object containing + /// `limit` posts after a post by url `after`, filtering the content + /// in context of a user specified by `user` (or an anonymous user). + /// + /// This method MUST hydrate the `author` property with an h-card + /// from the database by replacing URLs with corresponding h-cards. + /// + /// When encountering posts which the `user` is not authorized to + /// access, this method MUST elide such posts (as an optimization + /// for the frontend) and not return them, but still return up to + /// `limit` posts (to not reveal the hidden posts' presence). + /// + /// Note for implementors: if you use streams to fetch posts in + /// parallel from the database, preferably make this method use a + /// connection pool to reduce overhead of creating a database + /// connection per post for parallel fetching. + async fn read_feed_with_limit( + &self, + url: &'_ str, + after: &'_ Option<String>, + limit: usize, + user: &'_ Option<String>, + ) -> Result<Option<serde_json::Value>>; + + /// Fetch a feed at `url` and return an h-feed object containing + /// `limit` posts after a `cursor` (filtering the content in + /// context of a user specified by `user`, or an anonymous user), + /// as well as a new cursor to paginate with. + /// + /// This method MUST hydrate the `author` property with an h-card + /// from the database by replacing URLs with corresponding h-cards. + /// + /// When encountering posts which the `user` is not authorized to + /// access, this method MUST elide such posts (as an optimization + /// for the frontend) and not return them, but still return an + /// amount of posts as close to `limit` as possible (to avoid + /// revealing the existence of the hidden post). + /// + /// Note for implementors: if you use streams to fetch posts in + /// parallel from the database, preferably make this method use a + /// connection pool to reduce overhead of creating a database + /// connection per post for parallel fetching. + async fn read_feed_with_cursor( + &self, + url: &'_ str, + cursor: Option<&'_ str>, + limit: usize, + user: Option<&'_ str> + ) -> Result<Option<(serde_json::Value, Option<String>)>>; + + /// Deletes a post from the database irreversibly. Must be idempotent. + async fn delete_post(&self, url: &'_ str) -> Result<()>; + + /// Gets a setting from the setting store and passes the result. + async fn get_setting<S: Setting<'a>, 'a>(&'_ self, user: &'_ str) -> Result<S>; + + /// Commits a setting to the setting store. + async fn set_setting<S: Setting<'a> + 'a, 'a>(&self, user: &'a str, value: S::Data) -> Result<()>; + + /// Add (or update) a webmention on a certian post. + /// + /// The MF2 object describing the webmention content will always + /// be of type `h-cite`, and the `uid` property on the object will + /// always be set. + /// + /// The rationale for this function is as follows: webmentions + /// might be duplicated, and we need to deduplicate them first. As + /// we lack support for transactions and locking posts on the + /// database, the only way is to implement the operation on the + /// database itself. + /// + /// Besides, it may even allow for nice tricks like storing the + /// webmentions separately and rehydrating them on feed reads. + async fn add_or_update_webmention(&self, target: &str, mention_type: MentionType, mention: serde_json::Value) -> Result<()>; +} + +#[cfg(test)] +mod tests { + use super::settings; + + use super::{MicropubChannel, Storage}; + use kittybox_util::MentionType; + use serde_json::json; + + async fn test_basic_operations<Backend: Storage>(backend: Backend) { + let post: serde_json::Value = json!({ + "type": ["h-entry"], + "properties": { + "content": ["Test content"], + "author": ["https://fireburn.ru/"], + "uid": ["https://fireburn.ru/posts/hello"], + "url": ["https://fireburn.ru/posts/hello", "https://fireburn.ru/posts/test"] + } + }); + let key = post["properties"]["uid"][0].as_str().unwrap().to_string(); + let alt_url = post["properties"]["url"][1].as_str().unwrap().to_string(); + + // Reading and writing + backend + .put_post(&post, "fireburn.ru") + .await + .unwrap(); + if let Some(returned_post) = backend.get_post(&key).await.unwrap() { + assert!(returned_post.is_object()); + assert_eq!( + returned_post["type"].as_array().unwrap().len(), + post["type"].as_array().unwrap().len() + ); + assert_eq!( + returned_post["type"].as_array().unwrap(), + post["type"].as_array().unwrap() + ); + let props: &serde_json::Map<String, serde_json::Value> = + post["properties"].as_object().unwrap(); + for key in props.keys() { + assert_eq!( + returned_post["properties"][key].as_array().unwrap(), + post["properties"][key].as_array().unwrap() + ) + } + } else { + panic!("For some reason the backend did not return the post.") + } + // Check the alternative URL - it should return the same post + if let Ok(Some(returned_post)) = backend.get_post(&alt_url).await { + assert!(returned_post.is_object()); + assert_eq!( + returned_post["type"].as_array().unwrap().len(), + post["type"].as_array().unwrap().len() + ); + assert_eq!( + returned_post["type"].as_array().unwrap(), + post["type"].as_array().unwrap() + ); + let props: &serde_json::Map<String, serde_json::Value> = + post["properties"].as_object().unwrap(); + for key in props.keys() { + assert_eq!( + returned_post["properties"][key].as_array().unwrap(), + post["properties"][key].as_array().unwrap() + ) + } + } else { + panic!("For some reason the backend did not return the post.") + } + } + + /// Note: this is merely a smoke check and is in no way comprehensive. + // TODO updates for feeds must update children using special logic + async fn test_update<Backend: Storage>(backend: Backend) { + let post: serde_json::Value = json!({ + "type": ["h-entry"], + "properties": { + "content": ["Test content"], + "author": ["https://fireburn.ru/"], + "uid": ["https://fireburn.ru/posts/hello"], + "url": ["https://fireburn.ru/posts/hello", "https://fireburn.ru/posts/test"] + } + }); + let key = post["properties"]["uid"][0].as_str().unwrap().to_string(); + + // Reading and writing + backend + .put_post(&post, "fireburn.ru") + .await + .unwrap(); + + backend + .update_post( + &key, + serde_json::from_value(json!({ + "url": &key, + "add": { + "category": ["testing"], + }, + "replace": { + "content": ["Different test content"] + } + })).unwrap(), + ) + .await + .unwrap(); + + match backend.get_post(&key).await { + Ok(Some(returned_post)) => { + assert!(returned_post.is_object()); + assert_eq!( + returned_post["type"].as_array().unwrap().len(), + post["type"].as_array().unwrap().len() + ); + assert_eq!( + returned_post["type"].as_array().unwrap(), + post["type"].as_array().unwrap() + ); + assert_eq!( + returned_post["properties"]["content"][0].as_str().unwrap(), + "Different test content" + ); + assert_eq!( + returned_post["properties"]["category"].as_array().unwrap(), + &vec![json!("testing")] + ); + } + something_else => { + something_else + .expect("Shouldn't error") + .expect("Should have the post"); + } + } + } + + async fn test_get_channel_list<Backend: Storage>(backend: Backend) { + let feed = json!({ + "type": ["h-feed"], + "properties": { + "name": ["Main Page"], + "author": ["https://fireburn.ru/"], + "uid": ["https://fireburn.ru/feeds/main"] + }, + "children": [] + }); + backend + .put_post(&feed, "fireburn.ru") + .await + .unwrap(); + let chans = backend.get_channels("fireburn.ru").await.unwrap(); + assert_eq!(chans.len(), 1); + assert_eq!( + chans[0], + MicropubChannel { + uid: "https://fireburn.ru/feeds/main".to_string(), + name: "Main Page".to_string() + } + ); + } + + async fn test_settings<Backend: Storage>(backend: Backend) { + backend + .set_setting::<settings::SiteName>( + "https://fireburn.ru/", + "Vika's Hideout".to_owned() + ) + .await + .unwrap(); + assert_eq!( + backend + .get_setting::<settings::SiteName>("https://fireburn.ru/") + .await + .unwrap() + .as_ref(), + "Vika's Hideout" + ); + } + + fn gen_random_post(domain: &str) -> serde_json::Value { + use faker_rand::lorem::{Paragraphs, Word}; + + let uid = format!( + "https://{domain}/posts/{}-{}-{}", + rand::random::<Word>(), + rand::random::<Word>(), + rand::random::<Word>() + ); + + let time = chrono::Local::now().to_rfc3339(); + let post = json!({ + "type": ["h-entry"], + "properties": { + "content": [rand::random::<Paragraphs>().to_string()], + "uid": [&uid], + "url": [&uid], + "published": [&time] + } + }); + + post + } + + fn gen_random_mention(domain: &str, mention_type: MentionType, url: &str) -> serde_json::Value { + use faker_rand::lorem::{Paragraphs, Word}; + + let uid = format!( + "https://{domain}/posts/{}-{}-{}", + rand::random::<Word>(), + rand::random::<Word>(), + rand::random::<Word>() + ); + + let time = chrono::Local::now().to_rfc3339(); + let post = json!({ + "type": ["h-cite"], + "properties": { + "content": [rand::random::<Paragraphs>().to_string()], + "uid": [&uid], + "url": [&uid], + "published": [&time], + (match mention_type { + MentionType::Reply => "in-reply-to", + MentionType::Like => "like-of", + MentionType::Repost => "repost-of", + MentionType::Bookmark => "bookmark-of", + MentionType::Mention => unimplemented!(), + }): [url] + } + }); + + post + } + + async fn test_feed_pagination<Backend: Storage>(backend: Backend) { + let posts = { + let mut posts = std::iter::from_fn( + || Some(gen_random_post("fireburn.ru")) + ) + .take(40) + .collect::<Vec<serde_json::Value>>(); + + // Reverse the array so it's in reverse-chronological order + posts.reverse(); + + posts + }; + + let feed = json!({ + "type": ["h-feed"], + "properties": { + "name": ["Main Page"], + "author": ["https://fireburn.ru/"], + "uid": ["https://fireburn.ru/feeds/main"] + }, + }); + let key = feed["properties"]["uid"][0].as_str().unwrap(); + + backend + .put_post(&feed, "fireburn.ru") + .await + .unwrap(); + + for (i, post) in posts.iter().rev().enumerate() { + backend + .put_post(post, "fireburn.ru") + .await + .unwrap(); + backend.add_to_feed(key, post["properties"]["uid"][0].as_str().unwrap()).await.unwrap(); + } + + let limit: usize = 10; + + tracing::debug!("Starting feed reading..."); + let (result, cursor) = backend + .read_feed_with_cursor(key, None, limit, None) + .await + .unwrap() + .unwrap(); + + assert_eq!(result["children"].as_array().unwrap().len(), limit); + assert_eq!( + result["children"] + .as_array() + .unwrap() + .iter() + .map(|post| post["properties"]["uid"][0].as_str().unwrap()) + .collect::<Vec<_>>() + [0..10], + posts + .iter() + .map(|post| post["properties"]["uid"][0].as_str().unwrap()) + .collect::<Vec<_>>() + [0..10] + ); + + tracing::debug!("Continuing with cursor: {:?}", cursor); + let (result2, cursor2) = backend + .read_feed_with_cursor( + key, + cursor.as_deref(), + limit, + None, + ) + .await + .unwrap() + .unwrap(); + + assert_eq!( + result2["children"].as_array().unwrap()[0..10], + posts[10..20] + ); + + tracing::debug!("Continuing with cursor: {:?}", cursor); + let (result3, cursor3) = backend + .read_feed_with_cursor( + key, + cursor2.as_deref(), + limit, + None, + ) + .await + .unwrap() + .unwrap(); + + assert_eq!( + result3["children"].as_array().unwrap()[0..10], + posts[20..30] + ); + + tracing::debug!("Continuing with cursor: {:?}", cursor); + let (result4, _) = backend + .read_feed_with_cursor( + key, + cursor3.as_deref(), + limit, + None, + ) + .await + .unwrap() + .unwrap(); + + assert_eq!( + result4["children"].as_array().unwrap()[0..10], + posts[30..40] + ); + + // Regression test for #4 + // + // Results for a bogus cursor are undefined, so we aren't + // checking them. But the function at least shouldn't hang. + let nonsense_after = Some("1010101010"); + let _ = tokio::time::timeout(tokio::time::Duration::from_secs(10), async move { + backend + .read_feed_with_cursor(key, nonsense_after, limit, None) + .await + }) + .await + .expect("Operation should not hang: see https://gitlab.com/kittybox/kittybox/-/issues/4"); + } + + async fn test_webmention_addition<Backend: Storage>(db: Backend) { + let post = gen_random_post("fireburn.ru"); + + db.put_post(&post, "fireburn.ru").await.unwrap(); + const TYPE: MentionType = MentionType::Reply; + + let target = post["properties"]["uid"][0].as_str().unwrap(); + let mut reply = gen_random_mention("aaronparecki.com", TYPE, target); + + let (read_post, _) = db.read_feed_with_cursor(target, None, 20, None).await.unwrap().unwrap(); + assert_eq!(post, read_post); + + db.add_or_update_webmention(target, TYPE, reply.clone()).await.unwrap(); + + let (read_post, _) = db.read_feed_with_cursor(target, None, 20, None).await.unwrap().unwrap(); + assert_eq!(read_post["properties"]["comment"][0], reply); + + reply["properties"]["content"][0] = json!(rand::random::<faker_rand::lorem::Paragraphs>().to_string()); + + db.add_or_update_webmention(target, TYPE, reply.clone()).await.unwrap(); + let (read_post, _) = db.read_feed_with_cursor(target, None, 20, None).await.unwrap().unwrap(); + assert_eq!(read_post["properties"]["comment"][0], reply); + } + + async fn test_pretty_permalinks<Backend: Storage>(db: Backend) { + const PERMALINK: &str = "https://fireburn.ru/posts/pretty-permalink"; + + let post = { + let mut post = gen_random_post("fireburn.ru"); + let urls = post["properties"]["url"].as_array_mut().unwrap(); + urls.push(serde_json::Value::String( + PERMALINK.to_owned() + )); + + post + }; + db.put_post(&post, "fireburn.ru").await.unwrap(); + + for i in post["properties"]["url"].as_array().unwrap() { + let (read_post, _) = db.read_feed_with_cursor(i.as_str().unwrap(), None, 20, None).await.unwrap().unwrap(); + assert_eq!(read_post, post); + } + } + /// Automatically generates a test suite for + macro_rules! test_all { + ($func_name:ident, $mod_name:ident) => { + mod $mod_name { + $func_name!(test_basic_operations); + $func_name!(test_get_channel_list); + $func_name!(test_settings); + $func_name!(test_update); + $func_name!(test_feed_pagination); + $func_name!(test_webmention_addition); + $func_name!(test_pretty_permalinks); + } + }; + } + macro_rules! file_test { + ($func_name:ident) => { + #[tokio::test] + #[tracing_test::traced_test] + async fn $func_name() { + let tempdir = tempfile::tempdir().expect("Failed to create tempdir"); + let backend = super::super::FileStorage::new( + tempdir.path().to_path_buf() + ) + .await + .unwrap(); + super::$func_name(backend).await + } + }; + } + + macro_rules! postgres_test { + ($func_name:ident) => { + #[cfg(feature = "sqlx")] + #[sqlx::test] + #[tracing_test::traced_test] + async fn $func_name( + pool_opts: sqlx::postgres::PgPoolOptions, + connect_opts: sqlx::postgres::PgConnectOptions + ) -> Result<(), sqlx::Error> { + let db = { + //use sqlx::ConnectOptions; + //connect_opts.log_statements(log::LevelFilter::Debug); + + pool_opts.connect_with(connect_opts).await? + }; + let backend = super::super::PostgresStorage::from_pool(db).await.unwrap(); + + Ok(super::$func_name(backend).await) + } + }; + } + + test_all!(file_test, file); + test_all!(postgres_test, postgres); +} diff --git a/src/database/postgres/mod.rs b/src/database/postgres/mod.rs new file mode 100644 index 0000000..9176d12 --- /dev/null +++ b/src/database/postgres/mod.rs @@ -0,0 +1,416 @@ +#![allow(unused_variables)] +use std::borrow::Cow; +use std::str::FromStr; + +use kittybox_util::{MicropubChannel, MentionType}; +use sqlx::{PgPool, Executor}; +use crate::micropub::{MicropubUpdate, MicropubPropertyDeletion}; + +use super::settings::Setting; +use super::{Storage, Result, StorageError, ErrorKind}; + +static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!(); + +impl From<sqlx::Error> for StorageError { + fn from(value: sqlx::Error) -> Self { + Self::with_source( + super::ErrorKind::Backend, + Cow::Owned(format!("sqlx error: {}", &value)), + Box::new(value) + ) + } +} + +impl From<sqlx::migrate::MigrateError> for StorageError { + fn from(value: sqlx::migrate::MigrateError) -> Self { + Self::with_source( + super::ErrorKind::Backend, + Cow::Owned(format!("sqlx migration error: {}", &value)), + Box::new(value) + ) + } +} + +#[derive(Debug, Clone)] +pub struct PostgresStorage { + db: PgPool +} + +impl PostgresStorage { + /// Construct a new [`PostgresStorage`] from an URI string and run + /// migrations on the database. + /// + /// If `PGPASS_FILE` environment variable is defined, read the + /// password from the file at the specified path. If, instead, + /// the `PGPASS` environment variable is present, read the + /// password from it. + pub async fn new(uri: &str) -> Result<Self> { + tracing::debug!("Postgres URL: {uri}"); + let mut options = sqlx::postgres::PgConnectOptions::from_str(uri)? + .options([("search_path", "kittybox")]); + if let Ok(password_file) = std::env::var("PGPASS_FILE") { + let password = tokio::fs::read_to_string(password_file).await.unwrap(); + options = options.password(&password); + } else if let Ok(password) = std::env::var("PGPASS") { + options = options.password(&password) + } + Self::from_pool( + sqlx::postgres::PgPoolOptions::new() + .max_connections(50) + .connect_with(options) + .await? + ).await + + } + + /// Construct a [`PostgresStorage`] from a [`sqlx::PgPool`], + /// running appropriate migrations. + pub async fn from_pool(db: sqlx::PgPool) -> Result<Self> { + db.execute(sqlx::query("CREATE SCHEMA IF NOT EXISTS kittybox")).await?; + MIGRATOR.run(&db).await?; + Ok(Self { db }) + } +} + +#[async_trait::async_trait] +impl Storage for PostgresStorage { + #[tracing::instrument(skip(self))] + async fn post_exists(&self, url: &str) -> Result<bool> { + sqlx::query_as::<_, (bool,)>("SELECT exists(SELECT 1 FROM kittybox.mf2_json WHERE uid = $1 OR mf2['properties']['url'] ? $1)") + .bind(url) + .fetch_one(&self.db) + .await + .map(|v| v.0) + .map_err(|err| err.into()) + } + + #[tracing::instrument(skip(self))] + async fn get_post(&self, url: &str) -> Result<Option<serde_json::Value>> { + sqlx::query_as::<_, (serde_json::Value,)>("SELECT mf2 FROM kittybox.mf2_json WHERE uid = $1 OR mf2['properties']['url'] ? $1") + .bind(url) + .fetch_optional(&self.db) + .await + .map(|v| v.map(|v| v.0)) + .map_err(|err| err.into()) + + } + + #[tracing::instrument(skip(self))] + async fn put_post(&self, post: &'_ serde_json::Value, user: &'_ str) -> Result<()> { + tracing::debug!("New post: {}", post); + sqlx::query("INSERT INTO kittybox.mf2_json (uid, mf2, owner) VALUES ($1 #>> '{properties,uid,0}', $1, $2)") + .bind(post) + .bind(user) + .execute(&self.db) + .await + .map(|_| ()) + .map_err(Into::into) + } + + #[tracing::instrument(skip(self))] + async fn add_to_feed(&self, feed: &'_ str, post: &'_ str) -> Result<()> { + tracing::debug!("Inserting {} into {}", post, feed); + sqlx::query("INSERT INTO kittybox.children (parent, child) VALUES ($1, $2) ON CONFLICT DO NOTHING") + .bind(feed) + .bind(post) + .execute(&self.db) + .await + .map(|_| ()) + .map_err(Into::into) + } + + #[tracing::instrument(skip(self))] + async fn remove_from_feed(&self, feed: &'_ str, post: &'_ str) -> Result<()> { + sqlx::query("DELETE FROM kittybox.children WHERE parent = $1 AND child = $2") + .bind(feed) + .bind(post) + .execute(&self.db) + .await + .map_err(Into::into) + .map(|_| ()) + } + + #[tracing::instrument(skip(self))] + async fn add_or_update_webmention(&self, target: &str, mention_type: MentionType, mention: serde_json::Value) -> Result<()> { + let mut txn = self.db.begin().await?; + + let (uid, mut post) = sqlx::query_as::<_, (String, serde_json::Value)>("SELECT uid, mf2 FROM kittybox.mf2_json WHERE uid = $1 OR mf2['properties']['url'] ? $1 FOR UPDATE") + .bind(target) + .fetch_optional(&mut *txn) + .await? + .ok_or(StorageError::from_static( + ErrorKind::NotFound, + "The specified post wasn't found in the database." + ))?; + + tracing::debug!("Loaded post for target {} with uid {}", target, uid); + + let key: &'static str = match mention_type { + MentionType::Reply => "comment", + MentionType::Like => "like", + MentionType::Repost => "repost", + MentionType::Bookmark => "bookmark", + MentionType::Mention => "mention", + }; + + tracing::debug!("Mention type -> key: {}", key); + + let mention_uid = mention["properties"]["uid"][0].clone(); + if let Some(values) = post["properties"][key].as_array_mut() { + for value in values.iter_mut() { + if value["properties"]["uid"][0] == mention_uid { + *value = mention; + break; + } + } + } else { + post["properties"][key] = serde_json::Value::Array(vec![mention]); + } + + sqlx::query("UPDATE kittybox.mf2_json SET mf2 = $2 WHERE uid = $1") + .bind(uid) + .bind(post) + .execute(&mut *txn) + .await?; + + txn.commit().await.map_err(Into::into) + } + #[tracing::instrument(skip(self))] + async fn update_post(&self, url: &'_ str, update: MicropubUpdate) -> Result<()> { + tracing::debug!("Updating post {}", url); + let mut txn = self.db.begin().await?; + let (uid, mut post) = sqlx::query_as::<_, (String, serde_json::Value)>("SELECT uid, mf2 FROM kittybox.mf2_json WHERE uid = $1 OR mf2['properties']['url'] ? $1 FOR UPDATE") + .bind(url) + .fetch_optional(&mut *txn) + .await? + .ok_or(StorageError::from_static( + ErrorKind::NotFound, + "The specified post wasn't found in the database." + ))?; + + if let Some(MicropubPropertyDeletion::Properties(ref delete)) = update.delete { + if let Some(props) = post["properties"].as_object_mut() { + for key in delete { + props.remove(key); + } + } + } else if let Some(MicropubPropertyDeletion::Values(ref delete)) = update.delete { + if let Some(props) = post["properties"].as_object_mut() { + for (key, values) in delete { + if let Some(prop) = props.get_mut(key).and_then(serde_json::Value::as_array_mut) { + prop.retain(|v| { values.iter().all(|i| i != v) }) + } + } + } + } + if let Some(replace) = update.replace { + if let Some(props) = post["properties"].as_object_mut() { + for (key, value) in replace { + props.insert(key, serde_json::Value::Array(value)); + } + } + } + if let Some(add) = update.add { + if let Some(props) = post["properties"].as_object_mut() { + for (key, value) in add { + if let Some(prop) = props.get_mut(&key).and_then(serde_json::Value::as_array_mut) { + prop.extend_from_slice(value.as_slice()); + } else { + props.insert(key, serde_json::Value::Array(value)); + } + } + } + } + + sqlx::query("UPDATE kittybox.mf2_json SET mf2 = $2 WHERE uid = $1") + .bind(uid) + .bind(post) + .execute(&mut *txn) + .await?; + + txn.commit().await.map_err(Into::into) + } + + #[tracing::instrument(skip(self))] + async fn get_channels(&self, user: &'_ str) -> Result<Vec<MicropubChannel>> { + /*sqlx::query_as::<_, MicropubChannel>("SELECT name, uid FROM kittybox.channels WHERE owner = $1") + .bind(user) + .fetch_all(&self.db) + .await + .map_err(|err| err.into())*/ + sqlx::query_as::<_, MicropubChannel>(r#"SELECT mf2 #>> '{properties,name,0}' as name, uid FROM kittybox.mf2_json WHERE '["h-feed"]'::jsonb @> mf2['type'] AND owner = $1"#) + .bind(user) + .fetch_all(&self.db) + .await + .map_err(|err| err.into()) + } + + #[tracing::instrument(skip(self))] + async fn read_feed_with_limit( + &self, + url: &'_ str, + after: &'_ Option<String>, + limit: usize, + user: &'_ Option<String>, + ) -> Result<Option<serde_json::Value>> { + let mut feed = match sqlx::query_as::<_, (serde_json::Value,)>(" +SELECT jsonb_set( + mf2, + '{properties,author,0}', + (SELECT mf2 FROM kittybox.mf2_json + WHERE uid = mf2 #>> '{properties,author,0}') +) FROM kittybox.mf2_json WHERE uid = $1 OR mf2['properties']['url'] ? $1 +") + .bind(url) + .fetch_optional(&self.db) + .await? + .map(|v| v.0) + { + Some(feed) => feed, + None => return Ok(None) + }; + + let posts: Vec<String> = { + let mut posts_iter = feed["children"] + .as_array() + .cloned() + .unwrap_or_default() + .into_iter() + .map(|s| s.as_str().unwrap().to_string()); + if let Some(after) = after { + for s in posts_iter.by_ref() { + if &s == after { + break; + } + } + }; + + posts_iter.take(limit).collect::<Vec<_>>() + }; + feed["children"] = serde_json::Value::Array( + sqlx::query_as::<_, (serde_json::Value,)>(" +SELECT jsonb_set( + mf2, + '{properties,author,0}', + (SELECT mf2 FROM kittybox.mf2_json + WHERE uid = mf2 #>> '{properties,author,0}') +) FROM kittybox.mf2_json +WHERE uid = ANY($1) +ORDER BY mf2 #>> '{properties,published,0}' DESC +") + .bind(&posts[..]) + .fetch_all(&self.db) + .await? + .into_iter() + .map(|v| v.0) + .collect::<Vec<_>>() + ); + + Ok(Some(feed)) + + } + + #[tracing::instrument(skip(self))] + async fn read_feed_with_cursor( + &self, + url: &'_ str, + cursor: Option<&'_ str>, + limit: usize, + user: Option<&'_ str> + ) -> Result<Option<(serde_json::Value, Option<String>)>> { + let mut txn = self.db.begin().await?; + sqlx::query("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ, READ ONLY") + .execute(&mut *txn) + .await?; + tracing::debug!("Started txn: {:?}", txn); + let mut feed = match sqlx::query_scalar::<_, serde_json::Value>(" +SELECT kittybox.hydrate_author(mf2) FROM kittybox.mf2_json WHERE uid = $1 OR mf2['properties']['url'] ? $1 +") + .bind(url) + .fetch_optional(&mut *txn) + .await? + { + Some(feed) => feed, + None => return Ok(None) + }; + + // Don't query for children if this isn't a feed. + // + // The second query is very long and will probably be extremely + // expensive. It's best to skip it on types where it doesn't make sense + // (Kittybox doesn't support rendering children on non-feeds) + if !feed["type"].as_array().unwrap().iter().any(|t| *t == serde_json::json!("h-feed")) { + return Ok(Some((feed, None))); + } + + feed["children"] = sqlx::query_scalar::<_, serde_json::Value>(" +SELECT kittybox.hydrate_author(mf2) FROM kittybox.mf2_json +INNER JOIN kittybox.children +ON mf2_json.uid = children.child +WHERE + children.parent = $1 + AND ( + ( + (mf2 #>> '{properties,visibility,0}') = 'public' + OR + NOT (mf2['properties'] ? 'visibility') + ) + OR + ( + $3 != null AND ( + mf2['properties']['audience'] ? $3 + OR mf2['properties']['author'] ? $3 + ) + ) + ) + AND ($4 IS NULL OR ((mf2_json.mf2 #>> '{properties,published,0}') < $4)) +ORDER BY (mf2_json.mf2 #>> '{properties,published,0}') DESC +LIMIT $2" + ) + .bind(url) + .bind(limit as i64) + .bind(user) + .bind(cursor) + .fetch_all(&mut *txn) + .await + .map(serde_json::Value::Array)?; + + let new_cursor = feed["children"].as_array().unwrap() + .last() + .map(|v| v["properties"]["published"][0].as_str().unwrap().to_owned()); + + txn.commit().await?; + + Ok(Some((feed, new_cursor))) + } + + #[tracing::instrument(skip(self))] + async fn delete_post(&self, url: &'_ str) -> Result<()> { + todo!() + } + + #[tracing::instrument(skip(self))] + async fn get_setting<S: Setting<'a>, 'a>(&'_ self, user: &'_ str) -> Result<S> { + match sqlx::query_as::<_, (serde_json::Value,)>("SELECT kittybox.get_setting($1, $2)") + .bind(user) + .bind(S::ID) + .fetch_one(&self.db) + .await + { + Ok((value,)) => Ok(serde_json::from_value(value)?), + Err(err) => Err(err.into()) + } + } + + #[tracing::instrument(skip(self))] + async fn set_setting<S: Setting<'a> + 'a, 'a>(&self, user: &'a str, value: S::Data) -> Result<()> { + sqlx::query("SELECT kittybox.set_setting($1, $2, $3)") + .bind(user) + .bind(S::ID) + .bind(serde_json::to_value(S::new(value)).unwrap()) + .execute(&self.db) + .await + .map_err(Into::into) + .map(|_| ()) + } +} diff --git a/src/database/redis/edit_post.lua b/src/database/redis/edit_post.lua new file mode 100644 index 0000000..a398f8d --- /dev/null +++ b/src/database/redis/edit_post.lua @@ -0,0 +1,93 @@ +local posts = KEYS[1] +local update_desc = cjson.decode(ARGV[2]) +local post = cjson.decode(redis.call("HGET", posts, ARGV[1])) + +local delete_keys = {} +local delete_kvs = {} +local add_keys = {} + +if update_desc.replace ~= nil then + for k, v in pairs(update_desc.replace) do + table.insert(delete_keys, k) + add_keys[k] = v + end +end +if update_desc.delete ~= nil then + if update_desc.delete[0] == nil then + -- Table has string keys. Probably! + for k, v in pairs(update_desc.delete) do + delete_kvs[k] = v + end + else + -- Table has numeric keys. Probably! + for i, v in ipairs(update_desc.delete) do + table.insert(delete_keys, v) + end + end +end +if update_desc.add ~= nil then + for k, v in pairs(update_desc.add) do + add_keys[k] = v + end +end + +for i, v in ipairs(delete_keys) do + post["properties"][v] = nil + -- TODO delete URL links +end + +for k, v in pairs(delete_kvs) do + local index = -1 + if k == "children" then + for j, w in ipairs(post[k]) do + if w == v then + index = j + break + end + end + if index > -1 then + table.remove(post[k], index) + end + else + for j, w in ipairs(post["properties"][k]) do + if w == v then + index = j + break + end + end + if index > -1 then + table.remove(post["properties"][k], index) + -- TODO delete URL links + end + end +end + +for k, v in pairs(add_keys) do + if k == "children" then + if post["children"] == nil then + post["children"] = {} + end + for i, w in ipairs(v) do + table.insert(post["children"], 1, w) + end + else + if post["properties"][k] == nil then + post["properties"][k] = {} + end + for i, w in ipairs(v) do + table.insert(post["properties"][k], w) + end + if k == "url" then + redis.call("HSET", posts, v, cjson.encode({ see_other = post["properties"]["uid"][1] })) + elseif k == "channel" then + local feed = cjson.decode(redis.call("HGET", posts, v)) + table.insert(feed["children"], 1, post["properties"]["uid"][1]) + redis.call("HSET", posts, v, cjson.encode(feed)) + end + end +end + +local encoded = cjson.encode(post) +redis.call("SET", "debug", encoded) +redis.call("HSET", posts, post["properties"]["uid"][1], encoded) +return \ No newline at end of file diff --git a/src/database/redis/mod.rs b/src/database/redis/mod.rs new file mode 100644 index 0000000..39ee852 --- /dev/null +++ b/src/database/redis/mod.rs @@ -0,0 +1,398 @@ +use async_trait::async_trait; +use futures::stream; +use futures_util::FutureExt; +use futures_util::StreamExt; +use futures_util::TryStream; +use futures_util::TryStreamExt; +use lazy_static::lazy_static; +use log::error; +use mobc::Pool; +use mobc_redis::redis; +use mobc_redis::redis::AsyncCommands; +use mobc_redis::RedisConnectionManager; +use serde_json::json; +use std::time::Duration; + +use crate::database::{ErrorKind, MicropubChannel, Result, Storage, StorageError, filter_post}; +use crate::indieauth::User; + +struct RedisScripts { + edit_post: redis::Script, +} + +impl From<mobc_redis::redis::RedisError> for StorageError { + fn from(err: mobc_redis::redis::RedisError) -> Self { + Self { + msg: format!("{}", err), + source: Some(Box::new(err)), + kind: ErrorKind::Backend, + } + } +} +impl From<mobc::Error<mobc_redis::redis::RedisError>> for StorageError { + fn from(err: mobc::Error<mobc_redis::redis::RedisError>) -> Self { + Self { + msg: format!("{}", err), + source: Some(Box::new(err)), + kind: ErrorKind::Backend, + } + } +} + +lazy_static! { + static ref SCRIPTS: RedisScripts = RedisScripts { + edit_post: redis::Script::new(include_str!("./edit_post.lua")) + }; +} +/*#[cfg(feature(lazy_cell))] +static SCRIPTS_CELL: std::cell::LazyCell<RedisScripts> = std::cell::LazyCell::new(|| { + RedisScripts { + edit_post: redis::Script::new(include_str!("./edit_post.lua")) + } +});*/ + +#[derive(Clone)] +pub struct RedisStorage { + // note to future Vika: + // mobc::Pool is actually a fancy name for an Arc + // around a shared connection pool with a manager + // which makes it safe to implement [`Clone`] and + // not worry about new pools being suddenly made + // + // stop worrying and start coding, you dum-dum + redis: mobc::Pool<RedisConnectionManager>, +} + +#[async_trait] +impl Storage for RedisStorage { + async fn get_setting<'a>(&self, setting: &'a str, user: &'a str) -> Result<String> { + let mut conn = self.redis.get().await.map_err(|e| StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e)))?; + Ok(conn + .hget::<String, &str, String>(format!("settings_{}", user), setting) + .await?) + } + + async fn set_setting<'a>(&self, setting: &'a str, user: &'a str, value: &'a str) -> Result<()> { + let mut conn = self.redis.get().await.map_err(|e| StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e)))?; + Ok(conn + .hset::<String, &str, &str, ()>(format!("settings_{}", user), setting, value) + .await?) + } + + async fn delete_post<'a>(&self, url: &'a str) -> Result<()> { + let mut conn = self.redis.get().await.map_err(|e| StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e)))?; + Ok(conn.hdel::<&str, &str, ()>("posts", url).await?) + } + + async fn post_exists(&self, url: &str) -> Result<bool> { + let mut conn = self.redis.get().await.map_err(|e| StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e)))?; + Ok(conn.hexists::<&str, &str, bool>("posts", url).await?) + } + + async fn get_post(&self, url: &str) -> Result<Option<serde_json::Value>> { + let mut conn = self.redis.get().await.map_err(|e| StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e)))?; + match conn + .hget::<&str, &str, Option<String>>("posts", url) + .await? + { + Some(val) => { + let parsed = serde_json::from_str::<serde_json::Value>(&val)?; + if let Some(new_url) = parsed["see_other"].as_str() { + match conn + .hget::<&str, &str, Option<String>>("posts", new_url) + .await? + { + Some(val) => Ok(Some(serde_json::from_str::<serde_json::Value>(&val)?)), + None => Ok(None), + } + } else { + Ok(Some(parsed)) + } + } + None => Ok(None), + } + } + + async fn get_channels(&self, user: &User) -> Result<Vec<MicropubChannel>> { + let mut conn = self.redis.get().await.map_err(|e| StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e)))?; + let channels = conn + .smembers::<String, Vec<String>>("channels_".to_string() + user.me.as_str()) + .await?; + // TODO: use streams here instead of this weird thing... how did I even write this?! + Ok(futures_util::future::join_all( + channels + .iter() + .map(|channel| { + self.get_post(channel).map(|result| result.unwrap()).map( + |post: Option<serde_json::Value>| { + post.map(|post| MicropubChannel { + uid: post["properties"]["uid"][0].as_str().unwrap().to_string(), + name: post["properties"]["name"][0].as_str().unwrap().to_string(), + }) + }, + ) + }) + .collect::<Vec<_>>(), + ) + .await + .into_iter() + .flatten() + .collect::<Vec<_>>()) + } + + async fn put_post<'a>(&self, post: &'a serde_json::Value, user: &'a str) -> Result<()> { + let mut conn = self.redis.get().await.map_err(|e| StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e)))?; + let key: &str; + match post["properties"]["uid"][0].as_str() { + Some(uid) => key = uid, + None => { + return Err(StorageError::new( + ErrorKind::BadRequest, + "post doesn't have a UID", + )) + } + } + conn.hset::<&str, &str, String, ()>("posts", key, post.to_string()) + .await?; + if post["properties"]["url"].is_array() { + for url in post["properties"]["url"] + .as_array() + .unwrap() + .iter() + .map(|i| i.as_str().unwrap().to_string()) + { + if url != key && url.starts_with(user) { + conn.hset::<&str, &str, String, ()>( + "posts", + &url, + json!({ "see_other": key }).to_string(), + ) + .await?; + } + } + } + if post["type"] + .as_array() + .unwrap() + .iter() + .any(|i| i == "h-feed") + { + // This is a feed. Add it to the channels array if it's not already there. + conn.sadd::<String, &str, ()>( + "channels_".to_string() + post["properties"]["author"][0].as_str().unwrap(), + key, + ) + .await? + } + Ok(()) + } + + async fn read_feed_with_limit<'a>( + &self, + url: &'a str, + after: &'a Option<String>, + limit: usize, + user: &'a Option<String>, + ) -> Result<Option<serde_json::Value>> { + let mut conn = self.redis.get().await?; + let mut feed; + match conn + .hget::<&str, &str, Option<String>>("posts", url) + .await + .map_err(|e| StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e)))? + { + Some(post) => feed = serde_json::from_str::<serde_json::Value>(&post)?, + None => return Ok(None), + } + if feed["see_other"].is_string() { + match conn + .hget::<&str, &str, Option<String>>("posts", feed["see_other"].as_str().unwrap()) + .await? + { + Some(post) => feed = serde_json::from_str::<serde_json::Value>(&post)?, + None => return Ok(None), + } + } + if let Some(post) = filter_post(feed, user) { + feed = post + } else { + return Err(StorageError::new( + ErrorKind::PermissionDenied, + "specified user cannot access this post", + )); + } + if feed["children"].is_array() { + let children = feed["children"].as_array().unwrap(); + let mut posts_iter = children.iter().map(|i| i.as_str().unwrap().to_string()); + if after.is_some() { + loop { + let i = posts_iter.next(); + if &i == after { + break; + } + } + } + async fn fetch_post_for_feed(url: String) -> Option<serde_json::Value> { + return Some(serde_json::json!({})); + } + let posts = stream::iter(posts_iter) + .map(|url: String| async move { + return Ok(fetch_post_for_feed(url).await); + /*match self.redis.get().await { + Ok(mut conn) => { + match conn.hget::<&str, &str, Option<String>>("posts", &url).await { + Ok(post) => match post { + Some(post) => { + Ok(Some(serde_json::from_str(&post)?)) + } + // Happens because of a broken link (result of an improper deletion?) + None => Ok(None), + }, + Err(err) => Err(StorageError::with_source(ErrorKind::Backend, "Error executing a Redis command", Box::new(err))) + } + } + Err(err) => Err(StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(err))) + }*/ + }) + // TODO: determine the optimal value for this buffer + // It will probably depend on how often can you encounter a private post on the page + // It shouldn't be too large, or we'll start fetching too many posts from the database + // It MUST NOT be larger than the typical page size + // It MUST NOT be a significant amount of the connection pool size + //.buffered(std::cmp::min(3, limit)) + // Hack to unwrap the Option and sieve out broken links + // Broken links return None, and Stream::filter_map skips all Nones. + // I wonder if one can use try_flatten() here somehow akin to iters + .try_filter_map(|post| async move { Ok(post) }) + .try_filter_map(|post| async move { + Ok(filter_post(post, user)) + }) + .take(limit); + match posts.try_collect::<Vec<serde_json::Value>>().await { + Ok(posts) => feed["children"] = json!(posts), + Err(err) => { + let e = StorageError::with_source( + ErrorKind::Other, + "An error was encountered while processing the feed", + Box::new(err) + ); + error!("Error while assembling feed: {}", e); + return Err(e); + } + } + } + return Ok(Some(feed)); + } + + async fn update_post<'a>(&self, mut url: &'a str, update: serde_json::Value) -> Result<()> { + let mut conn = self.redis.get().await.map_err(|e| StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e)))?; + if !conn + .hexists::<&str, &str, bool>("posts", url) + .await + .unwrap() + { + return Err(StorageError::new( + ErrorKind::NotFound, + "can't edit a non-existent post", + )); + } + let post: serde_json::Value = + serde_json::from_str(&conn.hget::<&str, &str, String>("posts", url).await?)?; + if let Some(new_url) = post["see_other"].as_str() { + url = new_url + } + Ok(SCRIPTS + .edit_post + .key("posts") + .arg(url) + .arg(update.to_string()) + .invoke_async::<_, ()>(&mut conn as &mut redis::aio::Connection) + .await?) + } +} + +impl RedisStorage { + /// Create a new RedisDatabase that will connect to Redis at `redis_uri` to store data. + pub async fn new(redis_uri: String) -> Result<Self> { + match redis::Client::open(redis_uri) { + Ok(client) => Ok(Self { + redis: Pool::builder() + .max_open(20) + .max_idle(5) + .get_timeout(Some(Duration::from_secs(3))) + .max_lifetime(Some(Duration::from_secs(120))) + .build(RedisConnectionManager::new(client)), + }), + Err(e) => Err(e.into()), + } + } + + pub async fn conn(&self) -> Result<mobc::Connection<mobc_redis::RedisConnectionManager>> { + self.redis.get().await.map_err(|e| StorageError::with_source( + ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e) + )) + } +} + +#[cfg(test)] +pub mod tests { + use mobc_redis::redis; + use std::process; + use std::time::Duration; + + pub struct RedisInstance { + // We just need to hold on to it so it won't get dropped and remove the socket + _tempdir: tempdir::TempDir, + uri: String, + child: std::process::Child, + } + impl Drop for RedisInstance { + fn drop(&mut self) { + self.child.kill().expect("Failed to kill the child!"); + } + } + impl RedisInstance { + pub fn uri(&self) -> &str { + &self.uri + } + } + + pub async fn get_redis_instance() -> RedisInstance { + let tempdir = tempdir::TempDir::new("redis").expect("failed to create tempdir"); + let socket = tempdir.path().join("redis.sock"); + let redis_child = process::Command::new("redis-server") + .current_dir(&tempdir) + .arg("--port") + .arg("0") + .arg("--unixsocket") + .arg(&socket) + .stdout(process::Stdio::null()) + .stderr(process::Stdio::null()) + .spawn() + .expect("Failed to spawn Redis"); + println!("redis+unix:///{}", socket.to_str().unwrap()); + let uri = format!("redis+unix:///{}", socket.to_str().unwrap()); + // There should be a slight delay, we need to wait for Redis to spin up + let client = redis::Client::open(uri.clone()).unwrap(); + let millisecond = Duration::from_millis(1); + let mut retries: usize = 0; + const MAX_RETRIES: usize = 60 * 1000/*ms*/; + while let Err(err) = client.get_connection() { + if err.is_connection_refusal() { + async_std::task::sleep(millisecond).await; + retries += 1; + if retries > MAX_RETRIES { + panic!("Timeout waiting for Redis, last error: {}", err); + } + } else { + panic!("Could not connect: {}", err); + } + } + + RedisInstance { + uri, + child: redis_child, + _tempdir: tempdir, + } + } +} |