diff options
author | Vika <vika@fireburn.ru> | 2021-12-31 06:23:20 +0300 |
---|---|---|
committer | Vika <vika@fireburn.ru> | 2021-12-31 06:23:20 +0300 |
commit | 6cc2bd0856b946415f8f6e6e113dbcff7c48dc72 (patch) | |
tree | 1d79d9abf9ebac08464d2c7542bcd51b5be8be5f /src/database/file/mod.rs | |
parent | 2727e8bdd0122e367a6b155f96829fec26e536f9 (diff) | |
download | kittybox-6cc2bd0856b946415f8f6e6e113dbcff7c48dc72.tar.zst |
FileBackend: don't transfer locks over async boundaries
This may or may not be the cause for the app hanging while waiting for a lock. Now the operations with locks are never performed over an async boundary, excluding any shenanigans that can happen when accidentally leaving a file locked over async boundaries.
Diffstat (limited to 'src/database/file/mod.rs')
-rw-r--r-- | src/database/file/mod.rs | 357 |
1 files changed, 188 insertions, 169 deletions
diff --git a/src/database/file/mod.rs b/src/database/file/mod.rs index ee7d30b..5d6ff47 100644 --- a/src/database/file/mod.rs +++ b/src/database/file/mod.rs @@ -1,7 +1,6 @@ use crate::database::{filter_post, ErrorKind, Result, Storage, StorageError}; -use async_std::fs::{File, OpenOptions}; -use async_std::io::prelude::*; -use async_std::io::ErrorKind as IOErrorKind; +use std::fs::{File, OpenOptions}; +use std::io::{ErrorKind as IOErrorKind, Seek, SeekFrom, Read, Write}; use async_std::task::spawn_blocking; use async_trait::async_trait; use fd_lock::RwLock; @@ -26,11 +25,6 @@ impl From<std::io::Error> for StorageError { } } -async fn get_lockable_file(file: File) -> RwLock<File> { - debug!("Trying to create a file lock"); - spawn_blocking(move || RwLock::new(file)).await -} - // Copied from https://stackoverflow.com/questions/39340924 // This routine is adapted from the *old* Path's `path_relative_from` // function, which works differently from the new `relative_from` function. @@ -257,29 +251,31 @@ impl Storage for FileStorage { async fn get_post(&self, url: &str) -> Result<Option<serde_json::Value>> { let path = url_to_path(&self.root_dir, url); debug!("Opening {:?}", path); - // We have to special-case in here because the function should return Ok(None) on 404 - match File::open(path).await { - Ok(f) => { - let lock = get_lockable_file(f).await; - let guard = lock.read()?; - - let mut content = String::new(); - // Apparently this typechecks. Somehow. - // I can take &mut for a &File because File is not a real type - // The operating system guards it using something - // that looks like a mutex to Rust's runtime, allowing me - // to grab a mutable reference from an immutable reference - (&mut &*guard).read_to_string(&mut content).await?; - Ok(Some(serde_json::from_str(&content)?)) - } - Err(err) => { - if err.kind() == IOErrorKind::NotFound { - Ok(None) - } else { - Err(err.into()) + // Use exclusively synchronous operations to never transfer a lock over an await boundary + spawn_blocking(move || { + match File::open(&path) { + Ok(file) => { + let lock = RwLock::new(file); + debug!("Trying to get a lock for file {:?}", &path); + let guard = lock.read()?; + + let mut content = String::new(); + // Typechecks because OS magic acts on references + // to FDs as if they were behind a mutex + (&mut &*guard).read_to_string(&mut content)?; + Ok(Some(serde_json::from_str(&content)?)) + } + Err(err) => { + // We have to special-case in here because + // the function should return Ok(None) on 404 + if err.kind() == IOErrorKind::NotFound { + Ok(None) + } else { + Err(err.into()) + } } } - } + }).await } async fn put_post<'a>(&self, post: &'a serde_json::Value, user: &'a str) -> Result<()> { @@ -289,24 +285,31 @@ impl Storage for FileStorage { let path = url_to_path(&self.root_dir, key); debug!("Creating {:?}", path); + // To move these into the closure, we have to clone values + let post_json = post.to_string(); + let post_path = path.clone(); + // Use exclusively synchronous operations to never transfer a lock over an await boundary + spawn_blocking(move || { + let parent = post_path.parent().unwrap().to_owned(); + if !parent.is_dir() { + std::fs::create_dir_all(post_path.parent().unwrap())?; + } - let parent = path.parent().unwrap().to_owned(); - if !spawn_blocking(move || parent.is_dir()).await { - async_std::fs::create_dir_all(path.parent().unwrap()).await?; - } + let f = OpenOptions::new() + .write(true) + .create_new(true) + .open(&post_path)?; - let f = OpenOptions::new() - .write(true) - .create_new(true) - .open(&path) - .await?; + let mut lock = RwLock::new(f); + debug!("Waiting for lock on {:?}", &post_path); + let mut guard = lock.write()?; - let mut lock = get_lockable_file(f).await; - let mut guard = lock.write()?; + (*guard).write_all(post_json.as_bytes())?; + (*guard).flush()?; + drop(guard); - (*guard).write_all(post.to_string().as_bytes()).await?; - (*guard).flush().await?; - drop(guard); + Result::Ok(()) + }).await?; if post["properties"]["url"].is_array() { for url in post["properties"]["url"] @@ -319,33 +322,33 @@ impl Storage for FileStorage { let link = url_to_path(&self.root_dir, url); debug!("Creating a symlink at {:?}", link); let orig = path.clone(); - spawn_blocking::<_, Result<()>>(move || { - // We're supposed to have a parent here. - let basedir = link.parent().ok_or_else(|| { - StorageError::new( - ErrorKind::Backend, - "Failed to calculate parent directory when creating a symlink", - ) - })?; - let relative = path_relative_from(&orig, basedir).unwrap(); - println!("{:?} - {:?} = {:?}", &orig, &basedir, &relative); + // We're supposed to have a parent here. + let basedir = link.parent().ok_or_else(|| { + StorageError::new( + ErrorKind::Backend, + "Failed to calculate parent directory when creating a symlink", + ) + })?; + let relative = path_relative_from(&orig, basedir).unwrap(); + println!("{:?} - {:?} = {:?}", &orig, &basedir, &relative); + spawn_blocking(move || { println!("Created a symlink at {:?}", &link); let symlink_result; #[cfg(unix)] { symlink_result = std::os::unix::fs::symlink(relative, link); } - // Wow it even supports windows. Not sure if I need it to run on Windows but oh well + // Wow it even supports windows. Windows is weird #[cfg(windows)] { symlink_result = std::os::windows::fs::symlink_file(relative, link); } - match symlink_result { - Ok(()) => Ok(()), - Err(e) => Err(e.into()), + if let Err(e) = symlink_result { + Err(e.into()) + } else { + Result::Ok(()) } - }) - .await?; + }).await?; } } } @@ -363,65 +366,74 @@ impl Storage for FileStorage { path.push("channels"); let path = path.to_path(&self.root_dir); - let file = OpenOptions::new() - .read(true) - .write(true) - .truncate(false) - .create(true) - .open(&path) - .await?; - let mut lock = get_lockable_file(file).await; - let mut guard = lock.write()?; + let channel_name = post["properties"]["name"][0] + .as_str() + .map(|s| s.to_string()) + .unwrap_or_else(String::default); + let key = key.to_string(); + drop(post); + spawn_blocking(move || { + let file = OpenOptions::new() + .read(true) + .write(true) + .truncate(false) + .create(true) + .open(&path)?; + let mut lock = RwLock::new(file); + debug!("Trying to lock feed {:?}", &path); + let mut guard = lock.write()?; - let mut content = String::new(); - guard.read_to_string(&mut content).await?; - let mut channels: Vec<super::MicropubChannel>; - if !content.is_empty() { - channels = serde_json::from_str(&content)?; - } else { - channels = Vec::default(); - } + let mut content = String::new(); - channels.push(super::MicropubChannel { - uid: key.to_string(), - name: post["properties"]["name"][0] - .as_str() - .map(|s| s.to_string()) - .unwrap_or_else(String::default), - }); - guard.seek(std::io::SeekFrom::Start(0)).await?; - guard.set_len(0).await?; - guard - .write_all(serde_json::to_string(&channels)?.as_bytes()) - .await?; - } + (*guard).read_to_string(&mut content)?; + + let mut channels: Vec<super::MicropubChannel>; + if !content.is_empty() { + channels = serde_json::from_str(&content)?; + } else { + channels = Vec::default(); + } + + channels.push(super::MicropubChannel { + uid: key.to_string(), + name: channel_name, + }); + (*guard).seek(SeekFrom::Start(0))?; + (*guard).set_len(0)?; + (*guard).write_all(serde_json::to_string(&channels)?.as_bytes())?; + Result::Ok(()) + }).await?; + } Ok(()) } async fn update_post<'a>(&self, url: &'a str, update: serde_json::Value) -> Result<()> { let path = url_to_path(&self.root_dir, url); - let f = OpenOptions::new() - .write(true) - .read(true) - .truncate(false) - .open(&path) - .await?; - - let mut lock = get_lockable_file(f).await; - let mut guard = lock.write()?; - - let mut content = String::new(); - guard.read_to_string(&mut content).await?; - let json: serde_json::Value = serde_json::from_str(&content)?; - // Apply the editing algorithms - let new_json = modify_post(&json, &update)?; - - (*guard).set_len(0).await?; - (*guard).seek(std::io::SeekFrom::Start(0)).await?; - (*guard).write_all(new_json.to_string().as_bytes()).await?; - (*guard).flush().await?; - drop(guard); + + let (old_json, new_json) = spawn_blocking(move || { + let f = OpenOptions::new() + .write(true) + .read(true) + .truncate(false) + .open(&path)?; + + let mut lock = RwLock::new(f); + let mut guard = lock.write()?; + + let mut content = String::new(); + (*guard).read_to_string(&mut content)?; + let json: serde_json::Value = serde_json::from_str(&content)?; + // Apply the editing algorithms + let new_json = modify_post(&json, &update)?; + + (*guard).set_len(0)?; + (*guard).seek(SeekFrom::Start(0))?; + (*guard).write_all(new_json.to_string().as_bytes())?; + (*guard).flush()?; + + Result::Ok((json, new_json)) + }).await?; // TODO check if URLs changed between old and new JSON Ok(()) } @@ -432,28 +444,30 @@ impl Storage for FileStorage { path.push("channels"); let path = path.to_path(&self.root_dir); - match File::open(&path).await { - Ok(f) => { - let lock = get_lockable_file(f).await; - let guard = lock.read()?; - - let mut content = String::new(); - (&mut &*guard).read_to_string(&mut content).await?; - // This should not happen, but if it does, let's handle it gracefully instead of failing. - if content.is_empty() { - return Ok(vec![]); + spawn_blocking(move || { + match File::open(&path) { + Ok(f) => { + let lock = RwLock::new(f); + let guard = lock.read()?; + + let mut content = String::new(); + (&mut &*guard).read_to_string(&mut content)?; + // This should not happen, but if it does, handle it gracefully + if content.is_empty() { + return Ok(vec![]); + } + let channels: Vec<super::MicropubChannel> = serde_json::from_str(&content)?; + Ok(channels) } - let channels: Vec<super::MicropubChannel> = serde_json::from_str(&content)?; - Ok(channels) - } - Err(e) => { - if e.kind() == IOErrorKind::NotFound { - Ok(vec![]) - } else { - Err(e.into()) + Err(e) => { + if e.kind() == IOErrorKind::NotFound { + Ok(vec![]) + } else { + Err(e.into()) + } } } - } + }).await } async fn read_feed_with_limit<'a>( @@ -520,6 +534,7 @@ impl Storage for FileStorage { if let Err(e) = async_std::fs::remove_file(path).await { Err(e.into()) } else { + // TODO check for dangling references in the channel list Ok(()) } } @@ -532,19 +547,22 @@ impl Storage for FileStorage { path.push("settings"); let path = path.to_path(&self.root_dir); - let lock = get_lockable_file(File::open(path).await?).await; - let guard = lock.read()?; - - let mut content = String::new(); - (&mut &*guard).read_to_string(&mut content).await?; - drop(guard); - let settings: HashMap<String, String> = serde_json::from_str(&content)?; - // XXX consider returning string slices instead of cloning a string every time - // it might come with a performance hit and/or memory usage inflation - settings - .get(setting) - .cloned() - .ok_or_else(|| StorageError::new(ErrorKind::Backend, "Setting not set")) + let setting = setting.to_string(); + spawn_blocking(move || { + let lock = RwLock::new(File::open(path)?); + let guard = lock.read()?; + + let mut content = String::new(); + (&mut &*guard).read_to_string(&mut content)?; + drop(guard); + let settings: HashMap<String, String> = serde_json::from_str(&content)?; + // XXX consider returning string slices instead of cloning a string every time + // it might come with a performance hit and/or memory usage inflation + settings + .get(&setting) + .cloned() + .ok_or_else(|| StorageError::new(ErrorKind::Backend, "Setting not set")) + }).await } async fn set_setting<'a>(&self, setting: &'a str, user: &'a str, value: &'a str) -> Result<()> { @@ -560,32 +578,33 @@ impl Storage for FileStorage { async_std::fs::create_dir_all(path.parent().unwrap()).await?; } - let file = OpenOptions::new() - .write(true) - .read(true) - .truncate(false) - .create(true) - .open(&path) - .await?; - let mut lock = get_lockable_file(file).await; - log::debug!("Created a lock. Locking for writing..."); - let mut guard = lock.write()?; - - log::debug!("Locked. Writing."); - let mut content = String::new(); - guard.read_to_string(&mut content).await?; - let mut settings: HashMap<String, String> = if content.is_empty() { - HashMap::default() - } else { - serde_json::from_str(&content)? - }; - - settings.insert(setting.to_string(), value.to_string()); - guard.seek(std::io::SeekFrom::Start(0)).await?; - guard.set_len(0).await?; - guard - .write_all(serde_json::to_string(&settings)?.as_bytes()) - .await?; - Ok(()) + let (setting, value) = (setting.to_string(), value.to_string()); + + spawn_blocking(move || { + let file = OpenOptions::new() + .write(true) + .read(true) + .truncate(false) + .create(true) + .open(&path)?; + let mut lock = RwLock::new(file); + log::debug!("Created a lock. Locking for writing..."); + let mut guard = lock.write()?; + + log::debug!("Locked. Writing."); + let mut content = String::new(); + (&mut &*guard).read_to_string(&mut content)?; + let mut settings: HashMap<String, String> = if content.is_empty() { + HashMap::default() + } else { + serde_json::from_str(&content)? + }; + + settings.insert(setting.to_string(), value.to_string()); + (&mut *guard).seek(SeekFrom::Start(0))?; + (&mut *guard).set_len(0)?; + (&mut *guard).write_all(serde_json::to_string(&settings)?.as_bytes())?; + Result::Ok(()) + }).await } } |