diff options
author | Vika <vika@fireburn.ru> | 2021-08-04 13:28:24 +0300 |
---|---|---|
committer | Vika <vika@fireburn.ru> | 2021-08-04 13:28:24 +0300 |
commit | bfef058f199d195bb2c16d791c3f1bfeccb5614e (patch) | |
tree | 3b8a6ffac36b81be4df53943659a56c2a883713a /src/database/redis | |
parent | 1589475ebefa72c2981d1b50611531545c1dd32a (diff) | |
download | kittybox-bfef058f199d195bb2c16d791c3f1bfeccb5614e.tar.zst |
Fixed a VERY WRONG way to handle stream errors
for future reference: stream operations returning Result satisfy conditions for the futures::stream::TryStreamExt trait, allowing you to use `TryStreamExt::try_collect::<T>()` and receive a Result<T>.
Diffstat (limited to 'src/database/redis')
-rw-r--r-- | src/database/redis/mod.rs | 46 |
1 files changed, 16 insertions, 30 deletions
diff --git a/src/database/redis/mod.rs b/src/database/redis/mod.rs index c331e47..205af76 100644 --- a/src/database/redis/mod.rs +++ b/src/database/redis/mod.rs @@ -2,6 +2,7 @@ use async_trait::async_trait; use futures::stream; use futures_util::FutureExt; use futures_util::StreamExt; +use futures_util::TryStreamExt; use lazy_static::lazy_static; use log::error; use mobc::Pool; @@ -283,31 +284,17 @@ impl Storage for RedisStorage { Ok(post) => match post { Some(post) => { match serde_json::from_str::<serde_json::Value>(&post) { - Ok(post) => Some(post), - Err(err) => { - let err = StorageError::from(err); - error!("{}", err); - panic!("{}", err) - } + Ok(post) => Ok(Some(post)), + Err(err) => Err(StorageError::from(err)) } } // Happens because of a broken link (result of an improper deletion?) - None => None, + None => Ok(None), }, - Err(err) => { - let err = StorageError::from(err); - error!("{}", err); - panic!("{}", err) - } + Err(err) => Err(StorageError::from(err)) } } - // TODO: Instead of causing a panic, investigate how can you fail the whole stream - // Somehow fuse it maybe? - Err(err) => { - let err = StorageError::from(err); - error!("{}", err); - panic!("{}", err) - } + Err(err) => Err(StorageError::from(err)) } }) // TODO: determine the optimal value for this buffer @@ -318,20 +305,19 @@ impl Storage for RedisStorage { .buffered(std::cmp::min(3, limit)) // Hack to unwrap the Option and sieve out broken links // Broken links return None, and Stream::filter_map skips all Nones. - .filter_map(|post: Option<serde_json::Value>| async move { post }) - .filter_map(|post| async move { filter_post(post, user) }) + .try_filter_map(|post: Option<serde_json::Value>| async move { Ok(post) }) + .try_filter_map(|post| async move { Ok(filter_post(post, user)) }) .take(limit); - // TODO: Instead of catching panics, find a way to make the whole stream fail with Result<Vec<serde_json::Value>> - match std::panic::AssertUnwindSafe(posts.collect::<Vec<serde_json::Value>>()) - .catch_unwind() - .await - { + match posts.try_collect::<Vec<serde_json::Value>>().await { Ok(posts) => feed["children"] = json!(posts), - Err(_) => { - return Err(StorageError::new( + Err(err) => { + let e = StorageError::with_source( ErrorKind::Other, - "Unknown error encountered while assembling feed, see logs for more info", - )) + "An error was encountered while processing the feed", + Box::new(err) + ); + error!("Error while assembling feed: {}", e); + return Err(e); } } } |