about summary refs log tree commit diff
diff options
context:
space:
mode:
authorVika <vika@fireburn.ru>2021-05-05 16:29:11 +0300
committerVika <vika@fireburn.ru>2021-05-05 16:30:00 +0300
commitdc546640d066dfb7c72d204f3758b103bd415d2c (patch)
tree8b06c539d0d615bd89112478f896d7828542e5eb
parent488285e460dba2f5fb0901c77226d582d31e2be4 (diff)
downloadkittybox-dc546640d066dfb7c72d204f3758b103bd415d2c.tar.zst
Added a connection pool to the RedisDatabase
-rw-r--r--Cargo.lock112
-rw-r--r--Cargo.toml5
-rw-r--r--src/database/redis/mod.rs44
3 files changed, 32 insertions, 129 deletions
diff --git a/Cargo.lock b/Cargo.lock
index bd3dbb4..87d7ce7 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1223,7 +1223,6 @@ dependencies = [
  "mobc-redis",
  "mockito",
  "newbase60",
- "redis 0.20.0",
  "serde",
  "serde_json",
  "serde_urlencoded",
@@ -1368,33 +1367,12 @@ dependencies = [
 ]
 
 [[package]]
-name = "mio"
-version = "0.7.11"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cf80d3e903b34e0bd7282b218398aec54e082c840d9baf8339e0080a0c542956"
-dependencies = [
- "libc",
- "log",
- "miow",
- "ntapi",
- "winapi",
-]
-
-[[package]]
-name = "miow"
-version = "0.3.7"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21"
-dependencies = [
- "winapi",
-]
-
-[[package]]
 name = "mobc"
 version = "0.7.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "db0dcf244160856f43ccecdebc93d7dd4e6353666003b61beb053b3b09083671"
 dependencies = [
+ "async-std",
  "async-trait",
  "futures-channel",
  "futures-core",
@@ -1411,7 +1389,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "e7b5db77b37c9224d5b9949b214041ea3e1c15b6b1e5dd24a5acb8e73975d6d6"
 dependencies = [
  "mobc",
- "redis 0.19.0",
+ "redis",
 ]
 
 [[package]]
@@ -1451,15 +1429,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "72ef4a56884ca558e5ddb05a1d1e7e1bfd9a68d9ed024c21704cc98872dae1bb"
 
 [[package]]
-name = "ntapi"
-version = "0.3.6"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3f6bb902e437b6d86e03cce10a7e2af662292c5dfef23b65899ea3ac9354ad44"
-dependencies = [
- "winapi",
-]
-
-[[package]]
 name = "num-integer"
 version = "0.1.44"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1526,31 +1495,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72"
 
 [[package]]
-name = "parking_lot"
-version = "0.11.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6d7744ac029df22dca6284efe4e898991d28e3085c706c972bcd7da4a27a15eb"
-dependencies = [
- "instant",
- "lock_api",
- "parking_lot_core",
-]
-
-[[package]]
-name = "parking_lot_core"
-version = "0.8.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fa7a782938e745763fe6907fc6ba86946d72f49fe7e21de074e08128a99fb018"
-dependencies = [
- "cfg-if 1.0.0",
- "instant",
- "libc",
- "redox_syscall",
- "smallvec",
- "winapi",
-]
-
-[[package]]
 name = "percent-encoding"
 version = "2.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1727,17 +1671,6 @@ dependencies = [
 ]
 
 [[package]]
-name = "r2d2"
-version = "0.8.9"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "545c5bc2b880973c9c10e4067418407a0ccaa3091781d1671d46eb35107cb26f"
-dependencies = [
- "log",
- "parking_lot",
- "scheduled-thread-pool",
-]
-
-[[package]]
 name = "rand"
 version = "0.4.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1871,26 +1804,6 @@ version = "0.19.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "1a6ddfecac9391fed21cce10e83c65fa4abafd77df05c98b1c647c65374ce9b3"
 dependencies = [
- "async-trait",
- "bytes 1.0.1",
- "combine",
- "dtoa",
- "futures-util",
- "itoa",
- "percent-encoding",
- "pin-project-lite 0.2.6",
- "sha1",
- "tokio",
- "tokio-util",
- "url",
-]
-
-[[package]]
-name = "redis"
-version = "0.20.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "eeb8f8d059ead7805e171fc22de8348a3d611c0f985aaa4f5cf6c0dfc7645407"
-dependencies = [
  "async-std",
  "async-trait",
  "bytes 1.0.1",
@@ -1900,7 +1813,6 @@ dependencies = [
  "itoa",
  "percent-encoding",
  "pin-project-lite 0.2.6",
- "r2d2",
  "sha1",
  "tokio",
  "tokio-util",
@@ -1908,15 +1820,6 @@ dependencies = [
 ]
 
 [[package]]
-name = "redox_syscall"
-version = "0.2.8"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "742739e41cd49414de871ea5e549afb7e2a3ac77b589bcbebe8c82fab37147fc"
-dependencies = [
- "bitflags",
-]
-
-[[package]]
 name = "regex"
 version = "1.5.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1974,15 +1877,6 @@ dependencies = [
 ]
 
 [[package]]
-name = "scheduled-thread-pool"
-version = "0.2.5"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dc6f74fd1204073fa02d5d5d68bec8021be4c38690b61264b2fdb48083d0e7d7"
-dependencies = [
- "parking_lot",
-]
-
-[[package]]
 name = "scopeguard"
 version = "1.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2480,9 +2374,7 @@ checksum = "83f0c8e7c0addab50b663055baf787d0af7f413a46e6e7fb9559a4e4db7137a5"
 dependencies = [
  "autocfg",
  "bytes 1.0.1",
- "libc",
  "memchr",
- "mio",
  "num_cpus",
  "pin-project-lite 0.2.6",
 ]
diff --git a/Cargo.toml b/Cargo.toml
index e38cffd..6ce2cbe 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,7 +13,9 @@ tempdir = "0.3.7"           # A library for managing a temporary directory and d
 
 [dependencies]
 # Redis driver for Rust.
-redis = { version = "0.20.0", features = ["aio", "async-std-comp", "r2d2"] }
+#redis = { version = "0.20.0", features = ["aio", "async-std-comp"] }
+# Redis support for the mobc connection pool
+mobc-redis = { version = "0.7.0", features = ["async-std-comp"], default-features = false }
 # A generic serialization/deserialization framework
 serde = { version = "1.0.125", features = ["derive"] }
 # Date and time library for Rust
@@ -36,5 +38,4 @@ newbase60 = "0.1.3"         # A library that implements Tantek Çelik's New Base
 markdown = "0.3.0"          # Native Rust library for parsing Markdown and (outputting HTML)
 easy-scraper = "0.2.0"      # HTML scraping library focused on ease of use
 serde_urlencoded = "0.7.0"  # `x-www-form-urlencoded` meets Serde
-mobc-redis = "0.7.0"        # Redis support for the mobc connection pool
 mobc = "0.7.2"              # A generic connection pool with async/await support
diff --git a/src/database/redis/mod.rs b/src/database/redis/mod.rs
index b709125..eded583 100644
--- a/src/database/redis/mod.rs
+++ b/src/database/redis/mod.rs
@@ -4,8 +4,11 @@ use futures_util::StreamExt;
 use futures::stream;
 use lazy_static::lazy_static;
 use log::error;
-use redis::AsyncCommands;
+use mobc_redis::redis;
+use mobc_redis::redis::AsyncCommands;
 use serde_json::json;
+use mobc::Pool;
+use mobc_redis::RedisConnectionManager;
 
 use crate::database::{Storage, Result, StorageError, ErrorKind, MicropubChannel};
 use crate::indieauth::User;
@@ -14,8 +17,17 @@ struct RedisScripts {
     edit_post: redis::Script
 }
 
-impl From<redis::RedisError> for StorageError {
-    fn from(err: redis::RedisError) -> Self {
+impl From<mobc_redis::redis::RedisError> for StorageError {
+    fn from(err: mobc_redis::redis::RedisError) -> Self {
+        Self {
+            msg: format!("{}", err),
+            source: Some(Box::new(err)),
+            kind: ErrorKind::Backend
+        }
+    }
+}
+impl From<mobc::Error<mobc_redis::redis::RedisError>> for StorageError {
+    fn from(err: mobc::Error<mobc_redis::redis::RedisError>) -> Self {
         Self {
             msg: format!("{}", err),
             source: Some(Box::new(err)),
@@ -32,8 +44,7 @@ lazy_static! {
 
 #[derive(Clone)]
 pub struct RedisStorage {
-    // TODO: use mobc crate to create a connection pool and reuse connections for efficiency
-    redis: redis::Client,
+    redis: mobc::Pool<RedisConnectionManager>,
 }
 
 fn filter_post(mut post: serde_json::Value, user: &'_ Option<String>) -> Option<serde_json::Value> {
@@ -65,17 +76,17 @@ fn filter_post(mut post: serde_json::Value, user: &'_ Option<String>) -> Option<
 #[async_trait]
 impl Storage for RedisStorage {
     async fn delete_post<'a>(&self, url: &'a str) -> Result<()> {
-        let mut conn = self.redis.get_async_std_connection().await?;
+        let mut conn = self.redis.get().await?;
         Ok(conn.hdel::<&str, &str, ()>("posts", url).await?)
     }
 
     async fn post_exists(&self, url: &str) -> Result<bool> {
-        let mut conn = self.redis.get_async_std_connection().await?;
+        let mut conn = self.redis.get().await?;
         Ok(conn.hexists::<&str, &str, bool>(&"posts", url).await?)
     }
     
     async fn get_post(&self, url: &str) -> Result<Option<serde_json::Value>> {
-        let mut conn = self.redis.get_async_std_connection().await?;
+        let mut conn = self.redis.get().await?;
         match conn.hget::<&str, &str, Option<String>>(&"posts", url).await? {
             Some(val) => {
                 let parsed = serde_json::from_str::<serde_json::Value>(&val)?;
@@ -93,7 +104,7 @@ impl Storage for RedisStorage {
     }
 
     async fn get_channels(&self, user: &User) -> Result<Vec<MicropubChannel>> {
-        let mut conn = self.redis.get_async_std_connection().await?;
+        let mut conn = self.redis.get().await?;
         let channels = conn.smembers::<String, Vec<String>>("channels_".to_string() + user.me.as_str()).await?;
         // TODO: use streams here instead of this weird thing... how did I even write this?!
         Ok(futures_util::future::join_all(channels.iter()
@@ -111,7 +122,7 @@ impl Storage for RedisStorage {
     }
 
     async fn put_post<'a>(&self, post: &'a serde_json::Value) -> Result<()> {
-        let mut conn = self.redis.get_async_std_connection().await?;
+        let mut conn = self.redis.get().await?;
         let key: &str;
         match post["properties"]["uid"][0].as_str() {
             Some(uid) => key = uid,
@@ -133,7 +144,7 @@ impl Storage for RedisStorage {
     }
 
     async fn read_feed_with_limit<'a>(&self, url: &'a str, after: &'a Option<String>, limit: usize, user: &'a Option<String>) -> Result<Option<serde_json::Value>> {
-        let mut conn = self.redis.get_async_std_connection().await?;
+        let mut conn = self.redis.get().await?;
         let mut feed;
         match conn.hget::<&str, &str, Option<String>>(&"posts", url).await? {
             Some(post) => feed = serde_json::from_str::<serde_json::Value>(&post)?,
@@ -161,9 +172,7 @@ impl Storage for RedisStorage {
             }
             let posts = stream::iter(posts_iter)
                 .map(|url| async move {
-                    // Is it rational to use a new connection for every post fetched?
-                    // TODO: Use a connection pool here
-                    match self.redis.get_async_std_connection().await {
+                    match self.redis.get().await {
                         Ok(mut conn) => match conn.hget::<&str, &str, Option<String>>("posts", &url).await {
                             Ok(post) => match post {
                                 Some(post) => match serde_json::from_str::<serde_json::Value>(&post) {
@@ -196,6 +205,7 @@ impl Storage for RedisStorage {
                 // It will probably depend on how often can you encounter a private post on the page
                 // It shouldn't be too large, or we'll start fetching too many posts from the database
                 // It MUST NOT be larger than the typical page size
+                // It MUST NOT be a significant amount of the connection pool size
                 .buffered(std::cmp::min(3, limit))
                 // Hack to unwrap the Option and sieve out broken links
                 // Broken links return None, and Stream::filter_map skips all Nones.
@@ -214,7 +224,7 @@ impl Storage for RedisStorage {
     }
 
     async fn update_post<'a>(&self, mut url: &'a str, update: serde_json::Value) -> Result<()> {
-        let mut conn = self.redis.get_async_std_connection().await?;
+        let mut conn = self.redis.get().await?;
         if !conn.hexists::<&str, &str, bool>("posts", url).await.unwrap() {
             return Err(StorageError::new(ErrorKind::NotFound, "can't edit a non-existent post"))
         }
@@ -222,7 +232,7 @@ impl Storage for RedisStorage {
         if let Some(new_url) = post["see_other"].as_str() {
             url = new_url
         }
-        Ok(SCRIPTS.edit_post.key("posts").arg(url).arg(update.to_string()).invoke_async::<_, ()>(&mut conn).await?)
+        Ok(SCRIPTS.edit_post.key("posts").arg(url).arg(update.to_string()).invoke_async::<_, ()>(&mut conn as &mut redis::aio::Connection).await?)
     }
 }
 
@@ -231,7 +241,7 @@ impl RedisStorage {
     /// Create a new RedisDatabase that will connect to Redis at `redis_uri` to store data.
     pub async fn new(redis_uri: String) -> Result<Self> {
         match redis::Client::open(redis_uri) {
-            Ok(client) => Ok(Self { redis: client }),
+            Ok(client) => Ok(Self { redis: Pool::builder().max_open(20).build(RedisConnectionManager::new(client)) }),
             Err(e) => Err(e.into())
         }
     }