#![warn(missing_docs)]
use std::borrow::Cow;
use async_trait::async_trait;
use kittybox_util::MentionType;
mod file;
pub use crate::database::file::FileStorage;
use crate::micropub::MicropubUpdate;
#[cfg(feature = "postgres")]
mod postgres;
#[cfg(feature = "postgres")]
pub use postgres::PostgresStorage;
#[cfg(test)]
mod memory;
#[cfg(test)]
pub use crate::database::memory::MemoryStorage;
pub use kittybox_util::MicropubChannel;
use self::settings::Setting;
/// Enum representing different errors that might occur during the database query.
#[derive(Debug, Clone, Copy)]
pub enum ErrorKind {
/// Backend error (e.g. database connection error)
Backend,
/// Error due to insufficient contextual permissions for the query
PermissionDenied,
/// Error due to the database being unable to parse JSON returned from the backing storage.
/// Usually indicative of someone fiddling with the database manually instead of using proper tools.
JsonParsing,
/// - ErrorKind::NotFound - equivalent to a 404 error. Note, some requests return an Option,
/// in which case None is also equivalent to a 404.
NotFound,
/// The user's query or request to the database was malformed. Used whenever the database processes
/// the user's query directly, such as when editing posts inside of the database (e.g. Redis backend)
BadRequest,
/// the user's query collided with an in-flight request and needs to be retried
Conflict,
/// - ErrorKind::Other - when something so weird happens that it becomes undescribable.
Other,
}
/// Settings that can be stored in the database.
pub mod settings {
mod private {
pub trait Sealed {}
}
/// A trait for various settings that should be contained here.
///
/// **Note**: this trait is sealed to prevent external
/// implementations, as it wouldn't make sense to add new settings
/// that aren't used by Kittybox itself.
pub trait Setting<'de>: private::Sealed + std::fmt::Debug + Default + Clone + serde::Serialize + serde::de::DeserializeOwned + /*From<Settings> +*/ Send + Sync {
type Data: std::fmt::Debug + Send + Sync;
const ID: &'static str;
/// Unwrap the setting type, returning owned data contained within.
fn into_inner(self) -> Self::Data;
/// Create a new instance of this type containing certain data.
fn new(data: Self::Data) -> Self;
}
/// A website's title, shown in the header.
#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq, Eq)]
pub struct SiteName(String);
impl Default for SiteName {
fn default() -> Self {
Self("Kittybox".to_string())
}
}
impl AsRef<str> for SiteName {
fn as_ref(&self) -> &str {
self.0.as_str()
}
}
impl private::Sealed for SiteName {}
impl Setting<'_> for SiteName {
type Data = String;
const ID: &'static str = "site_name";
fn into_inner(self) -> String {
self.0
}
fn new(data: Self::Data) -> Self {
Self(data)
}
}
impl SiteName {
fn from_str(data: &str) -> Self {
Self(data.to_owned())
}
}
/// Participation status in the IndieWeb Webring: https://🕸💍.ws/dashboard
#[derive(Debug, Default, serde::Deserialize, serde::Serialize, Clone, Copy, PartialEq, Eq)]
pub struct Webring(bool);
impl private::Sealed for Webring {}
impl Setting<'_> for Webring {
type Data = bool;
const ID: &'static str = "webring";
fn into_inner(self) -> Self::Data {
self.0
}
fn new(data: Self::Data) -> Self {
Self(data)
}
}
}
/// Error signalled from the database.
#[derive(Debug)]
pub struct StorageError {
msg: std::borrow::Cow<'static, str>,
source: Option<Box<dyn std::error::Error + Send + Sync>>,
kind: ErrorKind,
}
impl std::error::Error for StorageError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.source
.as_ref()
.map(|e| e.as_ref() as &dyn std::error::Error)
}
}
impl From<serde_json::Error> for StorageError {
fn from(err: serde_json::Error) -> Self {
Self {
msg: std::borrow::Cow::Owned(format!("{}", err)),
source: Some(Box::new(err)),
kind: ErrorKind::JsonParsing,
}
}
}
impl std::fmt::Display for StorageError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}: {}",
match self.kind {
ErrorKind::Backend => "backend error",
ErrorKind::JsonParsing => "JSON parsing error",
ErrorKind::PermissionDenied => "permission denied",
ErrorKind::NotFound => "not found",
ErrorKind::BadRequest => "bad request",
ErrorKind::Conflict => "conflict with an in-flight request or existing data",
ErrorKind::Other => "generic storage layer error",
},
self.msg
)
}
}
impl serde::Serialize for StorageError {
fn serialize<S: serde::Serializer>(
&self,
serializer: S,
) -> std::result::Result<S::Ok, S::Error> {
serializer.serialize_str(&self.to_string())
}
}
impl StorageError {
/// Create a new StorageError of an ErrorKind with a message.
pub fn new(kind: ErrorKind, msg: String) -> Self {
Self {
msg: Cow::Owned(msg),
source: None,
kind,
}
}
/// Create a new StorageError of an ErrorKind with a message from
/// a static string.
///
/// This saves an allocation for a new string and is the preferred
/// way in case the error message doesn't change.
pub fn from_static(kind: ErrorKind, msg: &'static str) -> Self {
Self {
msg: Cow::Borrowed(msg),
source: None,
kind
}
}
/// Create a StorageError using another arbitrary Error as a source.
pub fn with_source(
kind: ErrorKind,
msg: std::borrow::Cow<'static, str>,
source: Box<dyn std::error::Error + Send + Sync>,
) -> Self {
Self {
msg,
source: Some(source),
kind,
}
}
/// Get the kind of an error.
pub fn kind(&self) -> ErrorKind {
self.kind
}
/// Get the message as a string slice.
pub fn msg(&self) -> &str {
&self.msg
}
}
/// A special Result type for the Micropub backing storage.
pub type Result<T> = std::result::Result<T, StorageError>;
/// A storage backend for the Micropub server.
///
/// Implementations should note that all methods listed on this trait MUST be fully atomic
/// or lock the database so that write conflicts or reading half-written data should not occur.
#[async_trait]
pub trait Storage: std::fmt::Debug + Clone + Send + Sync {
/// Check if a post exists in the database.
async fn post_exists(&self, url: &str) -> Result<bool>;
/// Load a post from the database in MF2-JSON format, deserialized from JSON.
async fn get_post(&self, url: &str) -> Result<Option<serde_json::Value>>;
/// Save a post to the database as an MF2-JSON structure.
///
/// Note that the `post` object MUST have `post["properties"]["uid"][0]` defined.
async fn put_post(&self, post: &'_ serde_json::Value, user: &'_ str) -> Result<()>;
/// Add post to feed. Some database implementations might have optimized ways to do this.
#[tracing::instrument(skip(self))]
async fn add_to_feed(&self, feed: &'_ str, post: &'_ str) -> Result<()> {
tracing::debug!("Inserting {} into {} using `update_post`", post, feed);
self.update_post(feed, serde_json::from_value(
serde_json::json!({"add": {"children": [post]}})).unwrap()
).await
}
/// Remove post from feed. Some database implementations might have optimized ways to do this.
#[tracing::instrument(skip(self))]
async fn remove_from_feed(&self, feed: &'_ str, post: &'_ str) -> Result<()> {
tracing::debug!("Removing {} into {} using `update_post`", post, feed);
self.update_post(feed, serde_json::from_value(
serde_json::json!({"delete": {"children": [post]}})).unwrap()
).await
}
/// Modify a post using an update object as defined in the
/// Micropub spec.
///
/// Note to implementors: the update operation MUST be atomic and
/// SHOULD lock the database to prevent two clients overwriting
/// each other's changes or simply corrupting something. Rejecting
/// is allowed in case of concurrent updates if waiting for a lock
/// cannot be done.
async fn update_post(&self, url: &str, update: MicropubUpdate) -> Result<()>;
/// Get a list of channels available for the user represented by
/// the `user` domain to write to.
async fn get_channels(&self, user: &'_ str) -> Result<Vec<MicropubChannel>>;
/// Fetch a feed at `url` and return an h-feed object containing
/// `limit` posts after a post by url `after`, filtering the content
/// in context of a user specified by `user` (or an anonymous user).
///
/// This method MUST hydrate the `author` property with an h-card
/// from the database by replacing URLs with corresponding h-cards.
///
/// When encountering posts which the `user` is not authorized to
/// access, this method MUST elide such posts (as an optimization
/// for the frontend) and not return them, but still return up to
/// `limit` posts (to not reveal the hidden posts' presence).
///
/// Note for implementors: if you use streams to fetch posts in
/// parallel from the database, preferably make this method use a
/// connection pool to reduce overhead of creating a database
/// connection per post for parallel fetching.
async fn read_feed_with_limit(
&self,
url: &'_ str,
after: &'_ Option<String>,
limit: usize,
user: &'_ Option<String>,
) -> Result<Option<serde_json::Value>>;
/// Fetch a feed at `url` and return an h-feed object containing
/// `limit` posts after a `cursor` (filtering the content in
/// context of a user specified by `user`, or an anonymous user),
/// as well as a new cursor to paginate with.
///
/// This method MUST hydrate the `author` property with an h-card
/// from the database by replacing URLs with corresponding h-cards.
///
/// When encountering posts which the `user` is not authorized to
/// access, this method MUST elide such posts (as an optimization
/// for the frontend) and not return them, but still return an
/// amount of posts as close to `limit` as possible (to avoid
/// revealing the existence of the hidden post).
///
/// Note for implementors: if you use streams to fetch posts in
/// parallel from the database, preferably make this method use a
/// connection pool to reduce overhead of creating a database
/// connection per post for parallel fetching.
async fn read_feed_with_cursor(
&self,
url: &'_ str,
cursor: Option<&'_ str>,
limit: usize,
user: Option<&'_ str>
) -> Result<Option<(serde_json::Value, Option<String>)>>;
/// Deletes a post from the database irreversibly. Must be idempotent.
async fn delete_post(&self, url: &'_ str) -> Result<()>;
/// Gets a setting from the setting store and passes the result.
async fn get_setting<S: Setting<'a>, 'a>(&'_ self, user: &'_ str) -> Result<S>;
/// Commits a setting to the setting store.
async fn set_setting<S: Setting<'a> + 'a, 'a>(&self, user: &'a str, value: S::Data) -> Result<()>;
/// Add (or update) a webmention on a certian post.
///
/// The MF2 object describing the webmention content will always
/// be of type `h-cite`, and the `uid` property on the object will
/// always be set.
///
/// The rationale for this function is as follows: webmentions
/// might be duplicated, and we need to deduplicate them first. As
/// we lack support for transactions and locking posts on the
/// database, the only way is to implement the operation on the
/// database itself.
///
/// Besides, it may even allow for nice tricks like storing the
/// webmentions separately and rehydrating them on feed reads.
async fn add_or_update_webmention(&self, target: &str, mention_type: MentionType, mention: serde_json::Value) -> Result<()>;
}
#[cfg(test)]
mod tests {
use super::settings;
use super::{MicropubChannel, Storage};
use kittybox_util::MentionType;
use serde_json::json;
async fn test_basic_operations<Backend: Storage>(backend: Backend) {
let post: serde_json::Value = json!({
"type": ["h-entry"],
"properties": {
"content": ["Test content"],
"author": ["https://fireburn.ru/"],
"uid": ["https://fireburn.ru/posts/hello"],
"url": ["https://fireburn.ru/posts/hello", "https://fireburn.ru/posts/test"]
}
});
let key = post["properties"]["uid"][0].as_str().unwrap().to_string();
let alt_url = post["properties"]["url"][1].as_str().unwrap().to_string();
// Reading and writing
backend
.put_post(&post, "fireburn.ru")
.await
.unwrap();
if let Some(returned_post) = backend.get_post(&key).await.unwrap() {
assert!(returned_post.is_object());
assert_eq!(
returned_post["type"].as_array().unwrap().len(),
post["type"].as_array().unwrap().len()
);
assert_eq!(
returned_post["type"].as_array().unwrap(),
post["type"].as_array().unwrap()
);
let props: &serde_json::Map<String, serde_json::Value> =
post["properties"].as_object().unwrap();
for key in props.keys() {
assert_eq!(
returned_post["properties"][key].as_array().unwrap(),
post["properties"][key].as_array().unwrap()
)
}
} else {
panic!("For some reason the backend did not return the post.")
}
// Check the alternative URL - it should return the same post
if let Ok(Some(returned_post)) = backend.get_post(&alt_url).await {
assert!(returned_post.is_object());
assert_eq!(
returned_post["type"].as_array().unwrap().len(),
post["type"].as_array().unwrap().len()
);
assert_eq!(
returned_post["type"].as_array().unwrap(),
post["type"].as_array().unwrap()
);
let props: &serde_json::Map<String, serde_json::Value> =
post["properties"].as_object().unwrap();
for key in props.keys() {
assert_eq!(
returned_post["properties"][key].as_array().unwrap(),
post["properties"][key].as_array().unwrap()
)
}
} else {
panic!("For some reason the backend did not return the post.")
}
}
/// Note: this is merely a smoke check and is in no way comprehensive.
// TODO updates for feeds must update children using special logic
async fn test_update<Backend: Storage>(backend: Backend) {
let post: serde_json::Value = json!({
"type": ["h-entry"],
"properties": {
"content": ["Test content"],
"author": ["https://fireburn.ru/"],
"uid": ["https://fireburn.ru/posts/hello"],
"url": ["https://fireburn.ru/posts/hello", "https://fireburn.ru/posts/test"]
}
});
let key = post["properties"]["uid"][0].as_str().unwrap().to_string();
// Reading and writing
backend
.put_post(&post, "fireburn.ru")
.await
.unwrap();
backend
.update_post(
&key,
serde_json::from_value(json!({
"url": &key,
"add": {
"category": ["testing"],
},
"replace": {
"content": ["Different test content"]
}
})).unwrap(),
)
.await
.unwrap();
match backend.get_post(&key).await {
Ok(Some(returned_post)) => {
assert!(returned_post.is_object());
assert_eq!(
returned_post["type"].as_array().unwrap().len(),
post["type"].as_array().unwrap().len()
);
assert_eq!(
returned_post["type"].as_array().unwrap(),
post["type"].as_array().unwrap()
);
assert_eq!(
returned_post["properties"]["content"][0].as_str().unwrap(),
"Different test content"
);
assert_eq!(
returned_post["properties"]["category"].as_array().unwrap(),
&vec![json!("testing")]
);
}
something_else => {
something_else
.expect("Shouldn't error")
.expect("Should have the post");
}
}
}
async fn test_get_channel_list<Backend: Storage>(backend: Backend) {
let feed = json!({
"type": ["h-feed"],
"properties": {
"name": ["Main Page"],
"author": ["https://fireburn.ru/"],
"uid": ["https://fireburn.ru/feeds/main"]
},
"children": []
});
backend
.put_post(&feed, "fireburn.ru")
.await
.unwrap();
let chans = backend.get_channels("fireburn.ru").await.unwrap();
assert_eq!(chans.len(), 1);
assert_eq!(
chans[0],
MicropubChannel {
uid: "https://fireburn.ru/feeds/main".to_string(),
name: "Main Page".to_string()
}
);
}
async fn test_settings<Backend: Storage>(backend: Backend) {
backend
.set_setting::<settings::SiteName>(
"https://fireburn.ru/",
"Vika's Hideout".to_owned()
)
.await
.unwrap();
assert_eq!(
backend
.get_setting::<settings::SiteName>("https://fireburn.ru/")
.await
.unwrap()
.as_ref(),
"Vika's Hideout"
);
}
fn gen_random_post(domain: &str) -> serde_json::Value {
use faker_rand::lorem::{Paragraphs, Word};
let uid = format!(
"https://{domain}/posts/{}-{}-{}",
rand::random::<Word>(),
rand::random::<Word>(),
rand::random::<Word>()
);
let time = chrono::Local::now().to_rfc3339();
let post = json!({
"type": ["h-entry"],
"properties": {
"content": [rand::random::<Paragraphs>().to_string()],
"uid": [&uid],
"url": [&uid],
"published": [&time]
}
});
post
}
fn gen_random_mention(domain: &str, mention_type: MentionType, url: &str) -> serde_json::Value {
use faker_rand::lorem::{Paragraphs, Word};
let uid = format!(
"https://{domain}/posts/{}-{}-{}",
rand::random::<Word>(),
rand::random::<Word>(),
rand::random::<Word>()
);
let time = chrono::Local::now().to_rfc3339();
let post = json!({
"type": ["h-cite"],
"properties": {
"content": [rand::random::<Paragraphs>().to_string()],
"uid": [&uid],
"url": [&uid],
"published": [&time],
(match mention_type {
MentionType::Reply => "in-reply-to",
MentionType::Like => "like-of",
MentionType::Repost => "repost-of",
MentionType::Bookmark => "bookmark-of",
MentionType::Mention => unimplemented!(),
}): [url]
}
});
post
}
async fn test_feed_pagination<Backend: Storage>(backend: Backend) {
let posts = {
let mut posts = std::iter::from_fn(
|| Some(gen_random_post("fireburn.ru"))
)
.take(40)
.collect::<Vec<serde_json::Value>>();
// Reverse the array so it's in reverse-chronological order
posts.reverse();
posts
};
let feed = json!({
"type": ["h-feed"],
"properties": {
"name": ["Main Page"],
"author": ["https://fireburn.ru/"],
"uid": ["https://fireburn.ru/feeds/main"]
},
});
let key = feed["properties"]["uid"][0].as_str().unwrap();
backend
.put_post(&feed, "fireburn.ru")
.await
.unwrap();
for (i, post) in posts.iter().rev().enumerate() {
backend
.put_post(post, "fireburn.ru")
.await
.unwrap();
backend.add_to_feed(key, post["properties"]["uid"][0].as_str().unwrap()).await.unwrap();
}
let limit: usize = 10;
tracing::debug!("Starting feed reading...");
let (result, cursor) = backend
.read_feed_with_cursor(key, None, limit, None)
.await
.unwrap()
.unwrap();
assert_eq!(result["children"].as_array().unwrap().len(), limit);
assert_eq!(
result["children"]
.as_array()
.unwrap()
.iter()
.map(|post| post["properties"]["uid"][0].as_str().unwrap())
.collect::<Vec<_>>()
[0..10],
posts
.iter()
.map(|post| post["properties"]["uid"][0].as_str().unwrap())
.collect::<Vec<_>>()
[0..10]
);
tracing::debug!("Continuing with cursor: {:?}", cursor);
let (result2, cursor2) = backend
.read_feed_with_cursor(
key,
cursor.as_deref(),
limit,
None,
)
.await
.unwrap()
.unwrap();
assert_eq!(
result2["children"].as_array().unwrap()[0..10],
posts[10..20]
);
tracing::debug!("Continuing with cursor: {:?}", cursor);
let (result3, cursor3) = backend
.read_feed_with_cursor(
key,
cursor2.as_deref(),
limit,
None,
)
.await
.unwrap()
.unwrap();
assert_eq!(
result3["children"].as_array().unwrap()[0..10],
posts[20..30]
);
tracing::debug!("Continuing with cursor: {:?}", cursor);
let (result4, _) = backend
.read_feed_with_cursor(
key,
cursor3.as_deref(),
limit,
None,
)
.await
.unwrap()
.unwrap();
assert_eq!(
result4["children"].as_array().unwrap()[0..10],
posts[30..40]
);
// Regression test for #4
//
// Results for a bogus cursor are undefined, so we aren't
// checking them. But the function at least shouldn't hang.
let nonsense_after = Some("1010101010");
let _ = tokio::time::timeout(tokio::time::Duration::from_secs(10), async move {
backend
.read_feed_with_cursor(key, nonsense_after, limit, None)
.await
})
.await
.expect("Operation should not hang: see https://gitlab.com/kittybox/kittybox/-/issues/4");
}
async fn test_webmention_addition<Backend: Storage>(db: Backend) {
let post = gen_random_post("fireburn.ru");
db.put_post(&post, "fireburn.ru").await.unwrap();
const TYPE: MentionType = MentionType::Reply;
let target = post["properties"]["uid"][0].as_str().unwrap();
let mut reply = gen_random_mention("aaronparecki.com", TYPE, target);
let (read_post, _) = db.read_feed_with_cursor(target, None, 20, None).await.unwrap().unwrap();
assert_eq!(post, read_post);
db.add_or_update_webmention(target, TYPE, reply.clone()).await.unwrap();
let (read_post, _) = db.read_feed_with_cursor(target, None, 20, None).await.unwrap().unwrap();
assert_eq!(read_post["properties"]["comment"][0], reply);
reply["properties"]["content"][0] = json!(rand::random::<faker_rand::lorem::Paragraphs>().to_string());
db.add_or_update_webmention(target, TYPE, reply.clone()).await.unwrap();
let (read_post, _) = db.read_feed_with_cursor(target, None, 20, None).await.unwrap().unwrap();
assert_eq!(read_post["properties"]["comment"][0], reply);
}
/// Automatically generates a test suite for
macro_rules! test_all {
($func_name:ident, $mod_name:ident) => {
mod $mod_name {
$func_name!(test_basic_operations);
$func_name!(test_get_channel_list);
$func_name!(test_settings);
$func_name!(test_update);
$func_name!(test_feed_pagination);
$func_name!(test_webmention_addition);
}
};
}
macro_rules! file_test {
($func_name:ident) => {
#[tokio::test]
#[tracing_test::traced_test]
async fn $func_name() {
let tempdir = tempfile::tempdir().expect("Failed to create tempdir");
let backend = super::super::FileStorage::new(
tempdir.path().to_path_buf()
)
.await
.unwrap();
super::$func_name(backend).await
}
};
}
macro_rules! postgres_test {
($func_name:ident) => {
#[cfg(feature = "sqlx")]
#[sqlx::test]
#[tracing_test::traced_test]
async fn $func_name(
pool_opts: sqlx::postgres::PgPoolOptions,
connect_opts: sqlx::postgres::PgConnectOptions
) -> Result<(), sqlx::Error> {
let db = {
//use sqlx::ConnectOptions;
//connect_opts.log_statements(log::LevelFilter::Debug);
pool_opts.connect_with(connect_opts).await?
};
let backend = super::super::PostgresStorage::from_pool(db).await.unwrap();
Ok(super::$func_name(backend).await)
}
};
}
test_all!(file_test, file);
test_all!(postgres_test, postgres);
}