diff options
author | Vika <vika@fireburn.ru> | 2023-07-29 21:59:56 +0300 |
---|---|---|
committer | Vika <vika@fireburn.ru> | 2023-07-29 21:59:56 +0300 |
commit | 0617663b249f9ca488e5de652108b17d67fbaf45 (patch) | |
tree | 11564b6c8fa37bf9203a0a4cc1c4e9cc088cb1a5 /src/database/file | |
parent | 26c2b79f6a6380ae3224e9309b9f3352f5717bd7 (diff) | |
download | kittybox-0617663b249f9ca488e5de652108b17d67fbaf45.tar.zst |
Moved the entire Kittybox tree into the root
Diffstat (limited to 'src/database/file')
-rw-r--r-- | src/database/file/mod.rs | 733 |
1 files changed, 733 insertions, 0 deletions
diff --git a/src/database/file/mod.rs b/src/database/file/mod.rs new file mode 100644 index 0000000..27d3da1 --- /dev/null +++ b/src/database/file/mod.rs @@ -0,0 +1,733 @@ +//#![warn(clippy::unwrap_used)] +use crate::database::{ErrorKind, Result, settings, Storage, StorageError}; +use crate::micropub::{MicropubUpdate, MicropubPropertyDeletion}; +use async_trait::async_trait; +use futures::{stream, StreamExt, TryStreamExt}; +use kittybox_util::MentionType; +use serde_json::json; +use std::borrow::Cow; +use std::collections::HashMap; +use std::io::ErrorKind as IOErrorKind; +use std::path::{Path, PathBuf}; +use tokio::fs::{File, OpenOptions}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::task::spawn_blocking; +use tracing::{debug, error}; + +impl From<std::io::Error> for StorageError { + fn from(source: std::io::Error) -> Self { + Self::with_source( + match source.kind() { + IOErrorKind::NotFound => ErrorKind::NotFound, + IOErrorKind::AlreadyExists => ErrorKind::Conflict, + _ => ErrorKind::Backend, + }, + Cow::Owned(format!("file I/O error: {}", &source)), + Box::new(source), + ) + } +} + +impl From<tokio::time::error::Elapsed> for StorageError { + fn from(source: tokio::time::error::Elapsed) -> Self { + Self::with_source( + ErrorKind::Backend, + Cow::Borrowed("timeout on I/O operation"), + Box::new(source), + ) + } +} + +// Copied from https://stackoverflow.com/questions/39340924 +// This routine is adapted from the *old* Path's `path_relative_from` +// function, which works differently from the new `relative_from` function. +// In particular, this handles the case on unix where both paths are +// absolute but with only the root as the common directory. +fn path_relative_from(path: &Path, base: &Path) -> Option<PathBuf> { + use std::path::Component; + + if path.is_absolute() != base.is_absolute() { + if path.is_absolute() { + Some(PathBuf::from(path)) + } else { + None + } + } else { + let mut ita = path.components(); + let mut itb = base.components(); + let mut comps: Vec<Component> = vec![]; + loop { + match (ita.next(), itb.next()) { + (None, None) => break, + (Some(a), None) => { + comps.push(a); + comps.extend(ita.by_ref()); + break; + } + (None, _) => comps.push(Component::ParentDir), + (Some(a), Some(b)) if comps.is_empty() && a == b => (), + (Some(a), Some(b)) if b == Component::CurDir => comps.push(a), + (Some(_), Some(b)) if b == Component::ParentDir => return None, + (Some(a), Some(_)) => { + comps.push(Component::ParentDir); + for _ in itb { + comps.push(Component::ParentDir); + } + comps.push(a); + comps.extend(ita.by_ref()); + break; + } + } + } + Some(comps.iter().map(|c| c.as_os_str()).collect()) + } +} + +#[allow(clippy::unwrap_used, clippy::expect_used)] +#[cfg(test)] +mod tests { + #[test] + fn test_relative_path_resolving() { + let path1 = std::path::Path::new("/home/vika/Projects/kittybox"); + let path2 = std::path::Path::new("/home/vika/Projects/nixpkgs"); + let relative_path = super::path_relative_from(path2, path1).unwrap(); + + assert_eq!(relative_path, std::path::Path::new("../nixpkgs")) + } +} + +// TODO: Check that the path ACTUALLY IS INSIDE THE ROOT FOLDER +// This could be checked by completely resolving the path +// and checking if it has a common prefix +fn url_to_path(root: &Path, url: &str) -> PathBuf { + let path = url_to_relative_path(url).to_logical_path(root); + if !path.starts_with(root) { + // TODO: handle more gracefully + panic!("Security error: {:?} is not a prefix of {:?}", path, root) + } else { + path + } +} + +fn url_to_relative_path(url: &str) -> relative_path::RelativePathBuf { + let url = url::Url::try_from(url).expect("Couldn't parse a URL"); + let mut path = relative_path::RelativePathBuf::new(); + let user_domain = format!( + "{}{}", + url.host_str().unwrap(), + url.port() + .map(|port| format!(":{}", port)) + .unwrap_or_default() + ); + path.push(user_domain + url.path() + ".json"); + + path +} + +fn modify_post(post: &serde_json::Value, update: MicropubUpdate) -> Result<serde_json::Value> { + let mut post = post.clone(); + + let mut add_keys: HashMap<String, Vec<serde_json::Value>> = HashMap::new(); + let mut remove_keys: Vec<String> = vec![]; + let mut remove_values: HashMap<String, Vec<serde_json::Value>> = HashMap::new(); + + if let Some(MicropubPropertyDeletion::Properties(delete)) = update.delete { + remove_keys.extend(delete.iter().cloned()); + } else if let Some(MicropubPropertyDeletion::Values(delete)) = update.delete { + for (k, v) in delete { + remove_values + .entry(k.to_string()) + .or_default() + .extend(v.clone()); + } + } + if let Some(add) = update.add { + for (k, v) in add { + add_keys.insert(k.to_string(), v.clone()); + } + } + if let Some(replace) = update.replace { + for (k, v) in replace { + remove_keys.push(k.to_string()); + add_keys.insert(k.to_string(), v.clone()); + } + } + + if let Some(props) = post["properties"].as_object_mut() { + for k in remove_keys { + props.remove(&k); + } + } + for (k, v) in remove_values { + let k = &k; + let props = if k == "children" { + &mut post + } else { + &mut post["properties"] + }; + v.iter().for_each(|v| { + if let Some(vec) = props[k].as_array_mut() { + if let Some(index) = vec.iter().position(|w| w == v) { + vec.remove(index); + } + } + }); + } + for (k, v) in add_keys { + tracing::debug!("Adding k/v to post: {} => {:?}", k, v); + let props = if k == "children" { + &mut post + } else { + &mut post["properties"] + }; + if let Some(prop) = props[&k].as_array_mut() { + if k == "children" { + v.into_iter().rev().for_each(|v| prop.insert(0, v)); + } else { + prop.extend(v.into_iter()); + } + } else { + props[&k] = serde_json::Value::Array(v) + } + } + Ok(post) +} + +#[derive(Clone, Debug)] +/// A backend using a folder with JSON files as a backing store. +/// Uses symbolic links to represent a many-to-one mapping of URLs to a post. +pub struct FileStorage { + root_dir: PathBuf, +} + +impl FileStorage { + /// Create a new storage wrapping a folder specified by root_dir. + pub async fn new(root_dir: PathBuf) -> Result<Self> { + // TODO check if the dir is writable + Ok(Self { root_dir }) + } +} + +async fn hydrate_author<S: Storage>( + feed: &mut serde_json::Value, + user: &'_ Option<String>, + storage: &S, +) { + let url = feed["properties"]["uid"][0] + .as_str() + .expect("MF2 value should have a UID set! Check if you used normalize_mf2 before recording the post!"); + if let Some(author) = feed["properties"]["author"].as_array().cloned() { + if !feed["type"] + .as_array() + .expect("MF2 value should have a type set!") + .iter() + .any(|i| i == "h-card") + { + let author_list: Vec<serde_json::Value> = stream::iter(author.iter()) + .then(|i| async move { + if let Some(i) = i.as_str() { + match storage.get_post(i).await { + Ok(post) => match post { + Some(post) => post, + None => json!(i), + }, + Err(e) => { + error!("Error while hydrating post {}: {}", url, e); + json!(i) + } + } + } else { + i.clone() + } + }) + .collect::<Vec<_>>() + .await; + if let Some(props) = feed["properties"].as_object_mut() { + props["author"] = json!(author_list); + } else { + feed["properties"] = json!({ "author": author_list }); + } + } + } +} + +#[async_trait] +impl Storage for FileStorage { + #[tracing::instrument(skip(self))] + async fn post_exists(&self, url: &str) -> Result<bool> { + let path = url_to_path(&self.root_dir, url); + debug!("Checking if {:?} exists...", path); + /*let result = match tokio::fs::metadata(path).await { + Ok(metadata) => { + Ok(true) + }, + Err(err) => { + if err.kind() == IOErrorKind::NotFound { + Ok(false) + } else { + Err(err.into()) + } + } + };*/ + #[allow(clippy::unwrap_used)] // JoinHandle captures panics, this closure shouldn't panic + Ok(spawn_blocking(move || path.is_file()).await.unwrap()) + } + + #[tracing::instrument(skip(self))] + async fn get_post(&self, url: &str) -> Result<Option<serde_json::Value>> { + let path = url_to_path(&self.root_dir, url); + // TODO: check that the path actually belongs to the dir of user who requested it + // it's not like you CAN access someone else's private posts with it + // so it's not exactly a security issue, but it's still not good + debug!("Opening {:?}", path); + + match File::open(&path).await { + Ok(mut file) => { + let mut content = String::new(); + // Typechecks because OS magic acts on references + // to FDs as if they were behind a mutex + AsyncReadExt::read_to_string(&mut file, &mut content).await?; + debug!( + "Read {} bytes successfully from {:?}", + content.as_bytes().len(), + &path + ); + Ok(Some(serde_json::from_str(&content)?)) + } + Err(err) => { + if err.kind() == IOErrorKind::NotFound { + Ok(None) + } else { + Err(err.into()) + } + } + } + } + + #[tracing::instrument(skip(self))] + async fn put_post(&self, post: &'_ serde_json::Value, user: &'_ str) -> Result<()> { + let key = post["properties"]["uid"][0] + .as_str() + .expect("Tried to save a post without UID"); + let path = url_to_path(&self.root_dir, key); + let tempfile = (&path).with_extension("tmp"); + debug!("Creating {:?}", path); + + let parent = path + .parent() + .expect("Parent for this directory should always exist") + .to_owned(); + tokio::fs::create_dir_all(&parent).await?; + + let mut file = tokio::fs::OpenOptions::new() + .write(true) + .create_new(true) + .open(&tempfile) + .await?; + + file.write_all(post.to_string().as_bytes()).await?; + file.flush().await?; + file.sync_all().await?; + drop(file); + tokio::fs::rename(&tempfile, &path).await?; + tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?; + + if let Some(urls) = post["properties"]["url"].as_array() { + for url in urls.iter().map(|i| i.as_str().unwrap()) { + let url_domain = { + let url = url::Url::parse(url).unwrap(); + format!( + "{}{}", + url.host_str().unwrap(), + url.port() + .map(|port| format!(":{}", port)) + .unwrap_or_default() + ) + }; + if url != key && url_domain == user { + let link = url_to_path(&self.root_dir, url); + debug!("Creating a symlink at {:?}", link); + let orig = path.clone(); + // We're supposed to have a parent here. + let basedir = link.parent().ok_or_else(|| { + StorageError::from_static( + ErrorKind::Backend, + "Failed to calculate parent directory when creating a symlink", + ) + })?; + let relative = path_relative_from(&orig, basedir).unwrap(); + println!("{:?} - {:?} = {:?}", &orig, &basedir, &relative); + tokio::fs::symlink(relative, link).await?; + } + } + } + + if post["type"] + .as_array() + .unwrap() + .iter() + .any(|s| s.as_str() == Some("h-feed")) + { + tracing::debug!("Adding to channel list..."); + // Add the h-feed to the channel list + let path = { + let mut path = relative_path::RelativePathBuf::new(); + path.push(user); + path.push("channels"); + + path.to_path(&self.root_dir) + }; + tokio::fs::create_dir_all(path.parent().unwrap()).await?; + tracing::debug!("Channels file path: {}", path.display()); + let tempfilename = path.with_extension("tmp"); + let channel_name = post["properties"]["name"][0] + .as_str() + .map(|s| s.to_string()) + .unwrap_or_else(String::default); + let key = key.to_string(); + tracing::debug!("Opening temporary file to modify chnanels..."); + let mut tempfile = OpenOptions::new() + .write(true) + .create_new(true) + .open(&tempfilename) + .await?; + tracing::debug!("Opening real channel file..."); + let mut channels: Vec<super::MicropubChannel> = { + match OpenOptions::new() + .read(true) + .write(false) + .truncate(false) + .create(false) + .open(&path) + .await + { + Err(err) if err.kind() == std::io::ErrorKind::NotFound => { + Vec::default() + } + Err(err) => { + // Propagate the error upwards + return Err(err.into()); + } + Ok(mut file) => { + let mut content = String::new(); + file.read_to_string(&mut content).await?; + drop(file); + + if !content.is_empty() { + serde_json::from_str(&content)? + } else { + Vec::default() + } + } + } + }; + + channels.push(super::MicropubChannel { + uid: key.to_string(), + name: channel_name, + }); + + tempfile + .write_all(serde_json::to_string(&channels)?.as_bytes()) + .await?; + tempfile.flush().await?; + tempfile.sync_all().await?; + drop(tempfile); + tokio::fs::rename(tempfilename, &path).await?; + tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?; + } + Ok(()) + } + + #[tracing::instrument(skip(self))] + async fn update_post(&self, url: &str, update: MicropubUpdate) -> Result<()> { + let path = url_to_path(&self.root_dir, url); + let tempfilename = path.with_extension("tmp"); + #[allow(unused_variables)] + let (old_json, new_json) = { + let mut temp = OpenOptions::new() + .write(true) + .create_new(true) + .open(&tempfilename) + .await?; + let mut file = OpenOptions::new().read(true).open(&path).await?; + + let mut content = String::new(); + file.read_to_string(&mut content).await?; + let json: serde_json::Value = serde_json::from_str(&content)?; + drop(file); + // Apply the editing algorithms + let new_json = modify_post(&json, update)?; + + temp.write_all(new_json.to_string().as_bytes()).await?; + temp.flush().await?; + temp.sync_all().await?; + drop(temp); + tokio::fs::rename(tempfilename, &path).await?; + tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?; + + (json, new_json) + }; + // TODO check if URLs changed between old and new JSON + Ok(()) + } + + #[tracing::instrument(skip(self))] + async fn get_channels(&self, user: &'_ str) -> Result<Vec<super::MicropubChannel>> { + let mut path = relative_path::RelativePathBuf::new(); + path.push(user); + path.push("channels"); + + let path = path.to_path(&self.root_dir); + tracing::debug!("Channels file path: {}", path.display()); + + match File::open(&path).await { + Ok(mut f) => { + let mut content = String::new(); + f.read_to_string(&mut content).await?; + // This should not happen, but if it does, handle it gracefully + if content.is_empty() { + return Ok(vec![]); + } + let channels: Vec<super::MicropubChannel> = serde_json::from_str(&content)?; + Ok(channels) + } + Err(e) => { + if e.kind() == IOErrorKind::NotFound { + Ok(vec![]) + } else { + Err(e.into()) + } + } + } + } + + async fn read_feed_with_cursor( + &self, + url: &'_ str, + cursor: Option<&'_ str>, + limit: usize, + user: Option<&'_ str> + ) -> Result<Option<(serde_json::Value, Option<String>)>> { + Ok(self.read_feed_with_limit( + url, + &cursor.map(|v| v.to_owned()), + limit, + &user.map(|v| v.to_owned()) + ).await? + .map(|feed| { + tracing::debug!("Feed: {:#}", serde_json::Value::Array( + feed["children"] + .as_array() + .map(|v| v.as_slice()) + .unwrap_or_default() + .iter() + .map(|mf2| mf2["properties"]["uid"][0].clone()) + .collect::<Vec<_>>() + )); + let cursor: Option<String> = feed["children"] + .as_array() + .map(|v| v.as_slice()) + .unwrap_or_default() + .last() + .map(|v| v["properties"]["uid"][0].as_str().unwrap().to_owned()); + tracing::debug!("Extracted the cursor: {:?}", cursor); + (feed, cursor) + }) + ) + } + + #[tracing::instrument(skip(self))] + async fn read_feed_with_limit( + &self, + url: &'_ str, + after: &'_ Option<String>, + limit: usize, + user: &'_ Option<String>, + ) -> Result<Option<serde_json::Value>> { + if let Some(mut feed) = self.get_post(url).await? { + if feed["children"].is_array() { + // Take this out of the MF2-JSON document to save memory + // + // This uses a clever match with enum destructuring + // to extract the underlying Vec without cloning it + let children: Vec<serde_json::Value> = match feed["children"].take() { + serde_json::Value::Array(children) => children, + // We've already checked it's an array + _ => unreachable!() + }; + tracing::debug!("Full children array: {:#}", serde_json::Value::Array(children.clone())); + let mut posts_iter = children + .into_iter() + .map(|s: serde_json::Value| s.as_str().unwrap().to_string()); + // Note: we can't actually use `skip_while` here because we end up emitting `after`. + // This imperative snippet consumes after instead of emitting it, allowing the + // stream of posts to return only those items that truly come *after* that one. + // If I would implement an Iter combinator like this, I would call it `skip_until` + if let Some(after) = after { + for s in posts_iter.by_ref() { + if &s == after { + break; + } + } + }; + let posts = stream::iter(posts_iter) + .map(|url: String| async move { self.get_post(&url).await }) + .buffered(std::cmp::min(3, limit)) + // Hack to unwrap the Option and sieve out broken links + // Broken links return None, and Stream::filter_map skips Nones. + .try_filter_map(|post: Option<serde_json::Value>| async move { Ok(post) }) + .and_then(|mut post| async move { + hydrate_author(&mut post, user, self).await; + Ok(post) + }) + .take(limit); + + match posts.try_collect::<Vec<serde_json::Value>>().await { + Ok(posts) => feed["children"] = serde_json::json!(posts), + Err(err) => { + return Err(StorageError::with_source( + ErrorKind::Other, + Cow::Owned(format!("Feed assembly error: {}", &err)), + Box::new(err), + )); + } + } + } + hydrate_author(&mut feed, user, self).await; + Ok(Some(feed)) + } else { + Ok(None) + } + } + + #[tracing::instrument(skip(self))] + async fn delete_post(&self, url: &'_ str) -> Result<()> { + let path = url_to_path(&self.root_dir, url); + if let Err(e) = tokio::fs::remove_file(path).await { + Err(e.into()) + } else { + // TODO check for dangling references in the channel list + Ok(()) + } + } + + #[tracing::instrument(skip(self))] + async fn get_setting<S: settings::Setting<'a>, 'a>(&self, user: &'_ str) -> Result<S> { + debug!("User for getting settings: {}", user); + let mut path = relative_path::RelativePathBuf::new(); + path.push(user); + path.push("settings"); + + let path = path.to_path(&self.root_dir); + debug!("Getting settings from {:?}", &path); + + let mut file = File::open(path).await?; + let mut content = String::new(); + file.read_to_string(&mut content).await?; + + let settings: HashMap<&str, serde_json::Value> = serde_json::from_str(&content)?; + match settings.get(S::ID) { + Some(value) => Ok(serde_json::from_value::<S>(value.clone())?), + None => Err(StorageError::from_static(ErrorKind::Backend, "Setting not set")) + } + } + + #[tracing::instrument(skip(self))] + async fn set_setting<S: settings::Setting<'a> + 'a, 'a>(&self, user: &'a str, value: S::Data) -> Result<()> { + let mut path = relative_path::RelativePathBuf::new(); + path.push(user); + path.push("settings"); + + let path = path.to_path(&self.root_dir); + let temppath = path.with_extension("tmp"); + + let parent = path.parent().unwrap().to_owned(); + tokio::fs::create_dir_all(&parent).await?; + + let mut tempfile = OpenOptions::new() + .write(true) + .create_new(true) + .open(&temppath) + .await?; + + let mut settings: HashMap<String, serde_json::Value> = match File::open(&path).await { + Ok(mut f) => { + let mut content = String::new(); + f.read_to_string(&mut content).await?; + if content.is_empty() { + Default::default() + } else { + serde_json::from_str(&content)? + } + } + Err(err) => { + if err.kind() == IOErrorKind::NotFound { + Default::default() + } else { + return Err(err.into()); + } + } + }; + settings.insert(S::ID.to_owned(), serde_json::to_value(S::new(value))?); + + tempfile + .write_all(serde_json::to_string(&settings)?.as_bytes()) + .await?; + tempfile.flush().await?; + tempfile.sync_all().await?; + drop(tempfile); + tokio::fs::rename(temppath, &path).await?; + tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?; + Ok(()) + } + + #[tracing::instrument(skip(self))] + async fn add_or_update_webmention(&self, target: &str, mention_type: MentionType, mention: serde_json::Value) -> Result<()> { + let path = url_to_path(&self.root_dir, target); + let tempfilename = path.with_extension("tmp"); + + let mut temp = OpenOptions::new() + .write(true) + .create_new(true) + .open(&tempfilename) + .await?; + let mut file = OpenOptions::new().read(true).open(&path).await?; + + let mut post: serde_json::Value = { + let mut content = String::new(); + file.read_to_string(&mut content).await?; + drop(file); + + serde_json::from_str(&content)? + }; + + let key: &'static str = match mention_type { + MentionType::Reply => "comment", + MentionType::Like => "like", + MentionType::Repost => "repost", + MentionType::Bookmark => "bookmark", + MentionType::Mention => "mention", + }; + let mention_uid = mention["properties"]["uid"][0].clone(); + if let Some(values) = post["properties"][key].as_array_mut() { + for value in values.iter_mut() { + if value["properties"]["uid"][0] == mention_uid { + *value = mention; + break; + } + } + } else { + post["properties"][key] = serde_json::Value::Array(vec![mention]); + } + + temp.write_all(post.to_string().as_bytes()).await?; + temp.flush().await?; + temp.sync_all().await?; + drop(temp); + tokio::fs::rename(tempfilename, &path).await?; + tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?; + + Ok(()) + } +} |