From f894d1746b94d60cd22260b933948f4169ece9ae Mon Sep 17 00:00:00 2001 From: Vika Date: Mon, 27 Sep 2021 17:10:54 +0300 Subject: Implemented support for channels --- src/database/file/mod.rs | 117 +++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 114 insertions(+), 3 deletions(-) (limited to 'src/database/file/mod.rs') diff --git a/src/database/file/mod.rs b/src/database/file/mod.rs index dff527f..82987b5 100644 --- a/src/database/file/mod.rs +++ b/src/database/file/mod.rs @@ -3,7 +3,10 @@ use async_std::io::{ErrorKind as IOErrorKind, BufReader}; use async_std::io::prelude::*; use async_std::task::spawn_blocking; use async_trait::async_trait; -use crate::database::{ErrorKind, Result, Storage, StorageError}; +use futures::stream; +use futures_util::StreamExt; +use futures_util::TryStreamExt; +use crate::database::{ErrorKind, Result, Storage, StorageError, filter_post}; use fd_lock::RwLock; use log::debug; use std::path::{Path, PathBuf}; @@ -274,6 +277,44 @@ impl Storage for FileStorage { } } + if post["type"].as_array().unwrap().iter().any(|s| s.as_str() == Some("h-feed")) { + println!("Adding to channel list..."); + // Add the h-feed to the channel list + let mut path = relative_path::RelativePathBuf::new(); + path.push(user); + path.push("channels"); + + let path = path.to_path(&self.root_dir); + let file = OpenOptions::new() + .read(true) + .write(true) + .truncate(false) + .create(true) + .open(&path).await?; + let mut lock = get_lockable_file(file).await; + let mut guard = lock.write()?; + + let mut content = String::new(); + guard.read_to_string(&mut content).await?; + let mut channels: Vec; + if content.len() > 0 { + channels = serde_json::from_str(&content)?; + } else { + channels = Vec::default(); + } + + channels.push(super::MicropubChannel { + uid: key.to_string(), + name: post["properties"]["name"][0] + .as_str() + .map(|s| s.to_string()) + .unwrap_or_else(|| String::default()) + }); + guard.seek(std::io::SeekFrom::Start(0)).await?; + guard.set_len(0).await?; + guard.write_all(serde_json::to_string(&channels)?.as_bytes()).await?; + } + Ok(()) } @@ -308,7 +349,33 @@ impl Storage for FileStorage { &self, user: &crate::indieauth::User, ) -> Result> { - todo!() + let mut path = relative_path::RelativePathBuf::new(); + path.push(&user.me.to_string()); + path.push("channels"); + + let path = path.to_path(&self.root_dir); + match File::open(&path).await { + Ok(f) => { + let lock = get_lockable_file(f).await; + let guard = lock.read()?; + + let mut content = String::new(); + (&mut &*guard).read_to_string(&mut content).await?; + // This should not happen, but if it does, let's handle it gracefully instead of failing. + if content.len() == 0 { + return Ok(vec![]) + } + let channels: Vec = serde_json::from_str(&content)?; + Ok(channels) + }, + Err(e) => { + if e.kind() == IOErrorKind::NotFound { + Ok(vec![]) + } else { + Err(e.into()) + } + } + } } async fn read_feed_with_limit<'a>( @@ -318,7 +385,51 @@ impl Storage for FileStorage { limit: usize, user: &'a Option, ) -> Result> { - todo!() + match self.get_post(url).await? { + Some(feed) => match filter_post(feed, user) { + Some(mut feed) => { + if feed["children"].is_array() { + let children = feed["children"].as_array().unwrap().clone(); + let mut posts_iter = children.into_iter() + .map(|s: serde_json::Value| s.as_str().unwrap().to_string()); + if after.is_some() { + loop { + let i = posts_iter.next(); + if &i == after { + break; + } + } + } + let posts = stream::iter(posts_iter) + .map(|url| async move { + self.get_post(&url).await + }) + .buffered(std::cmp::min(3, limit)) + // Hack to unwrap the Option and sieve out broken links + // Broken links return None, and Stream::filter_map skips Nones. + .try_filter_map(|post: Option| async move { Ok(post) }) + .try_filter_map(|post| async move { Ok(filter_post(post, user)) }) + .take(limit); + + match posts.try_collect::>().await { + Ok(posts) => feed["children"] = serde_json::json!(posts), + Err(err) => { + return Err(StorageError::with_source( + ErrorKind::Other, "Feed assembly error", + Box::new(err) + )); + } + } + } + Ok(Some(feed)) + }, + None => Err(StorageError::new( + ErrorKind::PermissionDenied, + "specified user cannot access this post", + )) + }, + None => Ok(None) + } } async fn delete_post<'a>(&self, url: &'a str) -> Result<()> { -- cgit 1.4.1