//#![warn(clippy::unwrap_used)]
use crate::database::{ErrorKind, Result, settings, Storage, StorageError};
use crate::micropub::{MicropubUpdate, MicropubPropertyDeletion};
use futures::{stream, StreamExt, TryStreamExt};
use kittybox_util::MentionType;
use serde_json::json;
use std::borrow::Cow;
use std::collections::HashMap;
use std::io::ErrorKind as IOErrorKind;
use std::path::{Path, PathBuf};
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::task::spawn_blocking;
use tracing::{debug, error};
impl From<std::io::Error> for StorageError {
fn from(source: std::io::Error) -> Self {
Self::with_source(
match source.kind() {
IOErrorKind::NotFound => ErrorKind::NotFound,
IOErrorKind::AlreadyExists => ErrorKind::Conflict,
_ => ErrorKind::Backend,
},
Cow::Owned(format!("file I/O error: {}", &source)),
Box::new(source),
)
}
}
impl From<tokio::time::error::Elapsed> for StorageError {
fn from(source: tokio::time::error::Elapsed) -> Self {
Self::with_source(
ErrorKind::Backend,
Cow::Borrowed("timeout on I/O operation"),
Box::new(source),
)
}
}
// Copied from https://stackoverflow.com/questions/39340924
// This routine is adapted from the *old* Path's `path_relative_from`
// function, which works differently from the new `relative_from` function.
// In particular, this handles the case on unix where both paths are
// absolute but with only the root as the common directory.
fn path_relative_from(path: &Path, base: &Path) -> Option<PathBuf> {
use std::path::Component;
if path.is_absolute() != base.is_absolute() {
if path.is_absolute() {
Some(PathBuf::from(path))
} else {
None
}
} else {
let mut ita = path.components();
let mut itb = base.components();
let mut comps: Vec<Component> = vec![];
loop {
match (ita.next(), itb.next()) {
(None, None) => break,
(Some(a), None) => {
comps.push(a);
comps.extend(ita.by_ref());
break;
}
(None, _) => comps.push(Component::ParentDir),
(Some(a), Some(b)) if comps.is_empty() && a == b => (),
(Some(a), Some(Component::CurDir)) => comps.push(a),
(Some(_), Some(Component::ParentDir)) => return None,
(Some(a), Some(_)) => {
comps.push(Component::ParentDir);
for _ in itb {
comps.push(Component::ParentDir);
}
comps.push(a);
comps.extend(ita.by_ref());
break;
}
}
}
Some(comps.iter().map(|c| c.as_os_str()).collect())
}
}
#[allow(clippy::unwrap_used, clippy::expect_used)]
#[cfg(test)]
mod tests {
#[test]
fn test_relative_path_resolving() {
let path1 = std::path::Path::new("/home/vika/Projects/kittybox");
let path2 = std::path::Path::new("/home/vika/Projects/nixpkgs");
let relative_path = super::path_relative_from(path2, path1).unwrap();
assert_eq!(relative_path, std::path::Path::new("../nixpkgs"))
}
}
// TODO: Check that the path ACTUALLY IS INSIDE THE ROOT FOLDER
// This could be checked by completely resolving the path
// and checking if it has a common prefix
fn url_to_path(root: &Path, url: &str) -> PathBuf {
let path = url_to_relative_path(url).to_logical_path(root);
if !path.starts_with(root) {
// TODO: handle more gracefully
panic!("Security error: {:?} is not a prefix of {:?}", path, root)
} else {
path
}
}
fn url_to_relative_path(url: &str) -> relative_path::RelativePathBuf {
let url = url::Url::try_from(url).expect("Couldn't parse a URL");
let mut path = relative_path::RelativePathBuf::new();
let user_domain = format!(
"{}{}",
url.host_str().unwrap(),
url.port()
.map(|port| format!(":{}", port))
.unwrap_or_default()
);
path.push(user_domain + url.path() + ".json");
path
}
fn modify_post(post: &serde_json::Value, update: MicropubUpdate) -> Result<serde_json::Value> {
let mut post = post.clone();
let mut add_keys: HashMap<String, Vec<serde_json::Value>> = HashMap::new();
let mut remove_keys: Vec<String> = vec![];
let mut remove_values: HashMap<String, Vec<serde_json::Value>> = HashMap::new();
if let Some(MicropubPropertyDeletion::Properties(delete)) = update.delete {
remove_keys.extend(delete.iter().cloned());
} else if let Some(MicropubPropertyDeletion::Values(delete)) = update.delete {
for (k, v) in delete {
remove_values
.entry(k.to_string())
.or_default()
.extend(v.clone());
}
}
if let Some(add) = update.add {
for (k, v) in add {
add_keys.insert(k.to_string(), v.clone());
}
}
if let Some(replace) = update.replace {
for (k, v) in replace {
remove_keys.push(k.to_string());
add_keys.insert(k.to_string(), v.clone());
}
}
if let Some(props) = post["properties"].as_object_mut() {
for k in remove_keys {
props.remove(&k);
}
}
for (k, v) in remove_values {
let k = &k;
let props = if k == "children" {
&mut post
} else {
&mut post["properties"]
};
v.iter().for_each(|v| {
if let Some(vec) = props[k].as_array_mut() {
if let Some(index) = vec.iter().position(|w| w == v) {
vec.remove(index);
}
}
});
}
for (k, v) in add_keys {
tracing::debug!("Adding k/v to post: {} => {:?}", k, v);
let props = if k == "children" {
&mut post
} else {
&mut post["properties"]
};
if let Some(prop) = props[&k].as_array_mut() {
if k == "children" {
v.into_iter().rev().for_each(|v| prop.insert(0, v));
} else {
prop.extend(v.into_iter());
}
} else {
props[&k] = serde_json::Value::Array(v)
}
}
Ok(post)
}
#[derive(Clone, Debug)]
/// A backend using a folder with JSON files as a backing store.
/// Uses symbolic links to represent a many-to-one mapping of URLs to a post.
pub struct FileStorage {
pub(super) root_dir: PathBuf,
}
async fn hydrate_author<S: Storage>(
feed: &mut serde_json::Value,
// Unused?
user: Option<&url::Url>,
storage: &S,
) {
let url = feed["properties"]["uid"][0]
.as_str()
.expect("MF2 value should have a UID set! Check if you used normalize_mf2 before recording the post!");
if let Some(author) = feed["properties"]["author"].as_array().cloned() {
if !feed["type"]
.as_array()
.expect("MF2 value should have a type set!")
.iter()
.any(|i| i == "h-card")
{
let author_list: Vec<serde_json::Value> = stream::iter(author.iter())
.then(|i| async move {
if let Some(i) = i.as_str() {
// BUG: Use `user` to sanitize?
match storage.get_post(i).await {
Ok(post) => match post {
Some(post) => post,
None => json!(i),
},
Err(e) => {
error!("Error while hydrating post {}: {}", url, e);
json!(i)
}
}
} else {
i.clone()
}
})
.collect::<Vec<_>>()
.await;
if let Some(props) = feed["properties"].as_object_mut() {
props["author"] = json!(author_list);
} else {
feed["properties"] = json!({ "author": author_list });
}
}
}
}
impl Storage for FileStorage {
async fn new(url: &'_ url::Url) -> Result<Self> {
// TODO: sanity check
Ok(Self { root_dir: PathBuf::from(url.path()) })
}
#[tracing::instrument(skip(self))]
async fn categories(&self, url: &str) -> Result<Vec<String>> {
// This requires an expensive scan through the entire
// directory tree.
//
// Until this backend has some kind of caching/indexing for
// categories (consider using symlinks?), this query won't
// perform well.
Err(std::io::Error::new(
std::io::ErrorKind::Unsupported,
"?q=category queries are not implemented due to resource constraints"
))?
}
#[tracing::instrument(skip(self))]
async fn post_exists(&self, url: &str) -> Result<bool> {
let path = url_to_path(&self.root_dir, url);
debug!("Checking if {:?} exists...", path);
/*let result = match tokio::fs::metadata(path).await {
Ok(metadata) => {
Ok(true)
},
Err(err) => {
if err.kind() == IOErrorKind::NotFound {
Ok(false)
} else {
Err(err.into())
}
}
};*/
#[allow(clippy::unwrap_used)] // JoinHandle captures panics, this closure shouldn't panic
Ok(spawn_blocking(move || path.is_file()).await.unwrap())
}
#[tracing::instrument(skip(self))]
async fn get_post(&self, url: &str) -> Result<Option<serde_json::Value>> {
let path = url_to_path(&self.root_dir, url);
// TODO: check that the path actually belongs to the dir of user who requested it
// it's not like you CAN access someone else's private posts with it
// so it's not exactly a security issue, but it's still not good
debug!("Opening {:?}", path);
match File::open(&path).await {
Ok(mut file) => {
let mut content = String::new();
// Typechecks because OS magic acts on references
// to FDs as if they were behind a mutex
AsyncReadExt::read_to_string(&mut file, &mut content).await?;
debug!(
"Read {} bytes successfully from {:?}",
content.as_bytes().len(),
&path
);
Ok(Some(serde_json::from_str(&content)?))
}
Err(err) => {
if err.kind() == IOErrorKind::NotFound {
Ok(None)
} else {
Err(err.into())
}
}
}
}
#[tracing::instrument(skip(self))]
async fn put_post(&self, post: &'_ serde_json::Value, user: &url::Url) -> Result<()> {
let key = post["properties"]["uid"][0]
.as_str()
.expect("Tried to save a post without UID");
let path = url_to_path(&self.root_dir, key);
let tempfile = path.with_extension("tmp");
debug!("Creating {:?}", path);
let parent = path
.parent()
.expect("Parent for this directory should always exist")
.to_owned();
tokio::fs::create_dir_all(&parent).await?;
let mut file = tokio::fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(&tempfile)
.await?;
file.write_all(post.to_string().as_bytes()).await?;
file.flush().await?;
file.sync_all().await?;
drop(file);
tokio::fs::rename(&tempfile, &path).await?;
tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?;
if let Some(urls) = post["properties"]["url"].as_array() {
for url in urls.iter().map(|i| i.as_str().unwrap()) {
let url_domain = {
let url = url::Url::parse(url).unwrap();
format!(
"{}{}",
url.host_str().unwrap(),
url.port()
.map(|port| format!(":{}", port))
.unwrap_or_default()
)
};
if url != key && url_domain == user.authority() {
let link = url_to_path(&self.root_dir, url);
debug!("Creating a symlink at {:?}", link);
let orig = path.clone();
// We're supposed to have a parent here.
let basedir = link.parent().ok_or_else(|| {
StorageError::from_static(
ErrorKind::Backend,
"Failed to calculate parent directory when creating a symlink",
)
})?;
let relative = path_relative_from(&orig, basedir).unwrap();
println!("{:?} - {:?} = {:?}", &orig, &basedir, &relative);
tokio::fs::symlink(relative, link).await?;
}
}
}
if post["type"]
.as_array()
.unwrap()
.iter()
.any(|s| s.as_str() == Some("h-feed"))
{
tracing::debug!("Adding to channel list...");
// Add the h-feed to the channel list
let path = {
let mut path = relative_path::RelativePathBuf::new();
path.push(user.authority());
path.push("channels");
path.to_path(&self.root_dir)
};
tokio::fs::create_dir_all(path.parent().unwrap()).await?;
tracing::debug!("Channels file path: {}", path.display());
let tempfilename = path.with_extension("tmp");
let channel_name = post["properties"]["name"][0]
.as_str()
.map(|s| s.to_string())
.unwrap_or_default();
let key = key.to_string();
tracing::debug!("Opening temporary file to modify chnanels...");
let mut tempfile = OpenOptions::new()
.write(true)
.create_new(true)
.open(&tempfilename)
.await?;
tracing::debug!("Opening real channel file...");
let mut channels: Vec<super::MicropubChannel> = {
match OpenOptions::new()
.read(true)
.write(false)
.truncate(false)
.create(false)
.open(&path)
.await
{
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
Vec::default()
}
Err(err) => {
// Propagate the error upwards
return Err(err.into());
}
Ok(mut file) => {
let mut content = String::new();
file.read_to_string(&mut content).await?;
drop(file);
if !content.is_empty() {
serde_json::from_str(&content)?
} else {
Vec::default()
}
}
}
};
channels.push(super::MicropubChannel {
uid: key.to_string(),
name: channel_name,
});
tempfile
.write_all(serde_json::to_string(&channels)?.as_bytes())
.await?;
tempfile.flush().await?;
tempfile.sync_all().await?;
drop(tempfile);
tokio::fs::rename(tempfilename, &path).await?;
tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?;
}
Ok(())
}
#[tracing::instrument(skip(self))]
async fn update_post(&self, url: &str, update: MicropubUpdate) -> Result<()> {
let path = url_to_path(&self.root_dir, url);
let tempfilename = path.with_extension("tmp");
#[allow(unused_variables)]
let (old_json, new_json) = {
let mut temp = OpenOptions::new()
.write(true)
.create_new(true)
.open(&tempfilename)
.await?;
let mut file = OpenOptions::new().read(true).open(&path).await?;
let mut content = String::new();
file.read_to_string(&mut content).await?;
let json: serde_json::Value = serde_json::from_str(&content)?;
drop(file);
// Apply the editing algorithms. We can't use the stock
// `update.apply` function due to special requirements of
// the file backend, so we're implementing our own.
let new_json = modify_post(&json, update)?;
temp.write_all(new_json.to_string().as_bytes()).await?;
temp.flush().await?;
temp.sync_all().await?;
drop(temp);
tokio::fs::rename(tempfilename, &path).await?;
tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?;
(json, new_json)
};
// TODO check if URLs changed between old and new JSON
Ok(())
}
#[tracing::instrument(skip(self, f), fields(f = std::any::type_name::<F>()))]
async fn update_with<F: FnOnce(&mut serde_json::Value) + Send>(
&self, url: &str, f: F
) -> Result<(serde_json::Value, serde_json::Value)> {
todo!("update_with is not yet implemented due to special requirements of the file backend")
}
#[tracing::instrument(skip(self))]
async fn get_channels(&self, user: &url::Url) -> Result<Vec<super::MicropubChannel>> {
let mut path = relative_path::RelativePathBuf::new();
path.push(user.authority());
path.push("channels");
let path = path.to_path(&self.root_dir);
tracing::debug!("Channels file path: {}", path.display());
match File::open(&path).await {
Ok(mut f) => {
let mut content = String::new();
f.read_to_string(&mut content).await?;
// This should not happen, but if it does, handle it gracefully
if content.is_empty() {
return Ok(vec![]);
}
let channels: Vec<super::MicropubChannel> = serde_json::from_str(&content)?;
Ok(channels)
}
Err(e) => {
if e.kind() == IOErrorKind::NotFound {
Ok(vec![])
} else {
Err(e.into())
}
}
}
}
async fn read_feed_with_cursor(
&self,
url: &'_ str,
cursor: Option<&'_ str>,
limit: usize,
user: Option<&url::Url>
) -> Result<Option<(serde_json::Value, Option<String>)>> {
#[allow(deprecated)]
Ok(self.read_feed_with_limit(
url,
cursor,
limit,
user
).await?
.map(|feed| {
tracing::debug!("Feed: {:#}", serde_json::Value::Array(
feed["children"]
.as_array()
.map(|v| v.as_slice())
.unwrap_or_default()
.iter()
.map(|mf2| mf2["properties"]["uid"][0].clone())
.collect::<Vec<_>>()
));
let cursor: Option<String> = feed["children"]
.as_array()
.map(|v| v.as_slice())
.unwrap_or_default()
.last()
.map(|v| v["properties"]["uid"][0].as_str().unwrap().to_owned());
tracing::debug!("Extracted the cursor: {:?}", cursor);
(feed, cursor)
})
)
}
#[tracing::instrument(skip(self))]
async fn read_feed_with_limit(
&self,
url: &'_ str,
after: Option<&str>,
limit: usize,
user: Option<&url::Url>,
) -> Result<Option<serde_json::Value>> {
if let Some(mut feed) = self.get_post(url).await? {
if feed["children"].is_array() {
// Take this out of the MF2-JSON document to save memory
//
// This uses a clever match with enum destructuring
// to extract the underlying Vec without cloning it
let children: Vec<serde_json::Value> = match feed["children"].take() {
serde_json::Value::Array(children) => children,
// We've already checked it's an array
_ => unreachable!()
};
tracing::debug!("Full children array: {:#}", serde_json::Value::Array(children.clone()));
let mut posts_iter = children
.into_iter()
.map(|s: serde_json::Value| s.as_str().unwrap().to_string());
// Note: we can't actually use `skip_while` here because we end up emitting `after`.
// This imperative snippet consumes after instead of emitting it, allowing the
// stream of posts to return only those items that truly come *after* that one.
// If I would implement an Iter combinator like this, I would call it `skip_until`
//
// Uses `tokio::task::block_in_place` to prevent starvation in case of rewinding
// incredibly long feeds.
if let Some(after) = after {
tokio::task::block_in_place(|| {
for s in posts_iter.by_ref() {
if s == after {
break;
}
}
})
}
let posts = stream::iter(posts_iter)
.map(|url: String| async move { self.get_post(&url).await })
.buffered(std::cmp::min(3, limit))
// Hack to unwrap the Option and sieve out broken links
// Broken links return None, and Stream::filter_map skips Nones.
.try_filter_map(|post: Option<serde_json::Value>| async move { Ok(post) })
.and_then(|mut post| async move {
hydrate_author(&mut post, user, self).await;
Ok(post)
})
.take(limit);
match posts.try_collect::<Vec<serde_json::Value>>().await {
Ok(posts) => feed["children"] = serde_json::json!(posts),
Err(err) => {
return Err(StorageError::with_source(
ErrorKind::Other,
Cow::Owned(format!("Feed assembly error: {}", &err)),
Box::new(err),
));
}
}
}
hydrate_author(&mut feed, user, self).await;
Ok(Some(feed))
} else {
Ok(None)
}
}
#[tracing::instrument(skip(self))]
async fn delete_post(&self, url: &'_ str) -> Result<()> {
let path = url_to_path(&self.root_dir, url);
if let Err(e) = tokio::fs::remove_file(path).await {
Err(e.into())
} else {
// TODO check for dangling references in the channel list
Ok(())
}
}
#[tracing::instrument(skip(self))]
async fn get_setting<S: settings::Setting>(&self, user: &url::Url) -> Result<S> {
debug!("User for getting settings: {}", user);
let mut path = relative_path::RelativePathBuf::new();
path.push(user.authority());
path.push("settings");
let path = path.to_path(&self.root_dir);
debug!("Getting settings from {:?}", &path);
let mut file = File::open(path).await?;
let mut content = String::new();
file.read_to_string(&mut content).await?;
let settings: HashMap<&str, serde_json::Value> = serde_json::from_str(&content)?;
match settings.get(S::ID) {
Some(value) => Ok(serde_json::from_value::<S>(value.clone())?),
None => Err(StorageError::from_static(ErrorKind::Backend, "Setting not set"))
}
}
#[tracing::instrument(skip(self))]
async fn set_setting<S: settings::Setting>(&self, user: &url::Url, value: S::Data) -> Result<()> {
let mut path = relative_path::RelativePathBuf::new();
path.push(user.authority());
path.push("settings");
let path = path.to_path(&self.root_dir);
let temppath = path.with_extension("tmp");
let parent = path.parent().unwrap().to_owned();
tokio::fs::create_dir_all(&parent).await?;
let mut tempfile = OpenOptions::new()
.write(true)
.create_new(true)
.open(&temppath)
.await?;
let mut settings: HashMap<String, serde_json::Value> = match File::open(&path).await {
Ok(mut f) => {
let mut content = String::new();
f.read_to_string(&mut content).await?;
if content.is_empty() {
Default::default()
} else {
serde_json::from_str(&content)?
}
}
Err(err) => {
if err.kind() == IOErrorKind::NotFound {
Default::default()
} else {
return Err(err.into());
}
}
};
settings.insert(S::ID.to_owned(), serde_json::to_value(S::new(value))?);
tempfile
.write_all(serde_json::to_string(&settings)?.as_bytes())
.await?;
tempfile.flush().await?;
tempfile.sync_all().await?;
drop(tempfile);
tokio::fs::rename(temppath, &path).await?;
tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?;
Ok(())
}
#[tracing::instrument(skip(self))]
async fn add_or_update_webmention(&self, target: &str, mention_type: MentionType, mention: serde_json::Value) -> Result<()> {
let path = url_to_path(&self.root_dir, target);
let tempfilename = path.with_extension("tmp");
let mut temp = OpenOptions::new()
.write(true)
.create_new(true)
.open(&tempfilename)
.await?;
let mut file = OpenOptions::new().read(true).open(&path).await?;
let mut post: serde_json::Value = {
let mut content = String::new();
file.read_to_string(&mut content).await?;
drop(file);
serde_json::from_str(&content)?
};
let key: &'static str = match mention_type {
MentionType::Reply => "comment",
MentionType::Like => "like",
MentionType::Repost => "repost",
MentionType::Bookmark => "bookmark",
MentionType::Mention => "mention",
};
let mention_uid = mention["properties"]["uid"][0].clone();
if let Some(values) = post["properties"][key].as_array_mut() {
for value in values.iter_mut() {
if value["properties"]["uid"][0] == mention_uid {
*value = mention;
break;
}
}
} else {
post["properties"][key] = serde_json::Value::Array(vec![mention]);
}
temp.write_all(post.to_string().as_bytes()).await?;
temp.flush().await?;
temp.sync_all().await?;
drop(temp);
tokio::fs::rename(tempfilename, &path).await?;
tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?;
Ok(())
}
async fn all_posts<'this>(&'this self, user: &url::Url) -> Result<impl futures::Stream<Item = serde_json::Value> + Send + 'this> {
todo!();
Ok(futures::stream::empty()) // for type inference
}
}