about summary refs log tree commit diff
path: root/src/database/file/mod.rs
diff options
context:
space:
mode:
authorVika <vika@fireburn.ru>2022-05-01 04:35:16 +0300
committerVika <vika@fireburn.ru>2022-05-01 04:35:16 +0300
commit122361795b3b1376c6ba03ed6b160e9b89da93d7 (patch)
tree71fe8dc080a74952c29da04d57689c2643e7f211 /src/database/file/mod.rs
parente2bc26e907c10def259f52401804f7f6d00c498c (diff)
downloadkittybox-122361795b3b1376c6ba03ed6b160e9b89da93d7.tar.zst
FileStorage: lockless reads and atomic writes
  - Reads don't lock anymore. At all.
  - Writes create a temporary file and use `rename(2)` to atomically
    replace it
  - since OpenOptions::create_new(true) is used, tempfile creation is
    atomic (and since tempfile names are per-post, a post can only be
    edited by one request at a time)
  - Since written files get atomically replaced, readers can't read a
    corrupted file

Potential pitfalls:
1. This approach is not covered by unit tests (yet)
2. Stale tempfiles can prevent editing posts (can be solved by
throwing out tempfiles that are older than, say, a day)
3. Crashed edits can leave stale tempfiles (honestly that sounds
better than corrupting the whole database, doesn't sound like a bug to
me at all!)
Diffstat (limited to 'src/database/file/mod.rs')
-rw-r--r--src/database/file/mod.rs377
1 files changed, 157 insertions, 220 deletions
diff --git a/src/database/file/mod.rs b/src/database/file/mod.rs
index b3856a6..0336a80 100644
--- a/src/database/file/mod.rs
+++ b/src/database/file/mod.rs
@@ -1,14 +1,14 @@
 //#![warn(clippy::unwrap_used)]
 use crate::database::{filter_post, ErrorKind, Result, Storage, StorageError, Settings};
-use std::fs::{File, OpenOptions};
-use std::io::{ErrorKind as IOErrorKind, Seek, SeekFrom, Read, Write};
-use std::time::Duration;
+use std::io::ErrorKind as IOErrorKind;
+use tokio::fs::{File, OpenOptions};
+use tokio::io::{AsyncReadExt, AsyncWriteExt};
 use tokio::task::spawn_blocking;
 use async_trait::async_trait;
-use fd_lock::RwLock;
-use futures_util::stream;
+/*use futures_util::stream;
 use futures_util::StreamExt;
-use futures_util::TryStreamExt;
+use futures_util::TryStreamExt;*/
+use futures::{stream, StreamExt, TryStreamExt};
 use log::debug;
 use serde_json::json;
 use std::collections::HashMap;
@@ -19,6 +19,7 @@ impl From<std::io::Error> for StorageError {
         Self::with_source(
             match source.kind() {
                 IOErrorKind::NotFound => ErrorKind::NotFound,
+                IOErrorKind::AlreadyExists => ErrorKind::Conflict,
                 _ => ErrorKind::Backend,
             },
             "file I/O error",
@@ -271,13 +272,23 @@ async fn hydrate_author<S: Storage>(
     }
 }
 
-const IO_TIMEOUT: u64 = 3;
-
 #[async_trait]
 impl Storage for FileStorage {
     async fn post_exists(&self, url: &str) -> Result<bool> {
         let path = url_to_path(&self.root_dir, url);
         debug!("Checking if {:?} exists...", path);
+        /*let result = match tokio::fs::metadata(path).await {
+            Ok(metadata) => {
+                Ok(true)
+            },
+            Err(err) => {
+                if err.kind() == IOErrorKind::NotFound {
+                    Ok(false)
+                } else {
+                    Err(err.into())
+                }
+            }
+        };*/
         #[allow(clippy::unwrap_used)] // JoinHandle captures panics, this closure shouldn't panic
         Ok(spawn_blocking(move || path.is_file()).await.unwrap())
     }
@@ -288,36 +299,24 @@ impl Storage for FileStorage {
         // it's not like you CAN access someone else's private posts with it
         // so it's not exactly a security issue, but it's still not good
         debug!("Opening {:?}", path);
-        // Use exclusively synchronous operations to never transfer a lock over an await boundary
-        #[allow(clippy::unwrap_used)] // JoinHandle captures panics, this closure shouldn't panic
-        tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(
-            #[warn(clippy::unwrap_used, clippy::expect_used, clippy::panic)] // can't panic here
-            move || {
-                match File::open(&path) {
-                    Ok(file) => {
-                        let lock = RwLock::new(file);
-                        debug!("Trying to get a lock for file {:?}", &path);
-                        let guard = lock.read()?;
-                        debug!("Lock for {:?} acquired", &path);
-                        let mut content = String::new();
-                        // Typechecks because OS magic acts on references
-                        // to FDs as if they were behind a mutex
-                        (&mut &*guard).read_to_string(&mut content)?;
-                        debug!("Read {} bytes successfully from {:?}", content.as_bytes().len(), &path);
-                        Ok(Some(serde_json::from_str(&content)?))
-                    }
-                    Err(err) => {
-                        // We have to special-case in here because
-                        // the function should return Ok(None) on 404
-                        if err.kind() == IOErrorKind::NotFound {
-                            Ok(None)
-                        } else {
-                            Err(err.into())
-                        }
-                    }
+
+        match File::open(&path).await {
+            Ok(mut file) => {
+                let mut content = String::new();
+                // Typechecks because OS magic acts on references
+                // to FDs as if they were behind a mutex
+                AsyncReadExt::read_to_string(&mut file, &mut content).await?;
+                debug!("Read {} bytes successfully from {:?}", content.as_bytes().len(), &path);
+                Ok(Some(serde_json::from_str(&content)?))
+            },
+            Err(err) => {
+                if err.kind() == IOErrorKind::NotFound {
+                    Ok(None)
+                } else {
+                    Err(err.into())
                 }
             }
-        )).await?.unwrap()
+        }
     }
 
     async fn put_post(&self, post: &'_ serde_json::Value, user: &'_ str) -> Result<()> {
@@ -325,37 +324,23 @@ impl Storage for FileStorage {
             .as_str()
             .expect("Tried to save a post without UID");
         let path = url_to_path(&self.root_dir, key);
-
+        let tempfile = (&path).with_extension("tmp");
         debug!("Creating {:?}", path);
-        // To move these into the closure, we have to clone values
-        let post_json = post.to_string();
-        let post_path = path.clone();
-        // Use exclusively synchronous operations to never transfer a lock over an await boundary
-        #[allow(clippy::unwrap_used)] // JoinHandle captures panics, this closure shouldn't panic
-        tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(
-            #[warn(clippy::unwrap_used, clippy::expect_used, clippy::panic)] // can't panic here
-            move || {
-                let parent = post_path.parent().expect("Parent for this directory should always exist").to_owned();
-                if !parent.is_dir() {
-                    std::fs::create_dir_all(post_path.parent().unwrap())?;
-                }
-
-                let f = OpenOptions::new()
-                    .write(true)
-                    .create_new(true)
-                    .open(&post_path)?;
 
-                let mut lock = RwLock::new(f);
-                debug!("Waiting for lock on {:?}", &post_path);
-                let mut guard = lock.write()?;
+        let parent = path.parent().expect("Parent for this directory should always exist").to_owned();
+        if !parent.is_dir() {
+            tokio::fs::create_dir_all(parent).await?;
+        }
 
-                (*guard).write_all(post_json.as_bytes())?;
-                (*guard).flush()?;
-                drop(guard);
+        let mut file = tokio::fs::OpenOptions::new()
+            .write(true)
+            .create_new(true)
+            .open(&tempfile).await?;
 
-                Result::Ok(())
-            }
-        )).await?.unwrap()?;
+        file.write_all(post.to_string().as_bytes()).await?;
+        file.flush().await?;
+        drop(file);
+        tokio::fs::rename(&tempfile, &path).await?;
 
         if let Some(urls) = post["properties"]["url"].as_array() {
             for url in urls
@@ -375,28 +360,10 @@ impl Storage for FileStorage {
                     })?;
                     let relative = path_relative_from(&orig, basedir).unwrap();
                     println!("{:?} - {:?} = {:?}", &orig, &basedir, &relative);
-                    tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || {
-                        println!("Created a symlink at {:?}", &link);
-                        let symlink_result;
-                        #[cfg(unix)]
-                        {
-                            symlink_result = std::os::unix::fs::symlink(relative, link);
-                        }
-                        // Wow it even supports windows. Windows is weird
-                        #[cfg(windows)]
-                        {
-                            symlink_result = std::os::windows::fs::symlink_file(relative, link);
-                        }
-                        #[cfg(all(not(unix), not(windows)))]
-                        {
-                            compile_error!("Don't know how to create symlinks on non-unix non-windows platform");
-                        }
-                        if let Err(e) = symlink_result {
-                            Err(e.into())
-                        } else {
-                            Result::Ok(())
-                        }
-                    })).await?.unwrap()?;
+                    #[cfg(unix)]
+                    tokio::fs::symlink(relative, link).await?;
+                    #[cfg(not(unix))]
+                    compile_error!("Don't know how to create symlinks on your OS");
                 }
             }
         }
@@ -414,84 +381,76 @@ impl Storage for FileStorage {
             path.push("channels");
 
             let path = path.to_path(&self.root_dir);
+            let tempfilename = (&path).with_extension("tmp");
             let channel_name = post["properties"]["name"][0]
                 .as_str()
                 .map(|s| s.to_string())
                 .unwrap_or_else(String::default);
             let key = key.to_string();
-            #[allow(clippy::drop_ref)] // using drop() to prevent mistakes here
-            drop(post);
-            tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(
-                #[warn(clippy::unwrap_used, clippy::expect_used, clippy::panic)] // can't panic here
-                move || {
-                    let file = OpenOptions::new()
-                        .read(true)
-                        .write(true)
-                        .truncate(false)
-                        .create(true)
-                        .open(&path)?;
-                    let mut lock = RwLock::new(file);
-                    debug!("Trying to lock feed {:?}", &path);
-                    let mut guard = lock.write()?;
-
-                    let mut content = String::new();
-
-                    (*guard).read_to_string(&mut content)?;
-
-                    let mut channels: Vec<super::MicropubChannel>;
-                    if !content.is_empty() {
-                        channels = serde_json::from_str(&content)?;
-                    } else {
-                        channels = Vec::default();
-                    }
 
-                    channels.push(super::MicropubChannel {
-                        uid: key.to_string(),
-                        name: channel_name,
-                    });
-                    (*guard).seek(SeekFrom::Start(0))?;
-                    (*guard).set_len(0)?;
-                    (*guard).write_all(serde_json::to_string(&channels)?.as_bytes())?;
+            let mut tempfile = OpenOptions::new()
+                .write(true)
+                .create_new(true)
+                .open(&tempfilename).await?;
+            let mut file = OpenOptions::new()
+                .read(true)
+                .write(true)
+                .truncate(false)
+                .create(true)
+                .open(&path).await?;
+
+            let mut content = String::new();
+            file.read_to_string(&mut content).await?;
+            drop(file);
+            let mut channels: Vec<super::MicropubChannel>;
+            if !content.is_empty() {
+                channels = serde_json::from_str(&content)?;
+            } else {
+                channels = Vec::default();
+            }
 
-                    Result::Ok(())
-                }
-            )).await?.unwrap()?;
+            channels.push(super::MicropubChannel {
+                uid: key.to_string(),
+                name: channel_name,
+            });
+
+            tempfile.write_all(serde_json::to_string(&channels)?.as_bytes()).await?;
+            tempfile.flush().await?;
+            drop(tempfile);
+            tokio::fs::rename(tempfilename, path).await?;
         }
         Ok(())
     }
 
     async fn update_post(&self, url: &'_ str, update: serde_json::Value) -> Result<()> {
         let path = url_to_path(&self.root_dir, url);
+        let tempfilename = path.with_extension("tmp");
         #[allow(unused_variables)]
-        let (old_json, new_json) = tokio::time::timeout(
-            Duration::from_secs(IO_TIMEOUT),
-            spawn_blocking(
-                #[warn(clippy::unwrap_used, clippy::expect_used, clippy::panic)] // can't panic here
-                move || {
-                    let f = OpenOptions::new()
-                        .write(true)
-                        .read(true)
-                        .truncate(false)
-                        .open(&path)?;
-
-                    let mut lock = RwLock::new(f);
-                    let mut guard = lock.write()?;
-
-                    let mut content = String::new();
-                    (*guard).read_to_string(&mut content)?;
-                    let json: serde_json::Value = serde_json::from_str(&content)?;
-                    // Apply the editing algorithms
-                    let new_json = modify_post(&json, &update)?;
-
-                    (*guard).set_len(0)?;
-                    (*guard).seek(SeekFrom::Start(0))?;
-                    (*guard).write_all(new_json.to_string().as_bytes())?;
-                    (*guard).flush()?;
-
-                    Result::Ok((json, new_json))
-                }
-            )
-        ).await?.unwrap()?;
+        let (old_json, new_json) = {
+            let mut temp = OpenOptions::new()
+                .write(true)
+                .create_new(true)
+                .open(&tempfilename)
+                .await?;
+            let mut file = OpenOptions::new()
+                .read(true)
+                .open(&path)
+                .await?;
+
+            let mut content = String::new();
+            file.read_to_string(&mut content).await?;
+            let json: serde_json::Value = serde_json::from_str(&content)?;
+            drop(file);
+            // Apply the editing algorithms
+            let new_json = modify_post(&json, &update)?;
+
+            temp.write_all(new_json.to_string().as_bytes()).await?;
+            temp.flush().await?;
+            drop(temp);
+            tokio::fs::rename(tempfilename, path).await?;
+
+            (json, new_json)
+        };
         // TODO check if URLs changed between old and new JSON
         Ok(())
     }
@@ -502,33 +461,25 @@ impl Storage for FileStorage {
         path.push("channels");
 
         let path = path.to_path(&self.root_dir);
-        tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(
-            #[warn(clippy::unwrap_used, clippy::expect_used, clippy::panic)] // can't panic here
-            move || {
-                match File::open(&path) {
-                    Ok(f) => {
-                        let lock = RwLock::new(f);
-                        let guard = lock.read()?;
-
-                        let mut content = String::new();
-                        (&mut &*guard).read_to_string(&mut content)?;
-                        // This should not happen, but if it does, handle it gracefully
-                        if content.is_empty() {
-                            return Ok(vec![]);
-                        }
-                        let channels: Vec<super::MicropubChannel> = serde_json::from_str(&content)?;
-                        Ok(channels)
-                    }
-                    Err(e) => {
-                        if e.kind() == IOErrorKind::NotFound {
-                            Ok(vec![])
-                        } else {
-                            Err(e.into())
-                        }
-                    }
+        match File::open(&path).await {
+            Ok(mut f) => {
+                let mut content = String::new();
+                f.read_to_string(&mut content).await?;
+                // This should not happen, but if it does, handle it gracefully
+                if content.is_empty() {
+                    return Ok(vec![]);
+                }
+                let channels: Vec<super::MicropubChannel> = serde_json::from_str(&content)?;
+                Ok(channels)
+            }
+            Err(e) => {
+                if e.kind() == IOErrorKind::NotFound {
+                    Ok(vec![])
+                } else {
+                    Err(e.into())
                 }
             }
-        )).await?.unwrap()
+        }
     }
 
     async fn read_feed_with_limit(
@@ -610,24 +561,17 @@ impl Storage for FileStorage {
         let path = path.to_path(&self.root_dir);
         log::debug!("Getting settings from {:?}", &path);
         let setting = setting.to_string();
-        tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(
-            #[warn(clippy::unwrap_used, clippy::expect_used, clippy::panic)] // can't panic here
-            move || {
-                let lock = RwLock::new(File::open(path)?);
-                let guard = lock.read()?;
-
-                let mut content = String::new();
-                (&mut &*guard).read_to_string(&mut content)?;
-                drop(guard);
-                let settings: HashMap<String, String> = serde_json::from_str(&content)?;
-                // XXX consider returning string slices instead of cloning a string every time
-                // it might come with a performance hit and/or memory usage inflation
-                settings
-                    .get(&setting)
-                    .cloned()
-                    .ok_or_else(|| StorageError::new(ErrorKind::Backend, "Setting not set"))
-            }
-        )).await?.unwrap()
+        let mut file = File::open(path).await?;
+        let mut content = String::new();
+        file.read_to_string(&mut content).await?;
+
+        let settings: HashMap<String, String> = serde_json::from_str(&content)?;
+        // XXX consider returning string slices instead of cloning a string every time
+        // it might come with a performance hit and/or memory usage inflation
+        settings
+            .get(&setting)
+            .cloned()
+            .ok_or_else(|| StorageError::new(ErrorKind::Backend, "Setting not set"))
     }
 
     async fn set_setting(&self, setting: Settings, user: &'_ str, value: &'_ str) -> Result<()> {
@@ -637,6 +581,7 @@ impl Storage for FileStorage {
         path.push("settings");
 
         let path = path.to_path(&self.root_dir);
+        let temppath = path.with_extension("tmp");
 
         let parent = path.parent().unwrap().to_owned();
         if !spawn_blocking(move || parent.is_dir()).await.unwrap() {
@@ -645,34 +590,26 @@ impl Storage for FileStorage {
 
         let (setting, value) = (setting.to_string(), value.to_string());
 
-        tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(
-            #[warn(clippy::unwrap_used, clippy::expect_used, clippy::panic)] // can't panic here
-            move || {
-                let file = OpenOptions::new()
-                    .write(true)
-                    .read(true)
-                    .truncate(false)
-                    .create(true)
-                    .open(&path)?;
-                let mut lock = RwLock::new(file);
-                log::debug!("Created a lock. Locking for writing...");
-                let mut guard = lock.write()?;
-
-                log::debug!("Locked. Writing.");
-                let mut content = String::new();
-                (&mut &*guard).read_to_string(&mut content)?;
-                let mut settings: HashMap<String, String> = if content.is_empty() {
-                    HashMap::default()
-                } else {
-                    serde_json::from_str(&content)?
-                };
-
-                settings.insert(setting, value);
-                (&mut *guard).seek(SeekFrom::Start(0))?;
-                (*guard).set_len(0)?;
-                (&mut *guard).write_all(serde_json::to_string(&settings)?.as_bytes())?;
-                Result::Ok(())
+        let mut tempfile = OpenOptions::new()
+            .write(true)
+            .create_new(true)
+            .open(&temppath)
+            .await?;
+
+        let mut settings: HashMap<String, String> = {
+            let mut f = File::open(&path).await?;
+            let mut content = String::new();
+            f.read_to_string(&mut content).await?;
+            if content.is_empty() {
+                HashMap::default()
+            } else {
+                serde_json::from_str(&content)?
             }
-        )).await?.unwrap()
+        };
+        settings.insert(setting, value);
+        tempfile.write_all(serde_json::to_string(&settings)?.as_bytes()).await?;
+        drop(tempfile);
+        tokio::fs::rename(temppath, path).await?;
+        Result::Ok(())
     }
 }