diff options
Diffstat (limited to 'src/database/file/mod.rs')
-rw-r--r-- | src/database/file/mod.rs | 317 |
1 files changed, 168 insertions, 149 deletions
diff --git a/src/database/file/mod.rs b/src/database/file/mod.rs index 841f9c0..af07c0c 100644 --- a/src/database/file/mod.rs +++ b/src/database/file/mod.rs @@ -290,32 +290,34 @@ impl Storage for FileStorage { debug!("Opening {:?}", path); // Use exclusively synchronous operations to never transfer a lock over an await boundary #[allow(clippy::unwrap_used)] // JoinHandle captures panics, this closure shouldn't panic - tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { - #[warn(clippy::unwrap_used)] - match File::open(&path) { - Ok(file) => { - let lock = RwLock::new(file); - debug!("Trying to get a lock for file {:?}", &path); - let guard = lock.read()?; - debug!("Lock for {:?} acquired", &path); - let mut content = String::new(); - // Typechecks because OS magic acts on references - // to FDs as if they were behind a mutex - (&mut &*guard).read_to_string(&mut content)?; - debug!("Read {} bytes successfully from {:?}", content.as_bytes().len(), &path); - Ok(Some(serde_json::from_str(&content)?)) - } - Err(err) => { - // We have to special-case in here because - // the function should return Ok(None) on 404 - if err.kind() == IOErrorKind::NotFound { - Ok(None) - } else { - Err(err.into()) + tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking( + #[warn(clippy::unwrap_used, clippy::expect_used, clippy::panic)] // can't panic here + move || { + match File::open(&path) { + Ok(file) => { + let lock = RwLock::new(file); + debug!("Trying to get a lock for file {:?}", &path); + let guard = lock.read()?; + debug!("Lock for {:?} acquired", &path); + let mut content = String::new(); + // Typechecks because OS magic acts on references + // to FDs as if they were behind a mutex + (&mut &*guard).read_to_string(&mut content)?; + debug!("Read {} bytes successfully from {:?}", content.as_bytes().len(), &path); + Ok(Some(serde_json::from_str(&content)?)) + } + Err(err) => { + // We have to special-case in here because + // the function should return Ok(None) on 404 + if err.kind() == IOErrorKind::NotFound { + Ok(None) + } else { + Err(err.into()) + } } } } - })).await?.unwrap() + )).await?.unwrap() } async fn put_post(&self, post: &'_ serde_json::Value, user: &'_ str) -> Result<()> { @@ -330,28 +332,30 @@ impl Storage for FileStorage { let post_path = path.clone(); // Use exclusively synchronous operations to never transfer a lock over an await boundary #[allow(clippy::unwrap_used)] // JoinHandle captures panics, this closure shouldn't panic - tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { - #[warn(clippy::unwrap_used)] - let parent = post_path.parent().expect("Parent for this directory should always exist").to_owned(); - if !parent.is_dir() { - std::fs::create_dir_all(post_path.parent().unwrap())?; - } + tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking( + #[warn(clippy::unwrap_used, clippy::expect_used, clippy::panic)] // can't panic here + move || { + let parent = post_path.parent().expect("Parent for this directory should always exist").to_owned(); + if !parent.is_dir() { + std::fs::create_dir_all(post_path.parent().unwrap())?; + } - let f = OpenOptions::new() - .write(true) - .create_new(true) - .open(&post_path)?; + let f = OpenOptions::new() + .write(true) + .create_new(true) + .open(&post_path)?; - let mut lock = RwLock::new(f); - debug!("Waiting for lock on {:?}", &post_path); - let mut guard = lock.write()?; + let mut lock = RwLock::new(f); + debug!("Waiting for lock on {:?}", &post_path); + let mut guard = lock.write()?; - (*guard).write_all(post_json.as_bytes())?; - (*guard).flush()?; - drop(guard); + (*guard).write_all(post_json.as_bytes())?; + (*guard).flush()?; + drop(guard); - Result::Ok(()) - })).await?.unwrap()?; + Result::Ok(()) + } + )).await?.unwrap()?; if let Some(urls) = post["properties"]["url"].as_array() { for url in urls @@ -413,38 +417,41 @@ impl Storage for FileStorage { let key = key.to_string(); #[allow(clippy::drop_ref)] // using drop() to prevent mistakes here drop(post); - tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { - let file = OpenOptions::new() - .read(true) - .write(true) - .truncate(false) - .create(true) - .open(&path)?; - let mut lock = RwLock::new(file); - debug!("Trying to lock feed {:?}", &path); - let mut guard = lock.write()?; + tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking( + #[warn(clippy::unwrap_used, clippy::expect_used, clippy::panic)] // can't panic here + move || { + let file = OpenOptions::new() + .read(true) + .write(true) + .truncate(false) + .create(true) + .open(&path)?; + let mut lock = RwLock::new(file); + debug!("Trying to lock feed {:?}", &path); + let mut guard = lock.write()?; - let mut content = String::new(); + let mut content = String::new(); - (*guard).read_to_string(&mut content)?; + (*guard).read_to_string(&mut content)?; - let mut channels: Vec<super::MicropubChannel>; - if !content.is_empty() { - channels = serde_json::from_str(&content)?; - } else { - channels = Vec::default(); - } + let mut channels: Vec<super::MicropubChannel>; + if !content.is_empty() { + channels = serde_json::from_str(&content)?; + } else { + channels = Vec::default(); + } - channels.push(super::MicropubChannel { - uid: key.to_string(), - name: channel_name, - }); - (*guard).seek(SeekFrom::Start(0))?; - (*guard).set_len(0)?; - (*guard).write_all(serde_json::to_string(&channels)?.as_bytes())?; + channels.push(super::MicropubChannel { + uid: key.to_string(), + name: channel_name, + }); + (*guard).seek(SeekFrom::Start(0))?; + (*guard).set_len(0)?; + (*guard).write_all(serde_json::to_string(&channels)?.as_bytes())?; - Result::Ok(()) - })).await?.unwrap()?; + Result::Ok(()) + } + )).await?.unwrap()?; } Ok(()) } @@ -454,29 +461,32 @@ impl Storage for FileStorage { #[allow(unused_variables)] let (old_json, new_json) = tokio::time::timeout( Duration::from_secs(IO_TIMEOUT), - spawn_blocking(move || { - let f = OpenOptions::new() - .write(true) - .read(true) - .truncate(false) - .open(&path)?; + spawn_blocking( + #[warn(clippy::unwrap_used, clippy::expect_used, clippy::panic)] // can't panic here + move || { + let f = OpenOptions::new() + .write(true) + .read(true) + .truncate(false) + .open(&path)?; + + let mut lock = RwLock::new(f); + let mut guard = lock.write()?; - let mut lock = RwLock::new(f); - let mut guard = lock.write()?; - - let mut content = String::new(); - (*guard).read_to_string(&mut content)?; - let json: serde_json::Value = serde_json::from_str(&content)?; - // Apply the editing algorithms - let new_json = modify_post(&json, &update)?; + let mut content = String::new(); + (*guard).read_to_string(&mut content)?; + let json: serde_json::Value = serde_json::from_str(&content)?; + // Apply the editing algorithms + let new_json = modify_post(&json, &update)?; - (*guard).set_len(0)?; - (*guard).seek(SeekFrom::Start(0))?; - (*guard).write_all(new_json.to_string().as_bytes())?; - (*guard).flush()?; + (*guard).set_len(0)?; + (*guard).seek(SeekFrom::Start(0))?; + (*guard).write_all(new_json.to_string().as_bytes())?; + (*guard).flush()?; - Result::Ok((json, new_json)) - }) + Result::Ok((json, new_json)) + } + ) ).await?.unwrap()?; // TODO check if URLs changed between old and new JSON Ok(()) @@ -488,30 +498,33 @@ impl Storage for FileStorage { path.push("channels"); let path = path.to_path(&self.root_dir); - tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { - match File::open(&path) { - Ok(f) => { - let lock = RwLock::new(f); - let guard = lock.read()?; - - let mut content = String::new(); - (&mut &*guard).read_to_string(&mut content)?; - // This should not happen, but if it does, handle it gracefully - if content.is_empty() { - return Ok(vec![]); + tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking( + #[warn(clippy::unwrap_used, clippy::expect_used, clippy::panic)] // can't panic here + move || { + match File::open(&path) { + Ok(f) => { + let lock = RwLock::new(f); + let guard = lock.read()?; + + let mut content = String::new(); + (&mut &*guard).read_to_string(&mut content)?; + // This should not happen, but if it does, handle it gracefully + if content.is_empty() { + return Ok(vec![]); + } + let channels: Vec<super::MicropubChannel> = serde_json::from_str(&content)?; + Ok(channels) } - let channels: Vec<super::MicropubChannel> = serde_json::from_str(&content)?; - Ok(channels) - } - Err(e) => { - if e.kind() == IOErrorKind::NotFound { - Ok(vec![]) - } else { - Err(e.into()) + Err(e) => { + if e.kind() == IOErrorKind::NotFound { + Ok(vec![]) + } else { + Err(e.into()) + } } } } - })).await?.unwrap() + )).await?.unwrap() } async fn read_feed_with_limit( @@ -593,21 +606,24 @@ impl Storage for FileStorage { let path = path.to_path(&self.root_dir); log::debug!("Getting settings from {:?}", &path); let setting = setting.to_string(); - tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { - let lock = RwLock::new(File::open(path)?); - let guard = lock.read()?; - - let mut content = String::new(); - (&mut &*guard).read_to_string(&mut content)?; - drop(guard); - let settings: HashMap<String, String> = serde_json::from_str(&content)?; - // XXX consider returning string slices instead of cloning a string every time - // it might come with a performance hit and/or memory usage inflation - settings - .get(&setting) - .cloned() - .ok_or_else(|| StorageError::new(ErrorKind::Backend, "Setting not set")) - })).await?.unwrap() + tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking( + #[warn(clippy::unwrap_used, clippy::expect_used, clippy::panic)] // can't panic here + move || { + let lock = RwLock::new(File::open(path)?); + let guard = lock.read()?; + + let mut content = String::new(); + (&mut &*guard).read_to_string(&mut content)?; + drop(guard); + let settings: HashMap<String, String> = serde_json::from_str(&content)?; + // XXX consider returning string slices instead of cloning a string every time + // it might come with a performance hit and/or memory usage inflation + settings + .get(&setting) + .cloned() + .ok_or_else(|| StorageError::new(ErrorKind::Backend, "Setting not set")) + } + )).await?.unwrap() } async fn set_setting(&self, setting: &'_ str, user: &'_ str, value: &'_ str) -> Result<()> { @@ -625,31 +641,34 @@ impl Storage for FileStorage { let (setting, value) = (setting.to_string(), value.to_string()); - tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { - let file = OpenOptions::new() - .write(true) - .read(true) - .truncate(false) - .create(true) - .open(&path)?; - let mut lock = RwLock::new(file); - log::debug!("Created a lock. Locking for writing..."); - let mut guard = lock.write()?; - - log::debug!("Locked. Writing."); - let mut content = String::new(); - (&mut &*guard).read_to_string(&mut content)?; - let mut settings: HashMap<String, String> = if content.is_empty() { - HashMap::default() - } else { - serde_json::from_str(&content)? - }; - - settings.insert(setting, value); - (&mut *guard).seek(SeekFrom::Start(0))?; - (*guard).set_len(0)?; - (&mut *guard).write_all(serde_json::to_string(&settings)?.as_bytes())?; - Result::Ok(()) - })).await?.unwrap() + tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking( + #[warn(clippy::unwrap_used, clippy::expect_used, clippy::panic)] // can't panic here + move || { + let file = OpenOptions::new() + .write(true) + .read(true) + .truncate(false) + .create(true) + .open(&path)?; + let mut lock = RwLock::new(file); + log::debug!("Created a lock. Locking for writing..."); + let mut guard = lock.write()?; + + log::debug!("Locked. Writing."); + let mut content = String::new(); + (&mut &*guard).read_to_string(&mut content)?; + let mut settings: HashMap<String, String> = if content.is_empty() { + HashMap::default() + } else { + serde_json::from_str(&content)? + }; + + settings.insert(setting, value); + (&mut *guard).seek(SeekFrom::Start(0))?; + (*guard).set_len(0)?; + (&mut *guard).write_all(serde_json::to_string(&settings)?.as_bytes())?; + Result::Ok(()) + } + )).await?.unwrap() } } |