diff options
author | Vika <vika@fireburn.ru> | 2023-07-29 21:59:56 +0300 |
---|---|---|
committer | Vika <vika@fireburn.ru> | 2023-07-29 21:59:56 +0300 |
commit | 0617663b249f9ca488e5de652108b17d67fbaf45 (patch) | |
tree | 11564b6c8fa37bf9203a0a4cc1c4e9cc088cb1a5 /kittybox-rs/src/database | |
parent | 26c2b79f6a6380ae3224e9309b9f3352f5717bd7 (diff) | |
download | kittybox-0617663b249f9ca488e5de652108b17d67fbaf45.tar.zst |
Moved the entire Kittybox tree into the root
Diffstat (limited to 'kittybox-rs/src/database')
-rw-r--r-- | kittybox-rs/src/database/file/mod.rs | 733 | ||||
-rw-r--r-- | kittybox-rs/src/database/memory.rs | 249 | ||||
-rw-r--r-- | kittybox-rs/src/database/mod.rs | 793 | ||||
-rw-r--r-- | kittybox-rs/src/database/postgres/mod.rs | 416 | ||||
-rw-r--r-- | kittybox-rs/src/database/redis/edit_post.lua | 93 | ||||
-rw-r--r-- | kittybox-rs/src/database/redis/mod.rs | 398 |
6 files changed, 0 insertions, 2682 deletions
diff --git a/kittybox-rs/src/database/file/mod.rs b/kittybox-rs/src/database/file/mod.rs deleted file mode 100644 index 27d3da1..0000000 --- a/kittybox-rs/src/database/file/mod.rs +++ /dev/null @@ -1,733 +0,0 @@ -//#![warn(clippy::unwrap_used)] -use crate::database::{ErrorKind, Result, settings, Storage, StorageError}; -use crate::micropub::{MicropubUpdate, MicropubPropertyDeletion}; -use async_trait::async_trait; -use futures::{stream, StreamExt, TryStreamExt}; -use kittybox_util::MentionType; -use serde_json::json; -use std::borrow::Cow; -use std::collections::HashMap; -use std::io::ErrorKind as IOErrorKind; -use std::path::{Path, PathBuf}; -use tokio::fs::{File, OpenOptions}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::task::spawn_blocking; -use tracing::{debug, error}; - -impl From<std::io::Error> for StorageError { - fn from(source: std::io::Error) -> Self { - Self::with_source( - match source.kind() { - IOErrorKind::NotFound => ErrorKind::NotFound, - IOErrorKind::AlreadyExists => ErrorKind::Conflict, - _ => ErrorKind::Backend, - }, - Cow::Owned(format!("file I/O error: {}", &source)), - Box::new(source), - ) - } -} - -impl From<tokio::time::error::Elapsed> for StorageError { - fn from(source: tokio::time::error::Elapsed) -> Self { - Self::with_source( - ErrorKind::Backend, - Cow::Borrowed("timeout on I/O operation"), - Box::new(source), - ) - } -} - -// Copied from https://stackoverflow.com/questions/39340924 -// This routine is adapted from the *old* Path's `path_relative_from` -// function, which works differently from the new `relative_from` function. -// In particular, this handles the case on unix where both paths are -// absolute but with only the root as the common directory. -fn path_relative_from(path: &Path, base: &Path) -> Option<PathBuf> { - use std::path::Component; - - if path.is_absolute() != base.is_absolute() { - if path.is_absolute() { - Some(PathBuf::from(path)) - } else { - None - } - } else { - let mut ita = path.components(); - let mut itb = base.components(); - let mut comps: Vec<Component> = vec![]; - loop { - match (ita.next(), itb.next()) { - (None, None) => break, - (Some(a), None) => { - comps.push(a); - comps.extend(ita.by_ref()); - break; - } - (None, _) => comps.push(Component::ParentDir), - (Some(a), Some(b)) if comps.is_empty() && a == b => (), - (Some(a), Some(b)) if b == Component::CurDir => comps.push(a), - (Some(_), Some(b)) if b == Component::ParentDir => return None, - (Some(a), Some(_)) => { - comps.push(Component::ParentDir); - for _ in itb { - comps.push(Component::ParentDir); - } - comps.push(a); - comps.extend(ita.by_ref()); - break; - } - } - } - Some(comps.iter().map(|c| c.as_os_str()).collect()) - } -} - -#[allow(clippy::unwrap_used, clippy::expect_used)] -#[cfg(test)] -mod tests { - #[test] - fn test_relative_path_resolving() { - let path1 = std::path::Path::new("/home/vika/Projects/kittybox"); - let path2 = std::path::Path::new("/home/vika/Projects/nixpkgs"); - let relative_path = super::path_relative_from(path2, path1).unwrap(); - - assert_eq!(relative_path, std::path::Path::new("../nixpkgs")) - } -} - -// TODO: Check that the path ACTUALLY IS INSIDE THE ROOT FOLDER -// This could be checked by completely resolving the path -// and checking if it has a common prefix -fn url_to_path(root: &Path, url: &str) -> PathBuf { - let path = url_to_relative_path(url).to_logical_path(root); - if !path.starts_with(root) { - // TODO: handle more gracefully - panic!("Security error: {:?} is not a prefix of {:?}", path, root) - } else { - path - } -} - -fn url_to_relative_path(url: &str) -> relative_path::RelativePathBuf { - let url = url::Url::try_from(url).expect("Couldn't parse a URL"); - let mut path = relative_path::RelativePathBuf::new(); - let user_domain = format!( - "{}{}", - url.host_str().unwrap(), - url.port() - .map(|port| format!(":{}", port)) - .unwrap_or_default() - ); - path.push(user_domain + url.path() + ".json"); - - path -} - -fn modify_post(post: &serde_json::Value, update: MicropubUpdate) -> Result<serde_json::Value> { - let mut post = post.clone(); - - let mut add_keys: HashMap<String, Vec<serde_json::Value>> = HashMap::new(); - let mut remove_keys: Vec<String> = vec![]; - let mut remove_values: HashMap<String, Vec<serde_json::Value>> = HashMap::new(); - - if let Some(MicropubPropertyDeletion::Properties(delete)) = update.delete { - remove_keys.extend(delete.iter().cloned()); - } else if let Some(MicropubPropertyDeletion::Values(delete)) = update.delete { - for (k, v) in delete { - remove_values - .entry(k.to_string()) - .or_default() - .extend(v.clone()); - } - } - if let Some(add) = update.add { - for (k, v) in add { - add_keys.insert(k.to_string(), v.clone()); - } - } - if let Some(replace) = update.replace { - for (k, v) in replace { - remove_keys.push(k.to_string()); - add_keys.insert(k.to_string(), v.clone()); - } - } - - if let Some(props) = post["properties"].as_object_mut() { - for k in remove_keys { - props.remove(&k); - } - } - for (k, v) in remove_values { - let k = &k; - let props = if k == "children" { - &mut post - } else { - &mut post["properties"] - }; - v.iter().for_each(|v| { - if let Some(vec) = props[k].as_array_mut() { - if let Some(index) = vec.iter().position(|w| w == v) { - vec.remove(index); - } - } - }); - } - for (k, v) in add_keys { - tracing::debug!("Adding k/v to post: {} => {:?}", k, v); - let props = if k == "children" { - &mut post - } else { - &mut post["properties"] - }; - if let Some(prop) = props[&k].as_array_mut() { - if k == "children" { - v.into_iter().rev().for_each(|v| prop.insert(0, v)); - } else { - prop.extend(v.into_iter()); - } - } else { - props[&k] = serde_json::Value::Array(v) - } - } - Ok(post) -} - -#[derive(Clone, Debug)] -/// A backend using a folder with JSON files as a backing store. -/// Uses symbolic links to represent a many-to-one mapping of URLs to a post. -pub struct FileStorage { - root_dir: PathBuf, -} - -impl FileStorage { - /// Create a new storage wrapping a folder specified by root_dir. - pub async fn new(root_dir: PathBuf) -> Result<Self> { - // TODO check if the dir is writable - Ok(Self { root_dir }) - } -} - -async fn hydrate_author<S: Storage>( - feed: &mut serde_json::Value, - user: &'_ Option<String>, - storage: &S, -) { - let url = feed["properties"]["uid"][0] - .as_str() - .expect("MF2 value should have a UID set! Check if you used normalize_mf2 before recording the post!"); - if let Some(author) = feed["properties"]["author"].as_array().cloned() { - if !feed["type"] - .as_array() - .expect("MF2 value should have a type set!") - .iter() - .any(|i| i == "h-card") - { - let author_list: Vec<serde_json::Value> = stream::iter(author.iter()) - .then(|i| async move { - if let Some(i) = i.as_str() { - match storage.get_post(i).await { - Ok(post) => match post { - Some(post) => post, - None => json!(i), - }, - Err(e) => { - error!("Error while hydrating post {}: {}", url, e); - json!(i) - } - } - } else { - i.clone() - } - }) - .collect::<Vec<_>>() - .await; - if let Some(props) = feed["properties"].as_object_mut() { - props["author"] = json!(author_list); - } else { - feed["properties"] = json!({ "author": author_list }); - } - } - } -} - -#[async_trait] -impl Storage for FileStorage { - #[tracing::instrument(skip(self))] - async fn post_exists(&self, url: &str) -> Result<bool> { - let path = url_to_path(&self.root_dir, url); - debug!("Checking if {:?} exists...", path); - /*let result = match tokio::fs::metadata(path).await { - Ok(metadata) => { - Ok(true) - }, - Err(err) => { - if err.kind() == IOErrorKind::NotFound { - Ok(false) - } else { - Err(err.into()) - } - } - };*/ - #[allow(clippy::unwrap_used)] // JoinHandle captures panics, this closure shouldn't panic - Ok(spawn_blocking(move || path.is_file()).await.unwrap()) - } - - #[tracing::instrument(skip(self))] - async fn get_post(&self, url: &str) -> Result<Option<serde_json::Value>> { - let path = url_to_path(&self.root_dir, url); - // TODO: check that the path actually belongs to the dir of user who requested it - // it's not like you CAN access someone else's private posts with it - // so it's not exactly a security issue, but it's still not good - debug!("Opening {:?}", path); - - match File::open(&path).await { - Ok(mut file) => { - let mut content = String::new(); - // Typechecks because OS magic acts on references - // to FDs as if they were behind a mutex - AsyncReadExt::read_to_string(&mut file, &mut content).await?; - debug!( - "Read {} bytes successfully from {:?}", - content.as_bytes().len(), - &path - ); - Ok(Some(serde_json::from_str(&content)?)) - } - Err(err) => { - if err.kind() == IOErrorKind::NotFound { - Ok(None) - } else { - Err(err.into()) - } - } - } - } - - #[tracing::instrument(skip(self))] - async fn put_post(&self, post: &'_ serde_json::Value, user: &'_ str) -> Result<()> { - let key = post["properties"]["uid"][0] - .as_str() - .expect("Tried to save a post without UID"); - let path = url_to_path(&self.root_dir, key); - let tempfile = (&path).with_extension("tmp"); - debug!("Creating {:?}", path); - - let parent = path - .parent() - .expect("Parent for this directory should always exist") - .to_owned(); - tokio::fs::create_dir_all(&parent).await?; - - let mut file = tokio::fs::OpenOptions::new() - .write(true) - .create_new(true) - .open(&tempfile) - .await?; - - file.write_all(post.to_string().as_bytes()).await?; - file.flush().await?; - file.sync_all().await?; - drop(file); - tokio::fs::rename(&tempfile, &path).await?; - tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?; - - if let Some(urls) = post["properties"]["url"].as_array() { - for url in urls.iter().map(|i| i.as_str().unwrap()) { - let url_domain = { - let url = url::Url::parse(url).unwrap(); - format!( - "{}{}", - url.host_str().unwrap(), - url.port() - .map(|port| format!(":{}", port)) - .unwrap_or_default() - ) - }; - if url != key && url_domain == user { - let link = url_to_path(&self.root_dir, url); - debug!("Creating a symlink at {:?}", link); - let orig = path.clone(); - // We're supposed to have a parent here. - let basedir = link.parent().ok_or_else(|| { - StorageError::from_static( - ErrorKind::Backend, - "Failed to calculate parent directory when creating a symlink", - ) - })?; - let relative = path_relative_from(&orig, basedir).unwrap(); - println!("{:?} - {:?} = {:?}", &orig, &basedir, &relative); - tokio::fs::symlink(relative, link).await?; - } - } - } - - if post["type"] - .as_array() - .unwrap() - .iter() - .any(|s| s.as_str() == Some("h-feed")) - { - tracing::debug!("Adding to channel list..."); - // Add the h-feed to the channel list - let path = { - let mut path = relative_path::RelativePathBuf::new(); - path.push(user); - path.push("channels"); - - path.to_path(&self.root_dir) - }; - tokio::fs::create_dir_all(path.parent().unwrap()).await?; - tracing::debug!("Channels file path: {}", path.display()); - let tempfilename = path.with_extension("tmp"); - let channel_name = post["properties"]["name"][0] - .as_str() - .map(|s| s.to_string()) - .unwrap_or_else(String::default); - let key = key.to_string(); - tracing::debug!("Opening temporary file to modify chnanels..."); - let mut tempfile = OpenOptions::new() - .write(true) - .create_new(true) - .open(&tempfilename) - .await?; - tracing::debug!("Opening real channel file..."); - let mut channels: Vec<super::MicropubChannel> = { - match OpenOptions::new() - .read(true) - .write(false) - .truncate(false) - .create(false) - .open(&path) - .await - { - Err(err) if err.kind() == std::io::ErrorKind::NotFound => { - Vec::default() - } - Err(err) => { - // Propagate the error upwards - return Err(err.into()); - } - Ok(mut file) => { - let mut content = String::new(); - file.read_to_string(&mut content).await?; - drop(file); - - if !content.is_empty() { - serde_json::from_str(&content)? - } else { - Vec::default() - } - } - } - }; - - channels.push(super::MicropubChannel { - uid: key.to_string(), - name: channel_name, - }); - - tempfile - .write_all(serde_json::to_string(&channels)?.as_bytes()) - .await?; - tempfile.flush().await?; - tempfile.sync_all().await?; - drop(tempfile); - tokio::fs::rename(tempfilename, &path).await?; - tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?; - } - Ok(()) - } - - #[tracing::instrument(skip(self))] - async fn update_post(&self, url: &str, update: MicropubUpdate) -> Result<()> { - let path = url_to_path(&self.root_dir, url); - let tempfilename = path.with_extension("tmp"); - #[allow(unused_variables)] - let (old_json, new_json) = { - let mut temp = OpenOptions::new() - .write(true) - .create_new(true) - .open(&tempfilename) - .await?; - let mut file = OpenOptions::new().read(true).open(&path).await?; - - let mut content = String::new(); - file.read_to_string(&mut content).await?; - let json: serde_json::Value = serde_json::from_str(&content)?; - drop(file); - // Apply the editing algorithms - let new_json = modify_post(&json, update)?; - - temp.write_all(new_json.to_string().as_bytes()).await?; - temp.flush().await?; - temp.sync_all().await?; - drop(temp); - tokio::fs::rename(tempfilename, &path).await?; - tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?; - - (json, new_json) - }; - // TODO check if URLs changed between old and new JSON - Ok(()) - } - - #[tracing::instrument(skip(self))] - async fn get_channels(&self, user: &'_ str) -> Result<Vec<super::MicropubChannel>> { - let mut path = relative_path::RelativePathBuf::new(); - path.push(user); - path.push("channels"); - - let path = path.to_path(&self.root_dir); - tracing::debug!("Channels file path: {}", path.display()); - - match File::open(&path).await { - Ok(mut f) => { - let mut content = String::new(); - f.read_to_string(&mut content).await?; - // This should not happen, but if it does, handle it gracefully - if content.is_empty() { - return Ok(vec![]); - } - let channels: Vec<super::MicropubChannel> = serde_json::from_str(&content)?; - Ok(channels) - } - Err(e) => { - if e.kind() == IOErrorKind::NotFound { - Ok(vec![]) - } else { - Err(e.into()) - } - } - } - } - - async fn read_feed_with_cursor( - &self, - url: &'_ str, - cursor: Option<&'_ str>, - limit: usize, - user: Option<&'_ str> - ) -> Result<Option<(serde_json::Value, Option<String>)>> { - Ok(self.read_feed_with_limit( - url, - &cursor.map(|v| v.to_owned()), - limit, - &user.map(|v| v.to_owned()) - ).await? - .map(|feed| { - tracing::debug!("Feed: {:#}", serde_json::Value::Array( - feed["children"] - .as_array() - .map(|v| v.as_slice()) - .unwrap_or_default() - .iter() - .map(|mf2| mf2["properties"]["uid"][0].clone()) - .collect::<Vec<_>>() - )); - let cursor: Option<String> = feed["children"] - .as_array() - .map(|v| v.as_slice()) - .unwrap_or_default() - .last() - .map(|v| v["properties"]["uid"][0].as_str().unwrap().to_owned()); - tracing::debug!("Extracted the cursor: {:?}", cursor); - (feed, cursor) - }) - ) - } - - #[tracing::instrument(skip(self))] - async fn read_feed_with_limit( - &self, - url: &'_ str, - after: &'_ Option<String>, - limit: usize, - user: &'_ Option<String>, - ) -> Result<Option<serde_json::Value>> { - if let Some(mut feed) = self.get_post(url).await? { - if feed["children"].is_array() { - // Take this out of the MF2-JSON document to save memory - // - // This uses a clever match with enum destructuring - // to extract the underlying Vec without cloning it - let children: Vec<serde_json::Value> = match feed["children"].take() { - serde_json::Value::Array(children) => children, - // We've already checked it's an array - _ => unreachable!() - }; - tracing::debug!("Full children array: {:#}", serde_json::Value::Array(children.clone())); - let mut posts_iter = children - .into_iter() - .map(|s: serde_json::Value| s.as_str().unwrap().to_string()); - // Note: we can't actually use `skip_while` here because we end up emitting `after`. - // This imperative snippet consumes after instead of emitting it, allowing the - // stream of posts to return only those items that truly come *after* that one. - // If I would implement an Iter combinator like this, I would call it `skip_until` - if let Some(after) = after { - for s in posts_iter.by_ref() { - if &s == after { - break; - } - } - }; - let posts = stream::iter(posts_iter) - .map(|url: String| async move { self.get_post(&url).await }) - .buffered(std::cmp::min(3, limit)) - // Hack to unwrap the Option and sieve out broken links - // Broken links return None, and Stream::filter_map skips Nones. - .try_filter_map(|post: Option<serde_json::Value>| async move { Ok(post) }) - .and_then(|mut post| async move { - hydrate_author(&mut post, user, self).await; - Ok(post) - }) - .take(limit); - - match posts.try_collect::<Vec<serde_json::Value>>().await { - Ok(posts) => feed["children"] = serde_json::json!(posts), - Err(err) => { - return Err(StorageError::with_source( - ErrorKind::Other, - Cow::Owned(format!("Feed assembly error: {}", &err)), - Box::new(err), - )); - } - } - } - hydrate_author(&mut feed, user, self).await; - Ok(Some(feed)) - } else { - Ok(None) - } - } - - #[tracing::instrument(skip(self))] - async fn delete_post(&self, url: &'_ str) -> Result<()> { - let path = url_to_path(&self.root_dir, url); - if let Err(e) = tokio::fs::remove_file(path).await { - Err(e.into()) - } else { - // TODO check for dangling references in the channel list - Ok(()) - } - } - - #[tracing::instrument(skip(self))] - async fn get_setting<S: settings::Setting<'a>, 'a>(&self, user: &'_ str) -> Result<S> { - debug!("User for getting settings: {}", user); - let mut path = relative_path::RelativePathBuf::new(); - path.push(user); - path.push("settings"); - - let path = path.to_path(&self.root_dir); - debug!("Getting settings from {:?}", &path); - - let mut file = File::open(path).await?; - let mut content = String::new(); - file.read_to_string(&mut content).await?; - - let settings: HashMap<&str, serde_json::Value> = serde_json::from_str(&content)?; - match settings.get(S::ID) { - Some(value) => Ok(serde_json::from_value::<S>(value.clone())?), - None => Err(StorageError::from_static(ErrorKind::Backend, "Setting not set")) - } - } - - #[tracing::instrument(skip(self))] - async fn set_setting<S: settings::Setting<'a> + 'a, 'a>(&self, user: &'a str, value: S::Data) -> Result<()> { - let mut path = relative_path::RelativePathBuf::new(); - path.push(user); - path.push("settings"); - - let path = path.to_path(&self.root_dir); - let temppath = path.with_extension("tmp"); - - let parent = path.parent().unwrap().to_owned(); - tokio::fs::create_dir_all(&parent).await?; - - let mut tempfile = OpenOptions::new() - .write(true) - .create_new(true) - .open(&temppath) - .await?; - - let mut settings: HashMap<String, serde_json::Value> = match File::open(&path).await { - Ok(mut f) => { - let mut content = String::new(); - f.read_to_string(&mut content).await?; - if content.is_empty() { - Default::default() - } else { - serde_json::from_str(&content)? - } - } - Err(err) => { - if err.kind() == IOErrorKind::NotFound { - Default::default() - } else { - return Err(err.into()); - } - } - }; - settings.insert(S::ID.to_owned(), serde_json::to_value(S::new(value))?); - - tempfile - .write_all(serde_json::to_string(&settings)?.as_bytes()) - .await?; - tempfile.flush().await?; - tempfile.sync_all().await?; - drop(tempfile); - tokio::fs::rename(temppath, &path).await?; - tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?; - Ok(()) - } - - #[tracing::instrument(skip(self))] - async fn add_or_update_webmention(&self, target: &str, mention_type: MentionType, mention: serde_json::Value) -> Result<()> { - let path = url_to_path(&self.root_dir, target); - let tempfilename = path.with_extension("tmp"); - - let mut temp = OpenOptions::new() - .write(true) - .create_new(true) - .open(&tempfilename) - .await?; - let mut file = OpenOptions::new().read(true).open(&path).await?; - - let mut post: serde_json::Value = { - let mut content = String::new(); - file.read_to_string(&mut content).await?; - drop(file); - - serde_json::from_str(&content)? - }; - - let key: &'static str = match mention_type { - MentionType::Reply => "comment", - MentionType::Like => "like", - MentionType::Repost => "repost", - MentionType::Bookmark => "bookmark", - MentionType::Mention => "mention", - }; - let mention_uid = mention["properties"]["uid"][0].clone(); - if let Some(values) = post["properties"][key].as_array_mut() { - for value in values.iter_mut() { - if value["properties"]["uid"][0] == mention_uid { - *value = mention; - break; - } - } - } else { - post["properties"][key] = serde_json::Value::Array(vec![mention]); - } - - temp.write_all(post.to_string().as_bytes()).await?; - temp.flush().await?; - temp.sync_all().await?; - drop(temp); - tokio::fs::rename(tempfilename, &path).await?; - tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?; - - Ok(()) - } -} diff --git a/kittybox-rs/src/database/memory.rs b/kittybox-rs/src/database/memory.rs deleted file mode 100644 index 6339e7a..0000000 --- a/kittybox-rs/src/database/memory.rs +++ /dev/null @@ -1,249 +0,0 @@ -#![allow(clippy::todo)] -use async_trait::async_trait; -use futures_util::FutureExt; -use serde_json::json; -use std::collections::HashMap; -use std::sync::Arc; -use tokio::sync::RwLock; - -use crate::database::{ErrorKind, MicropubChannel, Result, settings, Storage, StorageError}; - -#[derive(Clone, Debug)] -pub struct MemoryStorage { - pub mapping: Arc<RwLock<HashMap<String, serde_json::Value>>>, - pub channels: Arc<RwLock<HashMap<String, Vec<String>>>>, -} - -#[async_trait] -impl Storage for MemoryStorage { - async fn post_exists(&self, url: &str) -> Result<bool> { - return Ok(self.mapping.read().await.contains_key(url)); - } - - async fn get_post(&self, url: &str) -> Result<Option<serde_json::Value>> { - let mapping = self.mapping.read().await; - match mapping.get(url) { - Some(val) => { - if let Some(new_url) = val["see_other"].as_str() { - match mapping.get(new_url) { - Some(val) => Ok(Some(val.clone())), - None => { - drop(mapping); - self.mapping.write().await.remove(url); - Ok(None) - } - } - } else { - Ok(Some(val.clone())) - } - } - _ => Ok(None), - } - } - - async fn put_post(&self, post: &'_ serde_json::Value, _user: &'_ str) -> Result<()> { - let mapping = &mut self.mapping.write().await; - let key: &str = match post["properties"]["uid"][0].as_str() { - Some(uid) => uid, - None => { - return Err(StorageError::from_static( - ErrorKind::Other, - "post doesn't have a UID", - )) - } - }; - mapping.insert(key.to_string(), post.clone()); - if post["properties"]["url"].is_array() { - for url in post["properties"]["url"] - .as_array() - .unwrap() - .iter() - .map(|i| i.as_str().unwrap().to_string()) - { - if url != key { - mapping.insert(url, json!({ "see_other": key })); - } - } - } - if post["type"] - .as_array() - .unwrap() - .iter() - .any(|i| i == "h-feed") - { - // This is a feed. Add it to the channels array if it's not already there. - println!("{:#}", post); - self.channels - .write() - .await - .entry( - post["properties"]["author"][0] - .as_str() - .unwrap() - .to_string(), - ) - .or_insert_with(Vec::new) - .push(key.to_string()) - } - Ok(()) - } - - async fn update_post(&self, url: &'_ str, update: crate::micropub::MicropubUpdate) -> Result<()> { - let mut guard = self.mapping.write().await; - let mut post = guard.get_mut(url).ok_or(StorageError::from_static(ErrorKind::NotFound, "The specified post wasn't found in the database."))?; - - use crate::micropub::MicropubPropertyDeletion; - - let mut add_keys: HashMap<String, Vec<serde_json::Value>> = HashMap::new(); - let mut remove_keys: Vec<String> = vec![]; - let mut remove_values: HashMap<String, Vec<serde_json::Value>> = HashMap::new(); - - if let Some(MicropubPropertyDeletion::Properties(delete)) = update.delete { - remove_keys.extend(delete.iter().cloned()); - } else if let Some(MicropubPropertyDeletion::Values(delete)) = update.delete { - for (k, v) in delete { - remove_values - .entry(k.to_string()) - .or_default() - .extend(v.clone()); - } - } - if let Some(add) = update.add { - for (k, v) in add { - add_keys.insert(k.to_string(), v.clone()); - } - } - if let Some(replace) = update.replace { - for (k, v) in replace { - remove_keys.push(k.to_string()); - add_keys.insert(k.to_string(), v.clone()); - } - } - - if let Some(props) = post["properties"].as_object_mut() { - for k in remove_keys { - props.remove(&k); - } - } - for (k, v) in remove_values { - let k = &k; - let props = if k == "children" { - &mut post - } else { - &mut post["properties"] - }; - v.iter().for_each(|v| { - if let Some(vec) = props[k].as_array_mut() { - if let Some(index) = vec.iter().position(|w| w == v) { - vec.remove(index); - } - } - }); - } - for (k, v) in add_keys { - tracing::debug!("Adding k/v to post: {} => {:?}", k, v); - let props = if k == "children" { - &mut post - } else { - &mut post["properties"] - }; - if let Some(prop) = props[&k].as_array_mut() { - if k == "children" { - v.into_iter().rev().for_each(|v| prop.insert(0, v)); - } else { - prop.extend(v.into_iter()); - } - } else { - props[&k] = serde_json::Value::Array(v) - } - } - - Ok(()) - } - - async fn get_channels(&self, user: &'_ str) -> Result<Vec<MicropubChannel>> { - match self.channels.read().await.get(user) { - Some(channels) => Ok(futures_util::future::join_all( - channels - .iter() - .map(|channel| { - self.get_post(channel).map(|result| result.unwrap()).map( - |post: Option<serde_json::Value>| { - post.map(|post| MicropubChannel { - uid: post["properties"]["uid"][0].as_str().unwrap().to_string(), - name: post["properties"]["name"][0] - .as_str() - .unwrap() - .to_string(), - }) - }, - ) - }) - .collect::<Vec<_>>(), - ) - .await - .into_iter() - .flatten() - .collect::<Vec<_>>()), - None => Ok(vec![]), - } - } - - #[allow(unused_variables)] - async fn read_feed_with_limit( - &self, - url: &'_ str, - after: &'_ Option<String>, - limit: usize, - user: &'_ Option<String>, - ) -> Result<Option<serde_json::Value>> { - todo!() - } - - #[allow(unused_variables)] - async fn read_feed_with_cursor( - &self, - url: &'_ str, - cursor: Option<&'_ str>, - limit: usize, - user: Option<&'_ str> - ) -> Result<Option<(serde_json::Value, Option<String>)>> { - todo!() - } - - async fn delete_post(&self, url: &'_ str) -> Result<()> { - self.mapping.write().await.remove(url); - Ok(()) - } - - #[allow(unused_variables)] - async fn get_setting<S: settings::Setting<'a>, 'a>(&'_ self, user: &'_ str) -> Result<S> { - todo!() - } - - #[allow(unused_variables)] - async fn set_setting<S: settings::Setting<'a> + 'a, 'a>(&self, user: &'a str, value: S::Data) -> Result<()> { - todo!() - } - - #[allow(unused_variables)] - async fn add_or_update_webmention(&self, target: &str, mention_type: kittybox_util::MentionType, mention: serde_json::Value) -> Result<()> { - todo!() - } - -} - -impl Default for MemoryStorage { - fn default() -> Self { - Self::new() - } -} - -impl MemoryStorage { - pub fn new() -> Self { - Self { - mapping: Arc::new(RwLock::new(HashMap::new())), - channels: Arc::new(RwLock::new(HashMap::new())), - } - } -} diff --git a/kittybox-rs/src/database/mod.rs b/kittybox-rs/src/database/mod.rs deleted file mode 100644 index b4b70b2..0000000 --- a/kittybox-rs/src/database/mod.rs +++ /dev/null @@ -1,793 +0,0 @@ -#![warn(missing_docs)] -use std::borrow::Cow; - -use async_trait::async_trait; -use kittybox_util::MentionType; - -mod file; -pub use crate::database::file::FileStorage; -use crate::micropub::MicropubUpdate; -#[cfg(feature = "postgres")] -mod postgres; -#[cfg(feature = "postgres")] -pub use postgres::PostgresStorage; - -#[cfg(test)] -mod memory; -#[cfg(test)] -pub use crate::database::memory::MemoryStorage; - -pub use kittybox_util::MicropubChannel; - -use self::settings::Setting; - -/// Enum representing different errors that might occur during the database query. -#[derive(Debug, Clone, Copy)] -pub enum ErrorKind { - /// Backend error (e.g. database connection error) - Backend, - /// Error due to insufficient contextual permissions for the query - PermissionDenied, - /// Error due to the database being unable to parse JSON returned from the backing storage. - /// Usually indicative of someone fiddling with the database manually instead of using proper tools. - JsonParsing, - /// - ErrorKind::NotFound - equivalent to a 404 error. Note, some requests return an Option, - /// in which case None is also equivalent to a 404. - NotFound, - /// The user's query or request to the database was malformed. Used whenever the database processes - /// the user's query directly, such as when editing posts inside of the database (e.g. Redis backend) - BadRequest, - /// the user's query collided with an in-flight request and needs to be retried - Conflict, - /// - ErrorKind::Other - when something so weird happens that it becomes undescribable. - Other, -} - -/// Settings that can be stored in the database. -pub mod settings { - mod private { - pub trait Sealed {} - } - - /// A trait for various settings that should be contained here. - /// - /// **Note**: this trait is sealed to prevent external - /// implementations, as it wouldn't make sense to add new settings - /// that aren't used by Kittybox itself. - pub trait Setting<'de>: private::Sealed + std::fmt::Debug + Default + Clone + serde::Serialize + serde::de::DeserializeOwned + /*From<Settings> +*/ Send + Sync { - type Data: std::fmt::Debug + Send + Sync; - const ID: &'static str; - - /// Unwrap the setting type, returning owned data contained within. - fn into_inner(self) -> Self::Data; - /// Create a new instance of this type containing certain data. - fn new(data: Self::Data) -> Self; - } - - /// A website's title, shown in the header. - #[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq, Eq)] - pub struct SiteName(String); - impl Default for SiteName { - fn default() -> Self { - Self("Kittybox".to_string()) - } - } - impl AsRef<str> for SiteName { - fn as_ref(&self) -> &str { - self.0.as_str() - } - } - impl private::Sealed for SiteName {} - impl Setting<'_> for SiteName { - type Data = String; - const ID: &'static str = "site_name"; - - fn into_inner(self) -> String { - self.0 - } - fn new(data: Self::Data) -> Self { - Self(data) - } - } - impl SiteName { - fn from_str(data: &str) -> Self { - Self(data.to_owned()) - } - } - - /// Participation status in the IndieWeb Webring: https://🕸💍.ws/dashboard - #[derive(Debug, Default, serde::Deserialize, serde::Serialize, Clone, Copy, PartialEq, Eq)] - pub struct Webring(bool); - impl private::Sealed for Webring {} - impl Setting<'_> for Webring { - type Data = bool; - const ID: &'static str = "webring"; - - fn into_inner(self) -> Self::Data { - self.0 - } - - fn new(data: Self::Data) -> Self { - Self(data) - } - } -} - -/// Error signalled from the database. -#[derive(Debug)] -pub struct StorageError { - msg: std::borrow::Cow<'static, str>, - source: Option<Box<dyn std::error::Error + Send + Sync>>, - kind: ErrorKind, -} - -impl std::error::Error for StorageError { - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - self.source - .as_ref() - .map(|e| e.as_ref() as &dyn std::error::Error) - } -} -impl From<serde_json::Error> for StorageError { - fn from(err: serde_json::Error) -> Self { - Self { - msg: std::borrow::Cow::Owned(format!("{}", err)), - source: Some(Box::new(err)), - kind: ErrorKind::JsonParsing, - } - } -} -impl std::fmt::Display for StorageError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "{}: {}", - match self.kind { - ErrorKind::Backend => "backend error", - ErrorKind::JsonParsing => "JSON parsing error", - ErrorKind::PermissionDenied => "permission denied", - ErrorKind::NotFound => "not found", - ErrorKind::BadRequest => "bad request", - ErrorKind::Conflict => "conflict with an in-flight request or existing data", - ErrorKind::Other => "generic storage layer error", - }, - self.msg - ) - } -} -impl serde::Serialize for StorageError { - fn serialize<S: serde::Serializer>( - &self, - serializer: S, - ) -> std::result::Result<S::Ok, S::Error> { - serializer.serialize_str(&self.to_string()) - } -} -impl StorageError { - /// Create a new StorageError of an ErrorKind with a message. - pub fn new(kind: ErrorKind, msg: String) -> Self { - Self { - msg: Cow::Owned(msg), - source: None, - kind, - } - } - /// Create a new StorageError of an ErrorKind with a message from - /// a static string. - /// - /// This saves an allocation for a new string and is the preferred - /// way in case the error message doesn't change. - pub fn from_static(kind: ErrorKind, msg: &'static str) -> Self { - Self { - msg: Cow::Borrowed(msg), - source: None, - kind - } - } - /// Create a StorageError using another arbitrary Error as a source. - pub fn with_source( - kind: ErrorKind, - msg: std::borrow::Cow<'static, str>, - source: Box<dyn std::error::Error + Send + Sync>, - ) -> Self { - Self { - msg, - source: Some(source), - kind, - } - } - /// Get the kind of an error. - pub fn kind(&self) -> ErrorKind { - self.kind - } - /// Get the message as a string slice. - pub fn msg(&self) -> &str { - &self.msg - } -} - -/// A special Result type for the Micropub backing storage. -pub type Result<T> = std::result::Result<T, StorageError>; - -/// A storage backend for the Micropub server. -/// -/// Implementations should note that all methods listed on this trait MUST be fully atomic -/// or lock the database so that write conflicts or reading half-written data should not occur. -#[async_trait] -pub trait Storage: std::fmt::Debug + Clone + Send + Sync { - /// Check if a post exists in the database. - async fn post_exists(&self, url: &str) -> Result<bool>; - - /// Load a post from the database in MF2-JSON format, deserialized from JSON. - async fn get_post(&self, url: &str) -> Result<Option<serde_json::Value>>; - - /// Save a post to the database as an MF2-JSON structure. - /// - /// Note that the `post` object MUST have `post["properties"]["uid"][0]` defined. - async fn put_post(&self, post: &'_ serde_json::Value, user: &'_ str) -> Result<()>; - - /// Add post to feed. Some database implementations might have optimized ways to do this. - #[tracing::instrument(skip(self))] - async fn add_to_feed(&self, feed: &'_ str, post: &'_ str) -> Result<()> { - tracing::debug!("Inserting {} into {} using `update_post`", post, feed); - self.update_post(feed, serde_json::from_value( - serde_json::json!({"add": {"children": [post]}})).unwrap() - ).await - } - /// Remove post from feed. Some database implementations might have optimized ways to do this. - #[tracing::instrument(skip(self))] - async fn remove_from_feed(&self, feed: &'_ str, post: &'_ str) -> Result<()> { - tracing::debug!("Removing {} into {} using `update_post`", post, feed); - self.update_post(feed, serde_json::from_value( - serde_json::json!({"delete": {"children": [post]}})).unwrap() - ).await - } - - /// Modify a post using an update object as defined in the - /// Micropub spec. - /// - /// Note to implementors: the update operation MUST be atomic and - /// SHOULD lock the database to prevent two clients overwriting - /// each other's changes or simply corrupting something. Rejecting - /// is allowed in case of concurrent updates if waiting for a lock - /// cannot be done. - async fn update_post(&self, url: &str, update: MicropubUpdate) -> Result<()>; - - /// Get a list of channels available for the user represented by - /// the `user` domain to write to. - async fn get_channels(&self, user: &'_ str) -> Result<Vec<MicropubChannel>>; - - /// Fetch a feed at `url` and return an h-feed object containing - /// `limit` posts after a post by url `after`, filtering the content - /// in context of a user specified by `user` (or an anonymous user). - /// - /// This method MUST hydrate the `author` property with an h-card - /// from the database by replacing URLs with corresponding h-cards. - /// - /// When encountering posts which the `user` is not authorized to - /// access, this method MUST elide such posts (as an optimization - /// for the frontend) and not return them, but still return up to - /// `limit` posts (to not reveal the hidden posts' presence). - /// - /// Note for implementors: if you use streams to fetch posts in - /// parallel from the database, preferably make this method use a - /// connection pool to reduce overhead of creating a database - /// connection per post for parallel fetching. - async fn read_feed_with_limit( - &self, - url: &'_ str, - after: &'_ Option<String>, - limit: usize, - user: &'_ Option<String>, - ) -> Result<Option<serde_json::Value>>; - - /// Fetch a feed at `url` and return an h-feed object containing - /// `limit` posts after a `cursor` (filtering the content in - /// context of a user specified by `user`, or an anonymous user), - /// as well as a new cursor to paginate with. - /// - /// This method MUST hydrate the `author` property with an h-card - /// from the database by replacing URLs with corresponding h-cards. - /// - /// When encountering posts which the `user` is not authorized to - /// access, this method MUST elide such posts (as an optimization - /// for the frontend) and not return them, but still return an - /// amount of posts as close to `limit` as possible (to avoid - /// revealing the existence of the hidden post). - /// - /// Note for implementors: if you use streams to fetch posts in - /// parallel from the database, preferably make this method use a - /// connection pool to reduce overhead of creating a database - /// connection per post for parallel fetching. - async fn read_feed_with_cursor( - &self, - url: &'_ str, - cursor: Option<&'_ str>, - limit: usize, - user: Option<&'_ str> - ) -> Result<Option<(serde_json::Value, Option<String>)>>; - - /// Deletes a post from the database irreversibly. Must be idempotent. - async fn delete_post(&self, url: &'_ str) -> Result<()>; - - /// Gets a setting from the setting store and passes the result. - async fn get_setting<S: Setting<'a>, 'a>(&'_ self, user: &'_ str) -> Result<S>; - - /// Commits a setting to the setting store. - async fn set_setting<S: Setting<'a> + 'a, 'a>(&self, user: &'a str, value: S::Data) -> Result<()>; - - /// Add (or update) a webmention on a certian post. - /// - /// The MF2 object describing the webmention content will always - /// be of type `h-cite`, and the `uid` property on the object will - /// always be set. - /// - /// The rationale for this function is as follows: webmentions - /// might be duplicated, and we need to deduplicate them first. As - /// we lack support for transactions and locking posts on the - /// database, the only way is to implement the operation on the - /// database itself. - /// - /// Besides, it may even allow for nice tricks like storing the - /// webmentions separately and rehydrating them on feed reads. - async fn add_or_update_webmention(&self, target: &str, mention_type: MentionType, mention: serde_json::Value) -> Result<()>; -} - -#[cfg(test)] -mod tests { - use super::settings; - - use super::{MicropubChannel, Storage}; - use kittybox_util::MentionType; - use serde_json::json; - - async fn test_basic_operations<Backend: Storage>(backend: Backend) { - let post: serde_json::Value = json!({ - "type": ["h-entry"], - "properties": { - "content": ["Test content"], - "author": ["https://fireburn.ru/"], - "uid": ["https://fireburn.ru/posts/hello"], - "url": ["https://fireburn.ru/posts/hello", "https://fireburn.ru/posts/test"] - } - }); - let key = post["properties"]["uid"][0].as_str().unwrap().to_string(); - let alt_url = post["properties"]["url"][1].as_str().unwrap().to_string(); - - // Reading and writing - backend - .put_post(&post, "fireburn.ru") - .await - .unwrap(); - if let Some(returned_post) = backend.get_post(&key).await.unwrap() { - assert!(returned_post.is_object()); - assert_eq!( - returned_post["type"].as_array().unwrap().len(), - post["type"].as_array().unwrap().len() - ); - assert_eq!( - returned_post["type"].as_array().unwrap(), - post["type"].as_array().unwrap() - ); - let props: &serde_json::Map<String, serde_json::Value> = - post["properties"].as_object().unwrap(); - for key in props.keys() { - assert_eq!( - returned_post["properties"][key].as_array().unwrap(), - post["properties"][key].as_array().unwrap() - ) - } - } else { - panic!("For some reason the backend did not return the post.") - } - // Check the alternative URL - it should return the same post - if let Ok(Some(returned_post)) = backend.get_post(&alt_url).await { - assert!(returned_post.is_object()); - assert_eq!( - returned_post["type"].as_array().unwrap().len(), - post["type"].as_array().unwrap().len() - ); - assert_eq!( - returned_post["type"].as_array().unwrap(), - post["type"].as_array().unwrap() - ); - let props: &serde_json::Map<String, serde_json::Value> = - post["properties"].as_object().unwrap(); - for key in props.keys() { - assert_eq!( - returned_post["properties"][key].as_array().unwrap(), - post["properties"][key].as_array().unwrap() - ) - } - } else { - panic!("For some reason the backend did not return the post.") - } - } - - /// Note: this is merely a smoke check and is in no way comprehensive. - // TODO updates for feeds must update children using special logic - async fn test_update<Backend: Storage>(backend: Backend) { - let post: serde_json::Value = json!({ - "type": ["h-entry"], - "properties": { - "content": ["Test content"], - "author": ["https://fireburn.ru/"], - "uid": ["https://fireburn.ru/posts/hello"], - "url": ["https://fireburn.ru/posts/hello", "https://fireburn.ru/posts/test"] - } - }); - let key = post["properties"]["uid"][0].as_str().unwrap().to_string(); - - // Reading and writing - backend - .put_post(&post, "fireburn.ru") - .await - .unwrap(); - - backend - .update_post( - &key, - serde_json::from_value(json!({ - "url": &key, - "add": { - "category": ["testing"], - }, - "replace": { - "content": ["Different test content"] - } - })).unwrap(), - ) - .await - .unwrap(); - - match backend.get_post(&key).await { - Ok(Some(returned_post)) => { - assert!(returned_post.is_object()); - assert_eq!( - returned_post["type"].as_array().unwrap().len(), - post["type"].as_array().unwrap().len() - ); - assert_eq!( - returned_post["type"].as_array().unwrap(), - post["type"].as_array().unwrap() - ); - assert_eq!( - returned_post["properties"]["content"][0].as_str().unwrap(), - "Different test content" - ); - assert_eq!( - returned_post["properties"]["category"].as_array().unwrap(), - &vec![json!("testing")] - ); - } - something_else => { - something_else - .expect("Shouldn't error") - .expect("Should have the post"); - } - } - } - - async fn test_get_channel_list<Backend: Storage>(backend: Backend) { - let feed = json!({ - "type": ["h-feed"], - "properties": { - "name": ["Main Page"], - "author": ["https://fireburn.ru/"], - "uid": ["https://fireburn.ru/feeds/main"] - }, - "children": [] - }); - backend - .put_post(&feed, "fireburn.ru") - .await - .unwrap(); - let chans = backend.get_channels("fireburn.ru").await.unwrap(); - assert_eq!(chans.len(), 1); - assert_eq!( - chans[0], - MicropubChannel { - uid: "https://fireburn.ru/feeds/main".to_string(), - name: "Main Page".to_string() - } - ); - } - - async fn test_settings<Backend: Storage>(backend: Backend) { - backend - .set_setting::<settings::SiteName>( - "https://fireburn.ru/", - "Vika's Hideout".to_owned() - ) - .await - .unwrap(); - assert_eq!( - backend - .get_setting::<settings::SiteName>("https://fireburn.ru/") - .await - .unwrap() - .as_ref(), - "Vika's Hideout" - ); - } - - fn gen_random_post(domain: &str) -> serde_json::Value { - use faker_rand::lorem::{Paragraphs, Word}; - - let uid = format!( - "https://{domain}/posts/{}-{}-{}", - rand::random::<Word>(), - rand::random::<Word>(), - rand::random::<Word>() - ); - - let time = chrono::Local::now().to_rfc3339(); - let post = json!({ - "type": ["h-entry"], - "properties": { - "content": [rand::random::<Paragraphs>().to_string()], - "uid": [&uid], - "url": [&uid], - "published": [&time] - } - }); - - post - } - - fn gen_random_mention(domain: &str, mention_type: MentionType, url: &str) -> serde_json::Value { - use faker_rand::lorem::{Paragraphs, Word}; - - let uid = format!( - "https://{domain}/posts/{}-{}-{}", - rand::random::<Word>(), - rand::random::<Word>(), - rand::random::<Word>() - ); - - let time = chrono::Local::now().to_rfc3339(); - let post = json!({ - "type": ["h-cite"], - "properties": { - "content": [rand::random::<Paragraphs>().to_string()], - "uid": [&uid], - "url": [&uid], - "published": [&time], - (match mention_type { - MentionType::Reply => "in-reply-to", - MentionType::Like => "like-of", - MentionType::Repost => "repost-of", - MentionType::Bookmark => "bookmark-of", - MentionType::Mention => unimplemented!(), - }): [url] - } - }); - - post - } - - async fn test_feed_pagination<Backend: Storage>(backend: Backend) { - let posts = { - let mut posts = std::iter::from_fn( - || Some(gen_random_post("fireburn.ru")) - ) - .take(40) - .collect::<Vec<serde_json::Value>>(); - - // Reverse the array so it's in reverse-chronological order - posts.reverse(); - - posts - }; - - let feed = json!({ - "type": ["h-feed"], - "properties": { - "name": ["Main Page"], - "author": ["https://fireburn.ru/"], - "uid": ["https://fireburn.ru/feeds/main"] - }, - }); - let key = feed["properties"]["uid"][0].as_str().unwrap(); - - backend - .put_post(&feed, "fireburn.ru") - .await - .unwrap(); - - for (i, post) in posts.iter().rev().enumerate() { - backend - .put_post(post, "fireburn.ru") - .await - .unwrap(); - backend.add_to_feed(key, post["properties"]["uid"][0].as_str().unwrap()).await.unwrap(); - } - - let limit: usize = 10; - - tracing::debug!("Starting feed reading..."); - let (result, cursor) = backend - .read_feed_with_cursor(key, None, limit, None) - .await - .unwrap() - .unwrap(); - - assert_eq!(result["children"].as_array().unwrap().len(), limit); - assert_eq!( - result["children"] - .as_array() - .unwrap() - .iter() - .map(|post| post["properties"]["uid"][0].as_str().unwrap()) - .collect::<Vec<_>>() - [0..10], - posts - .iter() - .map(|post| post["properties"]["uid"][0].as_str().unwrap()) - .collect::<Vec<_>>() - [0..10] - ); - - tracing::debug!("Continuing with cursor: {:?}", cursor); - let (result2, cursor2) = backend - .read_feed_with_cursor( - key, - cursor.as_deref(), - limit, - None, - ) - .await - .unwrap() - .unwrap(); - - assert_eq!( - result2["children"].as_array().unwrap()[0..10], - posts[10..20] - ); - - tracing::debug!("Continuing with cursor: {:?}", cursor); - let (result3, cursor3) = backend - .read_feed_with_cursor( - key, - cursor2.as_deref(), - limit, - None, - ) - .await - .unwrap() - .unwrap(); - - assert_eq!( - result3["children"].as_array().unwrap()[0..10], - posts[20..30] - ); - - tracing::debug!("Continuing with cursor: {:?}", cursor); - let (result4, _) = backend - .read_feed_with_cursor( - key, - cursor3.as_deref(), - limit, - None, - ) - .await - .unwrap() - .unwrap(); - - assert_eq!( - result4["children"].as_array().unwrap()[0..10], - posts[30..40] - ); - - // Regression test for #4 - // - // Results for a bogus cursor are undefined, so we aren't - // checking them. But the function at least shouldn't hang. - let nonsense_after = Some("1010101010"); - let _ = tokio::time::timeout(tokio::time::Duration::from_secs(10), async move { - backend - .read_feed_with_cursor(key, nonsense_after, limit, None) - .await - }) - .await - .expect("Operation should not hang: see https://gitlab.com/kittybox/kittybox/-/issues/4"); - } - - async fn test_webmention_addition<Backend: Storage>(db: Backend) { - let post = gen_random_post("fireburn.ru"); - - db.put_post(&post, "fireburn.ru").await.unwrap(); - const TYPE: MentionType = MentionType::Reply; - - let target = post["properties"]["uid"][0].as_str().unwrap(); - let mut reply = gen_random_mention("aaronparecki.com", TYPE, target); - - let (read_post, _) = db.read_feed_with_cursor(target, None, 20, None).await.unwrap().unwrap(); - assert_eq!(post, read_post); - - db.add_or_update_webmention(target, TYPE, reply.clone()).await.unwrap(); - - let (read_post, _) = db.read_feed_with_cursor(target, None, 20, None).await.unwrap().unwrap(); - assert_eq!(read_post["properties"]["comment"][0], reply); - - reply["properties"]["content"][0] = json!(rand::random::<faker_rand::lorem::Paragraphs>().to_string()); - - db.add_or_update_webmention(target, TYPE, reply.clone()).await.unwrap(); - let (read_post, _) = db.read_feed_with_cursor(target, None, 20, None).await.unwrap().unwrap(); - assert_eq!(read_post["properties"]["comment"][0], reply); - } - - async fn test_pretty_permalinks<Backend: Storage>(db: Backend) { - const PERMALINK: &str = "https://fireburn.ru/posts/pretty-permalink"; - - let post = { - let mut post = gen_random_post("fireburn.ru"); - let urls = post["properties"]["url"].as_array_mut().unwrap(); - urls.push(serde_json::Value::String( - PERMALINK.to_owned() - )); - - post - }; - db.put_post(&post, "fireburn.ru").await.unwrap(); - - for i in post["properties"]["url"].as_array().unwrap() { - let (read_post, _) = db.read_feed_with_cursor(i.as_str().unwrap(), None, 20, None).await.unwrap().unwrap(); - assert_eq!(read_post, post); - } - } - /// Automatically generates a test suite for - macro_rules! test_all { - ($func_name:ident, $mod_name:ident) => { - mod $mod_name { - $func_name!(test_basic_operations); - $func_name!(test_get_channel_list); - $func_name!(test_settings); - $func_name!(test_update); - $func_name!(test_feed_pagination); - $func_name!(test_webmention_addition); - $func_name!(test_pretty_permalinks); - } - }; - } - macro_rules! file_test { - ($func_name:ident) => { - #[tokio::test] - #[tracing_test::traced_test] - async fn $func_name() { - let tempdir = tempfile::tempdir().expect("Failed to create tempdir"); - let backend = super::super::FileStorage::new( - tempdir.path().to_path_buf() - ) - .await - .unwrap(); - super::$func_name(backend).await - } - }; - } - - macro_rules! postgres_test { - ($func_name:ident) => { - #[cfg(feature = "sqlx")] - #[sqlx::test] - #[tracing_test::traced_test] - async fn $func_name( - pool_opts: sqlx::postgres::PgPoolOptions, - connect_opts: sqlx::postgres::PgConnectOptions - ) -> Result<(), sqlx::Error> { - let db = { - //use sqlx::ConnectOptions; - //connect_opts.log_statements(log::LevelFilter::Debug); - - pool_opts.connect_with(connect_opts).await? - }; - let backend = super::super::PostgresStorage::from_pool(db).await.unwrap(); - - Ok(super::$func_name(backend).await) - } - }; - } - - test_all!(file_test, file); - test_all!(postgres_test, postgres); -} diff --git a/kittybox-rs/src/database/postgres/mod.rs b/kittybox-rs/src/database/postgres/mod.rs deleted file mode 100644 index 9176d12..0000000 --- a/kittybox-rs/src/database/postgres/mod.rs +++ /dev/null @@ -1,416 +0,0 @@ -#![allow(unused_variables)] -use std::borrow::Cow; -use std::str::FromStr; - -use kittybox_util::{MicropubChannel, MentionType}; -use sqlx::{PgPool, Executor}; -use crate::micropub::{MicropubUpdate, MicropubPropertyDeletion}; - -use super::settings::Setting; -use super::{Storage, Result, StorageError, ErrorKind}; - -static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!(); - -impl From<sqlx::Error> for StorageError { - fn from(value: sqlx::Error) -> Self { - Self::with_source( - super::ErrorKind::Backend, - Cow::Owned(format!("sqlx error: {}", &value)), - Box::new(value) - ) - } -} - -impl From<sqlx::migrate::MigrateError> for StorageError { - fn from(value: sqlx::migrate::MigrateError) -> Self { - Self::with_source( - super::ErrorKind::Backend, - Cow::Owned(format!("sqlx migration error: {}", &value)), - Box::new(value) - ) - } -} - -#[derive(Debug, Clone)] -pub struct PostgresStorage { - db: PgPool -} - -impl PostgresStorage { - /// Construct a new [`PostgresStorage`] from an URI string and run - /// migrations on the database. - /// - /// If `PGPASS_FILE` environment variable is defined, read the - /// password from the file at the specified path. If, instead, - /// the `PGPASS` environment variable is present, read the - /// password from it. - pub async fn new(uri: &str) -> Result<Self> { - tracing::debug!("Postgres URL: {uri}"); - let mut options = sqlx::postgres::PgConnectOptions::from_str(uri)? - .options([("search_path", "kittybox")]); - if let Ok(password_file) = std::env::var("PGPASS_FILE") { - let password = tokio::fs::read_to_string(password_file).await.unwrap(); - options = options.password(&password); - } else if let Ok(password) = std::env::var("PGPASS") { - options = options.password(&password) - } - Self::from_pool( - sqlx::postgres::PgPoolOptions::new() - .max_connections(50) - .connect_with(options) - .await? - ).await - - } - - /// Construct a [`PostgresStorage`] from a [`sqlx::PgPool`], - /// running appropriate migrations. - pub async fn from_pool(db: sqlx::PgPool) -> Result<Self> { - db.execute(sqlx::query("CREATE SCHEMA IF NOT EXISTS kittybox")).await?; - MIGRATOR.run(&db).await?; - Ok(Self { db }) - } -} - -#[async_trait::async_trait] -impl Storage for PostgresStorage { - #[tracing::instrument(skip(self))] - async fn post_exists(&self, url: &str) -> Result<bool> { - sqlx::query_as::<_, (bool,)>("SELECT exists(SELECT 1 FROM kittybox.mf2_json WHERE uid = $1 OR mf2['properties']['url'] ? $1)") - .bind(url) - .fetch_one(&self.db) - .await - .map(|v| v.0) - .map_err(|err| err.into()) - } - - #[tracing::instrument(skip(self))] - async fn get_post(&self, url: &str) -> Result<Option<serde_json::Value>> { - sqlx::query_as::<_, (serde_json::Value,)>("SELECT mf2 FROM kittybox.mf2_json WHERE uid = $1 OR mf2['properties']['url'] ? $1") - .bind(url) - .fetch_optional(&self.db) - .await - .map(|v| v.map(|v| v.0)) - .map_err(|err| err.into()) - - } - - #[tracing::instrument(skip(self))] - async fn put_post(&self, post: &'_ serde_json::Value, user: &'_ str) -> Result<()> { - tracing::debug!("New post: {}", post); - sqlx::query("INSERT INTO kittybox.mf2_json (uid, mf2, owner) VALUES ($1 #>> '{properties,uid,0}', $1, $2)") - .bind(post) - .bind(user) - .execute(&self.db) - .await - .map(|_| ()) - .map_err(Into::into) - } - - #[tracing::instrument(skip(self))] - async fn add_to_feed(&self, feed: &'_ str, post: &'_ str) -> Result<()> { - tracing::debug!("Inserting {} into {}", post, feed); - sqlx::query("INSERT INTO kittybox.children (parent, child) VALUES ($1, $2) ON CONFLICT DO NOTHING") - .bind(feed) - .bind(post) - .execute(&self.db) - .await - .map(|_| ()) - .map_err(Into::into) - } - - #[tracing::instrument(skip(self))] - async fn remove_from_feed(&self, feed: &'_ str, post: &'_ str) -> Result<()> { - sqlx::query("DELETE FROM kittybox.children WHERE parent = $1 AND child = $2") - .bind(feed) - .bind(post) - .execute(&self.db) - .await - .map_err(Into::into) - .map(|_| ()) - } - - #[tracing::instrument(skip(self))] - async fn add_or_update_webmention(&self, target: &str, mention_type: MentionType, mention: serde_json::Value) -> Result<()> { - let mut txn = self.db.begin().await?; - - let (uid, mut post) = sqlx::query_as::<_, (String, serde_json::Value)>("SELECT uid, mf2 FROM kittybox.mf2_json WHERE uid = $1 OR mf2['properties']['url'] ? $1 FOR UPDATE") - .bind(target) - .fetch_optional(&mut *txn) - .await? - .ok_or(StorageError::from_static( - ErrorKind::NotFound, - "The specified post wasn't found in the database." - ))?; - - tracing::debug!("Loaded post for target {} with uid {}", target, uid); - - let key: &'static str = match mention_type { - MentionType::Reply => "comment", - MentionType::Like => "like", - MentionType::Repost => "repost", - MentionType::Bookmark => "bookmark", - MentionType::Mention => "mention", - }; - - tracing::debug!("Mention type -> key: {}", key); - - let mention_uid = mention["properties"]["uid"][0].clone(); - if let Some(values) = post["properties"][key].as_array_mut() { - for value in values.iter_mut() { - if value["properties"]["uid"][0] == mention_uid { - *value = mention; - break; - } - } - } else { - post["properties"][key] = serde_json::Value::Array(vec![mention]); - } - - sqlx::query("UPDATE kittybox.mf2_json SET mf2 = $2 WHERE uid = $1") - .bind(uid) - .bind(post) - .execute(&mut *txn) - .await?; - - txn.commit().await.map_err(Into::into) - } - #[tracing::instrument(skip(self))] - async fn update_post(&self, url: &'_ str, update: MicropubUpdate) -> Result<()> { - tracing::debug!("Updating post {}", url); - let mut txn = self.db.begin().await?; - let (uid, mut post) = sqlx::query_as::<_, (String, serde_json::Value)>("SELECT uid, mf2 FROM kittybox.mf2_json WHERE uid = $1 OR mf2['properties']['url'] ? $1 FOR UPDATE") - .bind(url) - .fetch_optional(&mut *txn) - .await? - .ok_or(StorageError::from_static( - ErrorKind::NotFound, - "The specified post wasn't found in the database." - ))?; - - if let Some(MicropubPropertyDeletion::Properties(ref delete)) = update.delete { - if let Some(props) = post["properties"].as_object_mut() { - for key in delete { - props.remove(key); - } - } - } else if let Some(MicropubPropertyDeletion::Values(ref delete)) = update.delete { - if let Some(props) = post["properties"].as_object_mut() { - for (key, values) in delete { - if let Some(prop) = props.get_mut(key).and_then(serde_json::Value::as_array_mut) { - prop.retain(|v| { values.iter().all(|i| i != v) }) - } - } - } - } - if let Some(replace) = update.replace { - if let Some(props) = post["properties"].as_object_mut() { - for (key, value) in replace { - props.insert(key, serde_json::Value::Array(value)); - } - } - } - if let Some(add) = update.add { - if let Some(props) = post["properties"].as_object_mut() { - for (key, value) in add { - if let Some(prop) = props.get_mut(&key).and_then(serde_json::Value::as_array_mut) { - prop.extend_from_slice(value.as_slice()); - } else { - props.insert(key, serde_json::Value::Array(value)); - } - } - } - } - - sqlx::query("UPDATE kittybox.mf2_json SET mf2 = $2 WHERE uid = $1") - .bind(uid) - .bind(post) - .execute(&mut *txn) - .await?; - - txn.commit().await.map_err(Into::into) - } - - #[tracing::instrument(skip(self))] - async fn get_channels(&self, user: &'_ str) -> Result<Vec<MicropubChannel>> { - /*sqlx::query_as::<_, MicropubChannel>("SELECT name, uid FROM kittybox.channels WHERE owner = $1") - .bind(user) - .fetch_all(&self.db) - .await - .map_err(|err| err.into())*/ - sqlx::query_as::<_, MicropubChannel>(r#"SELECT mf2 #>> '{properties,name,0}' as name, uid FROM kittybox.mf2_json WHERE '["h-feed"]'::jsonb @> mf2['type'] AND owner = $1"#) - .bind(user) - .fetch_all(&self.db) - .await - .map_err(|err| err.into()) - } - - #[tracing::instrument(skip(self))] - async fn read_feed_with_limit( - &self, - url: &'_ str, - after: &'_ Option<String>, - limit: usize, - user: &'_ Option<String>, - ) -> Result<Option<serde_json::Value>> { - let mut feed = match sqlx::query_as::<_, (serde_json::Value,)>(" -SELECT jsonb_set( - mf2, - '{properties,author,0}', - (SELECT mf2 FROM kittybox.mf2_json - WHERE uid = mf2 #>> '{properties,author,0}') -) FROM kittybox.mf2_json WHERE uid = $1 OR mf2['properties']['url'] ? $1 -") - .bind(url) - .fetch_optional(&self.db) - .await? - .map(|v| v.0) - { - Some(feed) => feed, - None => return Ok(None) - }; - - let posts: Vec<String> = { - let mut posts_iter = feed["children"] - .as_array() - .cloned() - .unwrap_or_default() - .into_iter() - .map(|s| s.as_str().unwrap().to_string()); - if let Some(after) = after { - for s in posts_iter.by_ref() { - if &s == after { - break; - } - } - }; - - posts_iter.take(limit).collect::<Vec<_>>() - }; - feed["children"] = serde_json::Value::Array( - sqlx::query_as::<_, (serde_json::Value,)>(" -SELECT jsonb_set( - mf2, - '{properties,author,0}', - (SELECT mf2 FROM kittybox.mf2_json - WHERE uid = mf2 #>> '{properties,author,0}') -) FROM kittybox.mf2_json -WHERE uid = ANY($1) -ORDER BY mf2 #>> '{properties,published,0}' DESC -") - .bind(&posts[..]) - .fetch_all(&self.db) - .await? - .into_iter() - .map(|v| v.0) - .collect::<Vec<_>>() - ); - - Ok(Some(feed)) - - } - - #[tracing::instrument(skip(self))] - async fn read_feed_with_cursor( - &self, - url: &'_ str, - cursor: Option<&'_ str>, - limit: usize, - user: Option<&'_ str> - ) -> Result<Option<(serde_json::Value, Option<String>)>> { - let mut txn = self.db.begin().await?; - sqlx::query("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ, READ ONLY") - .execute(&mut *txn) - .await?; - tracing::debug!("Started txn: {:?}", txn); - let mut feed = match sqlx::query_scalar::<_, serde_json::Value>(" -SELECT kittybox.hydrate_author(mf2) FROM kittybox.mf2_json WHERE uid = $1 OR mf2['properties']['url'] ? $1 -") - .bind(url) - .fetch_optional(&mut *txn) - .await? - { - Some(feed) => feed, - None => return Ok(None) - }; - - // Don't query for children if this isn't a feed. - // - // The second query is very long and will probably be extremely - // expensive. It's best to skip it on types where it doesn't make sense - // (Kittybox doesn't support rendering children on non-feeds) - if !feed["type"].as_array().unwrap().iter().any(|t| *t == serde_json::json!("h-feed")) { - return Ok(Some((feed, None))); - } - - feed["children"] = sqlx::query_scalar::<_, serde_json::Value>(" -SELECT kittybox.hydrate_author(mf2) FROM kittybox.mf2_json -INNER JOIN kittybox.children -ON mf2_json.uid = children.child -WHERE - children.parent = $1 - AND ( - ( - (mf2 #>> '{properties,visibility,0}') = 'public' - OR - NOT (mf2['properties'] ? 'visibility') - ) - OR - ( - $3 != null AND ( - mf2['properties']['audience'] ? $3 - OR mf2['properties']['author'] ? $3 - ) - ) - ) - AND ($4 IS NULL OR ((mf2_json.mf2 #>> '{properties,published,0}') < $4)) -ORDER BY (mf2_json.mf2 #>> '{properties,published,0}') DESC -LIMIT $2" - ) - .bind(url) - .bind(limit as i64) - .bind(user) - .bind(cursor) - .fetch_all(&mut *txn) - .await - .map(serde_json::Value::Array)?; - - let new_cursor = feed["children"].as_array().unwrap() - .last() - .map(|v| v["properties"]["published"][0].as_str().unwrap().to_owned()); - - txn.commit().await?; - - Ok(Some((feed, new_cursor))) - } - - #[tracing::instrument(skip(self))] - async fn delete_post(&self, url: &'_ str) -> Result<()> { - todo!() - } - - #[tracing::instrument(skip(self))] - async fn get_setting<S: Setting<'a>, 'a>(&'_ self, user: &'_ str) -> Result<S> { - match sqlx::query_as::<_, (serde_json::Value,)>("SELECT kittybox.get_setting($1, $2)") - .bind(user) - .bind(S::ID) - .fetch_one(&self.db) - .await - { - Ok((value,)) => Ok(serde_json::from_value(value)?), - Err(err) => Err(err.into()) - } - } - - #[tracing::instrument(skip(self))] - async fn set_setting<S: Setting<'a> + 'a, 'a>(&self, user: &'a str, value: S::Data) -> Result<()> { - sqlx::query("SELECT kittybox.set_setting($1, $2, $3)") - .bind(user) - .bind(S::ID) - .bind(serde_json::to_value(S::new(value)).unwrap()) - .execute(&self.db) - .await - .map_err(Into::into) - .map(|_| ()) - } -} diff --git a/kittybox-rs/src/database/redis/edit_post.lua b/kittybox-rs/src/database/redis/edit_post.lua deleted file mode 100644 index a398f8d..0000000 --- a/kittybox-rs/src/database/redis/edit_post.lua +++ /dev/null @@ -1,93 +0,0 @@ -local posts = KEYS[1] -local update_desc = cjson.decode(ARGV[2]) -local post = cjson.decode(redis.call("HGET", posts, ARGV[1])) - -local delete_keys = {} -local delete_kvs = {} -local add_keys = {} - -if update_desc.replace ~= nil then - for k, v in pairs(update_desc.replace) do - table.insert(delete_keys, k) - add_keys[k] = v - end -end -if update_desc.delete ~= nil then - if update_desc.delete[0] == nil then - -- Table has string keys. Probably! - for k, v in pairs(update_desc.delete) do - delete_kvs[k] = v - end - else - -- Table has numeric keys. Probably! - for i, v in ipairs(update_desc.delete) do - table.insert(delete_keys, v) - end - end -end -if update_desc.add ~= nil then - for k, v in pairs(update_desc.add) do - add_keys[k] = v - end -end - -for i, v in ipairs(delete_keys) do - post["properties"][v] = nil - -- TODO delete URL links -end - -for k, v in pairs(delete_kvs) do - local index = -1 - if k == "children" then - for j, w in ipairs(post[k]) do - if w == v then - index = j - break - end - end - if index > -1 then - table.remove(post[k], index) - end - else - for j, w in ipairs(post["properties"][k]) do - if w == v then - index = j - break - end - end - if index > -1 then - table.remove(post["properties"][k], index) - -- TODO delete URL links - end - end -end - -for k, v in pairs(add_keys) do - if k == "children" then - if post["children"] == nil then - post["children"] = {} - end - for i, w in ipairs(v) do - table.insert(post["children"], 1, w) - end - else - if post["properties"][k] == nil then - post["properties"][k] = {} - end - for i, w in ipairs(v) do - table.insert(post["properties"][k], w) - end - if k == "url" then - redis.call("HSET", posts, v, cjson.encode({ see_other = post["properties"]["uid"][1] })) - elseif k == "channel" then - local feed = cjson.decode(redis.call("HGET", posts, v)) - table.insert(feed["children"], 1, post["properties"]["uid"][1]) - redis.call("HSET", posts, v, cjson.encode(feed)) - end - end -end - -local encoded = cjson.encode(post) -redis.call("SET", "debug", encoded) -redis.call("HSET", posts, post["properties"]["uid"][1], encoded) -return \ No newline at end of file diff --git a/kittybox-rs/src/database/redis/mod.rs b/kittybox-rs/src/database/redis/mod.rs deleted file mode 100644 index 39ee852..0000000 --- a/kittybox-rs/src/database/redis/mod.rs +++ /dev/null @@ -1,398 +0,0 @@ -use async_trait::async_trait; -use futures::stream; -use futures_util::FutureExt; -use futures_util::StreamExt; -use futures_util::TryStream; -use futures_util::TryStreamExt; -use lazy_static::lazy_static; -use log::error; -use mobc::Pool; -use mobc_redis::redis; -use mobc_redis::redis::AsyncCommands; -use mobc_redis::RedisConnectionManager; -use serde_json::json; -use std::time::Duration; - -use crate::database::{ErrorKind, MicropubChannel, Result, Storage, StorageError, filter_post}; -use crate::indieauth::User; - -struct RedisScripts { - edit_post: redis::Script, -} - -impl From<mobc_redis::redis::RedisError> for StorageError { - fn from(err: mobc_redis::redis::RedisError) -> Self { - Self { - msg: format!("{}", err), - source: Some(Box::new(err)), - kind: ErrorKind::Backend, - } - } -} -impl From<mobc::Error<mobc_redis::redis::RedisError>> for StorageError { - fn from(err: mobc::Error<mobc_redis::redis::RedisError>) -> Self { - Self { - msg: format!("{}", err), - source: Some(Box::new(err)), - kind: ErrorKind::Backend, - } - } -} - -lazy_static! { - static ref SCRIPTS: RedisScripts = RedisScripts { - edit_post: redis::Script::new(include_str!("./edit_post.lua")) - }; -} -/*#[cfg(feature(lazy_cell))] -static SCRIPTS_CELL: std::cell::LazyCell<RedisScripts> = std::cell::LazyCell::new(|| { - RedisScripts { - edit_post: redis::Script::new(include_str!("./edit_post.lua")) - } -});*/ - -#[derive(Clone)] -pub struct RedisStorage { - // note to future Vika: - // mobc::Pool is actually a fancy name for an Arc - // around a shared connection pool with a manager - // which makes it safe to implement [`Clone`] and - // not worry about new pools being suddenly made - // - // stop worrying and start coding, you dum-dum - redis: mobc::Pool<RedisConnectionManager>, -} - -#[async_trait] -impl Storage for RedisStorage { - async fn get_setting<'a>(&self, setting: &'a str, user: &'a str) -> Result<String> { - let mut conn = self.redis.get().await.map_err(|e| StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e)))?; - Ok(conn - .hget::<String, &str, String>(format!("settings_{}", user), setting) - .await?) - } - - async fn set_setting<'a>(&self, setting: &'a str, user: &'a str, value: &'a str) -> Result<()> { - let mut conn = self.redis.get().await.map_err(|e| StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e)))?; - Ok(conn - .hset::<String, &str, &str, ()>(format!("settings_{}", user), setting, value) - .await?) - } - - async fn delete_post<'a>(&self, url: &'a str) -> Result<()> { - let mut conn = self.redis.get().await.map_err(|e| StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e)))?; - Ok(conn.hdel::<&str, &str, ()>("posts", url).await?) - } - - async fn post_exists(&self, url: &str) -> Result<bool> { - let mut conn = self.redis.get().await.map_err(|e| StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e)))?; - Ok(conn.hexists::<&str, &str, bool>("posts", url).await?) - } - - async fn get_post(&self, url: &str) -> Result<Option<serde_json::Value>> { - let mut conn = self.redis.get().await.map_err(|e| StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e)))?; - match conn - .hget::<&str, &str, Option<String>>("posts", url) - .await? - { - Some(val) => { - let parsed = serde_json::from_str::<serde_json::Value>(&val)?; - if let Some(new_url) = parsed["see_other"].as_str() { - match conn - .hget::<&str, &str, Option<String>>("posts", new_url) - .await? - { - Some(val) => Ok(Some(serde_json::from_str::<serde_json::Value>(&val)?)), - None => Ok(None), - } - } else { - Ok(Some(parsed)) - } - } - None => Ok(None), - } - } - - async fn get_channels(&self, user: &User) -> Result<Vec<MicropubChannel>> { - let mut conn = self.redis.get().await.map_err(|e| StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e)))?; - let channels = conn - .smembers::<String, Vec<String>>("channels_".to_string() + user.me.as_str()) - .await?; - // TODO: use streams here instead of this weird thing... how did I even write this?! - Ok(futures_util::future::join_all( - channels - .iter() - .map(|channel| { - self.get_post(channel).map(|result| result.unwrap()).map( - |post: Option<serde_json::Value>| { - post.map(|post| MicropubChannel { - uid: post["properties"]["uid"][0].as_str().unwrap().to_string(), - name: post["properties"]["name"][0].as_str().unwrap().to_string(), - }) - }, - ) - }) - .collect::<Vec<_>>(), - ) - .await - .into_iter() - .flatten() - .collect::<Vec<_>>()) - } - - async fn put_post<'a>(&self, post: &'a serde_json::Value, user: &'a str) -> Result<()> { - let mut conn = self.redis.get().await.map_err(|e| StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e)))?; - let key: &str; - match post["properties"]["uid"][0].as_str() { - Some(uid) => key = uid, - None => { - return Err(StorageError::new( - ErrorKind::BadRequest, - "post doesn't have a UID", - )) - } - } - conn.hset::<&str, &str, String, ()>("posts", key, post.to_string()) - .await?; - if post["properties"]["url"].is_array() { - for url in post["properties"]["url"] - .as_array() - .unwrap() - .iter() - .map(|i| i.as_str().unwrap().to_string()) - { - if url != key && url.starts_with(user) { - conn.hset::<&str, &str, String, ()>( - "posts", - &url, - json!({ "see_other": key }).to_string(), - ) - .await?; - } - } - } - if post["type"] - .as_array() - .unwrap() - .iter() - .any(|i| i == "h-feed") - { - // This is a feed. Add it to the channels array if it's not already there. - conn.sadd::<String, &str, ()>( - "channels_".to_string() + post["properties"]["author"][0].as_str().unwrap(), - key, - ) - .await? - } - Ok(()) - } - - async fn read_feed_with_limit<'a>( - &self, - url: &'a str, - after: &'a Option<String>, - limit: usize, - user: &'a Option<String>, - ) -> Result<Option<serde_json::Value>> { - let mut conn = self.redis.get().await?; - let mut feed; - match conn - .hget::<&str, &str, Option<String>>("posts", url) - .await - .map_err(|e| StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e)))? - { - Some(post) => feed = serde_json::from_str::<serde_json::Value>(&post)?, - None => return Ok(None), - } - if feed["see_other"].is_string() { - match conn - .hget::<&str, &str, Option<String>>("posts", feed["see_other"].as_str().unwrap()) - .await? - { - Some(post) => feed = serde_json::from_str::<serde_json::Value>(&post)?, - None => return Ok(None), - } - } - if let Some(post) = filter_post(feed, user) { - feed = post - } else { - return Err(StorageError::new( - ErrorKind::PermissionDenied, - "specified user cannot access this post", - )); - } - if feed["children"].is_array() { - let children = feed["children"].as_array().unwrap(); - let mut posts_iter = children.iter().map(|i| i.as_str().unwrap().to_string()); - if after.is_some() { - loop { - let i = posts_iter.next(); - if &i == after { - break; - } - } - } - async fn fetch_post_for_feed(url: String) -> Option<serde_json::Value> { - return Some(serde_json::json!({})); - } - let posts = stream::iter(posts_iter) - .map(|url: String| async move { - return Ok(fetch_post_for_feed(url).await); - /*match self.redis.get().await { - Ok(mut conn) => { - match conn.hget::<&str, &str, Option<String>>("posts", &url).await { - Ok(post) => match post { - Some(post) => { - Ok(Some(serde_json::from_str(&post)?)) - } - // Happens because of a broken link (result of an improper deletion?) - None => Ok(None), - }, - Err(err) => Err(StorageError::with_source(ErrorKind::Backend, "Error executing a Redis command", Box::new(err))) - } - } - Err(err) => Err(StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(err))) - }*/ - }) - // TODO: determine the optimal value for this buffer - // It will probably depend on how often can you encounter a private post on the page - // It shouldn't be too large, or we'll start fetching too many posts from the database - // It MUST NOT be larger than the typical page size - // It MUST NOT be a significant amount of the connection pool size - //.buffered(std::cmp::min(3, limit)) - // Hack to unwrap the Option and sieve out broken links - // Broken links return None, and Stream::filter_map skips all Nones. - // I wonder if one can use try_flatten() here somehow akin to iters - .try_filter_map(|post| async move { Ok(post) }) - .try_filter_map(|post| async move { - Ok(filter_post(post, user)) - }) - .take(limit); - match posts.try_collect::<Vec<serde_json::Value>>().await { - Ok(posts) => feed["children"] = json!(posts), - Err(err) => { - let e = StorageError::with_source( - ErrorKind::Other, - "An error was encountered while processing the feed", - Box::new(err) - ); - error!("Error while assembling feed: {}", e); - return Err(e); - } - } - } - return Ok(Some(feed)); - } - - async fn update_post<'a>(&self, mut url: &'a str, update: serde_json::Value) -> Result<()> { - let mut conn = self.redis.get().await.map_err(|e| StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e)))?; - if !conn - .hexists::<&str, &str, bool>("posts", url) - .await - .unwrap() - { - return Err(StorageError::new( - ErrorKind::NotFound, - "can't edit a non-existent post", - )); - } - let post: serde_json::Value = - serde_json::from_str(&conn.hget::<&str, &str, String>("posts", url).await?)?; - if let Some(new_url) = post["see_other"].as_str() { - url = new_url - } - Ok(SCRIPTS - .edit_post - .key("posts") - .arg(url) - .arg(update.to_string()) - .invoke_async::<_, ()>(&mut conn as &mut redis::aio::Connection) - .await?) - } -} - -impl RedisStorage { - /// Create a new RedisDatabase that will connect to Redis at `redis_uri` to store data. - pub async fn new(redis_uri: String) -> Result<Self> { - match redis::Client::open(redis_uri) { - Ok(client) => Ok(Self { - redis: Pool::builder() - .max_open(20) - .max_idle(5) - .get_timeout(Some(Duration::from_secs(3))) - .max_lifetime(Some(Duration::from_secs(120))) - .build(RedisConnectionManager::new(client)), - }), - Err(e) => Err(e.into()), - } - } - - pub async fn conn(&self) -> Result<mobc::Connection<mobc_redis::RedisConnectionManager>> { - self.redis.get().await.map_err(|e| StorageError::with_source( - ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e) - )) - } -} - -#[cfg(test)] -pub mod tests { - use mobc_redis::redis; - use std::process; - use std::time::Duration; - - pub struct RedisInstance { - // We just need to hold on to it so it won't get dropped and remove the socket - _tempdir: tempdir::TempDir, - uri: String, - child: std::process::Child, - } - impl Drop for RedisInstance { - fn drop(&mut self) { - self.child.kill().expect("Failed to kill the child!"); - } - } - impl RedisInstance { - pub fn uri(&self) -> &str { - &self.uri - } - } - - pub async fn get_redis_instance() -> RedisInstance { - let tempdir = tempdir::TempDir::new("redis").expect("failed to create tempdir"); - let socket = tempdir.path().join("redis.sock"); - let redis_child = process::Command::new("redis-server") - .current_dir(&tempdir) - .arg("--port") - .arg("0") - .arg("--unixsocket") - .arg(&socket) - .stdout(process::Stdio::null()) - .stderr(process::Stdio::null()) - .spawn() - .expect("Failed to spawn Redis"); - println!("redis+unix:///{}", socket.to_str().unwrap()); - let uri = format!("redis+unix:///{}", socket.to_str().unwrap()); - // There should be a slight delay, we need to wait for Redis to spin up - let client = redis::Client::open(uri.clone()).unwrap(); - let millisecond = Duration::from_millis(1); - let mut retries: usize = 0; - const MAX_RETRIES: usize = 60 * 1000/*ms*/; - while let Err(err) = client.get_connection() { - if err.is_connection_refusal() { - async_std::task::sleep(millisecond).await; - retries += 1; - if retries > MAX_RETRIES { - panic!("Timeout waiting for Redis, last error: {}", err); - } - } else { - panic!("Could not connect: {}", err); - } - } - - RedisInstance { - uri, - child: redis_child, - _tempdir: tempdir, - } - } -} |