From f894d1746b94d60cd22260b933948f4169ece9ae Mon Sep 17 00:00:00 2001 From: Vika Date: Mon, 27 Sep 2021 17:10:54 +0300 Subject: Implemented support for channels --- src/database/file/mod.rs | 117 ++++++++++++++++++++++++++++++++++++++++++++-- src/database/mod.rs | 49 +++++++++++++++++++ src/database/redis/mod.rs | 71 ++++------------------------ 3 files changed, 172 insertions(+), 65 deletions(-) diff --git a/src/database/file/mod.rs b/src/database/file/mod.rs index dff527f..82987b5 100644 --- a/src/database/file/mod.rs +++ b/src/database/file/mod.rs @@ -3,7 +3,10 @@ use async_std::io::{ErrorKind as IOErrorKind, BufReader}; use async_std::io::prelude::*; use async_std::task::spawn_blocking; use async_trait::async_trait; -use crate::database::{ErrorKind, Result, Storage, StorageError}; +use futures::stream; +use futures_util::StreamExt; +use futures_util::TryStreamExt; +use crate::database::{ErrorKind, Result, Storage, StorageError, filter_post}; use fd_lock::RwLock; use log::debug; use std::path::{Path, PathBuf}; @@ -274,6 +277,44 @@ impl Storage for FileStorage { } } + if post["type"].as_array().unwrap().iter().any(|s| s.as_str() == Some("h-feed")) { + println!("Adding to channel list..."); + // Add the h-feed to the channel list + let mut path = relative_path::RelativePathBuf::new(); + path.push(user); + path.push("channels"); + + let path = path.to_path(&self.root_dir); + let file = OpenOptions::new() + .read(true) + .write(true) + .truncate(false) + .create(true) + .open(&path).await?; + let mut lock = get_lockable_file(file).await; + let mut guard = lock.write()?; + + let mut content = String::new(); + guard.read_to_string(&mut content).await?; + let mut channels: Vec; + if content.len() > 0 { + channels = serde_json::from_str(&content)?; + } else { + channels = Vec::default(); + } + + channels.push(super::MicropubChannel { + uid: key.to_string(), + name: post["properties"]["name"][0] + .as_str() + .map(|s| s.to_string()) + .unwrap_or_else(|| String::default()) + }); + guard.seek(std::io::SeekFrom::Start(0)).await?; + guard.set_len(0).await?; + guard.write_all(serde_json::to_string(&channels)?.as_bytes()).await?; + } + Ok(()) } @@ -308,7 +349,33 @@ impl Storage for FileStorage { &self, user: &crate::indieauth::User, ) -> Result> { - todo!() + let mut path = relative_path::RelativePathBuf::new(); + path.push(&user.me.to_string()); + path.push("channels"); + + let path = path.to_path(&self.root_dir); + match File::open(&path).await { + Ok(f) => { + let lock = get_lockable_file(f).await; + let guard = lock.read()?; + + let mut content = String::new(); + (&mut &*guard).read_to_string(&mut content).await?; + // This should not happen, but if it does, let's handle it gracefully instead of failing. + if content.len() == 0 { + return Ok(vec![]) + } + let channels: Vec = serde_json::from_str(&content)?; + Ok(channels) + }, + Err(e) => { + if e.kind() == IOErrorKind::NotFound { + Ok(vec![]) + } else { + Err(e.into()) + } + } + } } async fn read_feed_with_limit<'a>( @@ -318,7 +385,51 @@ impl Storage for FileStorage { limit: usize, user: &'a Option, ) -> Result> { - todo!() + match self.get_post(url).await? { + Some(feed) => match filter_post(feed, user) { + Some(mut feed) => { + if feed["children"].is_array() { + let children = feed["children"].as_array().unwrap().clone(); + let mut posts_iter = children.into_iter() + .map(|s: serde_json::Value| s.as_str().unwrap().to_string()); + if after.is_some() { + loop { + let i = posts_iter.next(); + if &i == after { + break; + } + } + } + let posts = stream::iter(posts_iter) + .map(|url| async move { + self.get_post(&url).await + }) + .buffered(std::cmp::min(3, limit)) + // Hack to unwrap the Option and sieve out broken links + // Broken links return None, and Stream::filter_map skips Nones. + .try_filter_map(|post: Option| async move { Ok(post) }) + .try_filter_map(|post| async move { Ok(filter_post(post, user)) }) + .take(limit); + + match posts.try_collect::>().await { + Ok(posts) => feed["children"] = serde_json::json!(posts), + Err(err) => { + return Err(StorageError::with_source( + ErrorKind::Other, "Feed assembly error", + Box::new(err) + )); + } + } + } + Ok(Some(feed)) + }, + None => Err(StorageError::new( + ErrorKind::PermissionDenied, + "specified user cannot access this post", + )) + }, + None => Ok(None) + } } async fn delete_post<'a>(&self, url: &'a str) -> Result<()> { diff --git a/src/database/mod.rs b/src/database/mod.rs index a1f5861..0b97c24 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -122,6 +122,55 @@ impl StorageError { /// A special Result type for the Micropub backing storage. pub type Result = std::result::Result; +pub fn filter_post(mut post: serde_json::Value, user: &'_ Option) -> Option { + if post["properties"]["deleted"][0].is_string() { + return Some(serde_json::json!({ + "type": post["type"], + "properties": { + "deleted": post["properties"]["deleted"] + } + })); + } + let empty_vec: Vec = vec![]; + let author = post["properties"]["author"] + .as_array() + .unwrap_or(&empty_vec) + .iter() + .map(|i| i.as_str().unwrap().to_string()); + let visibility = post["properties"]["visibility"][0] + .as_str() + .unwrap_or("public"); + let mut audience = author.chain( + post["properties"]["audience"] + .as_array() + .unwrap_or(&empty_vec) + .iter() + .map(|i| i.as_str().unwrap().to_string()), + ); + if (visibility == "private" && !audience.any(|i| Some(i) == *user)) + || (visibility == "protected" && user.is_none()) + { + return None; + } + if post["properties"]["location"].is_array() { + let location_visibility = post["properties"]["location-visibility"][0] + .as_str() + .unwrap_or("private"); + let mut author = post["properties"]["author"] + .as_array() + .unwrap_or(&empty_vec) + .iter() + .map(|i| i.as_str().unwrap().to_string()); + if location_visibility == "private" && !author.any(|i| Some(i) == *user) { + post["properties"] + .as_object_mut() + .unwrap() + .remove("location"); + } + } + Some(post) +} + /// A storage backend for the Micropub server. /// /// Implementations should note that all methods listed on this trait MUST be fully atomic diff --git a/src/database/redis/mod.rs b/src/database/redis/mod.rs index 5b3506a..f1724b7 100644 --- a/src/database/redis/mod.rs +++ b/src/database/redis/mod.rs @@ -12,7 +12,7 @@ use mobc_redis::RedisConnectionManager; use serde_json::json; use std::time::Duration; -use crate::database::{ErrorKind, MicropubChannel, Result, Storage, StorageError}; +use crate::database::{ErrorKind, MicropubChannel, Result, Storage, StorageError, filter_post}; use crate::indieauth::User; struct RedisScripts { @@ -56,55 +56,6 @@ pub struct RedisStorage { redis: mobc::Pool, } -fn filter_post(mut post: serde_json::Value, user: &'_ Option) -> Option { - if post["properties"]["deleted"][0].is_string() { - return Some(json!({ - "type": post["type"], - "properties": { - "deleted": post["properties"]["deleted"] - } - })); - } - let empty_vec: Vec = vec![]; - let author = post["properties"]["author"] - .as_array() - .unwrap_or(&empty_vec) - .iter() - .map(|i| i.as_str().unwrap().to_string()); - let visibility = post["properties"]["visibility"][0] - .as_str() - .unwrap_or("public"); - let mut audience = author.chain( - post["properties"]["audience"] - .as_array() - .unwrap_or(&empty_vec) - .iter() - .map(|i| i.as_str().unwrap().to_string()), - ); - if (visibility == "private" && !audience.any(|i| Some(i) == *user)) - || (visibility == "protected" && user.is_none()) - { - return None; - } - if post["properties"]["location"].is_array() { - let location_visibility = post["properties"]["location-visibility"][0] - .as_str() - .unwrap_or("private"); - let mut author = post["properties"]["author"] - .as_array() - .unwrap_or(&empty_vec) - .iter() - .map(|i| i.as_str().unwrap().to_string()); - if location_visibility == "private" && !author.any(|i| Some(i) == *user) { - post["properties"] - .as_object_mut() - .unwrap() - .remove("location"); - } - } - Some(post) -} - #[async_trait] impl Storage for RedisStorage { async fn get_setting<'a>(&self, setting: &'a str, user: &'a str) -> Result { @@ -265,18 +216,14 @@ impl Storage for RedisStorage { } if feed["children"].is_array() { let children = feed["children"].as_array().unwrap(); - let posts_iter: Box + Send>; - // TODO: refactor this to apply the skip on the &mut iterator - if let Some(after) = after { - posts_iter = Box::new( - children - .iter() - .map(|i| i.as_str().unwrap().to_string()) - .skip_while(move |i| i != after) - .skip(1), - ); - } else { - posts_iter = Box::new(children.iter().map(|i| i.as_str().unwrap().to_string())); + let mut posts_iter = children.iter().map(|i| i.as_str().unwrap().to_string()); + if after.is_some() { + loop { + let i = posts_iter.next(); + if &i == after { + break; + } + } } let posts = stream::iter(posts_iter) .map(|url| async move { -- cgit 1.4.1