about summary refs log tree commit diff
path: root/src/database
diff options
context:
space:
mode:
authorVika <vika@fireburn.ru>2021-09-27 17:10:54 +0300
committerVika <vika@fireburn.ru>2021-09-27 17:10:54 +0300
commitf894d1746b94d60cd22260b933948f4169ece9ae (patch)
treeba6343dade34182d81f1483a56685d17f4fe3ba4 /src/database
parent5d635d9e9ae466ca52d1923fafdc74487030e975 (diff)
downloadkittybox-f894d1746b94d60cd22260b933948f4169ece9ae.tar.zst
Implemented support for channels
Diffstat (limited to 'src/database')
-rw-r--r--src/database/file/mod.rs117
-rw-r--r--src/database/mod.rs49
-rw-r--r--src/database/redis/mod.rs71
3 files changed, 172 insertions, 65 deletions
diff --git a/src/database/file/mod.rs b/src/database/file/mod.rs
index dff527f..82987b5 100644
--- a/src/database/file/mod.rs
+++ b/src/database/file/mod.rs
@@ -3,7 +3,10 @@ use async_std::io::{ErrorKind as IOErrorKind, BufReader};
 use async_std::io::prelude::*;
 use async_std::task::spawn_blocking;
 use async_trait::async_trait;
-use crate::database::{ErrorKind, Result, Storage, StorageError};
+use futures::stream;
+use futures_util::StreamExt;
+use futures_util::TryStreamExt;
+use crate::database::{ErrorKind, Result, Storage, StorageError, filter_post};
 use fd_lock::RwLock;
 use log::debug;
 use std::path::{Path, PathBuf};
@@ -274,6 +277,44 @@ impl Storage for FileStorage {
             }
         }
 
+        if post["type"].as_array().unwrap().iter().any(|s| s.as_str() == Some("h-feed")) {
+            println!("Adding to channel list...");
+            // Add the h-feed to the channel list
+            let mut path = relative_path::RelativePathBuf::new();
+            path.push(user);
+            path.push("channels");
+
+            let path = path.to_path(&self.root_dir);
+            let file = OpenOptions::new()
+                .read(true)
+                .write(true)
+                .truncate(false)
+                .create(true)
+                .open(&path).await?;
+            let mut lock = get_lockable_file(file).await;
+            let mut guard = lock.write()?;
+
+            let mut content = String::new();
+            guard.read_to_string(&mut content).await?;
+            let mut channels: Vec<super::MicropubChannel>;
+            if content.len() > 0 {
+                channels = serde_json::from_str(&content)?;
+            } else {
+                channels = Vec::default();
+            }
+
+            channels.push(super::MicropubChannel {
+                uid: key.to_string(),
+                name: post["properties"]["name"][0]
+                    .as_str()
+                    .map(|s| s.to_string())
+                    .unwrap_or_else(|| String::default())
+            });
+            guard.seek(std::io::SeekFrom::Start(0)).await?;
+            guard.set_len(0).await?;
+            guard.write_all(serde_json::to_string(&channels)?.as_bytes()).await?;
+        }
+
         Ok(())
     }
 
@@ -308,7 +349,33 @@ impl Storage for FileStorage {
         &self,
         user: &crate::indieauth::User,
     ) -> Result<Vec<super::MicropubChannel>> {
-        todo!()
+        let mut path = relative_path::RelativePathBuf::new();
+        path.push(&user.me.to_string());
+        path.push("channels");
+
+        let path = path.to_path(&self.root_dir);
+        match File::open(&path).await {
+            Ok(f) => {
+                let lock = get_lockable_file(f).await;
+                let guard = lock.read()?;
+
+                let mut content = String::new();
+                (&mut &*guard).read_to_string(&mut content).await?;
+                // This should not happen, but if it does, let's handle it gracefully instead of failing.
+                if content.len() == 0 {
+                    return Ok(vec![])
+                }
+                let channels: Vec<super::MicropubChannel> = serde_json::from_str(&content)?;
+                Ok(channels)
+            },
+            Err(e) => {
+                if e.kind() == IOErrorKind::NotFound {
+                    Ok(vec![])
+                } else {
+                    Err(e.into())
+                }
+            }
+        }
     }
 
     async fn read_feed_with_limit<'a>(
@@ -318,7 +385,51 @@ impl Storage for FileStorage {
         limit: usize,
         user: &'a Option<String>,
     ) -> Result<Option<serde_json::Value>> {
-        todo!()
+        match self.get_post(url).await? {
+            Some(feed) => match filter_post(feed, user) {
+                Some(mut feed) => {
+                    if feed["children"].is_array() {
+                        let children = feed["children"].as_array().unwrap().clone();
+                        let mut posts_iter = children.into_iter()
+                            .map(|s: serde_json::Value| s.as_str().unwrap().to_string());
+                        if after.is_some() {
+                            loop {
+                                let i = posts_iter.next();
+                                if &i == after {
+                                    break;
+                                }
+                            }
+                        }
+                        let posts = stream::iter(posts_iter)
+                            .map(|url| async move {
+                                self.get_post(&url).await
+                            })
+                            .buffered(std::cmp::min(3, limit))
+                            // Hack to unwrap the Option and sieve out broken links
+                            // Broken links return None, and Stream::filter_map skips Nones.
+                            .try_filter_map(|post: Option<serde_json::Value>| async move { Ok(post) })
+                            .try_filter_map(|post| async move { Ok(filter_post(post, user)) })
+                            .take(limit);
+
+                        match posts.try_collect::<Vec<serde_json::Value>>().await {
+                            Ok(posts) => feed["children"] = serde_json::json!(posts),
+                            Err(err) => {
+                                return Err(StorageError::with_source(
+                                    ErrorKind::Other, "Feed assembly error",
+                                    Box::new(err)
+                                ));
+                            }
+                        }
+                    }
+                    Ok(Some(feed))
+                },
+                None => Err(StorageError::new(
+                    ErrorKind::PermissionDenied,
+                    "specified user cannot access this post",
+                ))
+            },
+            None => Ok(None)
+        }
     }
 
     async fn delete_post<'a>(&self, url: &'a str) -> Result<()> {
diff --git a/src/database/mod.rs b/src/database/mod.rs
index a1f5861..0b97c24 100644
--- a/src/database/mod.rs
+++ b/src/database/mod.rs
@@ -122,6 +122,55 @@ impl StorageError {
 /// A special Result type for the Micropub backing storage.
 pub type Result<T> = std::result::Result<T, StorageError>;
 
+pub fn filter_post(mut post: serde_json::Value, user: &'_ Option<String>) -> Option<serde_json::Value> {
+    if post["properties"]["deleted"][0].is_string() {
+        return Some(serde_json::json!({
+            "type": post["type"],
+            "properties": {
+                "deleted": post["properties"]["deleted"]
+            }
+        }));
+    }
+    let empty_vec: Vec<serde_json::Value> = vec![];
+    let author = post["properties"]["author"]
+        .as_array()
+        .unwrap_or(&empty_vec)
+        .iter()
+        .map(|i| i.as_str().unwrap().to_string());
+    let visibility = post["properties"]["visibility"][0]
+        .as_str()
+        .unwrap_or("public");
+    let mut audience = author.chain(
+        post["properties"]["audience"]
+            .as_array()
+            .unwrap_or(&empty_vec)
+            .iter()
+            .map(|i| i.as_str().unwrap().to_string()),
+    );
+    if (visibility == "private" && !audience.any(|i| Some(i) == *user))
+        || (visibility == "protected" && user.is_none())
+    {
+        return None;
+    }
+    if post["properties"]["location"].is_array() {
+        let location_visibility = post["properties"]["location-visibility"][0]
+            .as_str()
+            .unwrap_or("private");
+        let mut author = post["properties"]["author"]
+            .as_array()
+            .unwrap_or(&empty_vec)
+            .iter()
+            .map(|i| i.as_str().unwrap().to_string());
+        if location_visibility == "private" && !author.any(|i| Some(i) == *user) {
+            post["properties"]
+                .as_object_mut()
+                .unwrap()
+                .remove("location");
+        }
+    }
+    Some(post)
+}
+
 /// A storage backend for the Micropub server.
 ///
 /// Implementations should note that all methods listed on this trait MUST be fully atomic
diff --git a/src/database/redis/mod.rs b/src/database/redis/mod.rs
index 5b3506a..f1724b7 100644
--- a/src/database/redis/mod.rs
+++ b/src/database/redis/mod.rs
@@ -12,7 +12,7 @@ use mobc_redis::RedisConnectionManager;
 use serde_json::json;
 use std::time::Duration;
 
-use crate::database::{ErrorKind, MicropubChannel, Result, Storage, StorageError};
+use crate::database::{ErrorKind, MicropubChannel, Result, Storage, StorageError, filter_post};
 use crate::indieauth::User;
 
 struct RedisScripts {
@@ -56,55 +56,6 @@ pub struct RedisStorage {
     redis: mobc::Pool<RedisConnectionManager>,
 }
 
-fn filter_post(mut post: serde_json::Value, user: &'_ Option<String>) -> Option<serde_json::Value> {
-    if post["properties"]["deleted"][0].is_string() {
-        return Some(json!({
-            "type": post["type"],
-            "properties": {
-                "deleted": post["properties"]["deleted"]
-            }
-        }));
-    }
-    let empty_vec: Vec<serde_json::Value> = vec![];
-    let author = post["properties"]["author"]
-        .as_array()
-        .unwrap_or(&empty_vec)
-        .iter()
-        .map(|i| i.as_str().unwrap().to_string());
-    let visibility = post["properties"]["visibility"][0]
-        .as_str()
-        .unwrap_or("public");
-    let mut audience = author.chain(
-        post["properties"]["audience"]
-            .as_array()
-            .unwrap_or(&empty_vec)
-            .iter()
-            .map(|i| i.as_str().unwrap().to_string()),
-    );
-    if (visibility == "private" && !audience.any(|i| Some(i) == *user))
-        || (visibility == "protected" && user.is_none())
-    {
-        return None;
-    }
-    if post["properties"]["location"].is_array() {
-        let location_visibility = post["properties"]["location-visibility"][0]
-            .as_str()
-            .unwrap_or("private");
-        let mut author = post["properties"]["author"]
-            .as_array()
-            .unwrap_or(&empty_vec)
-            .iter()
-            .map(|i| i.as_str().unwrap().to_string());
-        if location_visibility == "private" && !author.any(|i| Some(i) == *user) {
-            post["properties"]
-                .as_object_mut()
-                .unwrap()
-                .remove("location");
-        }
-    }
-    Some(post)
-}
-
 #[async_trait]
 impl Storage for RedisStorage {
     async fn get_setting<'a>(&self, setting: &'a str, user: &'a str) -> Result<String> {
@@ -265,18 +216,14 @@ impl Storage for RedisStorage {
         }
         if feed["children"].is_array() {
             let children = feed["children"].as_array().unwrap();
-            let posts_iter: Box<dyn std::iter::Iterator<Item = String> + Send>;
-            // TODO: refactor this to apply the skip on the &mut iterator
-            if let Some(after) = after {
-                posts_iter = Box::new(
-                    children
-                        .iter()
-                        .map(|i| i.as_str().unwrap().to_string())
-                        .skip_while(move |i| i != after)
-                        .skip(1),
-                );
-            } else {
-                posts_iter = Box::new(children.iter().map(|i| i.as_str().unwrap().to_string()));
+            let mut posts_iter = children.iter().map(|i| i.as_str().unwrap().to_string());
+            if after.is_some() {
+                loop {
+                    let i = posts_iter.next();
+                    if &i == after {
+                        break;
+                    }
+                }
             }
             let posts = stream::iter(posts_iter)
                 .map(|url| async move {