about summary refs log tree commit diff
path: root/src/database/redis/mod.rs
diff options
context:
space:
mode:
authorVika <vika@fireburn.ru>2022-05-24 17:18:30 +0300
committerVika <vika@fireburn.ru>2022-05-24 17:18:30 +0300
commit5610a5f0bf1a9df02bd3d5b55e2cdebef2440360 (patch)
tree8394bcf1dcc204043d7adeb8dde2e2746977606e /src/database/redis/mod.rs
parent2f93873122b47e42f7ee1c38f1f04d052a63599c (diff)
downloadkittybox-5610a5f0bf1a9df02bd3d5b55e2cdebef2440360.tar.zst
flake.nix: reorganize
 - Kittybox's source code is moved to a subfolder
   - This improves build caching by Nix since it doesn't take changes
     to other files into account
 - Package and test definitions were spun into separate files
   - This makes my flake.nix much easier to navigate
   - This also makes it somewhat possible to use without flakes (but
     it is still not easy, so use flakes!)
 - Some attributes were moved in compliance with Nix 2.8's changes to
   flake schema
Diffstat (limited to 'src/database/redis/mod.rs')
-rw-r--r--src/database/redis/mod.rs392
1 files changed, 0 insertions, 392 deletions
diff --git a/src/database/redis/mod.rs b/src/database/redis/mod.rs
deleted file mode 100644
index eeaa3f2..0000000
--- a/src/database/redis/mod.rs
+++ /dev/null
@@ -1,392 +0,0 @@
-use async_trait::async_trait;
-use futures::stream;
-use futures_util::FutureExt;
-use futures_util::StreamExt;
-use futures_util::TryStream;
-use futures_util::TryStreamExt;
-use lazy_static::lazy_static;
-use log::error;
-use mobc::Pool;
-use mobc_redis::redis;
-use mobc_redis::redis::AsyncCommands;
-use mobc_redis::RedisConnectionManager;
-use serde_json::json;
-use std::time::Duration;
-
-use crate::database::{ErrorKind, MicropubChannel, Result, Storage, StorageError, filter_post};
-use crate::indieauth::User;
-
-struct RedisScripts {
-    edit_post: redis::Script,
-}
-
-impl From<mobc_redis::redis::RedisError> for StorageError {
-    fn from(err: mobc_redis::redis::RedisError) -> Self {
-        Self {
-            msg: format!("{}", err),
-            source: Some(Box::new(err)),
-            kind: ErrorKind::Backend,
-        }
-    }
-}
-impl From<mobc::Error<mobc_redis::redis::RedisError>> for StorageError {
-    fn from(err: mobc::Error<mobc_redis::redis::RedisError>) -> Self {
-        Self {
-            msg: format!("{}", err),
-            source: Some(Box::new(err)),
-            kind: ErrorKind::Backend,
-        }
-    }
-}
-
-lazy_static! {
-    static ref SCRIPTS: RedisScripts = RedisScripts {
-        edit_post: redis::Script::new(include_str!("./edit_post.lua"))
-    };
-}
-
-#[derive(Clone)]
-pub struct RedisStorage {
-    // note to future Vika:
-    // mobc::Pool is actually a fancy name for an Arc
-    // around a shared connection pool with a manager
-    // which makes it safe to implement [`Clone`] and
-    // not worry about new pools being suddenly made
-    //
-    // stop worrying and start coding, you dum-dum
-    redis: mobc::Pool<RedisConnectionManager>,
-}
-
-#[async_trait]
-impl Storage for RedisStorage {
-    async fn get_setting<'a>(&self, setting: &'a str, user: &'a str) -> Result<String> {
-        let mut conn = self.redis.get().await.map_err(|e| StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e)))?;
-        Ok(conn
-            .hget::<String, &str, String>(format!("settings_{}", user), setting)
-            .await?)
-    }
-
-    async fn set_setting<'a>(&self, setting: &'a str, user: &'a str, value: &'a str) -> Result<()> {
-        let mut conn = self.redis.get().await.map_err(|e| StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e)))?;
-        Ok(conn
-            .hset::<String, &str, &str, ()>(format!("settings_{}", user), setting, value)
-            .await?)
-    }
-
-    async fn delete_post<'a>(&self, url: &'a str) -> Result<()> {
-        let mut conn = self.redis.get().await.map_err(|e| StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e)))?;
-        Ok(conn.hdel::<&str, &str, ()>("posts", url).await?)
-    }
-
-    async fn post_exists(&self, url: &str) -> Result<bool> {
-        let mut conn = self.redis.get().await.map_err(|e| StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e)))?;
-        Ok(conn.hexists::<&str, &str, bool>("posts", url).await?)
-    }
-
-    async fn get_post(&self, url: &str) -> Result<Option<serde_json::Value>> {
-        let mut conn = self.redis.get().await.map_err(|e| StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e)))?;
-        match conn
-            .hget::<&str, &str, Option<String>>("posts", url)
-            .await?
-        {
-            Some(val) => {
-                let parsed = serde_json::from_str::<serde_json::Value>(&val)?;
-                if let Some(new_url) = parsed["see_other"].as_str() {
-                    match conn
-                        .hget::<&str, &str, Option<String>>("posts", new_url)
-                        .await?
-                    {
-                        Some(val) => Ok(Some(serde_json::from_str::<serde_json::Value>(&val)?)),
-                        None => Ok(None),
-                    }
-                } else {
-                    Ok(Some(parsed))
-                }
-            }
-            None => Ok(None),
-        }
-    }
-
-    async fn get_channels(&self, user: &User) -> Result<Vec<MicropubChannel>> {
-        let mut conn = self.redis.get().await.map_err(|e| StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e)))?;
-        let channels = conn
-            .smembers::<String, Vec<String>>("channels_".to_string() + user.me.as_str())
-            .await?;
-        // TODO: use streams here instead of this weird thing... how did I even write this?!
-        Ok(futures_util::future::join_all(
-            channels
-                .iter()
-                .map(|channel| {
-                    self.get_post(channel).map(|result| result.unwrap()).map(
-                        |post: Option<serde_json::Value>| {
-                            post.map(|post| MicropubChannel {
-                                uid: post["properties"]["uid"][0].as_str().unwrap().to_string(),
-                                name: post["properties"]["name"][0].as_str().unwrap().to_string(),
-                            })
-                        },
-                    )
-                })
-                .collect::<Vec<_>>(),
-        )
-        .await
-        .into_iter()
-        .flatten()
-        .collect::<Vec<_>>())
-    }
-
-    async fn put_post<'a>(&self, post: &'a serde_json::Value, user: &'a str) -> Result<()> {
-        let mut conn = self.redis.get().await.map_err(|e| StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e)))?;
-        let key: &str;
-        match post["properties"]["uid"][0].as_str() {
-            Some(uid) => key = uid,
-            None => {
-                return Err(StorageError::new(
-                    ErrorKind::BadRequest,
-                    "post doesn't have a UID",
-                ))
-            }
-        }
-        conn.hset::<&str, &str, String, ()>("posts", key, post.to_string())
-            .await?;
-        if post["properties"]["url"].is_array() {
-            for url in post["properties"]["url"]
-                .as_array()
-                .unwrap()
-                .iter()
-                .map(|i| i.as_str().unwrap().to_string())
-            {
-                if url != key && url.starts_with(user) {
-                    conn.hset::<&str, &str, String, ()>(
-                        "posts",
-                        &url,
-                        json!({ "see_other": key }).to_string(),
-                    )
-                    .await?;
-                }
-            }
-        }
-        if post["type"]
-            .as_array()
-            .unwrap()
-            .iter()
-            .any(|i| i == "h-feed")
-        {
-            // This is a feed. Add it to the channels array if it's not already there.
-            conn.sadd::<String, &str, ()>(
-                "channels_".to_string() + post["properties"]["author"][0].as_str().unwrap(),
-                key,
-            )
-            .await?
-        }
-        Ok(())
-    }
-
-    async fn read_feed_with_limit<'a>(
-        &self,
-        url: &'a str,
-        after: &'a Option<String>,
-        limit: usize,
-        user: &'a Option<String>,
-    ) -> Result<Option<serde_json::Value>> {
-        let mut conn = self.redis.get().await?;
-        let mut feed;
-        match conn
-            .hget::<&str, &str, Option<String>>("posts", url)
-            .await
-            .map_err(|e| StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e)))?
-        {
-            Some(post) => feed = serde_json::from_str::<serde_json::Value>(&post)?,
-            None => return Ok(None),
-        }
-        if feed["see_other"].is_string() {
-            match conn
-                .hget::<&str, &str, Option<String>>("posts", feed["see_other"].as_str().unwrap())
-                .await?
-            {
-                Some(post) => feed = serde_json::from_str::<serde_json::Value>(&post)?,
-                None => return Ok(None),
-            }
-        }
-        if let Some(post) = filter_post(feed, user) {
-            feed = post
-        } else {
-            return Err(StorageError::new(
-                ErrorKind::PermissionDenied,
-                "specified user cannot access this post",
-            ));
-        }
-        if feed["children"].is_array() {
-            let children = feed["children"].as_array().unwrap();
-            let mut posts_iter = children.iter().map(|i| i.as_str().unwrap().to_string());
-            if after.is_some() {
-                loop {
-                    let i = posts_iter.next();
-                    if &i == after {
-                        break;
-                    }
-                }
-            }
-            async fn fetch_post_for_feed(url: String) -> Option<serde_json::Value> {
-                return Some(serde_json::json!({}));
-            }
-            let posts = stream::iter(posts_iter)
-                .map(|url: String| async move {
-                    return Ok(fetch_post_for_feed(url).await);
-                    /*match self.redis.get().await {
-                        Ok(mut conn) => {
-                            match conn.hget::<&str, &str, Option<String>>("posts", &url).await {
-                                Ok(post) => match post {
-                                    Some(post) => {
-                                        Ok(Some(serde_json::from_str(&post)?))
-                                    }
-                                    // Happens because of a broken link (result of an improper deletion?)
-                                    None => Ok(None),
-                                },
-                                Err(err) => Err(StorageError::with_source(ErrorKind::Backend, "Error executing a Redis command", Box::new(err)))
-                            }
-                        }
-                        Err(err) => Err(StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(err)))
-                    }*/
-                })
-                // TODO: determine the optimal value for this buffer
-                // It will probably depend on how often can you encounter a private post on the page
-                // It shouldn't be too large, or we'll start fetching too many posts from the database
-                // It MUST NOT be larger than the typical page size
-                // It MUST NOT be a significant amount of the connection pool size
-                //.buffered(std::cmp::min(3, limit))
-                // Hack to unwrap the Option and sieve out broken links
-                // Broken links return None, and Stream::filter_map skips all Nones.
-                // I wonder if one can use try_flatten() here somehow akin to iters
-                .try_filter_map(|post| async move { Ok(post) })
-                .try_filter_map(|post| async move {
-                    Ok(filter_post(post, user))
-                })
-                .take(limit);
-            match posts.try_collect::<Vec<serde_json::Value>>().await {
-                Ok(posts) => feed["children"] = json!(posts),
-                Err(err) => {
-                    let e = StorageError::with_source(
-                        ErrorKind::Other,
-                        "An error was encountered while processing the feed",
-                        Box::new(err)
-                    );
-                    error!("Error while assembling feed: {}", e);
-                    return Err(e);
-                }
-            }
-        }
-        return Ok(Some(feed));
-    }
-
-    async fn update_post<'a>(&self, mut url: &'a str, update: serde_json::Value) -> Result<()> {
-        let mut conn = self.redis.get().await.map_err(|e| StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e)))?;
-        if !conn
-            .hexists::<&str, &str, bool>("posts", url)
-            .await
-            .unwrap()
-        {
-            return Err(StorageError::new(
-                ErrorKind::NotFound,
-                "can't edit a non-existent post",
-            ));
-        }
-        let post: serde_json::Value =
-            serde_json::from_str(&conn.hget::<&str, &str, String>("posts", url).await?)?;
-        if let Some(new_url) = post["see_other"].as_str() {
-            url = new_url
-        }
-        Ok(SCRIPTS
-            .edit_post
-            .key("posts")
-            .arg(url)
-            .arg(update.to_string())
-            .invoke_async::<_, ()>(&mut conn as &mut redis::aio::Connection)
-            .await?)
-    }
-}
-
-impl RedisStorage {
-    /// Create a new RedisDatabase that will connect to Redis at `redis_uri` to store data.
-    pub async fn new(redis_uri: String) -> Result<Self> {
-        match redis::Client::open(redis_uri) {
-            Ok(client) => Ok(Self {
-                redis: Pool::builder()
-                    .max_open(20)
-                    .max_idle(5)
-                    .get_timeout(Some(Duration::from_secs(3)))
-                    .max_lifetime(Some(Duration::from_secs(120)))
-                    .build(RedisConnectionManager::new(client)),
-            }),
-            Err(e) => Err(e.into()),
-        }
-    }
-
-    pub async fn conn(&self) -> Result<mobc::Connection<mobc_redis::RedisConnectionManager>> {
-        self.redis.get().await.map_err(|e| StorageError::with_source(
-            ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e)
-        ))
-    }
-}
-
-#[cfg(test)]
-pub mod tests {
-    use mobc_redis::redis;
-    use std::process;
-    use std::time::Duration;
-
-    pub struct RedisInstance {
-        // We just need to hold on to it so it won't get dropped and remove the socket
-        _tempdir: tempdir::TempDir,
-        uri: String,
-        child: std::process::Child,
-    }
-    impl Drop for RedisInstance {
-        fn drop(&mut self) {
-            self.child.kill().expect("Failed to kill the child!");
-        }
-    }
-    impl RedisInstance {
-        pub fn uri(&self) -> &str {
-            &self.uri
-        }
-    }
-
-    pub async fn get_redis_instance() -> RedisInstance {
-        let tempdir = tempdir::TempDir::new("redis").expect("failed to create tempdir");
-        let socket = tempdir.path().join("redis.sock");
-        let redis_child = process::Command::new("redis-server")
-            .current_dir(&tempdir)
-            .arg("--port")
-            .arg("0")
-            .arg("--unixsocket")
-            .arg(&socket)
-            .stdout(process::Stdio::null())
-            .stderr(process::Stdio::null())
-            .spawn()
-            .expect("Failed to spawn Redis");
-        println!("redis+unix:///{}", socket.to_str().unwrap());
-        let uri = format!("redis+unix:///{}", socket.to_str().unwrap());
-        // There should be a slight delay, we need to wait for Redis to spin up
-        let client = redis::Client::open(uri.clone()).unwrap();
-        let millisecond = Duration::from_millis(1);
-        let mut retries: usize = 0;
-        const MAX_RETRIES: usize = 60 * 1000/*ms*/;
-        while let Err(err) = client.get_connection() {
-            if err.is_connection_refusal() {
-                async_std::task::sleep(millisecond).await;
-                retries += 1;
-                if retries > MAX_RETRIES {
-                    panic!("Timeout waiting for Redis, last error: {}", err);
-                }
-            } else {
-                panic!("Could not connect: {}", err);
-            }
-        }
-
-        RedisInstance {
-            uri,
-            child: redis_child,
-            _tempdir: tempdir,
-        }
-    }
-}