diff options
Diffstat (limited to 'src/database/file')
-rw-r--r-- | src/database/file/mod.rs | 159 |
1 files changed, 97 insertions, 62 deletions
diff --git a/src/database/file/mod.rs b/src/database/file/mod.rs index b9f27b2..5c93beb 100644 --- a/src/database/file/mod.rs +++ b/src/database/file/mod.rs @@ -1,6 +1,6 @@ //#![warn(clippy::unwrap_used)] -use crate::database::{ErrorKind, Result, settings, Storage, StorageError}; -use crate::micropub::{MicropubUpdate, MicropubPropertyDeletion}; +use crate::database::{settings, ErrorKind, Result, Storage, StorageError}; +use crate::micropub::{MicropubPropertyDeletion, MicropubUpdate}; use futures::{stream, StreamExt, TryStreamExt}; use kittybox_util::MentionType; use serde_json::json; @@ -247,7 +247,9 @@ async fn hydrate_author<S: Storage>( impl Storage for FileStorage { async fn new(url: &'_ url::Url) -> Result<Self> { // TODO: sanity check - Ok(Self { root_dir: PathBuf::from(url.path()) }) + Ok(Self { + root_dir: PathBuf::from(url.path()), + }) } #[tracing::instrument(skip(self))] async fn categories(&self, url: &str) -> Result<Vec<String>> { @@ -259,7 +261,7 @@ impl Storage for FileStorage { // perform well. Err(std::io::Error::new( std::io::ErrorKind::Unsupported, - "?q=category queries are not implemented due to resource constraints" + "?q=category queries are not implemented due to resource constraints", ))? } @@ -340,7 +342,10 @@ impl Storage for FileStorage { file.sync_all().await?; drop(file); tokio::fs::rename(&tempfile, &path).await?; - tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?; + tokio::fs::File::open(path.parent().unwrap()) + .await? + .sync_all() + .await?; if let Some(urls) = post["properties"]["url"].as_array() { for url in urls.iter().map(|i| i.as_str().unwrap()) { @@ -350,8 +355,8 @@ impl Storage for FileStorage { "{}{}", url.host_str().unwrap(), url.port() - .map(|port| format!(":{}", port)) - .unwrap_or_default() + .map(|port| format!(":{}", port)) + .unwrap_or_default() ) }; if url != key && url_domain == user.authority() { @@ -410,26 +415,24 @@ impl Storage for FileStorage { .create(false) .open(&path) .await - { - Err(err) if err.kind() == std::io::ErrorKind::NotFound => { - Vec::default() - } - Err(err) => { - // Propagate the error upwards - return Err(err.into()); - } - Ok(mut file) => { - let mut content = String::new(); - file.read_to_string(&mut content).await?; - drop(file); - - if !content.is_empty() { - serde_json::from_str(&content)? - } else { - Vec::default() - } - } - } + { + Err(err) if err.kind() == std::io::ErrorKind::NotFound => Vec::default(), + Err(err) => { + // Propagate the error upwards + return Err(err.into()); + } + Ok(mut file) => { + let mut content = String::new(); + file.read_to_string(&mut content).await?; + drop(file); + + if !content.is_empty() { + serde_json::from_str(&content)? + } else { + Vec::default() + } + } + } }; channels.push(super::MicropubChannel { @@ -444,7 +447,10 @@ impl Storage for FileStorage { tempfile.sync_all().await?; drop(tempfile); tokio::fs::rename(tempfilename, &path).await?; - tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?; + tokio::fs::File::open(path.parent().unwrap()) + .await? + .sync_all() + .await?; } Ok(()) } @@ -476,7 +482,10 @@ impl Storage for FileStorage { temp.sync_all().await?; drop(temp); tokio::fs::rename(tempfilename, &path).await?; - tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?; + tokio::fs::File::open(path.parent().unwrap()) + .await? + .sync_all() + .await?; (json, new_json) }; @@ -486,7 +495,9 @@ impl Storage for FileStorage { #[tracing::instrument(skip(self, f), fields(f = std::any::type_name::<F>()))] async fn update_with<F: FnOnce(&mut serde_json::Value) + Send>( - &self, url: &str, f: F + &self, + url: &str, + f: F, ) -> Result<(serde_json::Value, serde_json::Value)> { todo!("update_with is not yet implemented due to special requirements of the file backend") } @@ -526,25 +537,25 @@ impl Storage for FileStorage { url: &'_ str, cursor: Option<&'_ str>, limit: usize, - user: Option<&url::Url> + user: Option<&url::Url>, ) -> Result<Option<(serde_json::Value, Option<String>)>> { #[allow(deprecated)] - Ok(self.read_feed_with_limit( - url, - cursor, - limit, - user - ).await? + Ok(self + .read_feed_with_limit(url, cursor, limit, user) + .await? .map(|feed| { - tracing::debug!("Feed: {:#}", serde_json::Value::Array( - feed["children"] - .as_array() - .map(|v| v.as_slice()) - .unwrap_or_default() - .iter() - .map(|mf2| mf2["properties"]["uid"][0].clone()) - .collect::<Vec<_>>() - )); + tracing::debug!( + "Feed: {:#}", + serde_json::Value::Array( + feed["children"] + .as_array() + .map(|v| v.as_slice()) + .unwrap_or_default() + .iter() + .map(|mf2| mf2["properties"]["uid"][0].clone()) + .collect::<Vec<_>>() + ) + ); let cursor: Option<String> = feed["children"] .as_array() .map(|v| v.as_slice()) @@ -553,8 +564,7 @@ impl Storage for FileStorage { .map(|v| v["properties"]["uid"][0].as_str().unwrap().to_owned()); tracing::debug!("Extracted the cursor: {:?}", cursor); (feed, cursor) - }) - ) + })) } #[tracing::instrument(skip(self))] @@ -574,9 +584,12 @@ impl Storage for FileStorage { let children: Vec<serde_json::Value> = match feed["children"].take() { serde_json::Value::Array(children) => children, // We've already checked it's an array - _ => unreachable!() + _ => unreachable!(), }; - tracing::debug!("Full children array: {:#}", serde_json::Value::Array(children.clone())); + tracing::debug!( + "Full children array: {:#}", + serde_json::Value::Array(children.clone()) + ); let mut posts_iter = children .into_iter() .map(|s: serde_json::Value| s.as_str().unwrap().to_string()); @@ -589,7 +602,7 @@ impl Storage for FileStorage { // incredibly long feeds. if let Some(after) = after { tokio::task::block_in_place(|| { - for s in posts_iter.by_ref() { + for s in posts_iter.by_ref() { if s == after { break; } @@ -655,12 +668,19 @@ impl Storage for FileStorage { let settings: HashMap<&str, serde_json::Value> = serde_json::from_str(&content)?; match settings.get(S::ID) { Some(value) => Ok(serde_json::from_value::<S>(value.clone())?), - None => Err(StorageError::from_static(ErrorKind::Backend, "Setting not set")) + None => Err(StorageError::from_static( + ErrorKind::Backend, + "Setting not set", + )), } } #[tracing::instrument(skip(self))] - async fn set_setting<S: settings::Setting>(&self, user: &url::Url, value: S::Data) -> Result<()> { + async fn set_setting<S: settings::Setting>( + &self, + user: &url::Url, + value: S::Data, + ) -> Result<()> { let mut path = relative_path::RelativePathBuf::new(); path.push(user.authority()); path.push("settings"); @@ -704,20 +724,28 @@ impl Storage for FileStorage { tempfile.sync_all().await?; drop(tempfile); tokio::fs::rename(temppath, &path).await?; - tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?; + tokio::fs::File::open(path.parent().unwrap()) + .await? + .sync_all() + .await?; Ok(()) } #[tracing::instrument(skip(self))] - async fn add_or_update_webmention(&self, target: &str, mention_type: MentionType, mention: serde_json::Value) -> Result<()> { + async fn add_or_update_webmention( + &self, + target: &str, + mention_type: MentionType, + mention: serde_json::Value, + ) -> Result<()> { let path = url_to_path(&self.root_dir, target); let tempfilename = path.with_extension("tmp"); let mut temp = OpenOptions::new() - .write(true) - .create_new(true) - .open(&tempfilename) - .await?; + .write(true) + .create_new(true) + .open(&tempfilename) + .await?; let mut file = OpenOptions::new().read(true).open(&path).await?; let mut post: serde_json::Value = { @@ -752,13 +780,20 @@ impl Storage for FileStorage { temp.sync_all().await?; drop(temp); tokio::fs::rename(tempfilename, &path).await?; - tokio::fs::File::open(path.parent().unwrap()).await?.sync_all().await?; + tokio::fs::File::open(path.parent().unwrap()) + .await? + .sync_all() + .await?; Ok(()) } - async fn all_posts<'this>(&'this self, user: &url::Url) -> Result<impl futures::Stream<Item = serde_json::Value> + Send + 'this> { + async fn all_posts<'this>( + &'this self, + user: &url::Url, + ) -> Result<impl futures::Stream<Item = serde_json::Value> + Send + 'this> { todo!(); - #[allow(unreachable_code)] Ok(futures::stream::empty()) // for type inference + #[allow(unreachable_code)] + Ok(futures::stream::empty()) // for type inference } } |