diff options
author | Vika <vika@fireburn.ru> | 2022-05-01 04:35:16 +0300 |
---|---|---|
committer | Vika <vika@fireburn.ru> | 2022-05-01 04:35:16 +0300 |
commit | 122361795b3b1376c6ba03ed6b160e9b89da93d7 (patch) | |
tree | 71fe8dc080a74952c29da04d57689c2643e7f211 /src/database/file/mod.rs | |
parent | e2bc26e907c10def259f52401804f7f6d00c498c (diff) | |
download | kittybox-122361795b3b1376c6ba03ed6b160e9b89da93d7.tar.zst |
FileStorage: lockless reads and atomic writes
- Reads don't lock anymore. At all. - Writes create a temporary file and use `rename(2)` to atomically replace it - since OpenOptions::create_new(true) is used, tempfile creation is atomic (and since tempfile names are per-post, a post can only be edited by one request at a time) - Since written files get atomically replaced, readers can't read a corrupted file Potential pitfalls: 1. This approach is not covered by unit tests (yet) 2. Stale tempfiles can prevent editing posts (can be solved by throwing out tempfiles that are older than, say, a day) 3. Crashed edits can leave stale tempfiles (honestly that sounds better than corrupting the whole database, doesn't sound like a bug to me at all!)
Diffstat (limited to 'src/database/file/mod.rs')
-rw-r--r-- | src/database/file/mod.rs | 377 |
1 files changed, 157 insertions, 220 deletions
diff --git a/src/database/file/mod.rs b/src/database/file/mod.rs index b3856a6..0336a80 100644 --- a/src/database/file/mod.rs +++ b/src/database/file/mod.rs @@ -1,14 +1,14 @@ //#![warn(clippy::unwrap_used)] use crate::database::{filter_post, ErrorKind, Result, Storage, StorageError, Settings}; -use std::fs::{File, OpenOptions}; -use std::io::{ErrorKind as IOErrorKind, Seek, SeekFrom, Read, Write}; -use std::time::Duration; +use std::io::ErrorKind as IOErrorKind; +use tokio::fs::{File, OpenOptions}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::task::spawn_blocking; use async_trait::async_trait; -use fd_lock::RwLock; -use futures_util::stream; +/*use futures_util::stream; use futures_util::StreamExt; -use futures_util::TryStreamExt; +use futures_util::TryStreamExt;*/ +use futures::{stream, StreamExt, TryStreamExt}; use log::debug; use serde_json::json; use std::collections::HashMap; @@ -19,6 +19,7 @@ impl From<std::io::Error> for StorageError { Self::with_source( match source.kind() { IOErrorKind::NotFound => ErrorKind::NotFound, + IOErrorKind::AlreadyExists => ErrorKind::Conflict, _ => ErrorKind::Backend, }, "file I/O error", @@ -271,13 +272,23 @@ async fn hydrate_author<S: Storage>( } } -const IO_TIMEOUT: u64 = 3; - #[async_trait] impl Storage for FileStorage { async fn post_exists(&self, url: &str) -> Result<bool> { let path = url_to_path(&self.root_dir, url); debug!("Checking if {:?} exists...", path); + /*let result = match tokio::fs::metadata(path).await { + Ok(metadata) => { + Ok(true) + }, + Err(err) => { + if err.kind() == IOErrorKind::NotFound { + Ok(false) + } else { + Err(err.into()) + } + } + };*/ #[allow(clippy::unwrap_used)] // JoinHandle captures panics, this closure shouldn't panic Ok(spawn_blocking(move || path.is_file()).await.unwrap()) } @@ -288,36 +299,24 @@ impl Storage for FileStorage { // it's not like you CAN access someone else's private posts with it // so it's not exactly a security issue, but it's still not good debug!("Opening {:?}", path); - // Use exclusively synchronous operations to never transfer a lock over an await boundary - #[allow(clippy::unwrap_used)] // JoinHandle captures panics, this closure shouldn't panic - tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking( - #[warn(clippy::unwrap_used, clippy::expect_used, clippy::panic)] // can't panic here - move || { - match File::open(&path) { - Ok(file) => { - let lock = RwLock::new(file); - debug!("Trying to get a lock for file {:?}", &path); - let guard = lock.read()?; - debug!("Lock for {:?} acquired", &path); - let mut content = String::new(); - // Typechecks because OS magic acts on references - // to FDs as if they were behind a mutex - (&mut &*guard).read_to_string(&mut content)?; - debug!("Read {} bytes successfully from {:?}", content.as_bytes().len(), &path); - Ok(Some(serde_json::from_str(&content)?)) - } - Err(err) => { - // We have to special-case in here because - // the function should return Ok(None) on 404 - if err.kind() == IOErrorKind::NotFound { - Ok(None) - } else { - Err(err.into()) - } - } + + match File::open(&path).await { + Ok(mut file) => { + let mut content = String::new(); + // Typechecks because OS magic acts on references + // to FDs as if they were behind a mutex + AsyncReadExt::read_to_string(&mut file, &mut content).await?; + debug!("Read {} bytes successfully from {:?}", content.as_bytes().len(), &path); + Ok(Some(serde_json::from_str(&content)?)) + }, + Err(err) => { + if err.kind() == IOErrorKind::NotFound { + Ok(None) + } else { + Err(err.into()) } } - )).await?.unwrap() + } } async fn put_post(&self, post: &'_ serde_json::Value, user: &'_ str) -> Result<()> { @@ -325,37 +324,23 @@ impl Storage for FileStorage { .as_str() .expect("Tried to save a post without UID"); let path = url_to_path(&self.root_dir, key); - + let tempfile = (&path).with_extension("tmp"); debug!("Creating {:?}", path); - // To move these into the closure, we have to clone values - let post_json = post.to_string(); - let post_path = path.clone(); - // Use exclusively synchronous operations to never transfer a lock over an await boundary - #[allow(clippy::unwrap_used)] // JoinHandle captures panics, this closure shouldn't panic - tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking( - #[warn(clippy::unwrap_used, clippy::expect_used, clippy::panic)] // can't panic here - move || { - let parent = post_path.parent().expect("Parent for this directory should always exist").to_owned(); - if !parent.is_dir() { - std::fs::create_dir_all(post_path.parent().unwrap())?; - } - - let f = OpenOptions::new() - .write(true) - .create_new(true) - .open(&post_path)?; - let mut lock = RwLock::new(f); - debug!("Waiting for lock on {:?}", &post_path); - let mut guard = lock.write()?; + let parent = path.parent().expect("Parent for this directory should always exist").to_owned(); + if !parent.is_dir() { + tokio::fs::create_dir_all(parent).await?; + } - (*guard).write_all(post_json.as_bytes())?; - (*guard).flush()?; - drop(guard); + let mut file = tokio::fs::OpenOptions::new() + .write(true) + .create_new(true) + .open(&tempfile).await?; - Result::Ok(()) - } - )).await?.unwrap()?; + file.write_all(post.to_string().as_bytes()).await?; + file.flush().await?; + drop(file); + tokio::fs::rename(&tempfile, &path).await?; if let Some(urls) = post["properties"]["url"].as_array() { for url in urls @@ -375,28 +360,10 @@ impl Storage for FileStorage { })?; let relative = path_relative_from(&orig, basedir).unwrap(); println!("{:?} - {:?} = {:?}", &orig, &basedir, &relative); - tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { - println!("Created a symlink at {:?}", &link); - let symlink_result; - #[cfg(unix)] - { - symlink_result = std::os::unix::fs::symlink(relative, link); - } - // Wow it even supports windows. Windows is weird - #[cfg(windows)] - { - symlink_result = std::os::windows::fs::symlink_file(relative, link); - } - #[cfg(all(not(unix), not(windows)))] - { - compile_error!("Don't know how to create symlinks on non-unix non-windows platform"); - } - if let Err(e) = symlink_result { - Err(e.into()) - } else { - Result::Ok(()) - } - })).await?.unwrap()?; + #[cfg(unix)] + tokio::fs::symlink(relative, link).await?; + #[cfg(not(unix))] + compile_error!("Don't know how to create symlinks on your OS"); } } } @@ -414,84 +381,76 @@ impl Storage for FileStorage { path.push("channels"); let path = path.to_path(&self.root_dir); + let tempfilename = (&path).with_extension("tmp"); let channel_name = post["properties"]["name"][0] .as_str() .map(|s| s.to_string()) .unwrap_or_else(String::default); let key = key.to_string(); - #[allow(clippy::drop_ref)] // using drop() to prevent mistakes here - drop(post); - tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking( - #[warn(clippy::unwrap_used, clippy::expect_used, clippy::panic)] // can't panic here - move || { - let file = OpenOptions::new() - .read(true) - .write(true) - .truncate(false) - .create(true) - .open(&path)?; - let mut lock = RwLock::new(file); - debug!("Trying to lock feed {:?}", &path); - let mut guard = lock.write()?; - - let mut content = String::new(); - - (*guard).read_to_string(&mut content)?; - - let mut channels: Vec<super::MicropubChannel>; - if !content.is_empty() { - channels = serde_json::from_str(&content)?; - } else { - channels = Vec::default(); - } - channels.push(super::MicropubChannel { - uid: key.to_string(), - name: channel_name, - }); - (*guard).seek(SeekFrom::Start(0))?; - (*guard).set_len(0)?; - (*guard).write_all(serde_json::to_string(&channels)?.as_bytes())?; + let mut tempfile = OpenOptions::new() + .write(true) + .create_new(true) + .open(&tempfilename).await?; + let mut file = OpenOptions::new() + .read(true) + .write(true) + .truncate(false) + .create(true) + .open(&path).await?; + + let mut content = String::new(); + file.read_to_string(&mut content).await?; + drop(file); + let mut channels: Vec<super::MicropubChannel>; + if !content.is_empty() { + channels = serde_json::from_str(&content)?; + } else { + channels = Vec::default(); + } - Result::Ok(()) - } - )).await?.unwrap()?; + channels.push(super::MicropubChannel { + uid: key.to_string(), + name: channel_name, + }); + + tempfile.write_all(serde_json::to_string(&channels)?.as_bytes()).await?; + tempfile.flush().await?; + drop(tempfile); + tokio::fs::rename(tempfilename, path).await?; } Ok(()) } async fn update_post(&self, url: &'_ str, update: serde_json::Value) -> Result<()> { let path = url_to_path(&self.root_dir, url); + let tempfilename = path.with_extension("tmp"); #[allow(unused_variables)] - let (old_json, new_json) = tokio::time::timeout( - Duration::from_secs(IO_TIMEOUT), - spawn_blocking( - #[warn(clippy::unwrap_used, clippy::expect_used, clippy::panic)] // can't panic here - move || { - let f = OpenOptions::new() - .write(true) - .read(true) - .truncate(false) - .open(&path)?; - - let mut lock = RwLock::new(f); - let mut guard = lock.write()?; - - let mut content = String::new(); - (*guard).read_to_string(&mut content)?; - let json: serde_json::Value = serde_json::from_str(&content)?; - // Apply the editing algorithms - let new_json = modify_post(&json, &update)?; - - (*guard).set_len(0)?; - (*guard).seek(SeekFrom::Start(0))?; - (*guard).write_all(new_json.to_string().as_bytes())?; - (*guard).flush()?; - - Result::Ok((json, new_json)) - } - ) - ).await?.unwrap()?; + let (old_json, new_json) = { + let mut temp = OpenOptions::new() + .write(true) + .create_new(true) + .open(&tempfilename) + .await?; + let mut file = OpenOptions::new() + .read(true) + .open(&path) + .await?; + + let mut content = String::new(); + file.read_to_string(&mut content).await?; + let json: serde_json::Value = serde_json::from_str(&content)?; + drop(file); + // Apply the editing algorithms + let new_json = modify_post(&json, &update)?; + + temp.write_all(new_json.to_string().as_bytes()).await?; + temp.flush().await?; + drop(temp); + tokio::fs::rename(tempfilename, path).await?; + + (json, new_json) + }; // TODO check if URLs changed between old and new JSON Ok(()) } @@ -502,33 +461,25 @@ impl Storage for FileStorage { path.push("channels"); let path = path.to_path(&self.root_dir); - tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking( - #[warn(clippy::unwrap_used, clippy::expect_used, clippy::panic)] // can't panic here - move || { - match File::open(&path) { - Ok(f) => { - let lock = RwLock::new(f); - let guard = lock.read()?; - - let mut content = String::new(); - (&mut &*guard).read_to_string(&mut content)?; - // This should not happen, but if it does, handle it gracefully - if content.is_empty() { - return Ok(vec![]); - } - let channels: Vec<super::MicropubChannel> = serde_json::from_str(&content)?; - Ok(channels) - } - Err(e) => { - if e.kind() == IOErrorKind::NotFound { - Ok(vec![]) - } else { - Err(e.into()) - } - } + match File::open(&path).await { + Ok(mut f) => { + let mut content = String::new(); + f.read_to_string(&mut content).await?; + // This should not happen, but if it does, handle it gracefully + if content.is_empty() { + return Ok(vec![]); + } + let channels: Vec<super::MicropubChannel> = serde_json::from_str(&content)?; + Ok(channels) + } + Err(e) => { + if e.kind() == IOErrorKind::NotFound { + Ok(vec![]) + } else { + Err(e.into()) } } - )).await?.unwrap() + } } async fn read_feed_with_limit( @@ -610,24 +561,17 @@ impl Storage for FileStorage { let path = path.to_path(&self.root_dir); log::debug!("Getting settings from {:?}", &path); let setting = setting.to_string(); - tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking( - #[warn(clippy::unwrap_used, clippy::expect_used, clippy::panic)] // can't panic here - move || { - let lock = RwLock::new(File::open(path)?); - let guard = lock.read()?; - - let mut content = String::new(); - (&mut &*guard).read_to_string(&mut content)?; - drop(guard); - let settings: HashMap<String, String> = serde_json::from_str(&content)?; - // XXX consider returning string slices instead of cloning a string every time - // it might come with a performance hit and/or memory usage inflation - settings - .get(&setting) - .cloned() - .ok_or_else(|| StorageError::new(ErrorKind::Backend, "Setting not set")) - } - )).await?.unwrap() + let mut file = File::open(path).await?; + let mut content = String::new(); + file.read_to_string(&mut content).await?; + + let settings: HashMap<String, String> = serde_json::from_str(&content)?; + // XXX consider returning string slices instead of cloning a string every time + // it might come with a performance hit and/or memory usage inflation + settings + .get(&setting) + .cloned() + .ok_or_else(|| StorageError::new(ErrorKind::Backend, "Setting not set")) } async fn set_setting(&self, setting: Settings, user: &'_ str, value: &'_ str) -> Result<()> { @@ -637,6 +581,7 @@ impl Storage for FileStorage { path.push("settings"); let path = path.to_path(&self.root_dir); + let temppath = path.with_extension("tmp"); let parent = path.parent().unwrap().to_owned(); if !spawn_blocking(move || parent.is_dir()).await.unwrap() { @@ -645,34 +590,26 @@ impl Storage for FileStorage { let (setting, value) = (setting.to_string(), value.to_string()); - tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking( - #[warn(clippy::unwrap_used, clippy::expect_used, clippy::panic)] // can't panic here - move || { - let file = OpenOptions::new() - .write(true) - .read(true) - .truncate(false) - .create(true) - .open(&path)?; - let mut lock = RwLock::new(file); - log::debug!("Created a lock. Locking for writing..."); - let mut guard = lock.write()?; - - log::debug!("Locked. Writing."); - let mut content = String::new(); - (&mut &*guard).read_to_string(&mut content)?; - let mut settings: HashMap<String, String> = if content.is_empty() { - HashMap::default() - } else { - serde_json::from_str(&content)? - }; - - settings.insert(setting, value); - (&mut *guard).seek(SeekFrom::Start(0))?; - (*guard).set_len(0)?; - (&mut *guard).write_all(serde_json::to_string(&settings)?.as_bytes())?; - Result::Ok(()) + let mut tempfile = OpenOptions::new() + .write(true) + .create_new(true) + .open(&temppath) + .await?; + + let mut settings: HashMap<String, String> = { + let mut f = File::open(&path).await?; + let mut content = String::new(); + f.read_to_string(&mut content).await?; + if content.is_empty() { + HashMap::default() + } else { + serde_json::from_str(&content)? } - )).await?.unwrap() + }; + settings.insert(setting, value); + tempfile.write_all(serde_json::to_string(&settings)?.as_bytes()).await?; + drop(tempfile); + tokio::fs::rename(temppath, path).await?; + Result::Ok(()) } } |