From 5218889b43c4ee3eafee1c9871dbeba2887ded83 Mon Sep 17 00:00:00 2001 From: Vika Date: Mon, 26 Aug 2024 13:33:37 +0300 Subject: FileStorage: appease clippy and use block_in_place for rewinding feeds Using `tokio::task::block_in_place` downgrades the thread temporarily to avoid starvation. --- src/database/file/mod.rs | 23 ++++++++++++++--------- src/database/mod.rs | 2 +- 2 files changed, 15 insertions(+), 10 deletions(-) (limited to 'src/database') diff --git a/src/database/file/mod.rs b/src/database/file/mod.rs index ba8201f..117ba17 100644 --- a/src/database/file/mod.rs +++ b/src/database/file/mod.rs @@ -66,8 +66,8 @@ fn path_relative_from(path: &Path, base: &Path) -> Option { } (None, _) => comps.push(Component::ParentDir), (Some(a), Some(b)) if comps.is_empty() && a == b => (), - (Some(a), Some(b)) if b == Component::CurDir => comps.push(a), - (Some(_), Some(b)) if b == Component::ParentDir => return None, + (Some(a), Some(Component::CurDir)) => comps.push(a), + (Some(_), Some(Component::ParentDir)) => return None, (Some(a), Some(_)) => { comps.push(Component::ParentDir); for _ in itb { @@ -322,7 +322,7 @@ impl Storage for FileStorage { .as_str() .expect("Tried to save a post without UID"); let path = url_to_path(&self.root_dir, key); - let tempfile = (&path).with_extension("tmp"); + let tempfile = path.with_extension("tmp"); debug!("Creating {:?}", path); let parent = path @@ -395,7 +395,7 @@ impl Storage for FileStorage { let channel_name = post["properties"]["name"][0] .as_str() .map(|s| s.to_string()) - .unwrap_or_else(String::default); + .unwrap_or_default(); let key = key.to_string(); tracing::debug!("Opening temporary file to modify chnanels..."); let mut tempfile = OpenOptions::new() @@ -576,13 +576,18 @@ impl Storage for FileStorage { // This imperative snippet consumes after instead of emitting it, allowing the // stream of posts to return only those items that truly come *after* that one. // If I would implement an Iter combinator like this, I would call it `skip_until` + // + // Uses `tokio::task::block_in_place` to prevent starvation in case of rewinding + // incredibly long feeds. if let Some(after) = after { - for s in posts_iter.by_ref() { - if &s == after { - break; + tokio::task::block_in_place(|| { + for s in posts_iter.by_ref() { + if s == after { + break; + } } - } - }; + }) + } let posts = stream::iter(posts_iter) .map(|url: String| async move { self.get_post(&url).await }) .buffered(std::cmp::min(3, limit)) diff --git a/src/database/mod.rs b/src/database/mod.rs index d60ac05..7b50196 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -757,7 +757,7 @@ mod tests { } macro_rules! file_test { ($func_name:ident) => { - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] #[tracing_test::traced_test] async fn $func_name() { let tempdir = tempfile::tempdir().expect("Failed to create tempdir"); -- cgit 1.4.1