use crate::database::{filter_post, ErrorKind, Result, Storage, StorageError};
use async_std::fs::{File, OpenOptions};
use async_std::io::prelude::*;
use async_std::io::ErrorKind as IOErrorKind;
use async_std::task::spawn_blocking;
use async_trait::async_trait;
use fd_lock::RwLock;
use futures::stream;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use log::debug;
use serde_json::json;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
impl From<std::io::Error> for StorageError {
fn from(source: std::io::Error) -> Self {
Self::with_source(
match source.kind() {
IOErrorKind::NotFound => ErrorKind::NotFound,
_ => ErrorKind::Backend,
},
"file I/O error",
Box::new(source),
)
}
}
async fn get_lockable_file(file: File) -> RwLock<File> {
debug!("Trying to create a file lock");
spawn_blocking(move || RwLock::new(file)).await
}
// Copied from https://stackoverflow.com/questions/39340924
// This routine is adapted from the *old* Path's `path_relative_from`
// function, which works differently from the new `relative_from` function.
// In particular, this handles the case on unix where both paths are
// absolute but with only the root as the common directory.
fn path_relative_from(path: &Path, base: &Path) -> Option<PathBuf> {
use std::path::Component;
if path.is_absolute() != base.is_absolute() {
if path.is_absolute() {
Some(PathBuf::from(path))
} else {
None
}
} else {
let mut ita = path.components();
let mut itb = base.components();
let mut comps: Vec<Component> = vec![];
loop {
match (ita.next(), itb.next()) {
(None, None) => break,
(Some(a), None) => {
comps.push(a);
comps.extend(ita.by_ref());
break;
}
(None, _) => comps.push(Component::ParentDir),
(Some(a), Some(b)) if comps.is_empty() && a == b => (),
(Some(a), Some(b)) if b == Component::CurDir => comps.push(a),
(Some(_), Some(b)) if b == Component::ParentDir => return None,
(Some(a), Some(_)) => {
comps.push(Component::ParentDir);
for _ in itb {
comps.push(Component::ParentDir);
}
comps.push(a);
comps.extend(ita.by_ref());
break;
}
}
}
Some(comps.iter().map(|c| c.as_os_str()).collect())
}
}
mod tests {
#[test]
fn test_relative_path_resolving() {
let path1 = std::path::Path::new("/home/vika/Projects/kittybox");
let path2 = std::path::Path::new("/home/vika/Projects/nixpkgs");
let relative_path = super::path_relative_from(path2, path1).unwrap();
assert_eq!(relative_path, std::path::Path::new("../nixpkgs"))
}
}
fn url_to_path(root: &Path, url: &str) -> PathBuf {
url_to_relative_path(url).to_path(root).to_path_buf()
}
fn url_to_relative_path(url: &str) -> relative_path::RelativePathBuf {
let url = http_types::Url::parse(url).expect("Couldn't parse a URL");
let mut path = relative_path::RelativePathBuf::new();
path.push(url.origin().ascii_serialization() + &url.path().to_string() + ".json");
path
}
fn modify_post(post: &serde_json::Value, update: &serde_json::Value) -> Result<serde_json::Value> {
let mut add_keys: HashMap<String, serde_json::Value> = HashMap::new();
let mut remove_keys: Vec<String> = vec![];
let mut remove_values: HashMap<String, Vec<serde_json::Value>> = HashMap::new();
let mut post = post.clone();
if let Some(delete) = update["delete"].as_array() {
remove_keys.extend(
delete
.iter()
.filter_map(|v| v.as_str())
.map(|v| v.to_string()),
);
} else if let Some(delete) = update["delete"].as_object() {
for (k, v) in delete {
if let Some(v) = v.as_array() {
remove_values
.entry(k.to_string())
.or_default()
.extend(v.clone());
} else {
return Err(StorageError::new(
ErrorKind::BadRequest,
"Malformed update object",
));
}
}
}
if let Some(add) = update["add"].as_object() {
for (k, v) in add {
if v.is_array() {
add_keys.insert(k.to_string(), v.clone());
} else {
return Err(StorageError::new(
ErrorKind::BadRequest,
"Malformed update object",
));
}
}
}
if let Some(replace) = update["replace"].as_object() {
for (k, v) in replace {
remove_keys.push(k.to_string());
add_keys.insert(k.to_string(), v.clone());
}
}
for k in remove_keys {
post["properties"].as_object_mut().unwrap().remove(&k);
}
for (k, v) in remove_values {
let k = &k;
let props;
if k == "children" {
props = &mut post;
} else {
props = &mut post["properties"];
}
v.iter().for_each(|v| {
if let Some(vec) = props[k].as_array_mut() {
if let Some(index) = vec.iter().position(|w| w == v) {
vec.remove(index);
}
}
});
}
for (k, v) in add_keys {
let props;
if k == "children" {
props = &mut post;
} else {
props = &mut post["properties"];
}
let k = &k;
if let Some(prop) = props[k].as_array_mut() {
if k == "children" {
v.as_array()
.unwrap()
.iter()
.cloned()
.rev()
.for_each(|v| prop.insert(0, v));
} else {
prop.extend(v.as_array().unwrap().iter().cloned());
}
} else {
post["properties"][k] = v
}
}
Ok(post)
}
#[derive(Clone)]
/// A backend using a folder with JSON files as a backing store.
/// Uses symbolic links to represent a many-to-one mapping of URLs to a post.
pub struct FileStorage {
root_dir: PathBuf,
}
impl FileStorage {
/// Create a new storage wrapping a folder specified by root_dir.
pub async fn new(root_dir: PathBuf) -> Result<Self> {
// TODO check if the dir is writable
Ok(Self { root_dir })
}
}
async fn hydrate_author<S: Storage>(
feed: &mut serde_json::Value,
user: &'_ Option<String>,
storage: &S,
) {
let url = feed["properties"]["uid"][0].as_str().unwrap();
if let Some(author) = feed["properties"]["author"].clone().as_array() {
if !feed["type"]
.as_array()
.unwrap()
.iter()
.any(|i| i == "h-card")
{
let author_list: Vec<serde_json::Value> = stream::iter(author.iter())
.then(|i| async move {
if let Some(i) = i.as_str() {
match storage.get_post(i).await {
Ok(post) => match post {
Some(post) => match filter_post(post, user) {
Some(author) => author,
None => json!(i),
},
None => json!(i),
},
Err(e) => {
log::error!("Error while hydrating post {}: {}", url, e);
json!(i)
}
}
} else {
i.clone()
}
})
.collect::<Vec<_>>()
.await;
feed["properties"].as_object_mut().unwrap()["author"] = json!(author_list);
}
}
}
#[async_trait]
impl Storage for FileStorage {
async fn post_exists(&self, url: &str) -> Result<bool> {
let path = url_to_path(&self.root_dir, url);
debug!("Checking if {:?} exists...", path);
Ok(spawn_blocking(move || path.is_file()).await)
}
async fn get_post(&self, url: &str) -> Result<Option<serde_json::Value>> {
let path = url_to_path(&self.root_dir, url);
debug!("Opening {:?}", path);
// We have to special-case in here because the function should return Ok(None) on 404
match File::open(path).await {
Ok(f) => {
let lock = get_lockable_file(f).await;
let guard = lock.read()?;
let mut content = String::new();
// Apparently this typechecks. Somehow.
// I can take &mut for a &File because File is not a real type
// The operating system guards it using something
// that looks like a mutex to Rust's runtime, allowing me
// to grab a mutable reference from an immutable reference
(&mut &*guard).read_to_string(&mut content).await?;
Ok(Some(serde_json::from_str(&content)?))
}
Err(err) => {
if err.kind() == IOErrorKind::NotFound {
Ok(None)
} else {
Err(err.into())
}
}
}
}
async fn put_post<'a>(&self, post: &'a serde_json::Value, user: &'a str) -> Result<()> {
let key = post["properties"]["uid"][0]
.as_str()
.expect("Tried to save a post without UID");
let path = url_to_path(&self.root_dir, key);
debug!("Creating {:?}", path);
let parent = path.parent().unwrap().to_owned();
if !spawn_blocking(move || parent.is_dir()).await {
async_std::fs::create_dir_all(path.parent().unwrap()).await?;
}
let f = OpenOptions::new()
.write(true)
.create_new(true)
.open(&path)
.await?;
let mut lock = get_lockable_file(f).await;
let mut guard = lock.write()?;
(*guard).write_all(post.to_string().as_bytes()).await?;
(*guard).flush().await?;
drop(guard);
if post["properties"]["url"].is_array() {
for url in post["properties"]["url"]
.as_array()
.unwrap()
.iter()
.map(|i| i.as_str().unwrap())
{
if url != key && url.starts_with(user) {
let link = url_to_path(&self.root_dir, url);
debug!("Creating a symlink at {:?}", link);
let orig = path.clone();
spawn_blocking::<_, Result<()>>(move || {
// We're supposed to have a parent here.
let basedir = link.parent().ok_or(StorageError::new(
ErrorKind::Backend,
"Failed to calculate parent directory when creating a symlink",
))?;
let relative = path_relative_from(&orig, &basedir).unwrap();
println!("{:?} - {:?} = {:?}", &orig, &basedir, &relative);
println!("Created a symlink at {:?}", &link);
let symlink_result;
#[cfg(unix)]
{
symlink_result = std::os::unix::fs::symlink(relative, link);
}
// Wow it even supports windows. Not sure if I need it to run on Windows but oh well
#[cfg(windows)]
{
symlink_result = std::os::windows::fs::symlink_file(relative, link);
}
match symlink_result {
Ok(()) => Ok(()),
Err(e) => Err(e.into()),
}
})
.await?;
}
}
}
if post["type"]
.as_array()
.unwrap()
.iter()
.any(|s| s.as_str() == Some("h-feed"))
{
println!("Adding to channel list...");
// Add the h-feed to the channel list
let mut path = relative_path::RelativePathBuf::new();
path.push(user);
path.push("channels");
let path = path.to_path(&self.root_dir);
let file = OpenOptions::new()
.read(true)
.write(true)
.truncate(false)
.create(true)
.open(&path)
.await?;
let mut lock = get_lockable_file(file).await;
let mut guard = lock.write()?;
let mut content = String::new();
guard.read_to_string(&mut content).await?;
let mut channels: Vec<super::MicropubChannel>;
if content.len() > 0 {
channels = serde_json::from_str(&content)?;
} else {
channels = Vec::default();
}
channels.push(super::MicropubChannel {
uid: key.to_string(),
name: post["properties"]["name"][0]
.as_str()
.map(|s| s.to_string())
.unwrap_or_else(|| String::default()),
});
guard.seek(std::io::SeekFrom::Start(0)).await?;
guard.set_len(0).await?;
guard
.write_all(serde_json::to_string(&channels)?.as_bytes())
.await?;
}
Ok(())
}
async fn update_post<'a>(&self, url: &'a str, update: serde_json::Value) -> Result<()> {
let path = url_to_path(&self.root_dir, url);
let f = OpenOptions::new()
.write(true)
.read(true)
.truncate(false)
.open(&path)
.await?;
let mut lock = get_lockable_file(f).await;
let mut guard = lock.write()?;
let mut content = String::new();
guard.read_to_string(&mut content).await?;
let json: serde_json::Value = serde_json::from_str(&content)?;
// Apply the editing algorithms
let new_json = modify_post(&json, &update)?;
(*guard).set_len(0).await?;
(*guard).seek(std::io::SeekFrom::Start(0)).await?;
(*guard).write_all(new_json.to_string().as_bytes()).await?;
(*guard).flush().await?;
drop(guard);
// TODO check if URLs changed between old and new JSON
Ok(())
}
async fn get_channels<'a>(&self, user: &'a str) -> Result<Vec<super::MicropubChannel>> {
let mut path = relative_path::RelativePathBuf::new();
path.push(user.to_string());
path.push("channels");
let path = path.to_path(&self.root_dir);
match File::open(&path).await {
Ok(f) => {
let lock = get_lockable_file(f).await;
let guard = lock.read()?;
let mut content = String::new();
(&mut &*guard).read_to_string(&mut content).await?;
// This should not happen, but if it does, let's handle it gracefully instead of failing.
if content.len() == 0 {
return Ok(vec![]);
}
let channels: Vec<super::MicropubChannel> = serde_json::from_str(&content)?;
Ok(channels)
}
Err(e) => {
if e.kind() == IOErrorKind::NotFound {
Ok(vec![])
} else {
Err(e.into())
}
}
}
}
async fn read_feed_with_limit<'a>(
&self,
url: &'a str,
after: &'a Option<String>,
limit: usize,
user: &'a Option<String>,
) -> Result<Option<serde_json::Value>> {
if let Some(feed) = self.get_post(url).await? {
if let Some(mut feed) = filter_post(feed, user) {
if feed["children"].is_array() {
let children = feed["children"].as_array().unwrap().clone();
let mut posts_iter = children
.into_iter()
.map(|s: serde_json::Value| s.as_str().unwrap().to_string());
if after.is_some() {
loop {
let i = posts_iter.next();
if &i == after {
break;
}
}
}
let posts = stream::iter(posts_iter)
.map(|url: String| async move { self.get_post(&url).await })
.buffered(std::cmp::min(3, limit))
// Hack to unwrap the Option and sieve out broken links
// Broken links return None, and Stream::filter_map skips Nones.
.try_filter_map(|post: Option<serde_json::Value>| async move { Ok(post) })
.and_then(|mut post| async move {
hydrate_author(&mut post, user, self).await;
Ok(post)
})
.try_filter_map(|post| async move { Ok(filter_post(post, user)) })
.take(limit);
match posts.try_collect::<Vec<serde_json::Value>>().await {
Ok(posts) => feed["children"] = serde_json::json!(posts),
Err(err) => {
return Err(StorageError::with_source(
ErrorKind::Other,
"Feed assembly error",
Box::new(err),
));
}
}
}
hydrate_author(&mut feed, user, self).await;
Ok(Some(feed))
} else {
Err(StorageError::new(
ErrorKind::PermissionDenied,
"specified user cannot access this post",
))
}
} else {
Ok(None)
}
}
async fn delete_post<'a>(&self, url: &'a str) -> Result<()> {
let path = url_to_path(&self.root_dir, url);
if let Err(e) = async_std::fs::remove_file(path).await {
Err(e.into())
} else {
Ok(())
}
}
async fn get_setting<'a>(&self, setting: &'a str, user: &'a str) -> Result<String> {
log::debug!("User for getting settings: {}", user);
let url = http_types::Url::parse(user).expect("Couldn't parse a URL");
let mut path = relative_path::RelativePathBuf::new();
path.push(url.origin().ascii_serialization());
path.push("settings");
let path = path.to_path(&self.root_dir);
let lock = get_lockable_file(File::open(path).await?).await;
let guard = lock.read()?;
let mut content = String::new();
(&mut &*guard).read_to_string(&mut content).await?;
drop(guard);
let settings: HashMap<String, String> = serde_json::from_str(&content)?;
// XXX consider returning string slices instead of cloning a string every time
// it might come with a performance hit and/or memory usage inflation
settings
.get(setting)
.map(|s| s.clone())
.ok_or_else(|| StorageError::new(ErrorKind::Backend, "Setting not set"))
}
async fn set_setting<'a>(&self, setting: &'a str, user: &'a str, value: &'a str) -> Result<()> {
let url = http_types::Url::parse(user).expect("Couldn't parse a URL");
let mut path = relative_path::RelativePathBuf::new();
path.push(url.origin().ascii_serialization());
path.push("settings");
let path = path.to_path(&self.root_dir);
let parent = path.parent().unwrap().to_owned();
if !spawn_blocking(move || parent.is_dir()).await {
async_std::fs::create_dir_all(path.parent().unwrap()).await?;
}
let file = OpenOptions::new()
.write(true)
.read(true)
.truncate(false)
.create(true)
.open(&path)
.await?;
let mut lock = get_lockable_file(file).await;
log::debug!("Created a lock. Locking for writing...");
let mut guard = lock.write()?;
log::debug!("Locked. Writing.");
let mut content = String::new();
guard.read_to_string(&mut content).await?;
let mut settings: HashMap<String, String> = if content.len() == 0 {
HashMap::default()
} else {
serde_json::from_str(&content)?
};
settings.insert(setting.to_string(), value.to_string());
guard.seek(std::io::SeekFrom::Start(0)).await?;
guard.set_len(0).await?;
guard
.write_all(serde_json::to_string(&settings)?.as_bytes())
.await?;
Ok(())
}
}