From dc546640d066dfb7c72d204f3758b103bd415d2c Mon Sep 17 00:00:00 2001 From: Vika Date: Wed, 5 May 2021 16:29:11 +0300 Subject: Added a connection pool to the RedisDatabase --- src/database/redis/mod.rs | 44 +++++++++++++++++++++++++++----------------- 1 file changed, 27 insertions(+), 17 deletions(-) (limited to 'src') diff --git a/src/database/redis/mod.rs b/src/database/redis/mod.rs index b709125..eded583 100644 --- a/src/database/redis/mod.rs +++ b/src/database/redis/mod.rs @@ -4,8 +4,11 @@ use futures_util::StreamExt; use futures::stream; use lazy_static::lazy_static; use log::error; -use redis::AsyncCommands; +use mobc_redis::redis; +use mobc_redis::redis::AsyncCommands; use serde_json::json; +use mobc::Pool; +use mobc_redis::RedisConnectionManager; use crate::database::{Storage, Result, StorageError, ErrorKind, MicropubChannel}; use crate::indieauth::User; @@ -14,8 +17,17 @@ struct RedisScripts { edit_post: redis::Script } -impl From for StorageError { - fn from(err: redis::RedisError) -> Self { +impl From for StorageError { + fn from(err: mobc_redis::redis::RedisError) -> Self { + Self { + msg: format!("{}", err), + source: Some(Box::new(err)), + kind: ErrorKind::Backend + } + } +} +impl From> for StorageError { + fn from(err: mobc::Error) -> Self { Self { msg: format!("{}", err), source: Some(Box::new(err)), @@ -32,8 +44,7 @@ lazy_static! { #[derive(Clone)] pub struct RedisStorage { - // TODO: use mobc crate to create a connection pool and reuse connections for efficiency - redis: redis::Client, + redis: mobc::Pool, } fn filter_post(mut post: serde_json::Value, user: &'_ Option) -> Option { @@ -65,17 +76,17 @@ fn filter_post(mut post: serde_json::Value, user: &'_ Option) -> Option< #[async_trait] impl Storage for RedisStorage { async fn delete_post<'a>(&self, url: &'a str) -> Result<()> { - let mut conn = self.redis.get_async_std_connection().await?; + let mut conn = self.redis.get().await?; Ok(conn.hdel::<&str, &str, ()>("posts", url).await?) } async fn post_exists(&self, url: &str) -> Result { - let mut conn = self.redis.get_async_std_connection().await?; + let mut conn = self.redis.get().await?; Ok(conn.hexists::<&str, &str, bool>(&"posts", url).await?) } async fn get_post(&self, url: &str) -> Result> { - let mut conn = self.redis.get_async_std_connection().await?; + let mut conn = self.redis.get().await?; match conn.hget::<&str, &str, Option>(&"posts", url).await? { Some(val) => { let parsed = serde_json::from_str::(&val)?; @@ -93,7 +104,7 @@ impl Storage for RedisStorage { } async fn get_channels(&self, user: &User) -> Result> { - let mut conn = self.redis.get_async_std_connection().await?; + let mut conn = self.redis.get().await?; let channels = conn.smembers::>("channels_".to_string() + user.me.as_str()).await?; // TODO: use streams here instead of this weird thing... how did I even write this?! Ok(futures_util::future::join_all(channels.iter() @@ -111,7 +122,7 @@ impl Storage for RedisStorage { } async fn put_post<'a>(&self, post: &'a serde_json::Value) -> Result<()> { - let mut conn = self.redis.get_async_std_connection().await?; + let mut conn = self.redis.get().await?; let key: &str; match post["properties"]["uid"][0].as_str() { Some(uid) => key = uid, @@ -133,7 +144,7 @@ impl Storage for RedisStorage { } async fn read_feed_with_limit<'a>(&self, url: &'a str, after: &'a Option, limit: usize, user: &'a Option) -> Result> { - let mut conn = self.redis.get_async_std_connection().await?; + let mut conn = self.redis.get().await?; let mut feed; match conn.hget::<&str, &str, Option>(&"posts", url).await? { Some(post) => feed = serde_json::from_str::(&post)?, @@ -161,9 +172,7 @@ impl Storage for RedisStorage { } let posts = stream::iter(posts_iter) .map(|url| async move { - // Is it rational to use a new connection for every post fetched? - // TODO: Use a connection pool here - match self.redis.get_async_std_connection().await { + match self.redis.get().await { Ok(mut conn) => match conn.hget::<&str, &str, Option>("posts", &url).await { Ok(post) => match post { Some(post) => match serde_json::from_str::(&post) { @@ -196,6 +205,7 @@ impl Storage for RedisStorage { // It will probably depend on how often can you encounter a private post on the page // It shouldn't be too large, or we'll start fetching too many posts from the database // It MUST NOT be larger than the typical page size + // It MUST NOT be a significant amount of the connection pool size .buffered(std::cmp::min(3, limit)) // Hack to unwrap the Option and sieve out broken links // Broken links return None, and Stream::filter_map skips all Nones. @@ -214,7 +224,7 @@ impl Storage for RedisStorage { } async fn update_post<'a>(&self, mut url: &'a str, update: serde_json::Value) -> Result<()> { - let mut conn = self.redis.get_async_std_connection().await?; + let mut conn = self.redis.get().await?; if !conn.hexists::<&str, &str, bool>("posts", url).await.unwrap() { return Err(StorageError::new(ErrorKind::NotFound, "can't edit a non-existent post")) } @@ -222,7 +232,7 @@ impl Storage for RedisStorage { if let Some(new_url) = post["see_other"].as_str() { url = new_url } - Ok(SCRIPTS.edit_post.key("posts").arg(url).arg(update.to_string()).invoke_async::<_, ()>(&mut conn).await?) + Ok(SCRIPTS.edit_post.key("posts").arg(url).arg(update.to_string()).invoke_async::<_, ()>(&mut conn as &mut redis::aio::Connection).await?) } } @@ -231,7 +241,7 @@ impl RedisStorage { /// Create a new RedisDatabase that will connect to Redis at `redis_uri` to store data. pub async fn new(redis_uri: String) -> Result { match redis::Client::open(redis_uri) { - Ok(client) => Ok(Self { redis: client }), + Ok(client) => Ok(Self { redis: Pool::builder().max_open(20).build(RedisConnectionManager::new(client)) }), Err(e) => Err(e.into()) } } -- cgit 1.4.1