From a9f88a35c0f49579ce7e6da855b834d2975b1bf0 Mon Sep 17 00:00:00 2001 From: Vika Date: Fri, 31 Dec 2021 06:45:55 +0300 Subject: FileBackend: introduce timeouts on operations This is to prevent spinning in a loop waiting for a lock. This hangs often, though I suspect this should have been fixed in the previous commit. --- src/database/file/mod.rs | 85 +++++++++++++++++++++++++++++------------------- 1 file changed, 51 insertions(+), 34 deletions(-) (limited to 'src') diff --git a/src/database/file/mod.rs b/src/database/file/mod.rs index 5d6ff47..3717023 100644 --- a/src/database/file/mod.rs +++ b/src/database/file/mod.rs @@ -1,6 +1,8 @@ use crate::database::{filter_post, ErrorKind, Result, Storage, StorageError}; use std::fs::{File, OpenOptions}; use std::io::{ErrorKind as IOErrorKind, Seek, SeekFrom, Read, Write}; +use std::time::Duration; +use async_std::future::TimeoutError; use async_std::task::spawn_blocking; use async_trait::async_trait; use fd_lock::RwLock; @@ -25,6 +27,16 @@ impl From for StorageError { } } +impl From for StorageError { + fn from(source: TimeoutError) -> Self { + Self::with_source( + ErrorKind::Backend, + "timeout on I/O operation", + Box::new(source) + ) + } +} + // Copied from https://stackoverflow.com/questions/39340924 // This routine is adapted from the *old* Path's `path_relative_from` // function, which works differently from the new `relative_from` function. @@ -240,6 +252,8 @@ async fn hydrate_author( } } +const IO_TIMEOUT: u64 = 3; + #[async_trait] impl Storage for FileStorage { async fn post_exists(&self, url: &str) -> Result { @@ -252,7 +266,7 @@ impl Storage for FileStorage { let path = url_to_path(&self.root_dir, url); debug!("Opening {:?}", path); // Use exclusively synchronous operations to never transfer a lock over an await boundary - spawn_blocking(move || { + async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { match File::open(&path) { Ok(file) => { let lock = RwLock::new(file); @@ -275,7 +289,7 @@ impl Storage for FileStorage { } } } - }).await + })).await? } async fn put_post<'a>(&self, post: &'a serde_json::Value, user: &'a str) -> Result<()> { @@ -289,7 +303,7 @@ impl Storage for FileStorage { let post_json = post.to_string(); let post_path = path.clone(); // Use exclusively synchronous operations to never transfer a lock over an await boundary - spawn_blocking(move || { + async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { let parent = post_path.parent().unwrap().to_owned(); if !parent.is_dir() { std::fs::create_dir_all(post_path.parent().unwrap())?; @@ -309,7 +323,7 @@ impl Storage for FileStorage { drop(guard); Result::Ok(()) - }).await?; + })).await??; if post["properties"]["url"].is_array() { for url in post["properties"]["url"] @@ -331,7 +345,7 @@ impl Storage for FileStorage { })?; let relative = path_relative_from(&orig, basedir).unwrap(); println!("{:?} - {:?} = {:?}", &orig, &basedir, &relative); - spawn_blocking(move || { + async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { println!("Created a symlink at {:?}", &link); let symlink_result; #[cfg(unix)] @@ -348,7 +362,7 @@ impl Storage for FileStorage { } else { Result::Ok(()) } - }).await?; + })).await??; } } } @@ -372,7 +386,7 @@ impl Storage for FileStorage { .unwrap_or_else(String::default); let key = key.to_string(); drop(post); - spawn_blocking(move || { + async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { let file = OpenOptions::new() .read(true) .write(true) @@ -403,7 +417,7 @@ impl Storage for FileStorage { (*guard).write_all(serde_json::to_string(&channels)?.as_bytes())?; Result::Ok(()) - }).await?; + })).await??; } Ok(()) } @@ -411,29 +425,32 @@ impl Storage for FileStorage { async fn update_post<'a>(&self, url: &'a str, update: serde_json::Value) -> Result<()> { let path = url_to_path(&self.root_dir, url); - let (old_json, new_json) = spawn_blocking(move || { - let f = OpenOptions::new() - .write(true) - .read(true) - .truncate(false) - .open(&path)?; + let (old_json, new_json) = async_std::future::timeout( + Duration::from_secs(IO_TIMEOUT), + spawn_blocking(move || { + let f = OpenOptions::new() + .write(true) + .read(true) + .truncate(false) + .open(&path)?; - let mut lock = RwLock::new(f); - let mut guard = lock.write()?; + let mut lock = RwLock::new(f); + let mut guard = lock.write()?; - let mut content = String::new(); - (*guard).read_to_string(&mut content)?; - let json: serde_json::Value = serde_json::from_str(&content)?; - // Apply the editing algorithms - let new_json = modify_post(&json, &update)?; - - (*guard).set_len(0)?; - (*guard).seek(SeekFrom::Start(0))?; - (*guard).write_all(new_json.to_string().as_bytes())?; - (*guard).flush()?; + let mut content = String::new(); + (*guard).read_to_string(&mut content)?; + let json: serde_json::Value = serde_json::from_str(&content)?; + // Apply the editing algorithms + let new_json = modify_post(&json, &update)?; + + (*guard).set_len(0)?; + (*guard).seek(SeekFrom::Start(0))?; + (*guard).write_all(new_json.to_string().as_bytes())?; + (*guard).flush()?; - Result::Ok((json, new_json)) - }).await?; + Result::Ok((json, new_json)) + }) + ).await??; // TODO check if URLs changed between old and new JSON Ok(()) } @@ -444,7 +461,7 @@ impl Storage for FileStorage { path.push("channels"); let path = path.to_path(&self.root_dir); - spawn_blocking(move || { + async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { match File::open(&path) { Ok(f) => { let lock = RwLock::new(f); @@ -467,7 +484,7 @@ impl Storage for FileStorage { } } } - }).await + })).await? } async fn read_feed_with_limit<'a>( @@ -548,7 +565,7 @@ impl Storage for FileStorage { let path = path.to_path(&self.root_dir); let setting = setting.to_string(); - spawn_blocking(move || { + async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { let lock = RwLock::new(File::open(path)?); let guard = lock.read()?; @@ -562,7 +579,7 @@ impl Storage for FileStorage { .get(&setting) .cloned() .ok_or_else(|| StorageError::new(ErrorKind::Backend, "Setting not set")) - }).await + })).await? } async fn set_setting<'a>(&self, setting: &'a str, user: &'a str, value: &'a str) -> Result<()> { @@ -580,7 +597,7 @@ impl Storage for FileStorage { let (setting, value) = (setting.to_string(), value.to_string()); - spawn_blocking(move || { + async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { let file = OpenOptions::new() .write(true) .read(true) @@ -605,6 +622,6 @@ impl Storage for FileStorage { (&mut *guard).set_len(0)?; (&mut *guard).write_all(serde_json::to_string(&settings)?.as_bytes())?; Result::Ok(()) - }).await + })).await? } } -- cgit 1.4.1