//#![warn(clippy::unwrap_used)] use crate::database::{ErrorKind, Result, settings, Storage, StorageError}; use crate::micropub::{MicropubUpdate, MicropubPropertyDeletion}; use futures::{stream, StreamExt, TryStreamExt}; use kittybox_util::MentionType; use serde_json::json; use std::borrow::Cow; use std::collections::HashMap; use std::io::ErrorKind as IOErrorKind; use std::path::{Path, PathBuf}; use tokio::fs::{File, OpenOptions}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::task::spawn_blocking; use tracing::{debug, error}; impl From for StorageError { fn from(source: std::io::Error) -> Self { Self::with_source( match source.kind() { IOErrorKind::NotFound => ErrorKind::NotFound, IOErrorKind::AlreadyExists => ErrorKind::Conflict, _ => ErrorKind::Backend, }, Cow::Owned(format!("file I/O error: {}", &source)), Box::new(source), ) } } impl From for StorageError { fn from(source: tokio::time::error::Elapsed) -> Self { Self::with_source( ErrorKind::Backend, Cow::Borrowed("timeout on I/O operation"), Box::new(source), ) } } // Copied from https://stackoverflow.com/questions/39340924 // This routine is adapted from the *old* Path's `path_relative_from` // function, which works differently from the new `relative_from` function. // In particular, this handles the case on unix where both paths are // absolute but with only the root as the common directory. fn path_relative_from(path: &Path, base: &Path) -> Option { use std::path::Component; if path.is_absolute() != base.is_absolute() { if path.is_absolute() { Some(PathBuf::from(path)) } else { None } } else { let mut ita = path.components(); let mut itb = base.components(); let mut comps: Vec = vec![]; loop { match (ita.next(), itb.next()) { (None, None) => break, (Some(a), None) => { comps.push(a); comps.extend(ita.by_ref()); break; } (None, _) => comps.push(Component::ParentDir), (Some(a), Some(b)) if comps.is_empty() && a == b => (), (Some(a), Some(Component::CurDir)) => comps.push(a), (Some(_), Some(Component::ParentDir)) => return None, (Some(a), Some(_)) => { comps.push(Component::ParentDir); for _ in itb { comps.push(Component::ParentDir); } comps.push(a); comps.extend(ita.by_ref()); break; } } } Some(comps.iter().map(|c| c.as_os_str()).collect()) } } #[allow(clippy::unwrap_used, clippy::expect_used)] #[cfg(test)] mod tests { #[test] fn test_relative_path_resolving() { let path1 = std::path::Path::new("/home/vika/Projects/kittybox"); let path2 = std::path::Path::new("/home/vika/Projects/nixpkgs"); let relative_path = super::path_relative_from(path2, path1).unwrap(); assert_eq!(relative_path, std::path::Path::new("../nixpkgs")) } } // TODO: Check that the path ACTUALLY IS INSIDE THE ROOT FOLDER // This could be checked by completely resolving the path // and checking if it has a common prefix fn url_to_path(root: &Path, url: &str) -> PathBuf { let path = url_to_relative_path(url).to_logical_path(root); if !path.starts_with(root) { // TODO: handle more gracefully panic!("Security error: {:?} is not a prefix of {:?}", path, root) } else { path } } fn url_to_relative_path(url: &str) -> relative_path::RelativePathBuf { let url = url::Url::try_from(url).expect("Couldn't parse a URL"); let mut path = relative_path::RelativePathBuf::new(); let user_domain = format!( "{}{}", url.host_str().unwrap(), url.port() .map(|port| format!(":{}", port)) .unwrap_or_default() ); path.push(user_domain + url.path() + ".json"); path } fn modify_post(post: &serde_json::Value, update: MicropubUpdate) -> Result { let mut post = post.clone(); let mut add_keys: HashMap> = HashMap::new(); let mut remove_keys: Vec = vec![]; let mut remove_values: HashMap> = HashMap::new(); if let Some(MicropubPropertyDeletion::Properties(delete)) = update.delete { remove_keys.extend(delete.iter().cloned()); } else if let Some(MicropubPropertyDeletion::Values(delete)) = update.delete { for (k, v) in delete { remove_values .entry(k.to_string()) .or_default() .extend(v.clone()); } } if let Some(add) = update.add { for (k, v) in add { add_keys.insert(k.to_string(), v.clone()); } } if let Some(replace) = update.replace { for (k, v) in replace { remove_keys.push(k.to_string()); add_keys.insert(k.to_string(), v.clone()); } } if let Some(props) = post["properties"].as_object_mut() { for k in remove_keys { props.remove(&k); } } for (k, v) in remove_values { let k = &k; let props = if k == "children" { &mut post } else { &mut post["properties"] }; v.iter().for_each(|v| { if let Some(vec) = props[k].as_array_mut() { if let Some(index) = vec.iter().position(|w| w == v) { vec.remove(index); } } }); } for (k, v) in add_keys { tracing::debug!("Adding k/v to post: {} => {:?}", k, v); let props = if k == "children" { &mut post } else { &mut post["properties"] }; if let Some(prop) = props[&k].as_array_mut() { if k == "children" { v.into_iter().rev().for_each(|v| prop.insert(0, v)); } else { prop.extend(v.into_iter()); } } else { props[&k] = serde_json::Value::Array(v) } } Ok(post) } #[derive(Clone, Debug)] /// A backend using a folder with JSON files as a backing store. /// Uses symbolic links to represent a many-to-one mapping of URLs to a post. pub struct FileStorage { pub(super) root_dir: PathBuf, } async fn hydrate_author( feed: &mut serde_json::Value, // Unused? user: Option<&url::Url>, storage: &S, ) { let url = feed["properties"]["uid"][0] .as_str() .expect("MF2 value should have a UID set! Check if you used normalize_mf2 before recording the post!"); if let Some(author) = feed["properties"]["author"].as_array().cloned() { if !feed["type"] .as_array() .expect("MF2 value should have a type set!") .iter() .any(|i| i == "h-card") { let author_list: Vec = stream::iter(author.iter()) .then(|i| async move { if let Some(i) = i.as_str() { // BUG: Use `user` to sanitize? match storage.get_post(i).await { Ok(post) => match post { Some(post) => post, None => json!(i), }, Err(e) => { error!("Error while hydrating post {}: {}", url, e); json!(i) } } } else { i.clone() } }) .collect::>() .await; if let Some(props) = feed["properties"].as_object_mut() { props["author"] = json!(author_list); } else { feed["properties"] = json!({ "author": author_list }); } } } } impl Storage for FileStorage { async fn new(url: &'_ url::Url) -> Result { // TODO: sanity check Ok(Self { root_dir: PathBuf::from(url.path()) }) } #[tracing::instrument(skip(self))] async fn categories(&self, url: &str) -> Result> { // This requires an expensive scan through the entire // directory tree. // // Until this backend has some kind of caching/indexing for // categories (consider using symlinks?), this query won't // perform well. Err(std::io::Error::new( std::io::ErrorKind::Unsupported, "?q=category queries are not implemented due to resource constraints" ))? } #[tracing::instrument(skip(self))] async fn post_exists(&self, url: &str) -> Result { let path = url_to_path(&self.root_dir, url); debug!("Checking if {:?} exists...", path); /*let result = match tokio::fs::metadata(path).await { Ok(metadata) => { Ok(true) }, Err(err) => { if err.kind() == IOErrorKind::NotFound { Ok(false) } else { Err(err.into()) } } };*/ #[allow(clippy::unwrap_used)] // JoinHandle captures panics, this closure shouldn't panic Ok(spawn_blocking(move || path.is_file()).await.unwrap()) } #[tracing::instrument(skip(self))] async fn get_post(&self, url: &str) -> Result> { let path = url_to_path(&self.root_dir, url); // TODO: check that the path actually belongs to the dir of user who requested it // it's not like you CAN access someone else's private posts with it // so it's not exactly a security issue, but it's still not good debug!("Opening {:?}", path); match File::open(&path).await { Ok(mut file) => { let mut content = String::new(); // Typechecks because OS magic acts on references // to FDs as if they were behind a mutex AsyncReadExt::read_to_string(&mut file, &mut content).await?; debug!( "Read {} bytes successfully from {:?}", content.as_bytes().len(), &path ); Ok(Some(serde_json::from_str(&content)?)) } Err(err) => { if err.kind() == IOErrorKind::NotFound { Ok(None) } else { Err(err.into()) } } } } #[tracing::instrument(skip(self))] async fn put_post(&self, post: &'_ serde_json::Value, user: &url::Url) -> Result<()> { let key = post["properties"]["uid"][0] .as_str() .expect("Tried to save a post without UID"); let path = url_to_path(&self.root_dir, key); let tempfile = path.with_extension("tmp"); debug!("Creating {:?}", path); let parent = path .parent() .expect("Parent for this directory should always exist") .to_owned(); tokio::fs::create_dir_all(&parent).await?; let mut file = tokio::fs::OpenOptions::new() .write(true) .create_new(true) .open(&tempfile) .await?; file.write_all(post.to_string().as_bytes()).await?; file.flush().await?; file.sync_all().await?; drop(file); tokio::fs::rename(&tempfile, &path).await?; tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?; if let Some(urls) = post["properties"]["url"].as_array() { for url in urls.iter().map(|i| i.as_str().unwrap()) { let url_domain = { let url = url::Url::parse(url).unwrap(); format!( "{}{}", url.host_str().unwrap(), url.port() .map(|port| format!(":{}", port)) .unwrap_or_default() ) }; if url != key && url_domain == user.authority() { let link = url_to_path(&self.root_dir, url); debug!("Creating a symlink at {:?}", link); let orig = path.clone(); // We're supposed to have a parent here. let basedir = link.parent().ok_or_else(|| { StorageError::from_static( ErrorKind::Backend, "Failed to calculate parent directory when creating a symlink", ) })?; let relative = path_relative_from(&orig, basedir).unwrap(); println!("{:?} - {:?} = {:?}", &orig, &basedir, &relative); tokio::fs::symlink(relative, link).await?; } } } if post["type"] .as_array() .unwrap() .iter() .any(|s| s.as_str() == Some("h-feed")) { tracing::debug!("Adding to channel list..."); // Add the h-feed to the channel list let path = { let mut path = relative_path::RelativePathBuf::new(); path.push(user.authority()); path.push("channels"); path.to_path(&self.root_dir) }; tokio::fs::create_dir_all(path.parent().unwrap()).await?; tracing::debug!("Channels file path: {}", path.display()); let tempfilename = path.with_extension("tmp"); let channel_name = post["properties"]["name"][0] .as_str() .map(|s| s.to_string()) .unwrap_or_default(); let key = key.to_string(); tracing::debug!("Opening temporary file to modify chnanels..."); let mut tempfile = OpenOptions::new() .write(true) .create_new(true) .open(&tempfilename) .await?; tracing::debug!("Opening real channel file..."); let mut channels: Vec = { match OpenOptions::new() .read(true) .write(false) .truncate(false) .create(false) .open(&path) .await { Err(err) if err.kind() == std::io::ErrorKind::NotFound => { Vec::default() } Err(err) => { // Propagate the error upwards return Err(err.into()); } Ok(mut file) => { let mut content = String::new(); file.read_to_string(&mut content).await?; drop(file); if !content.is_empty() { serde_json::from_str(&content)? } else { Vec::default() } } } }; channels.push(super::MicropubChannel { uid: key.to_string(), name: channel_name, }); tempfile .write_all(serde_json::to_string(&channels)?.as_bytes()) .await?; tempfile.flush().await?; tempfile.sync_all().await?; drop(tempfile); tokio::fs::rename(tempfilename, &path).await?; tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?; } Ok(()) } #[tracing::instrument(skip(self))] async fn update_post(&self, url: &str, update: MicropubUpdate) -> Result<()> { let path = url_to_path(&self.root_dir, url); let tempfilename = path.with_extension("tmp"); #[allow(unused_variables)] let (old_json, new_json) = { let mut temp = OpenOptions::new() .write(true) .create_new(true) .open(&tempfilename) .await?; let mut file = OpenOptions::new().read(true).open(&path).await?; let mut content = String::new(); file.read_to_string(&mut content).await?; let json: serde_json::Value = serde_json::from_str(&content)?; drop(file); // Apply the editing algorithms. We can't use the stock // `update.apply` function due to special requirements of // the file backend, so we're implementing our own. let new_json = modify_post(&json, update)?; temp.write_all(new_json.to_string().as_bytes()).await?; temp.flush().await?; temp.sync_all().await?; drop(temp); tokio::fs::rename(tempfilename, &path).await?; tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?; (json, new_json) }; // TODO check if URLs changed between old and new JSON Ok(()) } #[tracing::instrument(skip(self, f), fields(f = std::any::type_name::()))] async fn update_with( &self, url: &str, f: F ) -> Result<(serde_json::Value, serde_json::Value)> { todo!("update_with is not yet implemented due to special requirements of the file backend") } #[tracing::instrument(skip(self))] async fn get_channels(&self, user: &url::Url) -> Result> { let mut path = relative_path::RelativePathBuf::new(); path.push(user.authority()); path.push("channels"); let path = path.to_path(&self.root_dir); tracing::debug!("Channels file path: {}", path.display()); match File::open(&path).await { Ok(mut f) => { let mut content = String::new(); f.read_to_string(&mut content).await?; // This should not happen, but if it does, handle it gracefully if content.is_empty() { return Ok(vec![]); } let channels: Vec = serde_json::from_str(&content)?; Ok(channels) } Err(e) => { if e.kind() == IOErrorKind::NotFound { Ok(vec![]) } else { Err(e.into()) } } } } async fn read_feed_with_cursor( &self, url: &'_ str, cursor: Option<&'_ str>, limit: usize, user: Option<&url::Url> ) -> Result)>> { Ok(self.read_feed_with_limit( url, cursor, limit, user ).await? .map(|feed| { tracing::debug!("Feed: {:#}", serde_json::Value::Array( feed["children"] .as_array() .map(|v| v.as_slice()) .unwrap_or_default() .iter() .map(|mf2| mf2["properties"]["uid"][0].clone()) .collect::>() )); let cursor: Option = feed["children"] .as_array() .map(|v| v.as_slice()) .unwrap_or_default() .last() .map(|v| v["properties"]["uid"][0].as_str().unwrap().to_owned()); tracing::debug!("Extracted the cursor: {:?}", cursor); (feed, cursor) }) ) } #[tracing::instrument(skip(self))] async fn read_feed_with_limit( &self, url: &'_ str, after: Option<&str>, limit: usize, user: Option<&url::Url>, ) -> Result> { if let Some(mut feed) = self.get_post(url).await? { if feed["children"].is_array() { // Take this out of the MF2-JSON document to save memory // // This uses a clever match with enum destructuring // to extract the underlying Vec without cloning it let children: Vec = match feed["children"].take() { serde_json::Value::Array(children) => children, // We've already checked it's an array _ => unreachable!() }; tracing::debug!("Full children array: {:#}", serde_json::Value::Array(children.clone())); let mut posts_iter = children .into_iter() .map(|s: serde_json::Value| s.as_str().unwrap().to_string()); // Note: we can't actually use `skip_while` here because we end up emitting `after`. // This imperative snippet consumes after instead of emitting it, allowing the // stream of posts to return only those items that truly come *after* that one. // If I would implement an Iter combinator like this, I would call it `skip_until` // // Uses `tokio::task::block_in_place` to prevent starvation in case of rewinding // incredibly long feeds. if let Some(after) = after { tokio::task::block_in_place(|| { for s in posts_iter.by_ref() { if s == after { break; } } }) } let posts = stream::iter(posts_iter) .map(|url: String| async move { self.get_post(&url).await }) .buffered(std::cmp::min(3, limit)) // Hack to unwrap the Option and sieve out broken links // Broken links return None, and Stream::filter_map skips Nones. .try_filter_map(|post: Option| async move { Ok(post) }) .and_then(|mut post| async move { hydrate_author(&mut post, user, self).await; Ok(post) }) .take(limit); match posts.try_collect::>().await { Ok(posts) => feed["children"] = serde_json::json!(posts), Err(err) => { return Err(StorageError::with_source( ErrorKind::Other, Cow::Owned(format!("Feed assembly error: {}", &err)), Box::new(err), )); } } } hydrate_author(&mut feed, user, self).await; Ok(Some(feed)) } else { Ok(None) } } #[tracing::instrument(skip(self))] async fn delete_post(&self, url: &'_ str) -> Result<()> { let path = url_to_path(&self.root_dir, url); if let Err(e) = tokio::fs::remove_file(path).await { Err(e.into()) } else { // TODO check for dangling references in the channel list Ok(()) } } #[tracing::instrument(skip(self))] async fn get_setting(&self, user: &url::Url) -> Result { debug!("User for getting settings: {}", user); let mut path = relative_path::RelativePathBuf::new(); path.push(user.authority()); path.push("settings"); let path = path.to_path(&self.root_dir); debug!("Getting settings from {:?}", &path); let mut file = File::open(path).await?; let mut content = String::new(); file.read_to_string(&mut content).await?; let settings: HashMap<&str, serde_json::Value> = serde_json::from_str(&content)?; match settings.get(S::ID) { Some(value) => Ok(serde_json::from_value::(value.clone())?), None => Err(StorageError::from_static(ErrorKind::Backend, "Setting not set")) } } #[tracing::instrument(skip(self))] async fn set_setting(&self, user: &url::Url, value: S::Data) -> Result<()> { let mut path = relative_path::RelativePathBuf::new(); path.push(user.authority()); path.push("settings"); let path = path.to_path(&self.root_dir); let temppath = path.with_extension("tmp"); let parent = path.parent().unwrap().to_owned(); tokio::fs::create_dir_all(&parent).await?; let mut tempfile = OpenOptions::new() .write(true) .create_new(true) .open(&temppath) .await?; let mut settings: HashMap = match File::open(&path).await { Ok(mut f) => { let mut content = String::new(); f.read_to_string(&mut content).await?; if content.is_empty() { Default::default() } else { serde_json::from_str(&content)? } } Err(err) => { if err.kind() == IOErrorKind::NotFound { Default::default() } else { return Err(err.into()); } } }; settings.insert(S::ID.to_owned(), serde_json::to_value(S::new(value))?); tempfile .write_all(serde_json::to_string(&settings)?.as_bytes()) .await?; tempfile.flush().await?; tempfile.sync_all().await?; drop(tempfile); tokio::fs::rename(temppath, &path).await?; tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?; Ok(()) } #[tracing::instrument(skip(self))] async fn add_or_update_webmention(&self, target: &str, mention_type: MentionType, mention: serde_json::Value) -> Result<()> { let path = url_to_path(&self.root_dir, target); let tempfilename = path.with_extension("tmp"); let mut temp = OpenOptions::new() .write(true) .create_new(true) .open(&tempfilename) .await?; let mut file = OpenOptions::new().read(true).open(&path).await?; let mut post: serde_json::Value = { let mut content = String::new(); file.read_to_string(&mut content).await?; drop(file); serde_json::from_str(&content)? }; let key: &'static str = match mention_type { MentionType::Reply => "comment", MentionType::Like => "like", MentionType::Repost => "repost", MentionType::Bookmark => "bookmark", MentionType::Mention => "mention", }; let mention_uid = mention["properties"]["uid"][0].clone(); if let Some(values) = post["properties"][key].as_array_mut() { for value in values.iter_mut() { if value["properties"]["uid"][0] == mention_uid { *value = mention; break; } } } else { post["properties"][key] = serde_json::Value::Array(vec![mention]); } temp.write_all(post.to_string().as_bytes()).await?; temp.flush().await?; temp.sync_all().await?; drop(temp); tokio::fs::rename(tempfilename, &path).await?; tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?; Ok(()) } async fn all_posts<'this>(&'this self, user: &url::Url) -> Result + Send + 'this> { todo!(); Ok(futures::stream::empty()) // for type inference } }