From 82346b89c7d166817f3450759cc9f6df1ec7c74d Mon Sep 17 00:00:00 2001 From: Vika Date: Sun, 9 Jul 2023 01:22:46 +0300 Subject: database{,/file}: clean up code, add documentation and logging `filter_post` is now out of here and moved into the frontend. This kind of non-intrusive filtering can be done on the frontend, and the database need not concern itself with this. It can still be done as an optimisation... probably? but the frontend is going to sanitize things like location in the post by itself now, so it is not required anymore (and might be harmful, if frontend starts indicating that there are some hidden fields by replacing them with placeholders that ask one to log in to view information). --- kittybox-rs/src/database/file/mod.rs | 188 ++++++++++++++++------------ kittybox-rs/src/database/mod.rs | 233 ++++++++++++++++------------------- 2 files changed, 215 insertions(+), 206 deletions(-) diff --git a/kittybox-rs/src/database/file/mod.rs b/kittybox-rs/src/database/file/mod.rs index 0f63c9d..842a834 100644 --- a/kittybox-rs/src/database/file/mod.rs +++ b/kittybox-rs/src/database/file/mod.rs @@ -1,5 +1,5 @@ //#![warn(clippy::unwrap_used)] -use crate::database::{filter_post, ErrorKind, Result, settings, Storage, StorageError}; +use crate::database::{ErrorKind, Result, settings, Storage, StorageError}; use crate::micropub::{MicropubUpdate, MicropubPropertyDeletion}; use async_trait::async_trait; use futures::{stream, StreamExt, TryStreamExt}; @@ -173,20 +173,20 @@ fn modify_post(post: &serde_json::Value, update: MicropubUpdate) -> Result {:?}", k, v); let props = if k == "children" { &mut post } else { &mut post["properties"] }; - let k = &k; - if let Some(prop) = props[k].as_array_mut() { + if let Some(prop) = props[&k].as_array_mut() { if k == "children" { v.into_iter().rev().for_each(|v| prop.insert(0, v)); } else { prop.extend(v.into_iter()); } } else { - post["properties"][k] = serde_json::Value::Array(v) + props[&k] = serde_json::Value::Array(v) } } Ok(post) @@ -227,10 +227,7 @@ async fn hydrate_author( if let Some(i) = i.as_str() { match storage.get_post(i).await { Ok(post) => match post { - Some(post) => match filter_post(post, user) { - Some(author) => author, - None => json!(i), - }, + Some(post) => post, None => json!(i), }, Err(e) => { @@ -383,27 +380,41 @@ impl Storage for FileStorage { .map(|s| s.to_string()) .unwrap_or_else(String::default); let key = key.to_string(); - + tracing::debug!("Opening temporary file to modify chnanels..."); let mut tempfile = OpenOptions::new() .write(true) .create_new(true) .open(&tempfilename) .await?; - let mut file = OpenOptions::new() - .read(true) - .write(true) - .truncate(false) - .create(true) - .open(&path) - .await?; - - let mut content = String::new(); - file.read_to_string(&mut content).await?; - drop(file); - let mut channels: Vec = if !content.is_empty() { - serde_json::from_str(&content)? - } else { - Vec::default() + tracing::debug!("Opening real channel file..."); + let mut channels: Vec = { + match OpenOptions::new() + .read(true) + .write(false) + .truncate(false) + .create(false) + .open(&path) + .await + { + Err(err) if err.kind() == std::io::ErrorKind::NotFound => { + Vec::default() + } + Err(err) => { + // Propagate the error upwards + return Err(err.into()); + } + Ok(mut file) => { + let mut content = String::new(); + file.read_to_string(&mut content).await?; + drop(file); + + if !content.is_empty() { + serde_json::from_str(&content)? + } else { + Vec::default() + } + } + } }; channels.push(super::MicropubChannel { @@ -489,7 +500,32 @@ impl Storage for FileStorage { limit: usize, user: Option<&'_ str> ) -> Result)>> { - todo!() + Ok(self.read_feed_with_limit( + url, + &cursor.map(|v| v.to_owned()), + limit, + &user.map(|v| v.to_owned()) + ).await? + .map(|feed| { + tracing::debug!("Feed: {:#}", serde_json::Value::Array( + feed["children"] + .as_array() + .map(|v| v.as_slice()) + .unwrap_or_default() + .iter() + .map(|mf2| mf2["properties"]["uid"][0].clone()) + .collect::>() + )); + let cursor: Option = feed["children"] + .as_array() + .map(|v| v.as_slice()) + .unwrap_or_default() + .last() + .map(|v| v["properties"]["uid"][0].as_str().unwrap().to_owned()); + tracing::debug!("Extracted the cursor: {:?}", cursor); + (feed, cursor) + }) + ) } #[tracing::instrument(skip(self))] @@ -500,65 +536,57 @@ impl Storage for FileStorage { limit: usize, user: &'_ Option, ) -> Result> { - if let Some(feed) = self.get_post(url).await? { - if let Some(mut feed) = filter_post(feed, user) { - if feed["children"].is_array() { - // Take this out of the MF2-JSON document to save memory - // - // This uses a clever match with enum destructuring - // to extract the underlying Vec without cloning it - let children: Vec = match feed["children"].take() { - serde_json::Value::Array(children) => children, - // We've already checked it's an array - _ => unreachable!() - }; - - let mut posts_iter = children - .into_iter() - .map(|s: serde_json::Value| s.as_str().unwrap().to_string()); - // Note: we can't actually use `skip_while` here because we end up emitting `after`. - // This imperative snippet consumes after instead of emitting it, allowing the - // stream of posts to return only those items that truly come *after* that one. - // If I would implement an Iter combinator like this, I would call it `skip_until` - if let Some(after) = after { - for s in posts_iter.by_ref() { - if &s == after { - break; - } - } - }; - let posts = stream::iter(posts_iter) - .map(|url: String| async move { self.get_post(&url).await }) - .buffered(std::cmp::min(3, limit)) - // Hack to unwrap the Option and sieve out broken links - // Broken links return None, and Stream::filter_map skips Nones. - .try_filter_map(|post: Option| async move { Ok(post) }) - .try_filter_map(|post| async move { Ok(filter_post(post, user)) }) - .and_then(|mut post| async move { - hydrate_author(&mut post, user, self).await; - Ok(post) - }) - .take(limit); - - match posts.try_collect::>().await { - Ok(posts) => feed["children"] = serde_json::json!(posts), - Err(err) => { - return Err(StorageError::with_source( - ErrorKind::Other, - Cow::Owned(format!("Feed assembly error: {}", &err)), - Box::new(err), - )); + if let Some(mut feed) = self.get_post(url).await? { + if feed["children"].is_array() { + // Take this out of the MF2-JSON document to save memory + // + // This uses a clever match with enum destructuring + // to extract the underlying Vec without cloning it + let children: Vec = match feed["children"].take() { + serde_json::Value::Array(children) => children, + // We've already checked it's an array + _ => unreachable!() + }; + tracing::debug!("Full children array: {:#}", serde_json::Value::Array(children.clone())); + let mut posts_iter = children + .into_iter() + .map(|s: serde_json::Value| s.as_str().unwrap().to_string()); + // Note: we can't actually use `skip_while` here because we end up emitting `after`. + // This imperative snippet consumes after instead of emitting it, allowing the + // stream of posts to return only those items that truly come *after* that one. + // If I would implement an Iter combinator like this, I would call it `skip_until` + if let Some(after) = after { + for s in posts_iter.by_ref() { + if &s == after { + break; } } + }; + let posts = stream::iter(posts_iter) + .map(|url: String| async move { self.get_post(&url).await }) + .buffered(std::cmp::min(3, limit)) + // Hack to unwrap the Option and sieve out broken links + // Broken links return None, and Stream::filter_map skips Nones. + .try_filter_map(|post: Option| async move { Ok(post) }) + .and_then(|mut post| async move { + hydrate_author(&mut post, user, self).await; + Ok(post) + }) + .take(limit); + + match posts.try_collect::>().await { + Ok(posts) => feed["children"] = serde_json::json!(posts), + Err(err) => { + return Err(StorageError::with_source( + ErrorKind::Other, + Cow::Owned(format!("Feed assembly error: {}", &err)), + Box::new(err), + )); + } } - hydrate_author(&mut feed, user, self).await; - Ok(Some(feed)) - } else { - Err(StorageError::from_static( - ErrorKind::PermissionDenied, - "specified user cannot access this post", - )) } + hydrate_author(&mut feed, user, self).await; + Ok(Some(feed)) } else { Ok(None) } diff --git a/kittybox-rs/src/database/mod.rs b/kittybox-rs/src/database/mod.rs index baae81d..3086623 100644 --- a/kittybox-rs/src/database/mod.rs +++ b/kittybox-rs/src/database/mod.rs @@ -42,21 +42,6 @@ pub mod settings { mod private { pub trait Sealed {} } - /*#[derive(serde::Deserialize, serde::Serialize, Default)] - pub struct Settings { - pub site_name: SiteName, - pub webring: Webring - } - impl From for SiteName { - fn from(settings: Settings) -> Self { - settings.site_name - } - } - impl From for Webring { - fn from(settings: Settings) -> Self { - settings.webring - } - }*/ /// A trait for various settings that should be contained here. /// @@ -218,65 +203,6 @@ impl StorageError { /// A special Result type for the Micropub backing storage. pub type Result = std::result::Result; -/// Filter the post according to the value of `user`. -/// -/// Anonymous users cannot view private posts and protected locations; -/// Logged-in users can only view private posts targeted at them; -/// Logged-in users can't view private location data -pub fn filter_post( - mut post: serde_json::Value, - user: &'_ Option, -) -> Option { - if post["properties"]["deleted"][0].is_string() { - return Some(serde_json::json!({ - "type": post["type"], - "properties": { - "deleted": post["properties"]["deleted"] - } - })); - } - let empty_vec: Vec = vec![]; - let author = post["properties"]["author"] - .as_array() - .unwrap_or(&empty_vec) - .iter() - .map(|i| i.as_str().unwrap().to_string()); - let visibility = post["properties"]["visibility"][0] - .as_str() - .unwrap_or("public"); - let mut audience = author.chain( - post["properties"]["audience"] - .as_array() - .unwrap_or(&empty_vec) - .iter() - .map(|i| i.as_str().unwrap().to_string()), - ); - if (visibility == "private" && !audience.any(|i| Some(i) == *user)) - || (visibility == "protected" && user.is_none()) - { - return None; - } - if post["properties"]["location"].is_array() { - let location_visibility = post["properties"]["location-visibility"][0] - .as_str() - .unwrap_or("private"); - let mut author = post["properties"]["author"] - .as_array() - .unwrap_or(&empty_vec) - .iter() - .map(|i| i.as_str().unwrap().to_string()); - if (location_visibility == "private" && !author.any(|i| Some(i) == *user)) - || (location_visibility == "protected" && user.is_none()) - { - post["properties"] - .as_object_mut() - .unwrap() - .remove("location"); - } - } - Some(post) -} - /// A storage backend for the Micropub server. /// /// Implementations should note that all methods listed on this trait MUST be fully atomic @@ -295,14 +221,24 @@ pub trait Storage: std::fmt::Debug + Clone + Send + Sync { async fn put_post(&self, post: &'_ serde_json::Value, user: &'_ str) -> Result<()>; /// Add post to feed. Some database implementations might have optimized ways to do this. + #[tracing::instrument(skip(self))] async fn add_to_feed(&self, feed: &'_ str, post: &'_ str) -> Result<()> { - self.update_post(feed, serde_json::from_str(r#"{"add": {"children": [post]}}"#).unwrap()).await + tracing::debug!("Inserting {} into {} using `update_post`", post, feed); + self.update_post(feed, serde_json::from_value( + serde_json::json!({"add": {"children": [post]}})).unwrap() + ).await } + /// Remove post from feed. Some database implementations might have optimized ways to do this. + #[tracing::instrument(skip(self))] async fn remove_from_feed(&self, feed: &'_ str, post: &'_ str) -> Result<()> { - self.update_post(feed, serde_json::from_str(r#"{"delete": {"children": [post]}}"#).unwrap()).await + tracing::debug!("Removing {} into {} using `update_post`", post, feed); + self.update_post(feed, serde_json::from_value( + serde_json::json!({"delete": {"children": [post]}})).unwrap() + ).await } - /// Modify a post using an update object as defined in the Micropub spec. + /// Modify a post using an update object as defined in the + /// Micropub spec. /// /// Note to implementors: the update operation MUST be atomic and /// SHOULD lock the database to prevent two clients overwriting @@ -311,26 +247,26 @@ pub trait Storage: std::fmt::Debug + Clone + Send + Sync { /// cannot be done. async fn update_post(&self, url: &str, update: MicropubUpdate) -> Result<()>; - /// Get a list of channels available for the user represented by the URL `user` to write to. + /// Get a list of channels available for the user represented by + /// the `user` domain to write to. async fn get_channels(&self, user: &'_ str) -> Result>; /// Fetch a feed at `url` and return a an h-feed object containing /// `limit` posts after a post by url `after`, filtering the content /// in context of a user specified by `user` (or an anonymous user). /// - /// Specifically, private posts that don't include the user in the audience - /// will be elided from the feed, and the posts containing location and not - /// specifying post["properties"]["location-visibility"][0] == "public" - /// will have their location data (but not check-in data) stripped. + /// This method MUST hydrate the `author` property with an h-card + /// from the database by replacing URLs with corresponding h-cards. /// - /// This function is used as an optimization so the client, whatever it is, - /// doesn't have to fetch posts, then realize some of them are private, and - /// fetch more posts. + /// When encountering posts which the `user` is not authorized to + /// access, this method MUST elide such posts (as an optimization + /// for the frontend) and not return them, but still return up to + /// `limit` posts (to not reveal the hidden posts' presence). /// - /// Note for implementors: if you use streams to fetch posts in parallel - /// from the database, preferably make this method use a connection pool - /// to reduce overhead of creating a database connection per post for - /// parallel fetching. + /// Note for implementors: if you use streams to fetch posts in + /// parallel from the database, preferably make this method use a + /// connection pool to reduce overhead of creating a database + /// connection per post for parallel fetching. async fn read_feed_with_limit( &self, url: &'_ str, @@ -347,7 +283,7 @@ pub trait Storage: std::fmt::Debug + Clone + Send + Sync { user: Option<&'_ str> ) -> Result)>>; - /// Deletes a post from the database irreversibly. 'nuff said. Must be idempotent. + /// Deletes a post from the database irreversibly. Must be idempotent. async fn delete_post(&self, url: &'_ str) -> Result<()>; /// Gets a setting from the setting store and passes the result. @@ -559,13 +495,19 @@ mod tests { } async fn test_feed_pagination(backend: Backend) { - let posts = std::iter::from_fn(|| Some(gen_random_post("fireburn.ru"))) - .take(20) - .collect::>() - .into_iter() - .rev() + let posts = { + let mut posts = std::iter::from_fn( + || Some(gen_random_post("fireburn.ru")) + ) + .take(40) .collect::>(); + // Reverse the array so it's in reverse-chronological order + posts.reverse(); + + posts + }; + let feed = json!({ "type": ["h-feed"], "properties": { @@ -573,9 +515,6 @@ mod tests { "author": ["https://fireburn.ru/"], "uid": ["https://fireburn.ru/feeds/main"] }, - "children": posts.iter() - .filter_map(|json| json["properties"]["uid"][0].as_str()) - .collect::>() }); let key = feed["properties"]["uid"][0].as_str().unwrap(); @@ -583,61 +522,103 @@ mod tests { .put_post(&feed, "fireburn.ru") .await .unwrap(); - println!("---"); - for (i, post) in posts.iter().enumerate() { + + for (i, post) in posts.iter().rev().enumerate() { backend .put_post(post, "fireburn.ru") .await .unwrap(); - println!("posts[{}] = {}", i, post["properties"]["uid"][0]); + backend.add_to_feed(key, post["properties"]["uid"][0].as_str().unwrap()).await.unwrap(); } - println!("---"); + let limit: usize = 10; - let result = backend - .read_feed_with_limit(key, &None, limit, &None) + + tracing::debug!("Starting feed reading..."); + let (result, cursor) = backend + .read_feed_with_cursor(key, None, limit, None) .await .unwrap() .unwrap(); - for (i, post) in result["children"].as_array().unwrap().iter().enumerate() { - println!("feed[0][{}] = {}", i, post["properties"]["uid"][0]); - } - println!("---"); - assert!(result["children"].as_array().unwrap().len() <= limit); - assert_eq!(result["children"].as_array().unwrap()[0..10], posts[0..10]); - let result2 = backend - .read_feed_with_limit( + assert_eq!(result["children"].as_array().unwrap().len(), limit); + assert_eq!( + result["children"] + .as_array() + .unwrap() + .iter() + .map(|post| post["properties"]["uid"][0].as_str().unwrap()) + .collect::>() + [0..10], + posts + .iter() + .map(|post| post["properties"]["uid"][0].as_str().unwrap()) + .collect::>() + [0..10] + ); + + tracing::debug!("Continuing with cursor: {:?}", cursor); + let (result2, cursor2) = backend + .read_feed_with_cursor( key, - &result["children"].as_array().unwrap().last().unwrap()["properties"]["uid"][0] - .as_str() - .map(|i| i.to_owned()), + cursor.as_deref(), limit, - &None, + None, ) .await .unwrap() .unwrap(); - for (i, post) in result2["children"].as_array().unwrap().iter().enumerate() { - println!("feed[1][{}] = {}", i, post["properties"]["uid"][0]); - } - println!("---"); + assert_eq!( result2["children"].as_array().unwrap()[0..10], posts[10..20] ); + tracing::debug!("Continuing with cursor: {:?}", cursor); + let (result3, cursor3) = backend + .read_feed_with_cursor( + key, + cursor2.as_deref(), + limit, + None, + ) + .await + .unwrap() + .unwrap(); + + assert_eq!( + result3["children"].as_array().unwrap()[0..10], + posts[20..30] + ); + + tracing::debug!("Continuing with cursor: {:?}", cursor); + let (result4, cursor4) = backend + .read_feed_with_cursor( + key, + cursor3.as_deref(), + limit, + None, + ) + .await + .unwrap() + .unwrap(); + + assert_eq!( + result4["children"].as_array().unwrap()[0..10], + posts[30..40] + ); + // Regression test for #4 - let nonsense_after = Some("1010101010".to_owned()); - let result3 = tokio::time::timeout(tokio::time::Duration::from_secs(10), async move { + // + // Results for a bogus cursor are undefined, so we aren't + // checking them. But the function at least shouldn't hang. + let nonsense_after = Some("1010101010"); + let result_bogus = tokio::time::timeout(tokio::time::Duration::from_secs(10), async move { backend - .read_feed_with_limit(key, &nonsense_after, limit, &None) + .read_feed_with_cursor(key, nonsense_after, limit, None) .await - .unwrap() - .unwrap() }) .await .expect("Operation should not hang: see https://gitlab.com/kittybox/kittybox/-/issues/4"); - assert!(result3["children"].as_array().unwrap().is_empty()); } /// Automatically generates a test suite for -- cgit 1.4.1