about summary refs log tree commit diff
path: root/src/database/file/mod.rs
diff options
context:
space:
mode:
authorVika <vika@fireburn.ru>2021-12-31 06:45:55 +0300
committerVika <vika@fireburn.ru>2021-12-31 06:45:55 +0300
commita9f88a35c0f49579ce7e6da855b834d2975b1bf0 (patch)
tree78923c7ac9afd34bee3aedb1467861ecff3b8c84 /src/database/file/mod.rs
parent6cc2bd0856b946415f8f6e6e113dbcff7c48dc72 (diff)
downloadkittybox-a9f88a35c0f49579ce7e6da855b834d2975b1bf0.tar.zst
FileBackend: introduce timeouts on operations
This is to prevent spinning in a loop waiting for a lock. This hangs
often, though I suspect this should have been fixed in the previous
commit.
Diffstat (limited to 'src/database/file/mod.rs')
-rw-r--r--src/database/file/mod.rs85
1 files changed, 51 insertions, 34 deletions
diff --git a/src/database/file/mod.rs b/src/database/file/mod.rs
index 5d6ff47..3717023 100644
--- a/src/database/file/mod.rs
+++ b/src/database/file/mod.rs
@@ -1,6 +1,8 @@
 use crate::database::{filter_post, ErrorKind, Result, Storage, StorageError};
 use std::fs::{File, OpenOptions};
 use std::io::{ErrorKind as IOErrorKind, Seek, SeekFrom, Read, Write};
+use std::time::Duration;
+use async_std::future::TimeoutError;
 use async_std::task::spawn_blocking;
 use async_trait::async_trait;
 use fd_lock::RwLock;
@@ -25,6 +27,16 @@ impl From<std::io::Error> for StorageError {
     }
 }
 
+impl From<TimeoutError> for StorageError {
+    fn from(source: TimeoutError) -> Self {
+        Self::with_source(
+            ErrorKind::Backend,
+            "timeout on I/O operation",
+            Box::new(source)
+        )
+    }
+}
+
 // Copied from https://stackoverflow.com/questions/39340924
 // This routine is adapted from the *old* Path's `path_relative_from`
 // function, which works differently from the new `relative_from` function.
@@ -240,6 +252,8 @@ async fn hydrate_author<S: Storage>(
     }
 }
 
+const IO_TIMEOUT: u64 = 3;
+
 #[async_trait]
 impl Storage for FileStorage {
     async fn post_exists(&self, url: &str) -> Result<bool> {
@@ -252,7 +266,7 @@ impl Storage for FileStorage {
         let path = url_to_path(&self.root_dir, url);
         debug!("Opening {:?}", path);
         // Use exclusively synchronous operations to never transfer a lock over an await boundary
-        spawn_blocking(move || {
+        async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || {
             match File::open(&path) {
                 Ok(file) => {
                     let lock = RwLock::new(file);
@@ -275,7 +289,7 @@ impl Storage for FileStorage {
                     }
                 }
             }
-        }).await
+        })).await?
     }
 
     async fn put_post<'a>(&self, post: &'a serde_json::Value, user: &'a str) -> Result<()> {
@@ -289,7 +303,7 @@ impl Storage for FileStorage {
         let post_json = post.to_string();
         let post_path = path.clone();
         // Use exclusively synchronous operations to never transfer a lock over an await boundary
-        spawn_blocking(move || {
+        async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || {
             let parent = post_path.parent().unwrap().to_owned();
             if !parent.is_dir() {
                 std::fs::create_dir_all(post_path.parent().unwrap())?;
@@ -309,7 +323,7 @@ impl Storage for FileStorage {
             drop(guard);
 
             Result::Ok(())
-        }).await?;
+        })).await??;
 
         if post["properties"]["url"].is_array() {
             for url in post["properties"]["url"]
@@ -331,7 +345,7 @@ impl Storage for FileStorage {
                     })?;
                     let relative = path_relative_from(&orig, basedir).unwrap();
                     println!("{:?} - {:?} = {:?}", &orig, &basedir, &relative);
-                    spawn_blocking(move || {
+                    async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || {
                         println!("Created a symlink at {:?}", &link);
                         let symlink_result;
                         #[cfg(unix)]
@@ -348,7 +362,7 @@ impl Storage for FileStorage {
                         } else {
                             Result::Ok(())
                         }
-                    }).await?;
+                    })).await??;
                 }
             }
         }
@@ -372,7 +386,7 @@ impl Storage for FileStorage {
                 .unwrap_or_else(String::default);
             let key = key.to_string();
             drop(post);
-            spawn_blocking(move || {
+            async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || {
                 let file = OpenOptions::new()
                     .read(true)
                     .write(true)
@@ -403,7 +417,7 @@ impl Storage for FileStorage {
                 (*guard).write_all(serde_json::to_string(&channels)?.as_bytes())?;
 
                 Result::Ok(())
-            }).await?;
+            })).await??;
         }
         Ok(())
     }
@@ -411,29 +425,32 @@ impl Storage for FileStorage {
     async fn update_post<'a>(&self, url: &'a str, update: serde_json::Value) -> Result<()> {
         let path = url_to_path(&self.root_dir, url);
 
-        let (old_json, new_json) = spawn_blocking(move || {
-            let f = OpenOptions::new()
-                .write(true)
-                .read(true)
-                .truncate(false)
-                .open(&path)?;
+        let (old_json, new_json) = async_std::future::timeout(
+            Duration::from_secs(IO_TIMEOUT),
+            spawn_blocking(move || {
+                let f = OpenOptions::new()
+                    .write(true)
+                    .read(true)
+                    .truncate(false)
+                    .open(&path)?;
 
-            let mut lock = RwLock::new(f);
-            let mut guard = lock.write()?;
+                let mut lock = RwLock::new(f);
+                let mut guard = lock.write()?;
 
-            let mut content = String::new();
-            (*guard).read_to_string(&mut content)?;
-            let json: serde_json::Value = serde_json::from_str(&content)?;
-            // Apply the editing algorithms
-            let new_json = modify_post(&json, &update)?;
-
-            (*guard).set_len(0)?;
-            (*guard).seek(SeekFrom::Start(0))?;
-            (*guard).write_all(new_json.to_string().as_bytes())?;
-            (*guard).flush()?;
+                let mut content = String::new();
+                (*guard).read_to_string(&mut content)?;
+                let json: serde_json::Value = serde_json::from_str(&content)?;
+                // Apply the editing algorithms
+                let new_json = modify_post(&json, &update)?;
+
+                (*guard).set_len(0)?;
+                (*guard).seek(SeekFrom::Start(0))?;
+                (*guard).write_all(new_json.to_string().as_bytes())?;
+                (*guard).flush()?;
 
-            Result::Ok((json, new_json))
-        }).await?;
+                Result::Ok((json, new_json))
+            })
+        ).await??;
         // TODO check if URLs changed between old and new JSON
         Ok(())
     }
@@ -444,7 +461,7 @@ impl Storage for FileStorage {
         path.push("channels");
 
         let path = path.to_path(&self.root_dir);
-        spawn_blocking(move || {
+        async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || {
             match File::open(&path) {
                 Ok(f) => {
                     let lock = RwLock::new(f);
@@ -467,7 +484,7 @@ impl Storage for FileStorage {
                     }
                 }
             }
-        }).await
+        })).await?
     }
 
     async fn read_feed_with_limit<'a>(
@@ -548,7 +565,7 @@ impl Storage for FileStorage {
 
         let path = path.to_path(&self.root_dir);
         let setting = setting.to_string();
-        spawn_blocking(move || {
+        async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || {
             let lock = RwLock::new(File::open(path)?);
             let guard = lock.read()?;
 
@@ -562,7 +579,7 @@ impl Storage for FileStorage {
                 .get(&setting)
                 .cloned()
                 .ok_or_else(|| StorageError::new(ErrorKind::Backend, "Setting not set"))
-        }).await
+        })).await?
     }
 
     async fn set_setting<'a>(&self, setting: &'a str, user: &'a str, value: &'a str) -> Result<()> {
@@ -580,7 +597,7 @@ impl Storage for FileStorage {
 
         let (setting, value) = (setting.to_string(), value.to_string());
 
-        spawn_blocking(move || {
+        async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || {
             let file = OpenOptions::new()
                 .write(true)
                 .read(true)
@@ -605,6 +622,6 @@ impl Storage for FileStorage {
             (&mut *guard).set_len(0)?;
             (&mut *guard).write_all(serde_json::to_string(&settings)?.as_bytes())?;
             Result::Ok(())
-        }).await
+        })).await?
     }
 }