use crate::database::{filter_post, ErrorKind, Result, Storage, StorageError}; use std::fs::{File, OpenOptions}; use std::io::{ErrorKind as IOErrorKind, Seek, SeekFrom, Read, Write}; use std::time::Duration; use async_std::future::TimeoutError; use async_std::task::spawn_blocking; use async_trait::async_trait; use fd_lock::RwLock; use futures::stream; use futures_util::StreamExt; use futures_util::TryStreamExt; use log::debug; use serde_json::json; use std::collections::HashMap; use std::path::{Path, PathBuf}; impl From<std::io::Error> for StorageError { fn from(source: std::io::Error) -> Self { Self::with_source( match source.kind() { IOErrorKind::NotFound => ErrorKind::NotFound, _ => ErrorKind::Backend, }, "file I/O error", Box::new(source), ) } } impl From<TimeoutError> for StorageError { fn from(source: TimeoutError) -> Self { Self::with_source( ErrorKind::Backend, "timeout on I/O operation", Box::new(source) ) } } // Copied from https://stackoverflow.com/questions/39340924 // This routine is adapted from the *old* Path's `path_relative_from` // function, which works differently from the new `relative_from` function. // In particular, this handles the case on unix where both paths are // absolute but with only the root as the common directory. fn path_relative_from(path: &Path, base: &Path) -> Option<PathBuf> { use std::path::Component; if path.is_absolute() != base.is_absolute() { if path.is_absolute() { Some(PathBuf::from(path)) } else { None } } else { let mut ita = path.components(); let mut itb = base.components(); let mut comps: Vec<Component> = vec![]; loop { match (ita.next(), itb.next()) { (None, None) => break, (Some(a), None) => { comps.push(a); comps.extend(ita.by_ref()); break; } (None, _) => comps.push(Component::ParentDir), (Some(a), Some(b)) if comps.is_empty() && a == b => (), (Some(a), Some(b)) if b == Component::CurDir => comps.push(a), (Some(_), Some(b)) if b == Component::ParentDir => return None, (Some(a), Some(_)) => { comps.push(Component::ParentDir); for _ in itb { comps.push(Component::ParentDir); } comps.push(a); comps.extend(ita.by_ref()); break; } } } Some(comps.iter().map(|c| c.as_os_str()).collect()) } } mod tests { #[test] fn test_relative_path_resolving() { let path1 = std::path::Path::new("/home/vika/Projects/kittybox"); let path2 = std::path::Path::new("/home/vika/Projects/nixpkgs"); let relative_path = super::path_relative_from(path2, path1).unwrap(); assert_eq!(relative_path, std::path::Path::new("../nixpkgs")) } } fn url_to_path(root: &Path, url: &str) -> PathBuf { url_to_relative_path(url).to_path(root) } fn url_to_relative_path(url: &str) -> relative_path::RelativePathBuf { let url = http_types::Url::parse(url).expect("Couldn't parse a URL"); let mut path = relative_path::RelativePathBuf::new(); path.push(url.origin().ascii_serialization() + &url.path().to_string() + ".json"); path } fn modify_post(post: &serde_json::Value, update: &serde_json::Value) -> Result<serde_json::Value> { let mut add_keys: HashMap<String, serde_json::Value> = HashMap::new(); let mut remove_keys: Vec<String> = vec![]; let mut remove_values: HashMap<String, Vec<serde_json::Value>> = HashMap::new(); let mut post = post.clone(); if let Some(delete) = update["delete"].as_array() { remove_keys.extend( delete .iter() .filter_map(|v| v.as_str()) .map(|v| v.to_string()), ); } else if let Some(delete) = update["delete"].as_object() { for (k, v) in delete { if let Some(v) = v.as_array() { remove_values .entry(k.to_string()) .or_default() .extend(v.clone()); } else { return Err(StorageError::new( ErrorKind::BadRequest, "Malformed update object", )); } } } if let Some(add) = update["add"].as_object() { for (k, v) in add { if v.is_array() { add_keys.insert(k.to_string(), v.clone()); } else { return Err(StorageError::new( ErrorKind::BadRequest, "Malformed update object", )); } } } if let Some(replace) = update["replace"].as_object() { for (k, v) in replace { remove_keys.push(k.to_string()); add_keys.insert(k.to_string(), v.clone()); } } for k in remove_keys { post["properties"].as_object_mut().unwrap().remove(&k); } for (k, v) in remove_values { let k = &k; let props; if k == "children" { props = &mut post; } else { props = &mut post["properties"]; } v.iter().for_each(|v| { if let Some(vec) = props[k].as_array_mut() { if let Some(index) = vec.iter().position(|w| w == v) { vec.remove(index); } } }); } for (k, v) in add_keys { let props; if k == "children" { props = &mut post; } else { props = &mut post["properties"]; } let k = &k; if let Some(prop) = props[k].as_array_mut() { if k == "children" { v.as_array() .unwrap() .iter() .cloned() .rev() .for_each(|v| prop.insert(0, v)); } else { prop.extend(v.as_array().unwrap().iter().cloned()); } } else { post["properties"][k] = v } } Ok(post) } #[derive(Clone)] /// A backend using a folder with JSON files as a backing store. /// Uses symbolic links to represent a many-to-one mapping of URLs to a post. pub struct FileStorage { root_dir: PathBuf, } impl FileStorage { /// Create a new storage wrapping a folder specified by root_dir. pub async fn new(root_dir: PathBuf) -> Result<Self> { // TODO check if the dir is writable Ok(Self { root_dir }) } } async fn hydrate_author<S: Storage>( feed: &mut serde_json::Value, user: &'_ Option<String>, storage: &S, ) { let url = feed["properties"]["uid"][0].as_str().unwrap(); if let Some(author) = feed["properties"]["author"].clone().as_array() { if !feed["type"] .as_array() .unwrap() .iter() .any(|i| i == "h-card") { let author_list: Vec<serde_json::Value> = stream::iter(author.iter()) .then(|i| async move { if let Some(i) = i.as_str() { match storage.get_post(i).await { Ok(post) => match post { Some(post) => match filter_post(post, user) { Some(author) => author, None => json!(i), }, None => json!(i), }, Err(e) => { log::error!("Error while hydrating post {}: {}", url, e); json!(i) } } } else { i.clone() } }) .collect::<Vec<_>>() .await; feed["properties"].as_object_mut().unwrap()["author"] = json!(author_list); } } } const IO_TIMEOUT: u64 = 3; #[async_trait] impl Storage for FileStorage { async fn post_exists(&self, url: &str) -> Result<bool> { let path = url_to_path(&self.root_dir, url); debug!("Checking if {:?} exists...", path); Ok(spawn_blocking(move || path.is_file()).await) } async fn get_post(&self, url: &str) -> Result<Option<serde_json::Value>> { let path = url_to_path(&self.root_dir, url); debug!("Opening {:?}", path); // Use exclusively synchronous operations to never transfer a lock over an await boundary async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { match File::open(&path) { Ok(file) => { let lock = RwLock::new(file); debug!("Trying to get a lock for file {:?}", &path); let guard = lock.read()?; let mut content = String::new(); // Typechecks because OS magic acts on references // to FDs as if they were behind a mutex (&mut &*guard).read_to_string(&mut content)?; Ok(Some(serde_json::from_str(&content)?)) } Err(err) => { // We have to special-case in here because // the function should return Ok(None) on 404 if err.kind() == IOErrorKind::NotFound { Ok(None) } else { Err(err.into()) } } } })).await? } async fn put_post<'a>(&self, post: &'a serde_json::Value, user: &'a str) -> Result<()> { let key = post["properties"]["uid"][0] .as_str() .expect("Tried to save a post without UID"); let path = url_to_path(&self.root_dir, key); debug!("Creating {:?}", path); // To move these into the closure, we have to clone values let post_json = post.to_string(); let post_path = path.clone(); // Use exclusively synchronous operations to never transfer a lock over an await boundary async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { let parent = post_path.parent().unwrap().to_owned(); if !parent.is_dir() { std::fs::create_dir_all(post_path.parent().unwrap())?; } let f = OpenOptions::new() .write(true) .create_new(true) .open(&post_path)?; let mut lock = RwLock::new(f); debug!("Waiting for lock on {:?}", &post_path); let mut guard = lock.write()?; (*guard).write_all(post_json.as_bytes())?; (*guard).flush()?; drop(guard); Result::Ok(()) })).await??; if post["properties"]["url"].is_array() { for url in post["properties"]["url"] .as_array() .unwrap() .iter() .map(|i| i.as_str().unwrap()) { if url != key && url.starts_with(user) { let link = url_to_path(&self.root_dir, url); debug!("Creating a symlink at {:?}", link); let orig = path.clone(); // We're supposed to have a parent here. let basedir = link.parent().ok_or_else(|| { StorageError::new( ErrorKind::Backend, "Failed to calculate parent directory when creating a symlink", ) })?; let relative = path_relative_from(&orig, basedir).unwrap(); println!("{:?} - {:?} = {:?}", &orig, &basedir, &relative); async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { println!("Created a symlink at {:?}", &link); let symlink_result; #[cfg(unix)] { symlink_result = std::os::unix::fs::symlink(relative, link); } // Wow it even supports windows. Windows is weird #[cfg(windows)] { symlink_result = std::os::windows::fs::symlink_file(relative, link); } if let Err(e) = symlink_result { Err(e.into()) } else { Result::Ok(()) } })).await??; } } } if post["type"] .as_array() .unwrap() .iter() .any(|s| s.as_str() == Some("h-feed")) { println!("Adding to channel list..."); // Add the h-feed to the channel list let mut path = relative_path::RelativePathBuf::new(); path.push(user); path.push("channels"); let path = path.to_path(&self.root_dir); let channel_name = post["properties"]["name"][0] .as_str() .map(|s| s.to_string()) .unwrap_or_else(String::default); let key = key.to_string(); drop(post); async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { let file = OpenOptions::new() .read(true) .write(true) .truncate(false) .create(true) .open(&path)?; let mut lock = RwLock::new(file); debug!("Trying to lock feed {:?}", &path); let mut guard = lock.write()?; let mut content = String::new(); (*guard).read_to_string(&mut content)?; let mut channels: Vec<super::MicropubChannel>; if !content.is_empty() { channels = serde_json::from_str(&content)?; } else { channels = Vec::default(); } channels.push(super::MicropubChannel { uid: key.to_string(), name: channel_name, }); (*guard).seek(SeekFrom::Start(0))?; (*guard).set_len(0)?; (*guard).write_all(serde_json::to_string(&channels)?.as_bytes())?; Result::Ok(()) })).await??; } Ok(()) } async fn update_post<'a>(&self, url: &'a str, update: serde_json::Value) -> Result<()> { let path = url_to_path(&self.root_dir, url); let (old_json, new_json) = async_std::future::timeout( Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { let f = OpenOptions::new() .write(true) .read(true) .truncate(false) .open(&path)?; let mut lock = RwLock::new(f); let mut guard = lock.write()?; let mut content = String::new(); (*guard).read_to_string(&mut content)?; let json: serde_json::Value = serde_json::from_str(&content)?; // Apply the editing algorithms let new_json = modify_post(&json, &update)?; (*guard).set_len(0)?; (*guard).seek(SeekFrom::Start(0))?; (*guard).write_all(new_json.to_string().as_bytes())?; (*guard).flush()?; Result::Ok((json, new_json)) }) ).await??; // TODO check if URLs changed between old and new JSON Ok(()) } async fn get_channels<'a>(&self, user: &'a str) -> Result<Vec<super::MicropubChannel>> { let mut path = relative_path::RelativePathBuf::new(); path.push(user.to_string()); path.push("channels"); let path = path.to_path(&self.root_dir); async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { match File::open(&path) { Ok(f) => { let lock = RwLock::new(f); let guard = lock.read()?; let mut content = String::new(); (&mut &*guard).read_to_string(&mut content)?; // This should not happen, but if it does, handle it gracefully if content.is_empty() { return Ok(vec![]); } let channels: Vec<super::MicropubChannel> = serde_json::from_str(&content)?; Ok(channels) } Err(e) => { if e.kind() == IOErrorKind::NotFound { Ok(vec![]) } else { Err(e.into()) } } } })).await? } async fn read_feed_with_limit<'a>( &self, url: &'a str, after: &'a Option<String>, limit: usize, user: &'a Option<String>, ) -> Result<Option<serde_json::Value>> { if let Some(feed) = self.get_post(url).await? { if let Some(mut feed) = filter_post(feed, user) { if feed["children"].is_array() { let children = feed["children"].as_array().unwrap().clone(); let mut posts_iter = children .into_iter() .map(|s: serde_json::Value| s.as_str().unwrap().to_string()); if after.is_some() { loop { let i = posts_iter.next(); if &i == after { break; } } } let posts = stream::iter(posts_iter) .map(|url: String| async move { self.get_post(&url).await }) .buffered(std::cmp::min(3, limit)) // Hack to unwrap the Option and sieve out broken links // Broken links return None, and Stream::filter_map skips Nones. .try_filter_map(|post: Option<serde_json::Value>| async move { Ok(post) }) .try_filter_map(|post| async move { Ok(filter_post(post, user)) }) .and_then(|mut post| async move { hydrate_author(&mut post, user, self).await; Ok(post) }) .take(limit); match posts.try_collect::<Vec<serde_json::Value>>().await { Ok(posts) => feed["children"] = serde_json::json!(posts), Err(err) => { return Err(StorageError::with_source( ErrorKind::Other, "Feed assembly error", Box::new(err), )); } } } hydrate_author(&mut feed, user, self).await; Ok(Some(feed)) } else { Err(StorageError::new( ErrorKind::PermissionDenied, "specified user cannot access this post", )) } } else { Ok(None) } } async fn delete_post<'a>(&self, url: &'a str) -> Result<()> { let path = url_to_path(&self.root_dir, url); if let Err(e) = async_std::fs::remove_file(path).await { Err(e.into()) } else { // TODO check for dangling references in the channel list Ok(()) } } async fn get_setting<'a>(&self, setting: &'a str, user: &'a str) -> Result<String> { log::debug!("User for getting settings: {}", user); let url = http_types::Url::parse(user).expect("Couldn't parse a URL"); let mut path = relative_path::RelativePathBuf::new(); path.push(url.origin().ascii_serialization()); path.push("settings"); let path = path.to_path(&self.root_dir); let setting = setting.to_string(); async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { let lock = RwLock::new(File::open(path)?); let guard = lock.read()?; let mut content = String::new(); (&mut &*guard).read_to_string(&mut content)?; drop(guard); let settings: HashMap<String, String> = serde_json::from_str(&content)?; // XXX consider returning string slices instead of cloning a string every time // it might come with a performance hit and/or memory usage inflation settings .get(&setting) .cloned() .ok_or_else(|| StorageError::new(ErrorKind::Backend, "Setting not set")) })).await? } async fn set_setting<'a>(&self, setting: &'a str, user: &'a str, value: &'a str) -> Result<()> { let url = http_types::Url::parse(user).expect("Couldn't parse a URL"); let mut path = relative_path::RelativePathBuf::new(); path.push(url.origin().ascii_serialization()); path.push("settings"); let path = path.to_path(&self.root_dir); let parent = path.parent().unwrap().to_owned(); if !spawn_blocking(move || parent.is_dir()).await { async_std::fs::create_dir_all(path.parent().unwrap()).await?; } let (setting, value) = (setting.to_string(), value.to_string()); async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { let file = OpenOptions::new() .write(true) .read(true) .truncate(false) .create(true) .open(&path)?; let mut lock = RwLock::new(file); log::debug!("Created a lock. Locking for writing..."); let mut guard = lock.write()?; log::debug!("Locked. Writing."); let mut content = String::new(); (&mut &*guard).read_to_string(&mut content)?; let mut settings: HashMap<String, String> = if content.is_empty() { HashMap::default() } else { serde_json::from_str(&content)? }; settings.insert(setting.to_string(), value.to_string()); (&mut *guard).seek(SeekFrom::Start(0))?; (&mut *guard).set_len(0)?; (&mut *guard).write_all(serde_json::to_string(&settings)?.as_bytes())?; Result::Ok(()) })).await? } }