diff options
Diffstat (limited to 'src/database')
-rw-r--r-- | src/database/mod.rs | 150 | ||||
-rw-r--r-- | src/database/redis/mod.rs | 293 |
2 files changed, 316 insertions, 127 deletions
diff --git a/src/database/mod.rs b/src/database/mod.rs index 98c2cae..8579125 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -1,17 +1,17 @@ #![warn(missing_docs)] -use async_trait::async_trait; -use serde::{Serialize,Deserialize}; use crate::indieauth::User; +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; mod redis; pub use crate::database::redis::RedisStorage; #[cfg(test)] -pub use redis::tests::{RedisInstance, get_redis_instance}; +pub use redis::tests::{get_redis_instance, RedisInstance}; #[derive(Serialize, Deserialize, PartialEq, Debug)] pub struct MicropubChannel { pub uid: String, - pub name: String + pub name: String, } #[derive(Debug, Clone, Copy)] @@ -21,14 +21,14 @@ pub enum ErrorKind { JsonParsing, NotFound, BadRequest, - Other + Other, } #[derive(Debug)] pub struct StorageError { msg: String, source: Option<Box<dyn std::error::Error>>, - kind: ErrorKind + kind: ErrorKind, } unsafe impl Send for StorageError {} unsafe impl Sync for StorageError {} @@ -38,14 +38,16 @@ impl From<StorageError> for tide::Response { ErrorKind::BadRequest => 400, ErrorKind::NotFound => 404, _ => 500, - }).body(serde_json::json!({ + }) + .body(serde_json::json!({ "error": match err.kind() { ErrorKind::BadRequest => "invalid_request", ErrorKind::NotFound => "not_found", _ => "database_error" }, "error_description": err - })).build() + })) + .build() } } impl std::error::Error for StorageError { @@ -58,7 +60,7 @@ impl From<serde_json::Error> for StorageError { Self { msg: format!("{}", err), source: Some(Box::new(err)), - kind: ErrorKind::JsonParsing + kind: ErrorKind::JsonParsing, } } } @@ -70,15 +72,18 @@ impl std::fmt::Display for StorageError { ErrorKind::PermissionDenied => write!(f, "permission denied: "), ErrorKind::NotFound => write!(f, "not found: "), ErrorKind::BadRequest => write!(f, "bad request: "), - ErrorKind::Other => write!(f, "generic storage layer error: ") + ErrorKind::Other => write!(f, "generic storage layer error: "), } { Ok(_) => write!(f, "{}", self.msg), - Err(err) => Err(err) + Err(err) => Err(err), } } } impl serde::Serialize for StorageError { - fn serialize<S: serde::Serializer>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error> { + fn serialize<S: serde::Serializer>( + &self, + serializer: S, + ) -> std::result::Result<S::Ok, S::Error> { serializer.serialize_str(&self.to_string()) } } @@ -88,7 +93,7 @@ impl StorageError { StorageError { msg: msg.to_string(), source: None, - kind + kind, } } /// Get the kind of an error. @@ -100,12 +105,11 @@ impl StorageError { } } - /// A special Result type for the Micropub backing storage. pub type Result<T> = std::result::Result<T, StorageError>; /// A storage backend for the Micropub server. -/// +/// /// Implementations should note that all methods listed on this trait MUST be fully atomic /// or lock the database so that write conflicts or reading half-written data should not occur. #[async_trait] @@ -117,21 +121,21 @@ pub trait Storage: Clone + Send + Sync { async fn get_post(&self, url: &str) -> Result<Option<serde_json::Value>>; /// Save a post to the database as an MF2-JSON structure. - /// + /// /// Note that the `post` object MUST have `post["properties"]["uid"][0]` defined. async fn put_post<'a>(&self, post: &'a serde_json::Value) -> Result<()>; /*/// Save a post and add it to the relevant feeds listed in `post["properties"]["channel"]`. - /// - /// Note that the `post` object MUST have `post["properties"]["uid"][0]` defined + /// + /// Note that the `post` object MUST have `post["properties"]["uid"][0]` defined /// and `post["properties"]["channel"]` defined, even if it's empty. async fn put_and_index_post<'a>(&mut self, post: &'a serde_json::Value) -> Result<()>;*/ - + /// Modify a post using an update object as defined in the Micropub spec. - /// + /// /// Note to implementors: the update operation MUST be atomic OR MUST lock the database /// to prevent two clients overwriting each other's changes. - /// + /// /// You can assume concurrent updates will never contradict each other, since that will be dumb. /// The last update always wins. async fn update_post<'a>(&self, url: &'a str, update: serde_json::Value) -> Result<()>; @@ -142,21 +146,27 @@ pub trait Storage: Clone + Send + Sync { /// Fetch a feed at `url` and return a an h-feed object containing /// `limit` posts after a post by url `after`, filtering the content /// in context of a user specified by `user` (or an anonymous user). - /// + /// /// Specifically, private posts that don't include the user in the audience /// will be elided from the feed, and the posts containing location and not /// specifying post["properties"]["location-visibility"][0] == "public" /// will have their location data (but not check-in data) stripped. - /// + /// /// This function is used as an optimization so the client, whatever it is, /// doesn't have to fetch posts, then realize some of them are private, and /// fetch more posts. - /// + /// /// Note for implementors: if you use streams to fetch posts in parallel /// from the database, preferably make this method use a connection pool /// to reduce overhead of creating a database connection per post for /// parallel fetching. - async fn read_feed_with_limit<'a>(&self, url: &'a str, after: &'a Option<String>, limit: usize, user: &'a Option<String>) -> Result<Option<serde_json::Value>>; + async fn read_feed_with_limit<'a>( + &self, + url: &'a str, + after: &'a Option<String>, + limit: usize, + user: &'a Option<String>, + ) -> Result<Option<serde_json::Value>>; /// Deletes a post from the database irreversibly. 'nuff said. Must be idempotent. async fn delete_post<'a>(&self, url: &'a str) -> Result<()>; @@ -170,9 +180,9 @@ pub trait Storage: Clone + Send + Sync { #[cfg(test)] mod tests { - use super::{Storage, MicropubChannel}; - use serde_json::json; use super::redis::tests::get_redis_instance; + use super::{MicropubChannel, Storage}; + use serde_json::json; async fn test_backend_basic_operations<Backend: Storage>(backend: Backend) { let post: serde_json::Value = json!({ @@ -191,23 +201,47 @@ mod tests { backend.put_post(&post).await.unwrap(); if let Ok(Some(returned_post)) = backend.get_post(&key).await { assert!(returned_post.is_object()); - assert_eq!(returned_post["type"].as_array().unwrap().len(), post["type"].as_array().unwrap().len()); - assert_eq!(returned_post["type"].as_array().unwrap(), post["type"].as_array().unwrap()); - let props: &serde_json::Map<String, serde_json::Value> = post["properties"].as_object().unwrap(); + assert_eq!( + returned_post["type"].as_array().unwrap().len(), + post["type"].as_array().unwrap().len() + ); + assert_eq!( + returned_post["type"].as_array().unwrap(), + post["type"].as_array().unwrap() + ); + let props: &serde_json::Map<String, serde_json::Value> = + post["properties"].as_object().unwrap(); for key in props.keys() { - assert_eq!(returned_post["properties"][key].as_array().unwrap(), post["properties"][key].as_array().unwrap()) + assert_eq!( + returned_post["properties"][key].as_array().unwrap(), + post["properties"][key].as_array().unwrap() + ) } - } else { panic!("For some reason the backend did not return the post.") } + } else { + panic!("For some reason the backend did not return the post.") + } // Check the alternative URL - it should return the same post if let Ok(Some(returned_post)) = backend.get_post(&alt_url).await { assert!(returned_post.is_object()); - assert_eq!(returned_post["type"].as_array().unwrap().len(), post["type"].as_array().unwrap().len()); - assert_eq!(returned_post["type"].as_array().unwrap(), post["type"].as_array().unwrap()); - let props: &serde_json::Map<String, serde_json::Value> = post["properties"].as_object().unwrap(); + assert_eq!( + returned_post["type"].as_array().unwrap().len(), + post["type"].as_array().unwrap().len() + ); + assert_eq!( + returned_post["type"].as_array().unwrap(), + post["type"].as_array().unwrap() + ); + let props: &serde_json::Map<String, serde_json::Value> = + post["properties"].as_object().unwrap(); for key in props.keys() { - assert_eq!(returned_post["properties"][key].as_array().unwrap(), post["properties"][key].as_array().unwrap()) + assert_eq!( + returned_post["properties"][key].as_array().unwrap(), + post["properties"][key].as_array().unwrap() + ) } - } else { panic!("For some reason the backend did not return the post.") } + } else { + panic!("For some reason the backend did not return the post.") + } } async fn test_backend_get_channel_list<Backend: Storage>(backend: Backend) { @@ -221,33 +255,61 @@ mod tests { "children": [] }); backend.put_post(&feed).await.unwrap(); - let chans = backend.get_channels(&crate::indieauth::User::new("https://fireburn.ru/", "https://quill.p3k.io/", "create update media")).await.unwrap(); + let chans = backend + .get_channels(&crate::indieauth::User::new( + "https://fireburn.ru/", + "https://quill.p3k.io/", + "create update media", + )) + .await + .unwrap(); assert_eq!(chans.len(), 1); - assert_eq!(chans[0], MicropubChannel { uid: "https://fireburn.ru/feeds/main".to_string(), name: "Main Page".to_string() }); + assert_eq!( + chans[0], + MicropubChannel { + uid: "https://fireburn.ru/feeds/main".to_string(), + name: "Main Page".to_string() + } + ); } async fn test_backend_settings<Backend: Storage>(backend: Backend) { - backend.set_setting("site_name", "https://fireburn.ru/", "Vika's Hideout").await.unwrap(); - assert_eq!(backend.get_setting("site_name", "https://fireburn.ru/").await.unwrap(), "Vika's Hideout"); + backend + .set_setting("site_name", "https://fireburn.ru/", "Vika's Hideout") + .await + .unwrap(); + assert_eq!( + backend + .get_setting("site_name", "https://fireburn.ru/") + .await + .unwrap(), + "Vika's Hideout" + ); } #[async_std::test] async fn test_redis_storage_basic_operations() { let redis_instance = get_redis_instance().await; - let backend = super::RedisStorage::new(redis_instance.uri().to_string()).await.unwrap(); + let backend = super::RedisStorage::new(redis_instance.uri().to_string()) + .await + .unwrap(); test_backend_basic_operations(backend).await; } #[async_std::test] async fn test_redis_storage_channel_list() { let redis_instance = get_redis_instance().await; - let backend = super::RedisStorage::new(redis_instance.uri().to_string()).await.unwrap(); + let backend = super::RedisStorage::new(redis_instance.uri().to_string()) + .await + .unwrap(); test_backend_get_channel_list(backend).await; } #[async_std::test] async fn test_redis_settings() { let redis_instance = get_redis_instance().await; - let backend = super::RedisStorage::new(redis_instance.uri().to_string()).await.unwrap(); + let backend = super::RedisStorage::new(redis_instance.uri().to_string()) + .await + .unwrap(); test_backend_settings(backend).await; } } diff --git a/src/database/redis/mod.rs b/src/database/redis/mod.rs index 352cece..5a6b70d 100644 --- a/src/database/redis/mod.rs +++ b/src/database/redis/mod.rs @@ -1,20 +1,20 @@ use async_trait::async_trait; +use futures::stream; use futures_util::FutureExt; use futures_util::StreamExt; -use futures::stream; use lazy_static::lazy_static; use log::error; +use mobc::Pool; use mobc_redis::redis; use mobc_redis::redis::AsyncCommands; -use serde_json::json; -use mobc::Pool; use mobc_redis::RedisConnectionManager; +use serde_json::json; -use crate::database::{Storage, Result, StorageError, ErrorKind, MicropubChannel}; +use crate::database::{ErrorKind, MicropubChannel, Result, Storage, StorageError}; use crate::indieauth::User; struct RedisScripts { - edit_post: redis::Script + edit_post: redis::Script, } impl From<mobc_redis::redis::RedisError> for StorageError { @@ -22,7 +22,7 @@ impl From<mobc_redis::redis::RedisError> for StorageError { Self { msg: format!("{}", err), source: Some(Box::new(err)), - kind: ErrorKind::Backend + kind: ErrorKind::Backend, } } } @@ -31,7 +31,7 @@ impl From<mobc::Error<mobc_redis::redis::RedisError>> for StorageError { Self { msg: format!("{}", err), source: Some(Box::new(err)), - kind: ErrorKind::Backend + kind: ErrorKind::Backend, } } } @@ -64,17 +64,40 @@ fn filter_post(mut post: serde_json::Value, user: &'_ Option<String>) -> Option< })); } let empty_vec: Vec<serde_json::Value> = vec![]; - let author = post["properties"]["author"].as_array().unwrap_or(&empty_vec).iter().map(|i| i.as_str().unwrap().to_string()); - let visibility = post["properties"]["visibility"][0].as_str().unwrap_or("public"); - let mut audience = author.chain(post["properties"]["audience"].as_array().unwrap_or(&empty_vec).iter().map(|i| i.as_str().unwrap().to_string())); - if (visibility == "private" && !audience.any(|i| Some(i) == *user)) || (visibility == "protected" && user.is_none()) { - return None + let author = post["properties"]["author"] + .as_array() + .unwrap_or(&empty_vec) + .iter() + .map(|i| i.as_str().unwrap().to_string()); + let visibility = post["properties"]["visibility"][0] + .as_str() + .unwrap_or("public"); + let mut audience = author.chain( + post["properties"]["audience"] + .as_array() + .unwrap_or(&empty_vec) + .iter() + .map(|i| i.as_str().unwrap().to_string()), + ); + if (visibility == "private" && !audience.any(|i| Some(i) == *user)) + || (visibility == "protected" && user.is_none()) + { + return None; } if post["properties"]["location"].is_array() { - let location_visibility = post["properties"]["location-visibility"][0].as_str().unwrap_or("private"); - let mut author = post["properties"]["author"].as_array().unwrap_or(&empty_vec).iter().map(|i| i.as_str().unwrap().to_string()); + let location_visibility = post["properties"]["location-visibility"][0] + .as_str() + .unwrap_or("private"); + let mut author = post["properties"]["author"] + .as_array() + .unwrap_or(&empty_vec) + .iter() + .map(|i| i.as_str().unwrap().to_string()); if location_visibility == "private" && !author.any(|i| Some(i) == *user) { - post["properties"].as_object_mut().unwrap().remove("location"); + post["properties"] + .as_object_mut() + .unwrap() + .remove("location"); } } Some(post) @@ -84,12 +107,16 @@ fn filter_post(mut post: serde_json::Value, user: &'_ Option<String>) -> Option< impl Storage for RedisStorage { async fn get_setting<'a>(&self, setting: &'a str, user: &'a str) -> Result<String> { let mut conn = self.redis.get().await?; - Ok(conn.hget::<String, &str, String>(format!("settings_{}", user), setting).await?) + Ok(conn + .hget::<String, &str, String>(format!("settings_{}", user), setting) + .await?) } async fn set_setting<'a>(&self, setting: &'a str, user: &'a str, value: &'a str) -> Result<()> { let mut conn = self.redis.get().await?; - Ok(conn.hset::<String, &str, &str, ()>(format!("settings_{}", user), setting, value).await?) + Ok(conn + .hset::<String, &str, &str, ()>(format!("settings_{}", user), setting, value) + .await?) } async fn delete_post<'a>(&self, url: &'a str) -> Result<()> { @@ -101,41 +128,63 @@ impl Storage for RedisStorage { let mut conn = self.redis.get().await?; Ok(conn.hexists::<&str, &str, bool>(&"posts", url).await?) } - + async fn get_post(&self, url: &str) -> Result<Option<serde_json::Value>> { let mut conn = self.redis.get().await?; - match conn.hget::<&str, &str, Option<String>>(&"posts", url).await? { + match conn + .hget::<&str, &str, Option<String>>(&"posts", url) + .await? + { Some(val) => { let parsed = serde_json::from_str::<serde_json::Value>(&val)?; if let Some(new_url) = parsed["see_other"].as_str() { - match conn.hget::<&str, &str, Option<String>>(&"posts", new_url).await? { + match conn + .hget::<&str, &str, Option<String>>(&"posts", new_url) + .await? + { Some(val) => Ok(Some(serde_json::from_str::<serde_json::Value>(&val)?)), - None => Ok(None) + None => Ok(None), } } else { Ok(Some(parsed)) } - }, - None => Ok(None) + } + None => Ok(None), } } async fn get_channels(&self, user: &User) -> Result<Vec<MicropubChannel>> { let mut conn = self.redis.get().await?; - let channels = conn.smembers::<String, Vec<String>>("channels_".to_string() + user.me.as_str()).await?; + let channels = conn + .smembers::<String, Vec<String>>("channels_".to_string() + user.me.as_str()) + .await?; // TODO: use streams here instead of this weird thing... how did I even write this?! - Ok(futures_util::future::join_all(channels.iter() - .map(|channel| self.get_post(channel) - .map(|result| result.unwrap()) - .map(|post: Option<serde_json::Value>| { - if let Some(post) = post { - Some(MicropubChannel { - uid: post["properties"]["uid"][0].as_str().unwrap().to_string(), - name: post["properties"]["name"][0].as_str().unwrap().to_string() - }) - } else { None } + Ok(futures_util::future::join_all( + channels + .iter() + .map(|channel| { + self.get_post(channel).map(|result| result.unwrap()).map( + |post: Option<serde_json::Value>| { + if let Some(post) = post { + Some(MicropubChannel { + uid: post["properties"]["uid"][0].as_str().unwrap().to_string(), + name: post["properties"]["name"][0] + .as_str() + .unwrap() + .to_string(), + }) + } else { + None + } + }, + ) }) - ).collect::<Vec<_>>()).await.into_iter().filter_map(|chan| chan).collect::<Vec<_>>()) + .collect::<Vec<_>>(), + ) + .await + .into_iter() + .filter_map(|chan| chan) + .collect::<Vec<_>>()) } async fn put_post<'a>(&self, post: &'a serde_json::Value) -> Result<()> { @@ -143,72 +192,122 @@ impl Storage for RedisStorage { let key: &str; match post["properties"]["uid"][0].as_str() { Some(uid) => key = uid, - None => return Err(StorageError::new(ErrorKind::BadRequest, "post doesn't have a UID")) + None => { + return Err(StorageError::new( + ErrorKind::BadRequest, + "post doesn't have a UID", + )) + } } - conn.hset::<&str, &str, String, ()>(&"posts", key, post.to_string()).await?; + conn.hset::<&str, &str, String, ()>(&"posts", key, post.to_string()) + .await?; if post["properties"]["url"].is_array() { - for url in post["properties"]["url"].as_array().unwrap().iter().map(|i| i.as_str().unwrap().to_string()) { + for url in post["properties"]["url"] + .as_array() + .unwrap() + .iter() + .map(|i| i.as_str().unwrap().to_string()) + { if url != key { - conn.hset::<&str, &str, String, ()>(&"posts", &url, json!({"see_other": key}).to_string()).await?; + conn.hset::<&str, &str, String, ()>( + &"posts", + &url, + json!({ "see_other": key }).to_string(), + ) + .await?; } } } - if post["type"].as_array().unwrap().iter().any(|i| i == "h-feed") { + if post["type"] + .as_array() + .unwrap() + .iter() + .any(|i| i == "h-feed") + { // This is a feed. Add it to the channels array if it's not already there. - conn.sadd::<String, &str, ()>("channels_".to_string() + post["properties"]["author"][0].as_str().unwrap(), key).await? + conn.sadd::<String, &str, ()>( + "channels_".to_string() + post["properties"]["author"][0].as_str().unwrap(), + key, + ) + .await? } Ok(()) } - async fn read_feed_with_limit<'a>(&self, url: &'a str, after: &'a Option<String>, limit: usize, user: &'a Option<String>) -> Result<Option<serde_json::Value>> { + async fn read_feed_with_limit<'a>( + &self, + url: &'a str, + after: &'a Option<String>, + limit: usize, + user: &'a Option<String>, + ) -> Result<Option<serde_json::Value>> { let mut conn = self.redis.get().await?; let mut feed; - match conn.hget::<&str, &str, Option<String>>(&"posts", url).await? { + match conn + .hget::<&str, &str, Option<String>>(&"posts", url) + .await? + { Some(post) => feed = serde_json::from_str::<serde_json::Value>(&post)?, - None => return Ok(None) + None => return Ok(None), } if feed["see_other"].is_string() { - match conn.hget::<&str, &str, Option<String>>(&"posts", feed["see_other"].as_str().unwrap()).await? { + match conn + .hget::<&str, &str, Option<String>>(&"posts", feed["see_other"].as_str().unwrap()) + .await? + { Some(post) => feed = serde_json::from_str::<serde_json::Value>(&post)?, - None => return Ok(None) + None => return Ok(None), } } if let Some(post) = filter_post(feed, user) { feed = post } else { - return Err(StorageError::new(ErrorKind::PermissionDenied, "specified user cannot access this post")) + return Err(StorageError::new( + ErrorKind::PermissionDenied, + "specified user cannot access this post", + )); } if feed["children"].is_array() { let children = feed["children"].as_array().unwrap(); let posts_iter: Box<dyn std::iter::Iterator<Item = String> + Send>; // TODO: refactor this to apply the skip on the &mut iterator if let Some(after) = after { - posts_iter = Box::new(children.iter().map(|i| i.as_str().unwrap().to_string()).skip_while(move |i| i != after).skip(1)); + posts_iter = Box::new( + children + .iter() + .map(|i| i.as_str().unwrap().to_string()) + .skip_while(move |i| i != after) + .skip(1), + ); } else { posts_iter = Box::new(children.iter().map(|i| i.as_str().unwrap().to_string())); } let posts = stream::iter(posts_iter) .map(|url| async move { match self.redis.get().await { - Ok(mut conn) => match conn.hget::<&str, &str, Option<String>>("posts", &url).await { - Ok(post) => match post { - Some(post) => match serde_json::from_str::<serde_json::Value>(&post) { - Ok(post) => Some(post), - Err(err) => { - let err = StorageError::from(err); - error!("{}", err); - panic!("{}", err) + Ok(mut conn) => { + match conn.hget::<&str, &str, Option<String>>("posts", &url).await { + Ok(post) => match post { + Some(post) => { + match serde_json::from_str::<serde_json::Value>(&post) { + Ok(post) => Some(post), + Err(err) => { + let err = StorageError::from(err); + error!("{}", err); + panic!("{}", err) + } + } } + // Happens because of a broken link (result of an improper deletion?) + None => None, }, - // Happens because of a broken link (result of an improper deletion?) - None => None, - }, - Err(err) => { - let err = StorageError::from(err); - error!("{}", err); - panic!("{}", err) + Err(err) => { + let err = StorageError::from(err); + error!("{}", err); + panic!("{}", err) + } } - }, + } // TODO: Instead of causing a panic, investigate how can you fail the whole stream // Somehow fuse it maybe? Err(err) => { @@ -227,14 +326,20 @@ impl Storage for RedisStorage { // Hack to unwrap the Option and sieve out broken links // Broken links return None, and Stream::filter_map skips all Nones. .filter_map(|post: Option<serde_json::Value>| async move { post }) - .filter_map(|post| async move { - filter_post(post, user) - }) + .filter_map(|post| async move { filter_post(post, user) }) .take(limit); // TODO: Instead of catching panics, find a way to make the whole stream fail with Result<Vec<serde_json::Value>> - match std::panic::AssertUnwindSafe(posts.collect::<Vec<serde_json::Value>>()).catch_unwind().await { + match std::panic::AssertUnwindSafe(posts.collect::<Vec<serde_json::Value>>()) + .catch_unwind() + .await + { Ok(posts) => feed["children"] = json!(posts), - Err(_) => return Err(StorageError::new(ErrorKind::Other, "Unknown error encountered while assembling feed, see logs for more info")) + Err(_) => { + return Err(StorageError::new( + ErrorKind::Other, + "Unknown error encountered while assembling feed, see logs for more info", + )) + } } } return Ok(Some(feed)); @@ -242,39 +347,56 @@ impl Storage for RedisStorage { async fn update_post<'a>(&self, mut url: &'a str, update: serde_json::Value) -> Result<()> { let mut conn = self.redis.get().await?; - if !conn.hexists::<&str, &str, bool>("posts", url).await.unwrap() { - return Err(StorageError::new(ErrorKind::NotFound, "can't edit a non-existent post")) + if !conn + .hexists::<&str, &str, bool>("posts", url) + .await + .unwrap() + { + return Err(StorageError::new( + ErrorKind::NotFound, + "can't edit a non-existent post", + )); } - let post: serde_json::Value = serde_json::from_str(&conn.hget::<&str, &str, String>("posts", url).await?)?; + let post: serde_json::Value = + serde_json::from_str(&conn.hget::<&str, &str, String>("posts", url).await?)?; if let Some(new_url) = post["see_other"].as_str() { url = new_url } - Ok(SCRIPTS.edit_post.key("posts").arg(url).arg(update.to_string()).invoke_async::<_, ()>(&mut conn as &mut redis::aio::Connection).await?) + Ok(SCRIPTS + .edit_post + .key("posts") + .arg(url) + .arg(update.to_string()) + .invoke_async::<_, ()>(&mut conn as &mut redis::aio::Connection) + .await?) } } - impl RedisStorage { /// Create a new RedisDatabase that will connect to Redis at `redis_uri` to store data. pub async fn new(redis_uri: String) -> Result<Self> { match redis::Client::open(redis_uri) { - Ok(client) => Ok(Self { redis: Pool::builder().max_open(20).build(RedisConnectionManager::new(client)) }), - Err(e) => Err(e.into()) + Ok(client) => Ok(Self { + redis: Pool::builder() + .max_open(20) + .build(RedisConnectionManager::new(client)), + }), + Err(e) => Err(e.into()), } } } #[cfg(test)] pub mod tests { + use mobc_redis::redis; use std::process; use std::time::Duration; - use mobc_redis::redis; pub struct RedisInstance { // We just need to hold on to it so it won't get dropped and remove the socket _tempdir: tempdir::TempDir, uri: String, - child: std::process::Child + child: std::process::Child, } impl Drop for RedisInstance { fn drop(&mut self) { @@ -292,11 +414,14 @@ pub mod tests { let socket = tempdir.path().join("redis.sock"); let redis_child = process::Command::new("redis-server") .current_dir(&tempdir) - .arg("--port").arg("0") - .arg("--unixsocket").arg(&socket) + .arg("--port") + .arg("0") + .arg("--unixsocket") + .arg(&socket) .stdout(process::Stdio::null()) .stderr(process::Stdio::null()) - .spawn().expect("Failed to spawn Redis"); + .spawn() + .expect("Failed to spawn Redis"); println!("redis+unix:///{}", socket.to_str().unwrap()); let uri = format!("redis+unix:///{}", socket.to_str().unwrap()); // There should be a slight delay, we need to wait for Redis to spin up @@ -317,7 +442,9 @@ pub mod tests { } return RedisInstance { - uri, child: redis_child, _tempdir: tempdir - } + uri, + child: redis_child, + _tempdir: tempdir, + }; } } |