diff options
-rw-r--r-- | kittybox-rs/src/database/file/mod.rs | 188 | ||||
-rw-r--r-- | kittybox-rs/src/database/mod.rs | 233 |
2 files changed, 215 insertions, 206 deletions
diff --git a/kittybox-rs/src/database/file/mod.rs b/kittybox-rs/src/database/file/mod.rs index 0f63c9d..842a834 100644 --- a/kittybox-rs/src/database/file/mod.rs +++ b/kittybox-rs/src/database/file/mod.rs @@ -1,5 +1,5 @@ //#![warn(clippy::unwrap_used)] -use crate::database::{filter_post, ErrorKind, Result, settings, Storage, StorageError}; +use crate::database::{ErrorKind, Result, settings, Storage, StorageError}; use crate::micropub::{MicropubUpdate, MicropubPropertyDeletion}; use async_trait::async_trait; use futures::{stream, StreamExt, TryStreamExt}; @@ -173,20 +173,20 @@ fn modify_post(post: &serde_json::Value, update: MicropubUpdate) -> Result<serde }); } for (k, v) in add_keys { + tracing::debug!("Adding k/v to post: {} => {:?}", k, v); let props = if k == "children" { &mut post } else { &mut post["properties"] }; - let k = &k; - if let Some(prop) = props[k].as_array_mut() { + if let Some(prop) = props[&k].as_array_mut() { if k == "children" { v.into_iter().rev().for_each(|v| prop.insert(0, v)); } else { prop.extend(v.into_iter()); } } else { - post["properties"][k] = serde_json::Value::Array(v) + props[&k] = serde_json::Value::Array(v) } } Ok(post) @@ -227,10 +227,7 @@ async fn hydrate_author<S: Storage>( if let Some(i) = i.as_str() { match storage.get_post(i).await { Ok(post) => match post { - Some(post) => match filter_post(post, user) { - Some(author) => author, - None => json!(i), - }, + Some(post) => post, None => json!(i), }, Err(e) => { @@ -383,27 +380,41 @@ impl Storage for FileStorage { .map(|s| s.to_string()) .unwrap_or_else(String::default); let key = key.to_string(); - + tracing::debug!("Opening temporary file to modify chnanels..."); let mut tempfile = OpenOptions::new() .write(true) .create_new(true) .open(&tempfilename) .await?; - let mut file = OpenOptions::new() - .read(true) - .write(true) - .truncate(false) - .create(true) - .open(&path) - .await?; - - let mut content = String::new(); - file.read_to_string(&mut content).await?; - drop(file); - let mut channels: Vec<super::MicropubChannel> = if !content.is_empty() { - serde_json::from_str(&content)? - } else { - Vec::default() + tracing::debug!("Opening real channel file..."); + let mut channels: Vec<super::MicropubChannel> = { + match OpenOptions::new() + .read(true) + .write(false) + .truncate(false) + .create(false) + .open(&path) + .await + { + Err(err) if err.kind() == std::io::ErrorKind::NotFound => { + Vec::default() + } + Err(err) => { + // Propagate the error upwards + return Err(err.into()); + } + Ok(mut file) => { + let mut content = String::new(); + file.read_to_string(&mut content).await?; + drop(file); + + if !content.is_empty() { + serde_json::from_str(&content)? + } else { + Vec::default() + } + } + } }; channels.push(super::MicropubChannel { @@ -489,7 +500,32 @@ impl Storage for FileStorage { limit: usize, user: Option<&'_ str> ) -> Result<Option<(serde_json::Value, Option<String>)>> { - todo!() + Ok(self.read_feed_with_limit( + url, + &cursor.map(|v| v.to_owned()), + limit, + &user.map(|v| v.to_owned()) + ).await? + .map(|feed| { + tracing::debug!("Feed: {:#}", serde_json::Value::Array( + feed["children"] + .as_array() + .map(|v| v.as_slice()) + .unwrap_or_default() + .iter() + .map(|mf2| mf2["properties"]["uid"][0].clone()) + .collect::<Vec<_>>() + )); + let cursor: Option<String> = feed["children"] + .as_array() + .map(|v| v.as_slice()) + .unwrap_or_default() + .last() + .map(|v| v["properties"]["uid"][0].as_str().unwrap().to_owned()); + tracing::debug!("Extracted the cursor: {:?}", cursor); + (feed, cursor) + }) + ) } #[tracing::instrument(skip(self))] @@ -500,65 +536,57 @@ impl Storage for FileStorage { limit: usize, user: &'_ Option<String>, ) -> Result<Option<serde_json::Value>> { - if let Some(feed) = self.get_post(url).await? { - if let Some(mut feed) = filter_post(feed, user) { - if feed["children"].is_array() { - // Take this out of the MF2-JSON document to save memory - // - // This uses a clever match with enum destructuring - // to extract the underlying Vec without cloning it - let children: Vec<serde_json::Value> = match feed["children"].take() { - serde_json::Value::Array(children) => children, - // We've already checked it's an array - _ => unreachable!() - }; - - let mut posts_iter = children - .into_iter() - .map(|s: serde_json::Value| s.as_str().unwrap().to_string()); - // Note: we can't actually use `skip_while` here because we end up emitting `after`. - // This imperative snippet consumes after instead of emitting it, allowing the - // stream of posts to return only those items that truly come *after* that one. - // If I would implement an Iter combinator like this, I would call it `skip_until` - if let Some(after) = after { - for s in posts_iter.by_ref() { - if &s == after { - break; - } - } - }; - let posts = stream::iter(posts_iter) - .map(|url: String| async move { self.get_post(&url).await }) - .buffered(std::cmp::min(3, limit)) - // Hack to unwrap the Option and sieve out broken links - // Broken links return None, and Stream::filter_map skips Nones. - .try_filter_map(|post: Option<serde_json::Value>| async move { Ok(post) }) - .try_filter_map(|post| async move { Ok(filter_post(post, user)) }) - .and_then(|mut post| async move { - hydrate_author(&mut post, user, self).await; - Ok(post) - }) - .take(limit); - - match posts.try_collect::<Vec<serde_json::Value>>().await { - Ok(posts) => feed["children"] = serde_json::json!(posts), - Err(err) => { - return Err(StorageError::with_source( - ErrorKind::Other, - Cow::Owned(format!("Feed assembly error: {}", &err)), - Box::new(err), - )); + if let Some(mut feed) = self.get_post(url).await? { + if feed["children"].is_array() { + // Take this out of the MF2-JSON document to save memory + // + // This uses a clever match with enum destructuring + // to extract the underlying Vec without cloning it + let children: Vec<serde_json::Value> = match feed["children"].take() { + serde_json::Value::Array(children) => children, + // We've already checked it's an array + _ => unreachable!() + }; + tracing::debug!("Full children array: {:#}", serde_json::Value::Array(children.clone())); + let mut posts_iter = children + .into_iter() + .map(|s: serde_json::Value| s.as_str().unwrap().to_string()); + // Note: we can't actually use `skip_while` here because we end up emitting `after`. + // This imperative snippet consumes after instead of emitting it, allowing the + // stream of posts to return only those items that truly come *after* that one. + // If I would implement an Iter combinator like this, I would call it `skip_until` + if let Some(after) = after { + for s in posts_iter.by_ref() { + if &s == after { + break; } } + }; + let posts = stream::iter(posts_iter) + .map(|url: String| async move { self.get_post(&url).await }) + .buffered(std::cmp::min(3, limit)) + // Hack to unwrap the Option and sieve out broken links + // Broken links return None, and Stream::filter_map skips Nones. + .try_filter_map(|post: Option<serde_json::Value>| async move { Ok(post) }) + .and_then(|mut post| async move { + hydrate_author(&mut post, user, self).await; + Ok(post) + }) + .take(limit); + + match posts.try_collect::<Vec<serde_json::Value>>().await { + Ok(posts) => feed["children"] = serde_json::json!(posts), + Err(err) => { + return Err(StorageError::with_source( + ErrorKind::Other, + Cow::Owned(format!("Feed assembly error: {}", &err)), + Box::new(err), + )); + } } - hydrate_author(&mut feed, user, self).await; - Ok(Some(feed)) - } else { - Err(StorageError::from_static( - ErrorKind::PermissionDenied, - "specified user cannot access this post", - )) } + hydrate_author(&mut feed, user, self).await; + Ok(Some(feed)) } else { Ok(None) } diff --git a/kittybox-rs/src/database/mod.rs b/kittybox-rs/src/database/mod.rs index baae81d..3086623 100644 --- a/kittybox-rs/src/database/mod.rs +++ b/kittybox-rs/src/database/mod.rs @@ -42,21 +42,6 @@ pub mod settings { mod private { pub trait Sealed {} } - /*#[derive(serde::Deserialize, serde::Serialize, Default)] - pub struct Settings { - pub site_name: SiteName, - pub webring: Webring - } - impl From<Settings> for SiteName { - fn from(settings: Settings) -> Self { - settings.site_name - } - } - impl From<Settings> for Webring { - fn from(settings: Settings) -> Self { - settings.webring - } - }*/ /// A trait for various settings that should be contained here. /// @@ -218,65 +203,6 @@ impl StorageError { /// A special Result type for the Micropub backing storage. pub type Result<T> = std::result::Result<T, StorageError>; -/// Filter the post according to the value of `user`. -/// -/// Anonymous users cannot view private posts and protected locations; -/// Logged-in users can only view private posts targeted at them; -/// Logged-in users can't view private location data -pub fn filter_post( - mut post: serde_json::Value, - user: &'_ Option<String>, -) -> Option<serde_json::Value> { - if post["properties"]["deleted"][0].is_string() { - return Some(serde_json::json!({ - "type": post["type"], - "properties": { - "deleted": post["properties"]["deleted"] - } - })); - } - let empty_vec: Vec<serde_json::Value> = vec![]; - let author = post["properties"]["author"] - .as_array() - .unwrap_or(&empty_vec) - .iter() - .map(|i| i.as_str().unwrap().to_string()); - let visibility = post["properties"]["visibility"][0] - .as_str() - .unwrap_or("public"); - let mut audience = author.chain( - post["properties"]["audience"] - .as_array() - .unwrap_or(&empty_vec) - .iter() - .map(|i| i.as_str().unwrap().to_string()), - ); - if (visibility == "private" && !audience.any(|i| Some(i) == *user)) - || (visibility == "protected" && user.is_none()) - { - return None; - } - if post["properties"]["location"].is_array() { - let location_visibility = post["properties"]["location-visibility"][0] - .as_str() - .unwrap_or("private"); - let mut author = post["properties"]["author"] - .as_array() - .unwrap_or(&empty_vec) - .iter() - .map(|i| i.as_str().unwrap().to_string()); - if (location_visibility == "private" && !author.any(|i| Some(i) == *user)) - || (location_visibility == "protected" && user.is_none()) - { - post["properties"] - .as_object_mut() - .unwrap() - .remove("location"); - } - } - Some(post) -} - /// A storage backend for the Micropub server. /// /// Implementations should note that all methods listed on this trait MUST be fully atomic @@ -295,14 +221,24 @@ pub trait Storage: std::fmt::Debug + Clone + Send + Sync { async fn put_post(&self, post: &'_ serde_json::Value, user: &'_ str) -> Result<()>; /// Add post to feed. Some database implementations might have optimized ways to do this. + #[tracing::instrument(skip(self))] async fn add_to_feed(&self, feed: &'_ str, post: &'_ str) -> Result<()> { - self.update_post(feed, serde_json::from_str(r#"{"add": {"children": [post]}}"#).unwrap()).await + tracing::debug!("Inserting {} into {} using `update_post`", post, feed); + self.update_post(feed, serde_json::from_value( + serde_json::json!({"add": {"children": [post]}})).unwrap() + ).await } + /// Remove post from feed. Some database implementations might have optimized ways to do this. + #[tracing::instrument(skip(self))] async fn remove_from_feed(&self, feed: &'_ str, post: &'_ str) -> Result<()> { - self.update_post(feed, serde_json::from_str(r#"{"delete": {"children": [post]}}"#).unwrap()).await + tracing::debug!("Removing {} into {} using `update_post`", post, feed); + self.update_post(feed, serde_json::from_value( + serde_json::json!({"delete": {"children": [post]}})).unwrap() + ).await } - /// Modify a post using an update object as defined in the Micropub spec. + /// Modify a post using an update object as defined in the + /// Micropub spec. /// /// Note to implementors: the update operation MUST be atomic and /// SHOULD lock the database to prevent two clients overwriting @@ -311,26 +247,26 @@ pub trait Storage: std::fmt::Debug + Clone + Send + Sync { /// cannot be done. async fn update_post(&self, url: &str, update: MicropubUpdate) -> Result<()>; - /// Get a list of channels available for the user represented by the URL `user` to write to. + /// Get a list of channels available for the user represented by + /// the `user` domain to write to. async fn get_channels(&self, user: &'_ str) -> Result<Vec<MicropubChannel>>; /// Fetch a feed at `url` and return a an h-feed object containing /// `limit` posts after a post by url `after`, filtering the content /// in context of a user specified by `user` (or an anonymous user). /// - /// Specifically, private posts that don't include the user in the audience - /// will be elided from the feed, and the posts containing location and not - /// specifying post["properties"]["location-visibility"][0] == "public" - /// will have their location data (but not check-in data) stripped. + /// This method MUST hydrate the `author` property with an h-card + /// from the database by replacing URLs with corresponding h-cards. /// - /// This function is used as an optimization so the client, whatever it is, - /// doesn't have to fetch posts, then realize some of them are private, and - /// fetch more posts. + /// When encountering posts which the `user` is not authorized to + /// access, this method MUST elide such posts (as an optimization + /// for the frontend) and not return them, but still return up to + /// `limit` posts (to not reveal the hidden posts' presence). /// - /// Note for implementors: if you use streams to fetch posts in parallel - /// from the database, preferably make this method use a connection pool - /// to reduce overhead of creating a database connection per post for - /// parallel fetching. + /// Note for implementors: if you use streams to fetch posts in + /// parallel from the database, preferably make this method use a + /// connection pool to reduce overhead of creating a database + /// connection per post for parallel fetching. async fn read_feed_with_limit( &self, url: &'_ str, @@ -347,7 +283,7 @@ pub trait Storage: std::fmt::Debug + Clone + Send + Sync { user: Option<&'_ str> ) -> Result<Option<(serde_json::Value, Option<String>)>>; - /// Deletes a post from the database irreversibly. 'nuff said. Must be idempotent. + /// Deletes a post from the database irreversibly. Must be idempotent. async fn delete_post(&self, url: &'_ str) -> Result<()>; /// Gets a setting from the setting store and passes the result. @@ -559,13 +495,19 @@ mod tests { } async fn test_feed_pagination<Backend: Storage>(backend: Backend) { - let posts = std::iter::from_fn(|| Some(gen_random_post("fireburn.ru"))) - .take(20) - .collect::<Vec<serde_json::Value>>() - .into_iter() - .rev() + let posts = { + let mut posts = std::iter::from_fn( + || Some(gen_random_post("fireburn.ru")) + ) + .take(40) .collect::<Vec<serde_json::Value>>(); + // Reverse the array so it's in reverse-chronological order + posts.reverse(); + + posts + }; + let feed = json!({ "type": ["h-feed"], "properties": { @@ -573,9 +515,6 @@ mod tests { "author": ["https://fireburn.ru/"], "uid": ["https://fireburn.ru/feeds/main"] }, - "children": posts.iter() - .filter_map(|json| json["properties"]["uid"][0].as_str()) - .collect::<Vec<&str>>() }); let key = feed["properties"]["uid"][0].as_str().unwrap(); @@ -583,61 +522,103 @@ mod tests { .put_post(&feed, "fireburn.ru") .await .unwrap(); - println!("---"); - for (i, post) in posts.iter().enumerate() { + + for (i, post) in posts.iter().rev().enumerate() { backend .put_post(post, "fireburn.ru") .await .unwrap(); - println!("posts[{}] = {}", i, post["properties"]["uid"][0]); + backend.add_to_feed(key, post["properties"]["uid"][0].as_str().unwrap()).await.unwrap(); } - println!("---"); + let limit: usize = 10; - let result = backend - .read_feed_with_limit(key, &None, limit, &None) + + tracing::debug!("Starting feed reading..."); + let (result, cursor) = backend + .read_feed_with_cursor(key, None, limit, None) .await .unwrap() .unwrap(); - for (i, post) in result["children"].as_array().unwrap().iter().enumerate() { - println!("feed[0][{}] = {}", i, post["properties"]["uid"][0]); - } - println!("---"); - assert!(result["children"].as_array().unwrap().len() <= limit); - assert_eq!(result["children"].as_array().unwrap()[0..10], posts[0..10]); - let result2 = backend - .read_feed_with_limit( + assert_eq!(result["children"].as_array().unwrap().len(), limit); + assert_eq!( + result["children"] + .as_array() + .unwrap() + .iter() + .map(|post| post["properties"]["uid"][0].as_str().unwrap()) + .collect::<Vec<_>>() + [0..10], + posts + .iter() + .map(|post| post["properties"]["uid"][0].as_str().unwrap()) + .collect::<Vec<_>>() + [0..10] + ); + + tracing::debug!("Continuing with cursor: {:?}", cursor); + let (result2, cursor2) = backend + .read_feed_with_cursor( key, - &result["children"].as_array().unwrap().last().unwrap()["properties"]["uid"][0] - .as_str() - .map(|i| i.to_owned()), + cursor.as_deref(), limit, - &None, + None, ) .await .unwrap() .unwrap(); - for (i, post) in result2["children"].as_array().unwrap().iter().enumerate() { - println!("feed[1][{}] = {}", i, post["properties"]["uid"][0]); - } - println!("---"); + assert_eq!( result2["children"].as_array().unwrap()[0..10], posts[10..20] ); + tracing::debug!("Continuing with cursor: {:?}", cursor); + let (result3, cursor3) = backend + .read_feed_with_cursor( + key, + cursor2.as_deref(), + limit, + None, + ) + .await + .unwrap() + .unwrap(); + + assert_eq!( + result3["children"].as_array().unwrap()[0..10], + posts[20..30] + ); + + tracing::debug!("Continuing with cursor: {:?}", cursor); + let (result4, cursor4) = backend + .read_feed_with_cursor( + key, + cursor3.as_deref(), + limit, + None, + ) + .await + .unwrap() + .unwrap(); + + assert_eq!( + result4["children"].as_array().unwrap()[0..10], + posts[30..40] + ); + // Regression test for #4 - let nonsense_after = Some("1010101010".to_owned()); - let result3 = tokio::time::timeout(tokio::time::Duration::from_secs(10), async move { + // + // Results for a bogus cursor are undefined, so we aren't + // checking them. But the function at least shouldn't hang. + let nonsense_after = Some("1010101010"); + let result_bogus = tokio::time::timeout(tokio::time::Duration::from_secs(10), async move { backend - .read_feed_with_limit(key, &nonsense_after, limit, &None) + .read_feed_with_cursor(key, nonsense_after, limit, None) .await - .unwrap() - .unwrap() }) .await .expect("Operation should not hang: see https://gitlab.com/kittybox/kittybox/-/issues/4"); - assert!(result3["children"].as_array().unwrap().is_empty()); } /// Automatically generates a test suite for |