about summary refs log tree commit diff
path: root/src/database
diff options
context:
space:
mode:
Diffstat (limited to 'src/database')
-rw-r--r--src/database/mod.rs150
-rw-r--r--src/database/redis/mod.rs293
2 files changed, 316 insertions, 127 deletions
diff --git a/src/database/mod.rs b/src/database/mod.rs
index 98c2cae..8579125 100644
--- a/src/database/mod.rs
+++ b/src/database/mod.rs
@@ -1,17 +1,17 @@
 #![warn(missing_docs)]
-use async_trait::async_trait;
-use serde::{Serialize,Deserialize};
 use crate::indieauth::User;
+use async_trait::async_trait;
+use serde::{Deserialize, Serialize};
 
 mod redis;
 pub use crate::database::redis::RedisStorage;
 #[cfg(test)]
-pub use redis::tests::{RedisInstance, get_redis_instance};
+pub use redis::tests::{get_redis_instance, RedisInstance};
 
 #[derive(Serialize, Deserialize, PartialEq, Debug)]
 pub struct MicropubChannel {
     pub uid: String,
-    pub name: String
+    pub name: String,
 }
 
 #[derive(Debug, Clone, Copy)]
@@ -21,14 +21,14 @@ pub enum ErrorKind {
     JsonParsing,
     NotFound,
     BadRequest,
-    Other
+    Other,
 }
 
 #[derive(Debug)]
 pub struct StorageError {
     msg: String,
     source: Option<Box<dyn std::error::Error>>,
-    kind: ErrorKind
+    kind: ErrorKind,
 }
 unsafe impl Send for StorageError {}
 unsafe impl Sync for StorageError {}
@@ -38,14 +38,16 @@ impl From<StorageError> for tide::Response {
             ErrorKind::BadRequest => 400,
             ErrorKind::NotFound => 404,
             _ => 500,
-        }).body(serde_json::json!({
+        })
+        .body(serde_json::json!({
             "error": match err.kind() {
                 ErrorKind::BadRequest => "invalid_request",
                 ErrorKind::NotFound => "not_found",
                 _ => "database_error"
             },
             "error_description": err
-        })).build()
+        }))
+        .build()
     }
 }
 impl std::error::Error for StorageError {
@@ -58,7 +60,7 @@ impl From<serde_json::Error> for StorageError {
         Self {
             msg: format!("{}", err),
             source: Some(Box::new(err)),
-            kind: ErrorKind::JsonParsing
+            kind: ErrorKind::JsonParsing,
         }
     }
 }
@@ -70,15 +72,18 @@ impl std::fmt::Display for StorageError {
             ErrorKind::PermissionDenied => write!(f, "permission denied: "),
             ErrorKind::NotFound => write!(f, "not found: "),
             ErrorKind::BadRequest => write!(f, "bad request: "),
-            ErrorKind::Other => write!(f, "generic storage layer error: ")
+            ErrorKind::Other => write!(f, "generic storage layer error: "),
         } {
             Ok(_) => write!(f, "{}", self.msg),
-            Err(err) => Err(err)
+            Err(err) => Err(err),
         }
     }
 }
 impl serde::Serialize for StorageError {
-    fn serialize<S: serde::Serializer>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error> {
+    fn serialize<S: serde::Serializer>(
+        &self,
+        serializer: S,
+    ) -> std::result::Result<S::Ok, S::Error> {
         serializer.serialize_str(&self.to_string())
     }
 }
@@ -88,7 +93,7 @@ impl StorageError {
         StorageError {
             msg: msg.to_string(),
             source: None,
-            kind
+            kind,
         }
     }
     /// Get the kind of an error.
@@ -100,12 +105,11 @@ impl StorageError {
     }
 }
 
-
 /// A special Result type for the Micropub backing storage.
 pub type Result<T> = std::result::Result<T, StorageError>;
 
 /// A storage backend for the Micropub server.
-/// 
+///
 /// Implementations should note that all methods listed on this trait MUST be fully atomic
 /// or lock the database so that write conflicts or reading half-written data should not occur.
 #[async_trait]
@@ -117,21 +121,21 @@ pub trait Storage: Clone + Send + Sync {
     async fn get_post(&self, url: &str) -> Result<Option<serde_json::Value>>;
 
     /// Save a post to the database as an MF2-JSON structure.
-    /// 
+    ///
     /// Note that the `post` object MUST have `post["properties"]["uid"][0]` defined.
     async fn put_post<'a>(&self, post: &'a serde_json::Value) -> Result<()>;
 
     /*/// Save a post and add it to the relevant feeds listed in `post["properties"]["channel"]`.
-    /// 
-    /// Note that the `post` object MUST have `post["properties"]["uid"][0]` defined 
+    ///
+    /// Note that the `post` object MUST have `post["properties"]["uid"][0]` defined
     /// and `post["properties"]["channel"]` defined, even if it's empty.
     async fn put_and_index_post<'a>(&mut self, post: &'a serde_json::Value) -> Result<()>;*/
-    
+
     /// Modify a post using an update object as defined in the Micropub spec.
-    /// 
+    ///
     /// Note to implementors: the update operation MUST be atomic OR MUST lock the database
     /// to prevent two clients overwriting each other's changes.
-    /// 
+    ///
     /// You can assume concurrent updates will never contradict each other, since that will be dumb.
     /// The last update always wins.
     async fn update_post<'a>(&self, url: &'a str, update: serde_json::Value) -> Result<()>;
@@ -142,21 +146,27 @@ pub trait Storage: Clone + Send + Sync {
     /// Fetch a feed at `url` and return a an h-feed object containing
     /// `limit` posts after a post by url `after`, filtering the content
     /// in context of a user specified by `user` (or an anonymous user).
-    /// 
+    ///
     /// Specifically, private posts that don't include the user in the audience
     /// will be elided from the feed, and the posts containing location and not
     /// specifying post["properties"]["location-visibility"][0] == "public"
     /// will have their location data (but not check-in data) stripped.
-    /// 
+    ///
     /// This function is used as an optimization so the client, whatever it is,
     /// doesn't have to fetch posts, then realize some of them are private, and
     /// fetch more posts.
-    /// 
+    ///
     /// Note for implementors: if you use streams to fetch posts in parallel
     /// from the database, preferably make this method use a connection pool
     /// to reduce overhead of creating a database connection per post for
     /// parallel fetching.
-    async fn read_feed_with_limit<'a>(&self, url: &'a str, after: &'a Option<String>, limit: usize, user: &'a Option<String>) -> Result<Option<serde_json::Value>>;
+    async fn read_feed_with_limit<'a>(
+        &self,
+        url: &'a str,
+        after: &'a Option<String>,
+        limit: usize,
+        user: &'a Option<String>,
+    ) -> Result<Option<serde_json::Value>>;
 
     /// Deletes a post from the database irreversibly. 'nuff said. Must be idempotent.
     async fn delete_post<'a>(&self, url: &'a str) -> Result<()>;
@@ -170,9 +180,9 @@ pub trait Storage: Clone + Send + Sync {
 
 #[cfg(test)]
 mod tests {
-    use super::{Storage, MicropubChannel};
-    use serde_json::json;
     use super::redis::tests::get_redis_instance;
+    use super::{MicropubChannel, Storage};
+    use serde_json::json;
 
     async fn test_backend_basic_operations<Backend: Storage>(backend: Backend) {
         let post: serde_json::Value = json!({
@@ -191,23 +201,47 @@ mod tests {
         backend.put_post(&post).await.unwrap();
         if let Ok(Some(returned_post)) = backend.get_post(&key).await {
             assert!(returned_post.is_object());
-            assert_eq!(returned_post["type"].as_array().unwrap().len(), post["type"].as_array().unwrap().len());
-            assert_eq!(returned_post["type"].as_array().unwrap(), post["type"].as_array().unwrap());
-            let props: &serde_json::Map<String, serde_json::Value> = post["properties"].as_object().unwrap();
+            assert_eq!(
+                returned_post["type"].as_array().unwrap().len(),
+                post["type"].as_array().unwrap().len()
+            );
+            assert_eq!(
+                returned_post["type"].as_array().unwrap(),
+                post["type"].as_array().unwrap()
+            );
+            let props: &serde_json::Map<String, serde_json::Value> =
+                post["properties"].as_object().unwrap();
             for key in props.keys() {
-                assert_eq!(returned_post["properties"][key].as_array().unwrap(), post["properties"][key].as_array().unwrap())
+                assert_eq!(
+                    returned_post["properties"][key].as_array().unwrap(),
+                    post["properties"][key].as_array().unwrap()
+                )
             }
-        } else { panic!("For some reason the backend did not return the post.") }
+        } else {
+            panic!("For some reason the backend did not return the post.")
+        }
         // Check the alternative URL - it should return the same post
         if let Ok(Some(returned_post)) = backend.get_post(&alt_url).await {
             assert!(returned_post.is_object());
-            assert_eq!(returned_post["type"].as_array().unwrap().len(), post["type"].as_array().unwrap().len());
-            assert_eq!(returned_post["type"].as_array().unwrap(), post["type"].as_array().unwrap());
-            let props: &serde_json::Map<String, serde_json::Value> = post["properties"].as_object().unwrap();
+            assert_eq!(
+                returned_post["type"].as_array().unwrap().len(),
+                post["type"].as_array().unwrap().len()
+            );
+            assert_eq!(
+                returned_post["type"].as_array().unwrap(),
+                post["type"].as_array().unwrap()
+            );
+            let props: &serde_json::Map<String, serde_json::Value> =
+                post["properties"].as_object().unwrap();
             for key in props.keys() {
-                assert_eq!(returned_post["properties"][key].as_array().unwrap(), post["properties"][key].as_array().unwrap())
+                assert_eq!(
+                    returned_post["properties"][key].as_array().unwrap(),
+                    post["properties"][key].as_array().unwrap()
+                )
             }
-        } else { panic!("For some reason the backend did not return the post.") }
+        } else {
+            panic!("For some reason the backend did not return the post.")
+        }
     }
 
     async fn test_backend_get_channel_list<Backend: Storage>(backend: Backend) {
@@ -221,33 +255,61 @@ mod tests {
             "children": []
         });
         backend.put_post(&feed).await.unwrap();
-        let chans = backend.get_channels(&crate::indieauth::User::new("https://fireburn.ru/", "https://quill.p3k.io/", "create update media")).await.unwrap();
+        let chans = backend
+            .get_channels(&crate::indieauth::User::new(
+                "https://fireburn.ru/",
+                "https://quill.p3k.io/",
+                "create update media",
+            ))
+            .await
+            .unwrap();
         assert_eq!(chans.len(), 1);
-        assert_eq!(chans[0], MicropubChannel { uid: "https://fireburn.ru/feeds/main".to_string(), name: "Main Page".to_string() });
+        assert_eq!(
+            chans[0],
+            MicropubChannel {
+                uid: "https://fireburn.ru/feeds/main".to_string(),
+                name: "Main Page".to_string()
+            }
+        );
     }
 
     async fn test_backend_settings<Backend: Storage>(backend: Backend) {
-        backend.set_setting("site_name", "https://fireburn.ru/", "Vika's Hideout").await.unwrap();
-        assert_eq!(backend.get_setting("site_name", "https://fireburn.ru/").await.unwrap(), "Vika's Hideout");
+        backend
+            .set_setting("site_name", "https://fireburn.ru/", "Vika's Hideout")
+            .await
+            .unwrap();
+        assert_eq!(
+            backend
+                .get_setting("site_name", "https://fireburn.ru/")
+                .await
+                .unwrap(),
+            "Vika's Hideout"
+        );
     }
 
     #[async_std::test]
     async fn test_redis_storage_basic_operations() {
         let redis_instance = get_redis_instance().await;
-        let backend = super::RedisStorage::new(redis_instance.uri().to_string()).await.unwrap();
+        let backend = super::RedisStorage::new(redis_instance.uri().to_string())
+            .await
+            .unwrap();
         test_backend_basic_operations(backend).await;
     }
     #[async_std::test]
     async fn test_redis_storage_channel_list() {
         let redis_instance = get_redis_instance().await;
-        let backend = super::RedisStorage::new(redis_instance.uri().to_string()).await.unwrap();
+        let backend = super::RedisStorage::new(redis_instance.uri().to_string())
+            .await
+            .unwrap();
         test_backend_get_channel_list(backend).await;
     }
 
     #[async_std::test]
     async fn test_redis_settings() {
         let redis_instance = get_redis_instance().await;
-        let backend = super::RedisStorage::new(redis_instance.uri().to_string()).await.unwrap();
+        let backend = super::RedisStorage::new(redis_instance.uri().to_string())
+            .await
+            .unwrap();
         test_backend_settings(backend).await;
     }
 }
diff --git a/src/database/redis/mod.rs b/src/database/redis/mod.rs
index 352cece..5a6b70d 100644
--- a/src/database/redis/mod.rs
+++ b/src/database/redis/mod.rs
@@ -1,20 +1,20 @@
 use async_trait::async_trait;
+use futures::stream;
 use futures_util::FutureExt;
 use futures_util::StreamExt;
-use futures::stream;
 use lazy_static::lazy_static;
 use log::error;
+use mobc::Pool;
 use mobc_redis::redis;
 use mobc_redis::redis::AsyncCommands;
-use serde_json::json;
-use mobc::Pool;
 use mobc_redis::RedisConnectionManager;
+use serde_json::json;
 
-use crate::database::{Storage, Result, StorageError, ErrorKind, MicropubChannel};
+use crate::database::{ErrorKind, MicropubChannel, Result, Storage, StorageError};
 use crate::indieauth::User;
 
 struct RedisScripts {
-    edit_post: redis::Script
+    edit_post: redis::Script,
 }
 
 impl From<mobc_redis::redis::RedisError> for StorageError {
@@ -22,7 +22,7 @@ impl From<mobc_redis::redis::RedisError> for StorageError {
         Self {
             msg: format!("{}", err),
             source: Some(Box::new(err)),
-            kind: ErrorKind::Backend
+            kind: ErrorKind::Backend,
         }
     }
 }
@@ -31,7 +31,7 @@ impl From<mobc::Error<mobc_redis::redis::RedisError>> for StorageError {
         Self {
             msg: format!("{}", err),
             source: Some(Box::new(err)),
-            kind: ErrorKind::Backend
+            kind: ErrorKind::Backend,
         }
     }
 }
@@ -64,17 +64,40 @@ fn filter_post(mut post: serde_json::Value, user: &'_ Option<String>) -> Option<
         }));
     }
     let empty_vec: Vec<serde_json::Value> = vec![];
-    let author = post["properties"]["author"].as_array().unwrap_or(&empty_vec).iter().map(|i| i.as_str().unwrap().to_string());
-    let visibility = post["properties"]["visibility"][0].as_str().unwrap_or("public");
-    let mut audience = author.chain(post["properties"]["audience"].as_array().unwrap_or(&empty_vec).iter().map(|i| i.as_str().unwrap().to_string()));
-    if (visibility == "private" && !audience.any(|i| Some(i) == *user)) || (visibility == "protected" && user.is_none()) {
-        return None
+    let author = post["properties"]["author"]
+        .as_array()
+        .unwrap_or(&empty_vec)
+        .iter()
+        .map(|i| i.as_str().unwrap().to_string());
+    let visibility = post["properties"]["visibility"][0]
+        .as_str()
+        .unwrap_or("public");
+    let mut audience = author.chain(
+        post["properties"]["audience"]
+            .as_array()
+            .unwrap_or(&empty_vec)
+            .iter()
+            .map(|i| i.as_str().unwrap().to_string()),
+    );
+    if (visibility == "private" && !audience.any(|i| Some(i) == *user))
+        || (visibility == "protected" && user.is_none())
+    {
+        return None;
     }
     if post["properties"]["location"].is_array() {
-        let location_visibility = post["properties"]["location-visibility"][0].as_str().unwrap_or("private");
-        let mut author = post["properties"]["author"].as_array().unwrap_or(&empty_vec).iter().map(|i| i.as_str().unwrap().to_string());
+        let location_visibility = post["properties"]["location-visibility"][0]
+            .as_str()
+            .unwrap_or("private");
+        let mut author = post["properties"]["author"]
+            .as_array()
+            .unwrap_or(&empty_vec)
+            .iter()
+            .map(|i| i.as_str().unwrap().to_string());
         if location_visibility == "private" && !author.any(|i| Some(i) == *user) {
-            post["properties"].as_object_mut().unwrap().remove("location");
+            post["properties"]
+                .as_object_mut()
+                .unwrap()
+                .remove("location");
         }
     }
     Some(post)
@@ -84,12 +107,16 @@ fn filter_post(mut post: serde_json::Value, user: &'_ Option<String>) -> Option<
 impl Storage for RedisStorage {
     async fn get_setting<'a>(&self, setting: &'a str, user: &'a str) -> Result<String> {
         let mut conn = self.redis.get().await?;
-        Ok(conn.hget::<String, &str, String>(format!("settings_{}", user), setting).await?)
+        Ok(conn
+            .hget::<String, &str, String>(format!("settings_{}", user), setting)
+            .await?)
     }
 
     async fn set_setting<'a>(&self, setting: &'a str, user: &'a str, value: &'a str) -> Result<()> {
         let mut conn = self.redis.get().await?;
-        Ok(conn.hset::<String, &str, &str, ()>(format!("settings_{}", user), setting, value).await?)
+        Ok(conn
+            .hset::<String, &str, &str, ()>(format!("settings_{}", user), setting, value)
+            .await?)
     }
 
     async fn delete_post<'a>(&self, url: &'a str) -> Result<()> {
@@ -101,41 +128,63 @@ impl Storage for RedisStorage {
         let mut conn = self.redis.get().await?;
         Ok(conn.hexists::<&str, &str, bool>(&"posts", url).await?)
     }
-    
+
     async fn get_post(&self, url: &str) -> Result<Option<serde_json::Value>> {
         let mut conn = self.redis.get().await?;
-        match conn.hget::<&str, &str, Option<String>>(&"posts", url).await? {
+        match conn
+            .hget::<&str, &str, Option<String>>(&"posts", url)
+            .await?
+        {
             Some(val) => {
                 let parsed = serde_json::from_str::<serde_json::Value>(&val)?;
                 if let Some(new_url) = parsed["see_other"].as_str() {
-                    match conn.hget::<&str, &str, Option<String>>(&"posts", new_url).await? {
+                    match conn
+                        .hget::<&str, &str, Option<String>>(&"posts", new_url)
+                        .await?
+                    {
                         Some(val) => Ok(Some(serde_json::from_str::<serde_json::Value>(&val)?)),
-                        None => Ok(None)
+                        None => Ok(None),
                     }
                 } else {
                     Ok(Some(parsed))
                 }
-            },
-            None => Ok(None)
+            }
+            None => Ok(None),
         }
     }
 
     async fn get_channels(&self, user: &User) -> Result<Vec<MicropubChannel>> {
         let mut conn = self.redis.get().await?;
-        let channels = conn.smembers::<String, Vec<String>>("channels_".to_string() + user.me.as_str()).await?;
+        let channels = conn
+            .smembers::<String, Vec<String>>("channels_".to_string() + user.me.as_str())
+            .await?;
         // TODO: use streams here instead of this weird thing... how did I even write this?!
-        Ok(futures_util::future::join_all(channels.iter()
-            .map(|channel| self.get_post(channel)
-                .map(|result| result.unwrap())
-                .map(|post: Option<serde_json::Value>| {
-                    if let Some(post) = post {
-                        Some(MicropubChannel {
-                            uid: post["properties"]["uid"][0].as_str().unwrap().to_string(),
-                            name: post["properties"]["name"][0].as_str().unwrap().to_string()
-                        })
-                    } else { None }
+        Ok(futures_util::future::join_all(
+            channels
+                .iter()
+                .map(|channel| {
+                    self.get_post(channel).map(|result| result.unwrap()).map(
+                        |post: Option<serde_json::Value>| {
+                            if let Some(post) = post {
+                                Some(MicropubChannel {
+                                    uid: post["properties"]["uid"][0].as_str().unwrap().to_string(),
+                                    name: post["properties"]["name"][0]
+                                        .as_str()
+                                        .unwrap()
+                                        .to_string(),
+                                })
+                            } else {
+                                None
+                            }
+                        },
+                    )
                 })
-            ).collect::<Vec<_>>()).await.into_iter().filter_map(|chan| chan).collect::<Vec<_>>())
+                .collect::<Vec<_>>(),
+        )
+        .await
+        .into_iter()
+        .filter_map(|chan| chan)
+        .collect::<Vec<_>>())
     }
 
     async fn put_post<'a>(&self, post: &'a serde_json::Value) -> Result<()> {
@@ -143,72 +192,122 @@ impl Storage for RedisStorage {
         let key: &str;
         match post["properties"]["uid"][0].as_str() {
             Some(uid) => key = uid,
-            None => return Err(StorageError::new(ErrorKind::BadRequest, "post doesn't have a UID"))
+            None => {
+                return Err(StorageError::new(
+                    ErrorKind::BadRequest,
+                    "post doesn't have a UID",
+                ))
+            }
         }
-        conn.hset::<&str, &str, String, ()>(&"posts", key, post.to_string()).await?;
+        conn.hset::<&str, &str, String, ()>(&"posts", key, post.to_string())
+            .await?;
         if post["properties"]["url"].is_array() {
-            for url in post["properties"]["url"].as_array().unwrap().iter().map(|i| i.as_str().unwrap().to_string()) {
+            for url in post["properties"]["url"]
+                .as_array()
+                .unwrap()
+                .iter()
+                .map(|i| i.as_str().unwrap().to_string())
+            {
                 if url != key {
-                    conn.hset::<&str, &str, String, ()>(&"posts", &url, json!({"see_other": key}).to_string()).await?;
+                    conn.hset::<&str, &str, String, ()>(
+                        &"posts",
+                        &url,
+                        json!({ "see_other": key }).to_string(),
+                    )
+                    .await?;
                 }
             }
         }
-        if post["type"].as_array().unwrap().iter().any(|i| i == "h-feed") {
+        if post["type"]
+            .as_array()
+            .unwrap()
+            .iter()
+            .any(|i| i == "h-feed")
+        {
             // This is a feed. Add it to the channels array if it's not already there.
-            conn.sadd::<String, &str, ()>("channels_".to_string() + post["properties"]["author"][0].as_str().unwrap(), key).await?
+            conn.sadd::<String, &str, ()>(
+                "channels_".to_string() + post["properties"]["author"][0].as_str().unwrap(),
+                key,
+            )
+            .await?
         }
         Ok(())
     }
 
-    async fn read_feed_with_limit<'a>(&self, url: &'a str, after: &'a Option<String>, limit: usize, user: &'a Option<String>) -> Result<Option<serde_json::Value>> {
+    async fn read_feed_with_limit<'a>(
+        &self,
+        url: &'a str,
+        after: &'a Option<String>,
+        limit: usize,
+        user: &'a Option<String>,
+    ) -> Result<Option<serde_json::Value>> {
         let mut conn = self.redis.get().await?;
         let mut feed;
-        match conn.hget::<&str, &str, Option<String>>(&"posts", url).await? {
+        match conn
+            .hget::<&str, &str, Option<String>>(&"posts", url)
+            .await?
+        {
             Some(post) => feed = serde_json::from_str::<serde_json::Value>(&post)?,
-            None => return Ok(None)
+            None => return Ok(None),
         }
         if feed["see_other"].is_string() {
-            match conn.hget::<&str, &str, Option<String>>(&"posts", feed["see_other"].as_str().unwrap()).await? {
+            match conn
+                .hget::<&str, &str, Option<String>>(&"posts", feed["see_other"].as_str().unwrap())
+                .await?
+            {
                 Some(post) => feed = serde_json::from_str::<serde_json::Value>(&post)?,
-                None => return Ok(None)
+                None => return Ok(None),
             }
         }
         if let Some(post) = filter_post(feed, user) {
             feed = post
         } else {
-            return Err(StorageError::new(ErrorKind::PermissionDenied, "specified user cannot access this post"))
+            return Err(StorageError::new(
+                ErrorKind::PermissionDenied,
+                "specified user cannot access this post",
+            ));
         }
         if feed["children"].is_array() {
             let children = feed["children"].as_array().unwrap();
             let posts_iter: Box<dyn std::iter::Iterator<Item = String> + Send>;
             // TODO: refactor this to apply the skip on the &mut iterator
             if let Some(after) = after {
-                posts_iter = Box::new(children.iter().map(|i| i.as_str().unwrap().to_string()).skip_while(move |i| i != after).skip(1));
+                posts_iter = Box::new(
+                    children
+                        .iter()
+                        .map(|i| i.as_str().unwrap().to_string())
+                        .skip_while(move |i| i != after)
+                        .skip(1),
+                );
             } else {
                 posts_iter = Box::new(children.iter().map(|i| i.as_str().unwrap().to_string()));
             }
             let posts = stream::iter(posts_iter)
                 .map(|url| async move {
                     match self.redis.get().await {
-                        Ok(mut conn) => match conn.hget::<&str, &str, Option<String>>("posts", &url).await {
-                            Ok(post) => match post {
-                                Some(post) => match serde_json::from_str::<serde_json::Value>(&post) {
-                                    Ok(post) => Some(post),
-                                    Err(err) => {
-                                        let err = StorageError::from(err);
-                                        error!("{}", err);
-                                        panic!("{}", err)
+                        Ok(mut conn) => {
+                            match conn.hget::<&str, &str, Option<String>>("posts", &url).await {
+                                Ok(post) => match post {
+                                    Some(post) => {
+                                        match serde_json::from_str::<serde_json::Value>(&post) {
+                                            Ok(post) => Some(post),
+                                            Err(err) => {
+                                                let err = StorageError::from(err);
+                                                error!("{}", err);
+                                                panic!("{}", err)
+                                            }
+                                        }
                                     }
+                                    // Happens because of a broken link (result of an improper deletion?)
+                                    None => None,
                                 },
-                                // Happens because of a broken link (result of an improper deletion?)
-                                None => None,
-                            },
-                            Err(err) => {
-                                let err = StorageError::from(err);
-                                error!("{}", err);
-                                panic!("{}", err)
+                                Err(err) => {
+                                    let err = StorageError::from(err);
+                                    error!("{}", err);
+                                    panic!("{}", err)
+                                }
                             }
-                        },
+                        }
                         // TODO: Instead of causing a panic, investigate how can you fail the whole stream
                         // Somehow fuse it maybe?
                         Err(err) => {
@@ -227,14 +326,20 @@ impl Storage for RedisStorage {
                 // Hack to unwrap the Option and sieve out broken links
                 // Broken links return None, and Stream::filter_map skips all Nones.
                 .filter_map(|post: Option<serde_json::Value>| async move { post })
-                .filter_map(|post| async move {
-                    filter_post(post, user)
-                })
+                .filter_map(|post| async move { filter_post(post, user) })
                 .take(limit);
             // TODO: Instead of catching panics, find a way to make the whole stream fail with Result<Vec<serde_json::Value>>
-            match std::panic::AssertUnwindSafe(posts.collect::<Vec<serde_json::Value>>()).catch_unwind().await {
+            match std::panic::AssertUnwindSafe(posts.collect::<Vec<serde_json::Value>>())
+                .catch_unwind()
+                .await
+            {
                 Ok(posts) => feed["children"] = json!(posts),
-                Err(_) => return Err(StorageError::new(ErrorKind::Other, "Unknown error encountered while assembling feed, see logs for more info"))
+                Err(_) => {
+                    return Err(StorageError::new(
+                        ErrorKind::Other,
+                        "Unknown error encountered while assembling feed, see logs for more info",
+                    ))
+                }
             }
         }
         return Ok(Some(feed));
@@ -242,39 +347,56 @@ impl Storage for RedisStorage {
 
     async fn update_post<'a>(&self, mut url: &'a str, update: serde_json::Value) -> Result<()> {
         let mut conn = self.redis.get().await?;
-        if !conn.hexists::<&str, &str, bool>("posts", url).await.unwrap() {
-            return Err(StorageError::new(ErrorKind::NotFound, "can't edit a non-existent post"))
+        if !conn
+            .hexists::<&str, &str, bool>("posts", url)
+            .await
+            .unwrap()
+        {
+            return Err(StorageError::new(
+                ErrorKind::NotFound,
+                "can't edit a non-existent post",
+            ));
         }
-        let post: serde_json::Value = serde_json::from_str(&conn.hget::<&str, &str, String>("posts", url).await?)?;
+        let post: serde_json::Value =
+            serde_json::from_str(&conn.hget::<&str, &str, String>("posts", url).await?)?;
         if let Some(new_url) = post["see_other"].as_str() {
             url = new_url
         }
-        Ok(SCRIPTS.edit_post.key("posts").arg(url).arg(update.to_string()).invoke_async::<_, ()>(&mut conn as &mut redis::aio::Connection).await?)
+        Ok(SCRIPTS
+            .edit_post
+            .key("posts")
+            .arg(url)
+            .arg(update.to_string())
+            .invoke_async::<_, ()>(&mut conn as &mut redis::aio::Connection)
+            .await?)
     }
 }
 
-
 impl RedisStorage {
     /// Create a new RedisDatabase that will connect to Redis at `redis_uri` to store data.
     pub async fn new(redis_uri: String) -> Result<Self> {
         match redis::Client::open(redis_uri) {
-            Ok(client) => Ok(Self { redis: Pool::builder().max_open(20).build(RedisConnectionManager::new(client)) }),
-            Err(e) => Err(e.into())
+            Ok(client) => Ok(Self {
+                redis: Pool::builder()
+                    .max_open(20)
+                    .build(RedisConnectionManager::new(client)),
+            }),
+            Err(e) => Err(e.into()),
         }
     }
 }
 
 #[cfg(test)]
 pub mod tests {
+    use mobc_redis::redis;
     use std::process;
     use std::time::Duration;
-    use mobc_redis::redis;
 
     pub struct RedisInstance {
         // We just need to hold on to it so it won't get dropped and remove the socket
         _tempdir: tempdir::TempDir,
         uri: String,
-        child: std::process::Child
+        child: std::process::Child,
     }
     impl Drop for RedisInstance {
         fn drop(&mut self) {
@@ -292,11 +414,14 @@ pub mod tests {
         let socket = tempdir.path().join("redis.sock");
         let redis_child = process::Command::new("redis-server")
             .current_dir(&tempdir)
-            .arg("--port").arg("0")
-            .arg("--unixsocket").arg(&socket)
+            .arg("--port")
+            .arg("0")
+            .arg("--unixsocket")
+            .arg(&socket)
             .stdout(process::Stdio::null())
             .stderr(process::Stdio::null())
-            .spawn().expect("Failed to spawn Redis");
+            .spawn()
+            .expect("Failed to spawn Redis");
         println!("redis+unix:///{}", socket.to_str().unwrap());
         let uri = format!("redis+unix:///{}", socket.to_str().unwrap());
         // There should be a slight delay, we need to wait for Redis to spin up
@@ -317,7 +442,9 @@ pub mod tests {
         }
 
         return RedisInstance {
-            uri, child: redis_child, _tempdir: tempdir
-        }
+            uri,
+            child: redis_child,
+            _tempdir: tempdir,
+        };
     }
 }