about summary refs log tree commit diff
path: root/src/database
diff options
context:
space:
mode:
authorVika <vika@fireburn.ru>2021-08-04 13:28:24 +0300
committerVika <vika@fireburn.ru>2021-08-04 13:28:24 +0300
commitbfef058f199d195bb2c16d791c3f1bfeccb5614e (patch)
tree3b8a6ffac36b81be4df53943659a56c2a883713a /src/database
parent1589475ebefa72c2981d1b50611531545c1dd32a (diff)
downloadkittybox-bfef058f199d195bb2c16d791c3f1bfeccb5614e.tar.zst
Fixed a VERY WRONG way to handle stream errors
for future reference: stream operations returning Result satisfy
conditions for the futures::stream::TryStreamExt trait, allowing you to
use `TryStreamExt::try_collect::<T>()` and receive a Result<T>.
Diffstat (limited to 'src/database')
-rw-r--r--src/database/mod.rs10
-rw-r--r--src/database/redis/mod.rs46
2 files changed, 25 insertions, 31 deletions
diff --git a/src/database/mod.rs b/src/database/mod.rs
index 27c0025..7b144f8 100644
--- a/src/database/mod.rs
+++ b/src/database/mod.rs
@@ -90,12 +90,20 @@ impl serde::Serialize for StorageError {
 impl StorageError {
     /// Create a new StorageError of an ErrorKind with a message.
     fn new(kind: ErrorKind, msg: &str) -> Self {
-        StorageError {
+        Self {
             msg: msg.to_string(),
             source: None,
             kind,
         }
     }
+    /// Create a StorageError using another arbitrary Error as a source.
+    fn with_source(kind: ErrorKind, msg: &str, source: Box<dyn std::error::Error>) -> Self {
+        Self {
+            msg: msg.to_string(),
+            source: Some(source),
+            kind
+        }
+    }
     /// Get the kind of an error.
     pub fn kind(&self) -> ErrorKind {
         self.kind
diff --git a/src/database/redis/mod.rs b/src/database/redis/mod.rs
index c331e47..205af76 100644
--- a/src/database/redis/mod.rs
+++ b/src/database/redis/mod.rs
@@ -2,6 +2,7 @@ use async_trait::async_trait;
 use futures::stream;
 use futures_util::FutureExt;
 use futures_util::StreamExt;
+use futures_util::TryStreamExt;
 use lazy_static::lazy_static;
 use log::error;
 use mobc::Pool;
@@ -283,31 +284,17 @@ impl Storage for RedisStorage {
                                 Ok(post) => match post {
                                     Some(post) => {
                                         match serde_json::from_str::<serde_json::Value>(&post) {
-                                            Ok(post) => Some(post),
-                                            Err(err) => {
-                                                let err = StorageError::from(err);
-                                                error!("{}", err);
-                                                panic!("{}", err)
-                                            }
+                                            Ok(post) => Ok(Some(post)),
+                                            Err(err) => Err(StorageError::from(err))
                                         }
                                     }
                                     // Happens because of a broken link (result of an improper deletion?)
-                                    None => None,
+                                    None => Ok(None),
                                 },
-                                Err(err) => {
-                                    let err = StorageError::from(err);
-                                    error!("{}", err);
-                                    panic!("{}", err)
-                                }
+                                Err(err) => Err(StorageError::from(err))
                             }
                         }
-                        // TODO: Instead of causing a panic, investigate how can you fail the whole stream
-                        // Somehow fuse it maybe?
-                        Err(err) => {
-                            let err = StorageError::from(err);
-                            error!("{}", err);
-                            panic!("{}", err)
-                        }
+                        Err(err) => Err(StorageError::from(err))
                     }
                 })
                 // TODO: determine the optimal value for this buffer
@@ -318,20 +305,19 @@ impl Storage for RedisStorage {
                 .buffered(std::cmp::min(3, limit))
                 // Hack to unwrap the Option and sieve out broken links
                 // Broken links return None, and Stream::filter_map skips all Nones.
-                .filter_map(|post: Option<serde_json::Value>| async move { post })
-                .filter_map(|post| async move { filter_post(post, user) })
+                .try_filter_map(|post: Option<serde_json::Value>| async move { Ok(post) })
+                .try_filter_map(|post| async move { Ok(filter_post(post, user)) })
                 .take(limit);
-            // TODO: Instead of catching panics, find a way to make the whole stream fail with Result<Vec<serde_json::Value>>
-            match std::panic::AssertUnwindSafe(posts.collect::<Vec<serde_json::Value>>())
-                .catch_unwind()
-                .await
-            {
+            match posts.try_collect::<Vec<serde_json::Value>>().await {
                 Ok(posts) => feed["children"] = json!(posts),
-                Err(_) => {
-                    return Err(StorageError::new(
+                Err(err) => {
+                    let e = StorageError::with_source(
                         ErrorKind::Other,
-                        "Unknown error encountered while assembling feed, see logs for more info",
-                    ))
+                        "An error was encountered while processing the feed",
+                        Box::new(err)
+                    );
+                    error!("Error while assembling feed: {}", e);
+                    return Err(e);
                 }
             }
         }