diff options
Diffstat (limited to 'kittybox-rs/src/database/file')
-rw-r--r-- | kittybox-rs/src/database/file/mod.rs | 188 |
1 files changed, 108 insertions, 80 deletions
diff --git a/kittybox-rs/src/database/file/mod.rs b/kittybox-rs/src/database/file/mod.rs index 0f63c9d..842a834 100644 --- a/kittybox-rs/src/database/file/mod.rs +++ b/kittybox-rs/src/database/file/mod.rs @@ -1,5 +1,5 @@ //#![warn(clippy::unwrap_used)] -use crate::database::{filter_post, ErrorKind, Result, settings, Storage, StorageError}; +use crate::database::{ErrorKind, Result, settings, Storage, StorageError}; use crate::micropub::{MicropubUpdate, MicropubPropertyDeletion}; use async_trait::async_trait; use futures::{stream, StreamExt, TryStreamExt}; @@ -173,20 +173,20 @@ fn modify_post(post: &serde_json::Value, update: MicropubUpdate) -> Result<serde }); } for (k, v) in add_keys { + tracing::debug!("Adding k/v to post: {} => {:?}", k, v); let props = if k == "children" { &mut post } else { &mut post["properties"] }; - let k = &k; - if let Some(prop) = props[k].as_array_mut() { + if let Some(prop) = props[&k].as_array_mut() { if k == "children" { v.into_iter().rev().for_each(|v| prop.insert(0, v)); } else { prop.extend(v.into_iter()); } } else { - post["properties"][k] = serde_json::Value::Array(v) + props[&k] = serde_json::Value::Array(v) } } Ok(post) @@ -227,10 +227,7 @@ async fn hydrate_author<S: Storage>( if let Some(i) = i.as_str() { match storage.get_post(i).await { Ok(post) => match post { - Some(post) => match filter_post(post, user) { - Some(author) => author, - None => json!(i), - }, + Some(post) => post, None => json!(i), }, Err(e) => { @@ -383,27 +380,41 @@ impl Storage for FileStorage { .map(|s| s.to_string()) .unwrap_or_else(String::default); let key = key.to_string(); - + tracing::debug!("Opening temporary file to modify chnanels..."); let mut tempfile = OpenOptions::new() .write(true) .create_new(true) .open(&tempfilename) .await?; - let mut file = OpenOptions::new() - .read(true) - .write(true) - .truncate(false) - .create(true) - .open(&path) - .await?; - - let mut content = String::new(); - file.read_to_string(&mut content).await?; - drop(file); - let mut channels: Vec<super::MicropubChannel> = if !content.is_empty() { - serde_json::from_str(&content)? - } else { - Vec::default() + tracing::debug!("Opening real channel file..."); + let mut channels: Vec<super::MicropubChannel> = { + match OpenOptions::new() + .read(true) + .write(false) + .truncate(false) + .create(false) + .open(&path) + .await + { + Err(err) if err.kind() == std::io::ErrorKind::NotFound => { + Vec::default() + } + Err(err) => { + // Propagate the error upwards + return Err(err.into()); + } + Ok(mut file) => { + let mut content = String::new(); + file.read_to_string(&mut content).await?; + drop(file); + + if !content.is_empty() { + serde_json::from_str(&content)? + } else { + Vec::default() + } + } + } }; channels.push(super::MicropubChannel { @@ -489,7 +500,32 @@ impl Storage for FileStorage { limit: usize, user: Option<&'_ str> ) -> Result<Option<(serde_json::Value, Option<String>)>> { - todo!() + Ok(self.read_feed_with_limit( + url, + &cursor.map(|v| v.to_owned()), + limit, + &user.map(|v| v.to_owned()) + ).await? + .map(|feed| { + tracing::debug!("Feed: {:#}", serde_json::Value::Array( + feed["children"] + .as_array() + .map(|v| v.as_slice()) + .unwrap_or_default() + .iter() + .map(|mf2| mf2["properties"]["uid"][0].clone()) + .collect::<Vec<_>>() + )); + let cursor: Option<String> = feed["children"] + .as_array() + .map(|v| v.as_slice()) + .unwrap_or_default() + .last() + .map(|v| v["properties"]["uid"][0].as_str().unwrap().to_owned()); + tracing::debug!("Extracted the cursor: {:?}", cursor); + (feed, cursor) + }) + ) } #[tracing::instrument(skip(self))] @@ -500,65 +536,57 @@ impl Storage for FileStorage { limit: usize, user: &'_ Option<String>, ) -> Result<Option<serde_json::Value>> { - if let Some(feed) = self.get_post(url).await? { - if let Some(mut feed) = filter_post(feed, user) { - if feed["children"].is_array() { - // Take this out of the MF2-JSON document to save memory - // - // This uses a clever match with enum destructuring - // to extract the underlying Vec without cloning it - let children: Vec<serde_json::Value> = match feed["children"].take() { - serde_json::Value::Array(children) => children, - // We've already checked it's an array - _ => unreachable!() - }; - - let mut posts_iter = children - .into_iter() - .map(|s: serde_json::Value| s.as_str().unwrap().to_string()); - // Note: we can't actually use `skip_while` here because we end up emitting `after`. - // This imperative snippet consumes after instead of emitting it, allowing the - // stream of posts to return only those items that truly come *after* that one. - // If I would implement an Iter combinator like this, I would call it `skip_until` - if let Some(after) = after { - for s in posts_iter.by_ref() { - if &s == after { - break; - } - } - }; - let posts = stream::iter(posts_iter) - .map(|url: String| async move { self.get_post(&url).await }) - .buffered(std::cmp::min(3, limit)) - // Hack to unwrap the Option and sieve out broken links - // Broken links return None, and Stream::filter_map skips Nones. - .try_filter_map(|post: Option<serde_json::Value>| async move { Ok(post) }) - .try_filter_map(|post| async move { Ok(filter_post(post, user)) }) - .and_then(|mut post| async move { - hydrate_author(&mut post, user, self).await; - Ok(post) - }) - .take(limit); - - match posts.try_collect::<Vec<serde_json::Value>>().await { - Ok(posts) => feed["children"] = serde_json::json!(posts), - Err(err) => { - return Err(StorageError::with_source( - ErrorKind::Other, - Cow::Owned(format!("Feed assembly error: {}", &err)), - Box::new(err), - )); + if let Some(mut feed) = self.get_post(url).await? { + if feed["children"].is_array() { + // Take this out of the MF2-JSON document to save memory + // + // This uses a clever match with enum destructuring + // to extract the underlying Vec without cloning it + let children: Vec<serde_json::Value> = match feed["children"].take() { + serde_json::Value::Array(children) => children, + // We've already checked it's an array + _ => unreachable!() + }; + tracing::debug!("Full children array: {:#}", serde_json::Value::Array(children.clone())); + let mut posts_iter = children + .into_iter() + .map(|s: serde_json::Value| s.as_str().unwrap().to_string()); + // Note: we can't actually use `skip_while` here because we end up emitting `after`. + // This imperative snippet consumes after instead of emitting it, allowing the + // stream of posts to return only those items that truly come *after* that one. + // If I would implement an Iter combinator like this, I would call it `skip_until` + if let Some(after) = after { + for s in posts_iter.by_ref() { + if &s == after { + break; } } + }; + let posts = stream::iter(posts_iter) + .map(|url: String| async move { self.get_post(&url).await }) + .buffered(std::cmp::min(3, limit)) + // Hack to unwrap the Option and sieve out broken links + // Broken links return None, and Stream::filter_map skips Nones. + .try_filter_map(|post: Option<serde_json::Value>| async move { Ok(post) }) + .and_then(|mut post| async move { + hydrate_author(&mut post, user, self).await; + Ok(post) + }) + .take(limit); + + match posts.try_collect::<Vec<serde_json::Value>>().await { + Ok(posts) => feed["children"] = serde_json::json!(posts), + Err(err) => { + return Err(StorageError::with_source( + ErrorKind::Other, + Cow::Owned(format!("Feed assembly error: {}", &err)), + Box::new(err), + )); + } } - hydrate_author(&mut feed, user, self).await; - Ok(Some(feed)) - } else { - Err(StorageError::from_static( - ErrorKind::PermissionDenied, - "specified user cannot access this post", - )) } + hydrate_author(&mut feed, user, self).await; + Ok(Some(feed)) } else { Ok(None) } |