diff options
author | Vika <vika@fireburn.ru> | 2021-05-05 16:29:11 +0300 |
---|---|---|
committer | Vika <vika@fireburn.ru> | 2021-05-05 16:30:00 +0300 |
commit | dc546640d066dfb7c72d204f3758b103bd415d2c (patch) | |
tree | 8b06c539d0d615bd89112478f896d7828542e5eb | |
parent | 488285e460dba2f5fb0901c77226d582d31e2be4 (diff) | |
download | kittybox-dc546640d066dfb7c72d204f3758b103bd415d2c.tar.zst |
Added a connection pool to the RedisDatabase
-rw-r--r-- | Cargo.lock | 112 | ||||
-rw-r--r-- | Cargo.toml | 5 | ||||
-rw-r--r-- | src/database/redis/mod.rs | 44 |
3 files changed, 32 insertions, 129 deletions
diff --git a/Cargo.lock b/Cargo.lock index bd3dbb4..87d7ce7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1223,7 +1223,6 @@ dependencies = [ "mobc-redis", "mockito", "newbase60", - "redis 0.20.0", "serde", "serde_json", "serde_urlencoded", @@ -1368,33 +1367,12 @@ dependencies = [ ] [[package]] -name = "mio" -version = "0.7.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf80d3e903b34e0bd7282b218398aec54e082c840d9baf8339e0080a0c542956" -dependencies = [ - "libc", - "log", - "miow", - "ntapi", - "winapi", -] - -[[package]] -name = "miow" -version = "0.3.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21" -dependencies = [ - "winapi", -] - -[[package]] name = "mobc" version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "db0dcf244160856f43ccecdebc93d7dd4e6353666003b61beb053b3b09083671" dependencies = [ + "async-std", "async-trait", "futures-channel", "futures-core", @@ -1411,7 +1389,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e7b5db77b37c9224d5b9949b214041ea3e1c15b6b1e5dd24a5acb8e73975d6d6" dependencies = [ "mobc", - "redis 0.19.0", + "redis", ] [[package]] @@ -1451,15 +1429,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72ef4a56884ca558e5ddb05a1d1e7e1bfd9a68d9ed024c21704cc98872dae1bb" [[package]] -name = "ntapi" -version = "0.3.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f6bb902e437b6d86e03cce10a7e2af662292c5dfef23b65899ea3ac9354ad44" -dependencies = [ - "winapi", -] - -[[package]] name = "num-integer" version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1526,31 +1495,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72" [[package]] -name = "parking_lot" -version = "0.11.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d7744ac029df22dca6284efe4e898991d28e3085c706c972bcd7da4a27a15eb" -dependencies = [ - "instant", - "lock_api", - "parking_lot_core", -] - -[[package]] -name = "parking_lot_core" -version = "0.8.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa7a782938e745763fe6907fc6ba86946d72f49fe7e21de074e08128a99fb018" -dependencies = [ - "cfg-if 1.0.0", - "instant", - "libc", - "redox_syscall", - "smallvec", - "winapi", -] - -[[package]] name = "percent-encoding" version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1727,17 +1671,6 @@ dependencies = [ ] [[package]] -name = "r2d2" -version = "0.8.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "545c5bc2b880973c9c10e4067418407a0ccaa3091781d1671d46eb35107cb26f" -dependencies = [ - "log", - "parking_lot", - "scheduled-thread-pool", -] - -[[package]] name = "rand" version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1871,26 +1804,6 @@ version = "0.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a6ddfecac9391fed21cce10e83c65fa4abafd77df05c98b1c647c65374ce9b3" dependencies = [ - "async-trait", - "bytes 1.0.1", - "combine", - "dtoa", - "futures-util", - "itoa", - "percent-encoding", - "pin-project-lite 0.2.6", - "sha1", - "tokio", - "tokio-util", - "url", -] - -[[package]] -name = "redis" -version = "0.20.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eeb8f8d059ead7805e171fc22de8348a3d611c0f985aaa4f5cf6c0dfc7645407" -dependencies = [ "async-std", "async-trait", "bytes 1.0.1", @@ -1900,7 +1813,6 @@ dependencies = [ "itoa", "percent-encoding", "pin-project-lite 0.2.6", - "r2d2", "sha1", "tokio", "tokio-util", @@ -1908,15 +1820,6 @@ dependencies = [ ] [[package]] -name = "redox_syscall" -version = "0.2.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "742739e41cd49414de871ea5e549afb7e2a3ac77b589bcbebe8c82fab37147fc" -dependencies = [ - "bitflags", -] - -[[package]] name = "regex" version = "1.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1974,15 +1877,6 @@ dependencies = [ ] [[package]] -name = "scheduled-thread-pool" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc6f74fd1204073fa02d5d5d68bec8021be4c38690b61264b2fdb48083d0e7d7" -dependencies = [ - "parking_lot", -] - -[[package]] name = "scopeguard" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2480,9 +2374,7 @@ checksum = "83f0c8e7c0addab50b663055baf787d0af7f413a46e6e7fb9559a4e4db7137a5" dependencies = [ "autocfg", "bytes 1.0.1", - "libc", "memchr", - "mio", "num_cpus", "pin-project-lite 0.2.6", ] diff --git a/Cargo.toml b/Cargo.toml index e38cffd..6ce2cbe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,9 @@ tempdir = "0.3.7" # A library for managing a temporary directory and d [dependencies] # Redis driver for Rust. -redis = { version = "0.20.0", features = ["aio", "async-std-comp", "r2d2"] } +#redis = { version = "0.20.0", features = ["aio", "async-std-comp"] } +# Redis support for the mobc connection pool +mobc-redis = { version = "0.7.0", features = ["async-std-comp"], default-features = false } # A generic serialization/deserialization framework serde = { version = "1.0.125", features = ["derive"] } # Date and time library for Rust @@ -36,5 +38,4 @@ newbase60 = "0.1.3" # A library that implements Tantek Çelik's New Base markdown = "0.3.0" # Native Rust library for parsing Markdown and (outputting HTML) easy-scraper = "0.2.0" # HTML scraping library focused on ease of use serde_urlencoded = "0.7.0" # `x-www-form-urlencoded` meets Serde -mobc-redis = "0.7.0" # Redis support for the mobc connection pool mobc = "0.7.2" # A generic connection pool with async/await support diff --git a/src/database/redis/mod.rs b/src/database/redis/mod.rs index b709125..eded583 100644 --- a/src/database/redis/mod.rs +++ b/src/database/redis/mod.rs @@ -4,8 +4,11 @@ use futures_util::StreamExt; use futures::stream; use lazy_static::lazy_static; use log::error; -use redis::AsyncCommands; +use mobc_redis::redis; +use mobc_redis::redis::AsyncCommands; use serde_json::json; +use mobc::Pool; +use mobc_redis::RedisConnectionManager; use crate::database::{Storage, Result, StorageError, ErrorKind, MicropubChannel}; use crate::indieauth::User; @@ -14,8 +17,17 @@ struct RedisScripts { edit_post: redis::Script } -impl From<redis::RedisError> for StorageError { - fn from(err: redis::RedisError) -> Self { +impl From<mobc_redis::redis::RedisError> for StorageError { + fn from(err: mobc_redis::redis::RedisError) -> Self { + Self { + msg: format!("{}", err), + source: Some(Box::new(err)), + kind: ErrorKind::Backend + } + } +} +impl From<mobc::Error<mobc_redis::redis::RedisError>> for StorageError { + fn from(err: mobc::Error<mobc_redis::redis::RedisError>) -> Self { Self { msg: format!("{}", err), source: Some(Box::new(err)), @@ -32,8 +44,7 @@ lazy_static! { #[derive(Clone)] pub struct RedisStorage { - // TODO: use mobc crate to create a connection pool and reuse connections for efficiency - redis: redis::Client, + redis: mobc::Pool<RedisConnectionManager>, } fn filter_post(mut post: serde_json::Value, user: &'_ Option<String>) -> Option<serde_json::Value> { @@ -65,17 +76,17 @@ fn filter_post(mut post: serde_json::Value, user: &'_ Option<String>) -> Option< #[async_trait] impl Storage for RedisStorage { async fn delete_post<'a>(&self, url: &'a str) -> Result<()> { - let mut conn = self.redis.get_async_std_connection().await?; + let mut conn = self.redis.get().await?; Ok(conn.hdel::<&str, &str, ()>("posts", url).await?) } async fn post_exists(&self, url: &str) -> Result<bool> { - let mut conn = self.redis.get_async_std_connection().await?; + let mut conn = self.redis.get().await?; Ok(conn.hexists::<&str, &str, bool>(&"posts", url).await?) } async fn get_post(&self, url: &str) -> Result<Option<serde_json::Value>> { - let mut conn = self.redis.get_async_std_connection().await?; + let mut conn = self.redis.get().await?; match conn.hget::<&str, &str, Option<String>>(&"posts", url).await? { Some(val) => { let parsed = serde_json::from_str::<serde_json::Value>(&val)?; @@ -93,7 +104,7 @@ impl Storage for RedisStorage { } async fn get_channels(&self, user: &User) -> Result<Vec<MicropubChannel>> { - let mut conn = self.redis.get_async_std_connection().await?; + let mut conn = self.redis.get().await?; let channels = conn.smembers::<String, Vec<String>>("channels_".to_string() + user.me.as_str()).await?; // TODO: use streams here instead of this weird thing... how did I even write this?! Ok(futures_util::future::join_all(channels.iter() @@ -111,7 +122,7 @@ impl Storage for RedisStorage { } async fn put_post<'a>(&self, post: &'a serde_json::Value) -> Result<()> { - let mut conn = self.redis.get_async_std_connection().await?; + let mut conn = self.redis.get().await?; let key: &str; match post["properties"]["uid"][0].as_str() { Some(uid) => key = uid, @@ -133,7 +144,7 @@ impl Storage for RedisStorage { } async fn read_feed_with_limit<'a>(&self, url: &'a str, after: &'a Option<String>, limit: usize, user: &'a Option<String>) -> Result<Option<serde_json::Value>> { - let mut conn = self.redis.get_async_std_connection().await?; + let mut conn = self.redis.get().await?; let mut feed; match conn.hget::<&str, &str, Option<String>>(&"posts", url).await? { Some(post) => feed = serde_json::from_str::<serde_json::Value>(&post)?, @@ -161,9 +172,7 @@ impl Storage for RedisStorage { } let posts = stream::iter(posts_iter) .map(|url| async move { - // Is it rational to use a new connection for every post fetched? - // TODO: Use a connection pool here - match self.redis.get_async_std_connection().await { + match self.redis.get().await { Ok(mut conn) => match conn.hget::<&str, &str, Option<String>>("posts", &url).await { Ok(post) => match post { Some(post) => match serde_json::from_str::<serde_json::Value>(&post) { @@ -196,6 +205,7 @@ impl Storage for RedisStorage { // It will probably depend on how often can you encounter a private post on the page // It shouldn't be too large, or we'll start fetching too many posts from the database // It MUST NOT be larger than the typical page size + // It MUST NOT be a significant amount of the connection pool size .buffered(std::cmp::min(3, limit)) // Hack to unwrap the Option and sieve out broken links // Broken links return None, and Stream::filter_map skips all Nones. @@ -214,7 +224,7 @@ impl Storage for RedisStorage { } async fn update_post<'a>(&self, mut url: &'a str, update: serde_json::Value) -> Result<()> { - let mut conn = self.redis.get_async_std_connection().await?; + let mut conn = self.redis.get().await?; if !conn.hexists::<&str, &str, bool>("posts", url).await.unwrap() { return Err(StorageError::new(ErrorKind::NotFound, "can't edit a non-existent post")) } @@ -222,7 +232,7 @@ impl Storage for RedisStorage { if let Some(new_url) = post["see_other"].as_str() { url = new_url } - Ok(SCRIPTS.edit_post.key("posts").arg(url).arg(update.to_string()).invoke_async::<_, ()>(&mut conn).await?) + Ok(SCRIPTS.edit_post.key("posts").arg(url).arg(update.to_string()).invoke_async::<_, ()>(&mut conn as &mut redis::aio::Connection).await?) } } @@ -231,7 +241,7 @@ impl RedisStorage { /// Create a new RedisDatabase that will connect to Redis at `redis_uri` to store data. pub async fn new(redis_uri: String) -> Result<Self> { match redis::Client::open(redis_uri) { - Ok(client) => Ok(Self { redis: client }), + Ok(client) => Ok(Self { redis: Pool::builder().max_open(20).build(RedisConnectionManager::new(client)) }), Err(e) => Err(e.into()) } } |