about summary refs log tree commit diff
path: root/src/database
diff options
context:
space:
mode:
authorVika <vika@fireburn.ru>2021-12-31 06:23:20 +0300
committerVika <vika@fireburn.ru>2021-12-31 06:23:20 +0300
commit6cc2bd0856b946415f8f6e6e113dbcff7c48dc72 (patch)
tree1d79d9abf9ebac08464d2c7542bcd51b5be8be5f /src/database
parent2727e8bdd0122e367a6b155f96829fec26e536f9 (diff)
downloadkittybox-6cc2bd0856b946415f8f6e6e113dbcff7c48dc72.tar.zst
FileBackend: don't transfer locks over async boundaries
This may or may not be the cause for the app hanging while waiting for
a lock. Now the operations with locks are never performed over an
async boundary, excluding any shenanigans that can happen when
accidentally leaving a file locked over async boundaries.
Diffstat (limited to 'src/database')
-rw-r--r--src/database/file/mod.rs357
1 files changed, 188 insertions, 169 deletions
diff --git a/src/database/file/mod.rs b/src/database/file/mod.rs
index ee7d30b..5d6ff47 100644
--- a/src/database/file/mod.rs
+++ b/src/database/file/mod.rs
@@ -1,7 +1,6 @@
 use crate::database::{filter_post, ErrorKind, Result, Storage, StorageError};
-use async_std::fs::{File, OpenOptions};
-use async_std::io::prelude::*;
-use async_std::io::ErrorKind as IOErrorKind;
+use std::fs::{File, OpenOptions};
+use std::io::{ErrorKind as IOErrorKind, Seek, SeekFrom, Read, Write};
 use async_std::task::spawn_blocking;
 use async_trait::async_trait;
 use fd_lock::RwLock;
@@ -26,11 +25,6 @@ impl From<std::io::Error> for StorageError {
     }
 }
 
-async fn get_lockable_file(file: File) -> RwLock<File> {
-    debug!("Trying to create a file lock");
-    spawn_blocking(move || RwLock::new(file)).await
-}
-
 // Copied from https://stackoverflow.com/questions/39340924
 // This routine is adapted from the *old* Path's `path_relative_from`
 // function, which works differently from the new `relative_from` function.
@@ -257,29 +251,31 @@ impl Storage for FileStorage {
     async fn get_post(&self, url: &str) -> Result<Option<serde_json::Value>> {
         let path = url_to_path(&self.root_dir, url);
         debug!("Opening {:?}", path);
-        // We have to special-case in here because the function should return Ok(None) on 404
-        match File::open(path).await {
-            Ok(f) => {
-                let lock = get_lockable_file(f).await;
-                let guard = lock.read()?;
-
-                let mut content = String::new();
-                // Apparently this typechecks. Somehow.
-                // I can take &mut for a &File because File is not a real type
-                // The operating system guards it using something
-                // that looks like a mutex to Rust's runtime, allowing me
-                // to grab a mutable reference from an immutable reference
-                (&mut &*guard).read_to_string(&mut content).await?;
-                Ok(Some(serde_json::from_str(&content)?))
-            }
-            Err(err) => {
-                if err.kind() == IOErrorKind::NotFound {
-                    Ok(None)
-                } else {
-                    Err(err.into())
+        // Use exclusively synchronous operations to never transfer a lock over an await boundary
+        spawn_blocking(move || {
+            match File::open(&path) {
+                Ok(file) => {
+                    let lock = RwLock::new(file);
+                    debug!("Trying to get a lock for file {:?}", &path);
+                    let guard = lock.read()?;
+
+                    let mut content = String::new();
+                    // Typechecks because OS magic acts on references
+                    // to FDs as if they were behind a mutex
+                    (&mut &*guard).read_to_string(&mut content)?;
+                    Ok(Some(serde_json::from_str(&content)?))
+                }
+                Err(err) => {
+                    // We have to special-case in here because
+                    // the function should return Ok(None) on 404
+                    if err.kind() == IOErrorKind::NotFound {
+                        Ok(None)
+                    } else {
+                        Err(err.into())
+                    }
                 }
             }
-        }
+        }).await
     }
 
     async fn put_post<'a>(&self, post: &'a serde_json::Value, user: &'a str) -> Result<()> {
@@ -289,24 +285,31 @@ impl Storage for FileStorage {
         let path = url_to_path(&self.root_dir, key);
 
         debug!("Creating {:?}", path);
+        // To move these into the closure, we have to clone values
+        let post_json = post.to_string();
+        let post_path = path.clone();
+        // Use exclusively synchronous operations to never transfer a lock over an await boundary
+        spawn_blocking(move || {
+            let parent = post_path.parent().unwrap().to_owned();
+            if !parent.is_dir() {
+                std::fs::create_dir_all(post_path.parent().unwrap())?;
+            }
 
-        let parent = path.parent().unwrap().to_owned();
-        if !spawn_blocking(move || parent.is_dir()).await {
-            async_std::fs::create_dir_all(path.parent().unwrap()).await?;
-        }
+            let f = OpenOptions::new()
+                .write(true)
+                .create_new(true)
+                .open(&post_path)?;
 
-        let f = OpenOptions::new()
-            .write(true)
-            .create_new(true)
-            .open(&path)
-            .await?;
+            let mut lock = RwLock::new(f);
+            debug!("Waiting for lock on {:?}", &post_path);
+            let mut guard = lock.write()?;
 
-        let mut lock = get_lockable_file(f).await;
-        let mut guard = lock.write()?;
+            (*guard).write_all(post_json.as_bytes())?;
+            (*guard).flush()?;
+            drop(guard);
 
-        (*guard).write_all(post.to_string().as_bytes()).await?;
-        (*guard).flush().await?;
-        drop(guard);
+            Result::Ok(())
+        }).await?;
 
         if post["properties"]["url"].is_array() {
             for url in post["properties"]["url"]
@@ -319,33 +322,33 @@ impl Storage for FileStorage {
                     let link = url_to_path(&self.root_dir, url);
                     debug!("Creating a symlink at {:?}", link);
                     let orig = path.clone();
-                    spawn_blocking::<_, Result<()>>(move || {
-                        // We're supposed to have a parent here.
-                        let basedir = link.parent().ok_or_else(|| {
-                            StorageError::new(
-                                ErrorKind::Backend,
-                                "Failed to calculate parent directory when creating a symlink",
-                            )
-                        })?;
-                        let relative = path_relative_from(&orig, basedir).unwrap();
-                        println!("{:?} - {:?} = {:?}", &orig, &basedir, &relative);
+                    // We're supposed to have a parent here.
+                    let basedir = link.parent().ok_or_else(|| {
+                        StorageError::new(
+                            ErrorKind::Backend,
+                            "Failed to calculate parent directory when creating a symlink",
+                        )
+                    })?;
+                    let relative = path_relative_from(&orig, basedir).unwrap();
+                    println!("{:?} - {:?} = {:?}", &orig, &basedir, &relative);
+                    spawn_blocking(move || {
                         println!("Created a symlink at {:?}", &link);
                         let symlink_result;
                         #[cfg(unix)]
                         {
                             symlink_result = std::os::unix::fs::symlink(relative, link);
                         }
-                        // Wow it even supports windows. Not sure if I need it to run on Windows but oh well
+                        // Wow it even supports windows. Windows is weird
                         #[cfg(windows)]
                         {
                             symlink_result = std::os::windows::fs::symlink_file(relative, link);
                         }
-                        match symlink_result {
-                            Ok(()) => Ok(()),
-                            Err(e) => Err(e.into()),
+                        if let Err(e) = symlink_result {
+                            Err(e.into())
+                        } else {
+                            Result::Ok(())
                         }
-                    })
-                    .await?;
+                    }).await?;
                 }
             }
         }
@@ -363,65 +366,74 @@ impl Storage for FileStorage {
             path.push("channels");
 
             let path = path.to_path(&self.root_dir);
-            let file = OpenOptions::new()
-                .read(true)
-                .write(true)
-                .truncate(false)
-                .create(true)
-                .open(&path)
-                .await?;
-            let mut lock = get_lockable_file(file).await;
-            let mut guard = lock.write()?;
+            let channel_name = post["properties"]["name"][0]
+                .as_str()
+                .map(|s| s.to_string())
+                .unwrap_or_else(String::default);
+            let key = key.to_string();
+            drop(post);
+            spawn_blocking(move || {
+                let file = OpenOptions::new()
+                    .read(true)
+                    .write(true)
+                    .truncate(false)
+                    .create(true)
+                    .open(&path)?;
+                let mut lock = RwLock::new(file);
+                debug!("Trying to lock feed {:?}", &path);
+                let mut guard = lock.write()?;
 
-            let mut content = String::new();
-            guard.read_to_string(&mut content).await?;
-            let mut channels: Vec<super::MicropubChannel>;
-            if !content.is_empty() {
-                channels = serde_json::from_str(&content)?;
-            } else {
-                channels = Vec::default();
-            }
+                let mut content = String::new();
 
-            channels.push(super::MicropubChannel {
-                uid: key.to_string(),
-                name: post["properties"]["name"][0]
-                    .as_str()
-                    .map(|s| s.to_string())
-                    .unwrap_or_else(String::default),
-            });
-            guard.seek(std::io::SeekFrom::Start(0)).await?;
-            guard.set_len(0).await?;
-            guard
-                .write_all(serde_json::to_string(&channels)?.as_bytes())
-                .await?;
-        }
+                (*guard).read_to_string(&mut content)?;
+
+                let mut channels: Vec<super::MicropubChannel>;
+                if !content.is_empty() {
+                    channels = serde_json::from_str(&content)?;
+                } else {
+                    channels = Vec::default();
+                }
+
+                channels.push(super::MicropubChannel {
+                    uid: key.to_string(),
+                    name: channel_name,
+                });
+                (*guard).seek(SeekFrom::Start(0))?;
+                (*guard).set_len(0)?;
+                (*guard).write_all(serde_json::to_string(&channels)?.as_bytes())?;
 
+                Result::Ok(())
+            }).await?;
+        }
         Ok(())
     }
 
     async fn update_post<'a>(&self, url: &'a str, update: serde_json::Value) -> Result<()> {
         let path = url_to_path(&self.root_dir, url);
-        let f = OpenOptions::new()
-            .write(true)
-            .read(true)
-            .truncate(false)
-            .open(&path)
-            .await?;
-
-        let mut lock = get_lockable_file(f).await;
-        let mut guard = lock.write()?;
-
-        let mut content = String::new();
-        guard.read_to_string(&mut content).await?;
-        let json: serde_json::Value = serde_json::from_str(&content)?;
-        // Apply the editing algorithms
-        let new_json = modify_post(&json, &update)?;
-
-        (*guard).set_len(0).await?;
-        (*guard).seek(std::io::SeekFrom::Start(0)).await?;
-        (*guard).write_all(new_json.to_string().as_bytes()).await?;
-        (*guard).flush().await?;
-        drop(guard);
+
+        let (old_json, new_json) = spawn_blocking(move || {
+            let f = OpenOptions::new()
+                .write(true)
+                .read(true)
+                .truncate(false)
+                .open(&path)?;
+
+            let mut lock = RwLock::new(f);
+            let mut guard = lock.write()?;
+
+            let mut content = String::new();
+            (*guard).read_to_string(&mut content)?;
+            let json: serde_json::Value = serde_json::from_str(&content)?;
+            // Apply the editing algorithms
+            let new_json = modify_post(&json, &update)?;
+
+            (*guard).set_len(0)?;
+            (*guard).seek(SeekFrom::Start(0))?;
+            (*guard).write_all(new_json.to_string().as_bytes())?;
+            (*guard).flush()?;
+
+            Result::Ok((json, new_json))
+        }).await?;
         // TODO check if URLs changed between old and new JSON
         Ok(())
     }
@@ -432,28 +444,30 @@ impl Storage for FileStorage {
         path.push("channels");
 
         let path = path.to_path(&self.root_dir);
-        match File::open(&path).await {
-            Ok(f) => {
-                let lock = get_lockable_file(f).await;
-                let guard = lock.read()?;
-
-                let mut content = String::new();
-                (&mut &*guard).read_to_string(&mut content).await?;
-                // This should not happen, but if it does, let's handle it gracefully instead of failing.
-                if content.is_empty() {
-                    return Ok(vec![]);
+        spawn_blocking(move || {
+            match File::open(&path) {
+                Ok(f) => {
+                    let lock = RwLock::new(f);
+                    let guard = lock.read()?;
+
+                    let mut content = String::new();
+                    (&mut &*guard).read_to_string(&mut content)?;
+                    // This should not happen, but if it does, handle it gracefully
+                    if content.is_empty() {
+                        return Ok(vec![]);
+                    }
+                    let channels: Vec<super::MicropubChannel> = serde_json::from_str(&content)?;
+                    Ok(channels)
                 }
-                let channels: Vec<super::MicropubChannel> = serde_json::from_str(&content)?;
-                Ok(channels)
-            }
-            Err(e) => {
-                if e.kind() == IOErrorKind::NotFound {
-                    Ok(vec![])
-                } else {
-                    Err(e.into())
+                Err(e) => {
+                    if e.kind() == IOErrorKind::NotFound {
+                        Ok(vec![])
+                    } else {
+                        Err(e.into())
+                    }
                 }
             }
-        }
+        }).await
     }
 
     async fn read_feed_with_limit<'a>(
@@ -520,6 +534,7 @@ impl Storage for FileStorage {
         if let Err(e) = async_std::fs::remove_file(path).await {
             Err(e.into())
         } else {
+            // TODO check for dangling references in the channel list
             Ok(())
         }
     }
@@ -532,19 +547,22 @@ impl Storage for FileStorage {
         path.push("settings");
 
         let path = path.to_path(&self.root_dir);
-        let lock = get_lockable_file(File::open(path).await?).await;
-        let guard = lock.read()?;
-
-        let mut content = String::new();
-        (&mut &*guard).read_to_string(&mut content).await?;
-        drop(guard);
-        let settings: HashMap<String, String> = serde_json::from_str(&content)?;
-        // XXX consider returning string slices instead of cloning a string every time
-        // it might come with a performance hit and/or memory usage inflation
-        settings
-            .get(setting)
-            .cloned()
-            .ok_or_else(|| StorageError::new(ErrorKind::Backend, "Setting not set"))
+        let setting = setting.to_string();
+        spawn_blocking(move || {
+            let lock = RwLock::new(File::open(path)?);
+            let guard = lock.read()?;
+
+            let mut content = String::new();
+            (&mut &*guard).read_to_string(&mut content)?;
+            drop(guard);
+            let settings: HashMap<String, String> = serde_json::from_str(&content)?;
+            // XXX consider returning string slices instead of cloning a string every time
+            // it might come with a performance hit and/or memory usage inflation
+            settings
+                .get(&setting)
+                .cloned()
+                .ok_or_else(|| StorageError::new(ErrorKind::Backend, "Setting not set"))
+        }).await
     }
 
     async fn set_setting<'a>(&self, setting: &'a str, user: &'a str, value: &'a str) -> Result<()> {
@@ -560,32 +578,33 @@ impl Storage for FileStorage {
             async_std::fs::create_dir_all(path.parent().unwrap()).await?;
         }
 
-        let file = OpenOptions::new()
-            .write(true)
-            .read(true)
-            .truncate(false)
-            .create(true)
-            .open(&path)
-            .await?;
-        let mut lock = get_lockable_file(file).await;
-        log::debug!("Created a lock. Locking for writing...");
-        let mut guard = lock.write()?;
-
-        log::debug!("Locked. Writing.");
-        let mut content = String::new();
-        guard.read_to_string(&mut content).await?;
-        let mut settings: HashMap<String, String> = if content.is_empty() {
-            HashMap::default()
-        } else {
-            serde_json::from_str(&content)?
-        };
-
-        settings.insert(setting.to_string(), value.to_string());
-        guard.seek(std::io::SeekFrom::Start(0)).await?;
-        guard.set_len(0).await?;
-        guard
-            .write_all(serde_json::to_string(&settings)?.as_bytes())
-            .await?;
-        Ok(())
+        let (setting, value) = (setting.to_string(), value.to_string());
+
+        spawn_blocking(move || {
+            let file = OpenOptions::new()
+                .write(true)
+                .read(true)
+                .truncate(false)
+                .create(true)
+                .open(&path)?;
+            let mut lock = RwLock::new(file);
+            log::debug!("Created a lock. Locking for writing...");
+            let mut guard = lock.write()?;
+
+            log::debug!("Locked. Writing.");
+            let mut content = String::new();
+            (&mut &*guard).read_to_string(&mut content)?;
+            let mut settings: HashMap<String, String> = if content.is_empty() {
+                HashMap::default()
+            } else {
+                serde_json::from_str(&content)?
+            };
+
+            settings.insert(setting.to_string(), value.to_string());
+            (&mut *guard).seek(SeekFrom::Start(0))?;
+            (&mut *guard).set_len(0)?;
+            (&mut *guard).write_all(serde_json::to_string(&settings)?.as_bytes())?;
+            Result::Ok(())
+        }).await
     }
 }