From bfef058f199d195bb2c16d791c3f1bfeccb5614e Mon Sep 17 00:00:00 2001 From: Vika Date: Wed, 4 Aug 2021 13:28:24 +0300 Subject: Fixed a VERY WRONG way to handle stream errors for future reference: stream operations returning Result satisfy conditions for the futures::stream::TryStreamExt trait, allowing you to use `TryStreamExt::try_collect::()` and receive a Result. --- src/database/redis/mod.rs | 46 ++++++++++++++++------------------------------ 1 file changed, 16 insertions(+), 30 deletions(-) (limited to 'src/database/redis/mod.rs') diff --git a/src/database/redis/mod.rs b/src/database/redis/mod.rs index c331e47..205af76 100644 --- a/src/database/redis/mod.rs +++ b/src/database/redis/mod.rs @@ -2,6 +2,7 @@ use async_trait::async_trait; use futures::stream; use futures_util::FutureExt; use futures_util::StreamExt; +use futures_util::TryStreamExt; use lazy_static::lazy_static; use log::error; use mobc::Pool; @@ -283,31 +284,17 @@ impl Storage for RedisStorage { Ok(post) => match post { Some(post) => { match serde_json::from_str::(&post) { - Ok(post) => Some(post), - Err(err) => { - let err = StorageError::from(err); - error!("{}", err); - panic!("{}", err) - } + Ok(post) => Ok(Some(post)), + Err(err) => Err(StorageError::from(err)) } } // Happens because of a broken link (result of an improper deletion?) - None => None, + None => Ok(None), }, - Err(err) => { - let err = StorageError::from(err); - error!("{}", err); - panic!("{}", err) - } + Err(err) => Err(StorageError::from(err)) } } - // TODO: Instead of causing a panic, investigate how can you fail the whole stream - // Somehow fuse it maybe? - Err(err) => { - let err = StorageError::from(err); - error!("{}", err); - panic!("{}", err) - } + Err(err) => Err(StorageError::from(err)) } }) // TODO: determine the optimal value for this buffer @@ -318,20 +305,19 @@ impl Storage for RedisStorage { .buffered(std::cmp::min(3, limit)) // Hack to unwrap the Option and sieve out broken links // Broken links return None, and Stream::filter_map skips all Nones. - .filter_map(|post: Option| async move { post }) - .filter_map(|post| async move { filter_post(post, user) }) + .try_filter_map(|post: Option| async move { Ok(post) }) + .try_filter_map(|post| async move { Ok(filter_post(post, user)) }) .take(limit); - // TODO: Instead of catching panics, find a way to make the whole stream fail with Result> - match std::panic::AssertUnwindSafe(posts.collect::>()) - .catch_unwind() - .await - { + match posts.try_collect::>().await { Ok(posts) => feed["children"] = json!(posts), - Err(_) => { - return Err(StorageError::new( + Err(err) => { + let e = StorageError::with_source( ErrorKind::Other, - "Unknown error encountered while assembling feed, see logs for more info", - )) + "An error was encountered while processing the feed", + Box::new(err) + ); + error!("Error while assembling feed: {}", e); + return Err(e); } } } -- cgit 1.4.1