diff options
author | Vika <vika@fireburn.ru> | 2023-07-29 21:59:56 +0300 |
---|---|---|
committer | Vika <vika@fireburn.ru> | 2023-07-29 21:59:56 +0300 |
commit | 0617663b249f9ca488e5de652108b17d67fbaf45 (patch) | |
tree | 11564b6c8fa37bf9203a0a4cc1c4e9cc088cb1a5 /kittybox-rs/src/database/file | |
parent | 26c2b79f6a6380ae3224e9309b9f3352f5717bd7 (diff) | |
download | kittybox-0617663b249f9ca488e5de652108b17d67fbaf45.tar.zst |
Moved the entire Kittybox tree into the root
Diffstat (limited to 'kittybox-rs/src/database/file')
-rw-r--r-- | kittybox-rs/src/database/file/mod.rs | 733 |
1 files changed, 0 insertions, 733 deletions
diff --git a/kittybox-rs/src/database/file/mod.rs b/kittybox-rs/src/database/file/mod.rs deleted file mode 100644 index 27d3da1..0000000 --- a/kittybox-rs/src/database/file/mod.rs +++ /dev/null @@ -1,733 +0,0 @@ -//#![warn(clippy::unwrap_used)] -use crate::database::{ErrorKind, Result, settings, Storage, StorageError}; -use crate::micropub::{MicropubUpdate, MicropubPropertyDeletion}; -use async_trait::async_trait; -use futures::{stream, StreamExt, TryStreamExt}; -use kittybox_util::MentionType; -use serde_json::json; -use std::borrow::Cow; -use std::collections::HashMap; -use std::io::ErrorKind as IOErrorKind; -use std::path::{Path, PathBuf}; -use tokio::fs::{File, OpenOptions}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::task::spawn_blocking; -use tracing::{debug, error}; - -impl From<std::io::Error> for StorageError { - fn from(source: std::io::Error) -> Self { - Self::with_source( - match source.kind() { - IOErrorKind::NotFound => ErrorKind::NotFound, - IOErrorKind::AlreadyExists => ErrorKind::Conflict, - _ => ErrorKind::Backend, - }, - Cow::Owned(format!("file I/O error: {}", &source)), - Box::new(source), - ) - } -} - -impl From<tokio::time::error::Elapsed> for StorageError { - fn from(source: tokio::time::error::Elapsed) -> Self { - Self::with_source( - ErrorKind::Backend, - Cow::Borrowed("timeout on I/O operation"), - Box::new(source), - ) - } -} - -// Copied from https://stackoverflow.com/questions/39340924 -// This routine is adapted from the *old* Path's `path_relative_from` -// function, which works differently from the new `relative_from` function. -// In particular, this handles the case on unix where both paths are -// absolute but with only the root as the common directory. -fn path_relative_from(path: &Path, base: &Path) -> Option<PathBuf> { - use std::path::Component; - - if path.is_absolute() != base.is_absolute() { - if path.is_absolute() { - Some(PathBuf::from(path)) - } else { - None - } - } else { - let mut ita = path.components(); - let mut itb = base.components(); - let mut comps: Vec<Component> = vec![]; - loop { - match (ita.next(), itb.next()) { - (None, None) => break, - (Some(a), None) => { - comps.push(a); - comps.extend(ita.by_ref()); - break; - } - (None, _) => comps.push(Component::ParentDir), - (Some(a), Some(b)) if comps.is_empty() && a == b => (), - (Some(a), Some(b)) if b == Component::CurDir => comps.push(a), - (Some(_), Some(b)) if b == Component::ParentDir => return None, - (Some(a), Some(_)) => { - comps.push(Component::ParentDir); - for _ in itb { - comps.push(Component::ParentDir); - } - comps.push(a); - comps.extend(ita.by_ref()); - break; - } - } - } - Some(comps.iter().map(|c| c.as_os_str()).collect()) - } -} - -#[allow(clippy::unwrap_used, clippy::expect_used)] -#[cfg(test)] -mod tests { - #[test] - fn test_relative_path_resolving() { - let path1 = std::path::Path::new("/home/vika/Projects/kittybox"); - let path2 = std::path::Path::new("/home/vika/Projects/nixpkgs"); - let relative_path = super::path_relative_from(path2, path1).unwrap(); - - assert_eq!(relative_path, std::path::Path::new("../nixpkgs")) - } -} - -// TODO: Check that the path ACTUALLY IS INSIDE THE ROOT FOLDER -// This could be checked by completely resolving the path -// and checking if it has a common prefix -fn url_to_path(root: &Path, url: &str) -> PathBuf { - let path = url_to_relative_path(url).to_logical_path(root); - if !path.starts_with(root) { - // TODO: handle more gracefully - panic!("Security error: {:?} is not a prefix of {:?}", path, root) - } else { - path - } -} - -fn url_to_relative_path(url: &str) -> relative_path::RelativePathBuf { - let url = url::Url::try_from(url).expect("Couldn't parse a URL"); - let mut path = relative_path::RelativePathBuf::new(); - let user_domain = format!( - "{}{}", - url.host_str().unwrap(), - url.port() - .map(|port| format!(":{}", port)) - .unwrap_or_default() - ); - path.push(user_domain + url.path() + ".json"); - - path -} - -fn modify_post(post: &serde_json::Value, update: MicropubUpdate) -> Result<serde_json::Value> { - let mut post = post.clone(); - - let mut add_keys: HashMap<String, Vec<serde_json::Value>> = HashMap::new(); - let mut remove_keys: Vec<String> = vec![]; - let mut remove_values: HashMap<String, Vec<serde_json::Value>> = HashMap::new(); - - if let Some(MicropubPropertyDeletion::Properties(delete)) = update.delete { - remove_keys.extend(delete.iter().cloned()); - } else if let Some(MicropubPropertyDeletion::Values(delete)) = update.delete { - for (k, v) in delete { - remove_values - .entry(k.to_string()) - .or_default() - .extend(v.clone()); - } - } - if let Some(add) = update.add { - for (k, v) in add { - add_keys.insert(k.to_string(), v.clone()); - } - } - if let Some(replace) = update.replace { - for (k, v) in replace { - remove_keys.push(k.to_string()); - add_keys.insert(k.to_string(), v.clone()); - } - } - - if let Some(props) = post["properties"].as_object_mut() { - for k in remove_keys { - props.remove(&k); - } - } - for (k, v) in remove_values { - let k = &k; - let props = if k == "children" { - &mut post - } else { - &mut post["properties"] - }; - v.iter().for_each(|v| { - if let Some(vec) = props[k].as_array_mut() { - if let Some(index) = vec.iter().position(|w| w == v) { - vec.remove(index); - } - } - }); - } - for (k, v) in add_keys { - tracing::debug!("Adding k/v to post: {} => {:?}", k, v); - let props = if k == "children" { - &mut post - } else { - &mut post["properties"] - }; - if let Some(prop) = props[&k].as_array_mut() { - if k == "children" { - v.into_iter().rev().for_each(|v| prop.insert(0, v)); - } else { - prop.extend(v.into_iter()); - } - } else { - props[&k] = serde_json::Value::Array(v) - } - } - Ok(post) -} - -#[derive(Clone, Debug)] -/// A backend using a folder with JSON files as a backing store. -/// Uses symbolic links to represent a many-to-one mapping of URLs to a post. -pub struct FileStorage { - root_dir: PathBuf, -} - -impl FileStorage { - /// Create a new storage wrapping a folder specified by root_dir. - pub async fn new(root_dir: PathBuf) -> Result<Self> { - // TODO check if the dir is writable - Ok(Self { root_dir }) - } -} - -async fn hydrate_author<S: Storage>( - feed: &mut serde_json::Value, - user: &'_ Option<String>, - storage: &S, -) { - let url = feed["properties"]["uid"][0] - .as_str() - .expect("MF2 value should have a UID set! Check if you used normalize_mf2 before recording the post!"); - if let Some(author) = feed["properties"]["author"].as_array().cloned() { - if !feed["type"] - .as_array() - .expect("MF2 value should have a type set!") - .iter() - .any(|i| i == "h-card") - { - let author_list: Vec<serde_json::Value> = stream::iter(author.iter()) - .then(|i| async move { - if let Some(i) = i.as_str() { - match storage.get_post(i).await { - Ok(post) => match post { - Some(post) => post, - None => json!(i), - }, - Err(e) => { - error!("Error while hydrating post {}: {}", url, e); - json!(i) - } - } - } else { - i.clone() - } - }) - .collect::<Vec<_>>() - .await; - if let Some(props) = feed["properties"].as_object_mut() { - props["author"] = json!(author_list); - } else { - feed["properties"] = json!({ "author": author_list }); - } - } - } -} - -#[async_trait] -impl Storage for FileStorage { - #[tracing::instrument(skip(self))] - async fn post_exists(&self, url: &str) -> Result<bool> { - let path = url_to_path(&self.root_dir, url); - debug!("Checking if {:?} exists...", path); - /*let result = match tokio::fs::metadata(path).await { - Ok(metadata) => { - Ok(true) - }, - Err(err) => { - if err.kind() == IOErrorKind::NotFound { - Ok(false) - } else { - Err(err.into()) - } - } - };*/ - #[allow(clippy::unwrap_used)] // JoinHandle captures panics, this closure shouldn't panic - Ok(spawn_blocking(move || path.is_file()).await.unwrap()) - } - - #[tracing::instrument(skip(self))] - async fn get_post(&self, url: &str) -> Result<Option<serde_json::Value>> { - let path = url_to_path(&self.root_dir, url); - // TODO: check that the path actually belongs to the dir of user who requested it - // it's not like you CAN access someone else's private posts with it - // so it's not exactly a security issue, but it's still not good - debug!("Opening {:?}", path); - - match File::open(&path).await { - Ok(mut file) => { - let mut content = String::new(); - // Typechecks because OS magic acts on references - // to FDs as if they were behind a mutex - AsyncReadExt::read_to_string(&mut file, &mut content).await?; - debug!( - "Read {} bytes successfully from {:?}", - content.as_bytes().len(), - &path - ); - Ok(Some(serde_json::from_str(&content)?)) - } - Err(err) => { - if err.kind() == IOErrorKind::NotFound { - Ok(None) - } else { - Err(err.into()) - } - } - } - } - - #[tracing::instrument(skip(self))] - async fn put_post(&self, post: &'_ serde_json::Value, user: &'_ str) -> Result<()> { - let key = post["properties"]["uid"][0] - .as_str() - .expect("Tried to save a post without UID"); - let path = url_to_path(&self.root_dir, key); - let tempfile = (&path).with_extension("tmp"); - debug!("Creating {:?}", path); - - let parent = path - .parent() - .expect("Parent for this directory should always exist") - .to_owned(); - tokio::fs::create_dir_all(&parent).await?; - - let mut file = tokio::fs::OpenOptions::new() - .write(true) - .create_new(true) - .open(&tempfile) - .await?; - - file.write_all(post.to_string().as_bytes()).await?; - file.flush().await?; - file.sync_all().await?; - drop(file); - tokio::fs::rename(&tempfile, &path).await?; - tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?; - - if let Some(urls) = post["properties"]["url"].as_array() { - for url in urls.iter().map(|i| i.as_str().unwrap()) { - let url_domain = { - let url = url::Url::parse(url).unwrap(); - format!( - "{}{}", - url.host_str().unwrap(), - url.port() - .map(|port| format!(":{}", port)) - .unwrap_or_default() - ) - }; - if url != key && url_domain == user { - let link = url_to_path(&self.root_dir, url); - debug!("Creating a symlink at {:?}", link); - let orig = path.clone(); - // We're supposed to have a parent here. - let basedir = link.parent().ok_or_else(|| { - StorageError::from_static( - ErrorKind::Backend, - "Failed to calculate parent directory when creating a symlink", - ) - })?; - let relative = path_relative_from(&orig, basedir).unwrap(); - println!("{:?} - {:?} = {:?}", &orig, &basedir, &relative); - tokio::fs::symlink(relative, link).await?; - } - } - } - - if post["type"] - .as_array() - .unwrap() - .iter() - .any(|s| s.as_str() == Some("h-feed")) - { - tracing::debug!("Adding to channel list..."); - // Add the h-feed to the channel list - let path = { - let mut path = relative_path::RelativePathBuf::new(); - path.push(user); - path.push("channels"); - - path.to_path(&self.root_dir) - }; - tokio::fs::create_dir_all(path.parent().unwrap()).await?; - tracing::debug!("Channels file path: {}", path.display()); - let tempfilename = path.with_extension("tmp"); - let channel_name = post["properties"]["name"][0] - .as_str() - .map(|s| s.to_string()) - .unwrap_or_else(String::default); - let key = key.to_string(); - tracing::debug!("Opening temporary file to modify chnanels..."); - let mut tempfile = OpenOptions::new() - .write(true) - .create_new(true) - .open(&tempfilename) - .await?; - tracing::debug!("Opening real channel file..."); - let mut channels: Vec<super::MicropubChannel> = { - match OpenOptions::new() - .read(true) - .write(false) - .truncate(false) - .create(false) - .open(&path) - .await - { - Err(err) if err.kind() == std::io::ErrorKind::NotFound => { - Vec::default() - } - Err(err) => { - // Propagate the error upwards - return Err(err.into()); - } - Ok(mut file) => { - let mut content = String::new(); - file.read_to_string(&mut content).await?; - drop(file); - - if !content.is_empty() { - serde_json::from_str(&content)? - } else { - Vec::default() - } - } - } - }; - - channels.push(super::MicropubChannel { - uid: key.to_string(), - name: channel_name, - }); - - tempfile - .write_all(serde_json::to_string(&channels)?.as_bytes()) - .await?; - tempfile.flush().await?; - tempfile.sync_all().await?; - drop(tempfile); - tokio::fs::rename(tempfilename, &path).await?; - tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?; - } - Ok(()) - } - - #[tracing::instrument(skip(self))] - async fn update_post(&self, url: &str, update: MicropubUpdate) -> Result<()> { - let path = url_to_path(&self.root_dir, url); - let tempfilename = path.with_extension("tmp"); - #[allow(unused_variables)] - let (old_json, new_json) = { - let mut temp = OpenOptions::new() - .write(true) - .create_new(true) - .open(&tempfilename) - .await?; - let mut file = OpenOptions::new().read(true).open(&path).await?; - - let mut content = String::new(); - file.read_to_string(&mut content).await?; - let json: serde_json::Value = serde_json::from_str(&content)?; - drop(file); - // Apply the editing algorithms - let new_json = modify_post(&json, update)?; - - temp.write_all(new_json.to_string().as_bytes()).await?; - temp.flush().await?; - temp.sync_all().await?; - drop(temp); - tokio::fs::rename(tempfilename, &path).await?; - tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?; - - (json, new_json) - }; - // TODO check if URLs changed between old and new JSON - Ok(()) - } - - #[tracing::instrument(skip(self))] - async fn get_channels(&self, user: &'_ str) -> Result<Vec<super::MicropubChannel>> { - let mut path = relative_path::RelativePathBuf::new(); - path.push(user); - path.push("channels"); - - let path = path.to_path(&self.root_dir); - tracing::debug!("Channels file path: {}", path.display()); - - match File::open(&path).await { - Ok(mut f) => { - let mut content = String::new(); - f.read_to_string(&mut content).await?; - // This should not happen, but if it does, handle it gracefully - if content.is_empty() { - return Ok(vec![]); - } - let channels: Vec<super::MicropubChannel> = serde_json::from_str(&content)?; - Ok(channels) - } - Err(e) => { - if e.kind() == IOErrorKind::NotFound { - Ok(vec![]) - } else { - Err(e.into()) - } - } - } - } - - async fn read_feed_with_cursor( - &self, - url: &'_ str, - cursor: Option<&'_ str>, - limit: usize, - user: Option<&'_ str> - ) -> Result<Option<(serde_json::Value, Option<String>)>> { - Ok(self.read_feed_with_limit( - url, - &cursor.map(|v| v.to_owned()), - limit, - &user.map(|v| v.to_owned()) - ).await? - .map(|feed| { - tracing::debug!("Feed: {:#}", serde_json::Value::Array( - feed["children"] - .as_array() - .map(|v| v.as_slice()) - .unwrap_or_default() - .iter() - .map(|mf2| mf2["properties"]["uid"][0].clone()) - .collect::<Vec<_>>() - )); - let cursor: Option<String> = feed["children"] - .as_array() - .map(|v| v.as_slice()) - .unwrap_or_default() - .last() - .map(|v| v["properties"]["uid"][0].as_str().unwrap().to_owned()); - tracing::debug!("Extracted the cursor: {:?}", cursor); - (feed, cursor) - }) - ) - } - - #[tracing::instrument(skip(self))] - async fn read_feed_with_limit( - &self, - url: &'_ str, - after: &'_ Option<String>, - limit: usize, - user: &'_ Option<String>, - ) -> Result<Option<serde_json::Value>> { - if let Some(mut feed) = self.get_post(url).await? { - if feed["children"].is_array() { - // Take this out of the MF2-JSON document to save memory - // - // This uses a clever match with enum destructuring - // to extract the underlying Vec without cloning it - let children: Vec<serde_json::Value> = match feed["children"].take() { - serde_json::Value::Array(children) => children, - // We've already checked it's an array - _ => unreachable!() - }; - tracing::debug!("Full children array: {:#}", serde_json::Value::Array(children.clone())); - let mut posts_iter = children - .into_iter() - .map(|s: serde_json::Value| s.as_str().unwrap().to_string()); - // Note: we can't actually use `skip_while` here because we end up emitting `after`. - // This imperative snippet consumes after instead of emitting it, allowing the - // stream of posts to return only those items that truly come *after* that one. - // If I would implement an Iter combinator like this, I would call it `skip_until` - if let Some(after) = after { - for s in posts_iter.by_ref() { - if &s == after { - break; - } - } - }; - let posts = stream::iter(posts_iter) - .map(|url: String| async move { self.get_post(&url).await }) - .buffered(std::cmp::min(3, limit)) - // Hack to unwrap the Option and sieve out broken links - // Broken links return None, and Stream::filter_map skips Nones. - .try_filter_map(|post: Option<serde_json::Value>| async move { Ok(post) }) - .and_then(|mut post| async move { - hydrate_author(&mut post, user, self).await; - Ok(post) - }) - .take(limit); - - match posts.try_collect::<Vec<serde_json::Value>>().await { - Ok(posts) => feed["children"] = serde_json::json!(posts), - Err(err) => { - return Err(StorageError::with_source( - ErrorKind::Other, - Cow::Owned(format!("Feed assembly error: {}", &err)), - Box::new(err), - )); - } - } - } - hydrate_author(&mut feed, user, self).await; - Ok(Some(feed)) - } else { - Ok(None) - } - } - - #[tracing::instrument(skip(self))] - async fn delete_post(&self, url: &'_ str) -> Result<()> { - let path = url_to_path(&self.root_dir, url); - if let Err(e) = tokio::fs::remove_file(path).await { - Err(e.into()) - } else { - // TODO check for dangling references in the channel list - Ok(()) - } - } - - #[tracing::instrument(skip(self))] - async fn get_setting<S: settings::Setting<'a>, 'a>(&self, user: &'_ str) -> Result<S> { - debug!("User for getting settings: {}", user); - let mut path = relative_path::RelativePathBuf::new(); - path.push(user); - path.push("settings"); - - let path = path.to_path(&self.root_dir); - debug!("Getting settings from {:?}", &path); - - let mut file = File::open(path).await?; - let mut content = String::new(); - file.read_to_string(&mut content).await?; - - let settings: HashMap<&str, serde_json::Value> = serde_json::from_str(&content)?; - match settings.get(S::ID) { - Some(value) => Ok(serde_json::from_value::<S>(value.clone())?), - None => Err(StorageError::from_static(ErrorKind::Backend, "Setting not set")) - } - } - - #[tracing::instrument(skip(self))] - async fn set_setting<S: settings::Setting<'a> + 'a, 'a>(&self, user: &'a str, value: S::Data) -> Result<()> { - let mut path = relative_path::RelativePathBuf::new(); - path.push(user); - path.push("settings"); - - let path = path.to_path(&self.root_dir); - let temppath = path.with_extension("tmp"); - - let parent = path.parent().unwrap().to_owned(); - tokio::fs::create_dir_all(&parent).await?; - - let mut tempfile = OpenOptions::new() - .write(true) - .create_new(true) - .open(&temppath) - .await?; - - let mut settings: HashMap<String, serde_json::Value> = match File::open(&path).await { - Ok(mut f) => { - let mut content = String::new(); - f.read_to_string(&mut content).await?; - if content.is_empty() { - Default::default() - } else { - serde_json::from_str(&content)? - } - } - Err(err) => { - if err.kind() == IOErrorKind::NotFound { - Default::default() - } else { - return Err(err.into()); - } - } - }; - settings.insert(S::ID.to_owned(), serde_json::to_value(S::new(value))?); - - tempfile - .write_all(serde_json::to_string(&settings)?.as_bytes()) - .await?; - tempfile.flush().await?; - tempfile.sync_all().await?; - drop(tempfile); - tokio::fs::rename(temppath, &path).await?; - tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?; - Ok(()) - } - - #[tracing::instrument(skip(self))] - async fn add_or_update_webmention(&self, target: &str, mention_type: MentionType, mention: serde_json::Value) -> Result<()> { - let path = url_to_path(&self.root_dir, target); - let tempfilename = path.with_extension("tmp"); - - let mut temp = OpenOptions::new() - .write(true) - .create_new(true) - .open(&tempfilename) - .await?; - let mut file = OpenOptions::new().read(true).open(&path).await?; - - let mut post: serde_json::Value = { - let mut content = String::new(); - file.read_to_string(&mut content).await?; - drop(file); - - serde_json::from_str(&content)? - }; - - let key: &'static str = match mention_type { - MentionType::Reply => "comment", - MentionType::Like => "like", - MentionType::Repost => "repost", - MentionType::Bookmark => "bookmark", - MentionType::Mention => "mention", - }; - let mention_uid = mention["properties"]["uid"][0].clone(); - if let Some(values) = post["properties"][key].as_array_mut() { - for value in values.iter_mut() { - if value["properties"]["uid"][0] == mention_uid { - *value = mention; - break; - } - } - } else { - post["properties"][key] = serde_json::Value::Array(vec![mention]); - } - - temp.write_all(post.to_string().as_bytes()).await?; - temp.flush().await?; - temp.sync_all().await?; - drop(temp); - tokio::fs::rename(tempfilename, &path).await?; - tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?; - - Ok(()) - } -} |