diff options
author | Vika <vika@fireburn.ru> | 2022-05-24 17:18:30 +0300 |
---|---|---|
committer | Vika <vika@fireburn.ru> | 2022-05-24 17:18:30 +0300 |
commit | 5610a5f0bf1a9df02bd3d5b55e2cdebef2440360 (patch) | |
tree | 8394bcf1dcc204043d7adeb8dde2e2746977606e /src/database/redis/mod.rs | |
parent | 2f93873122b47e42f7ee1c38f1f04d052a63599c (diff) | |
download | kittybox-5610a5f0bf1a9df02bd3d5b55e2cdebef2440360.tar.zst |
flake.nix: reorganize
- Kittybox's source code is moved to a subfolder - This improves build caching by Nix since it doesn't take changes to other files into account - Package and test definitions were spun into separate files - This makes my flake.nix much easier to navigate - This also makes it somewhat possible to use without flakes (but it is still not easy, so use flakes!) - Some attributes were moved in compliance with Nix 2.8's changes to flake schema
Diffstat (limited to 'src/database/redis/mod.rs')
-rw-r--r-- | src/database/redis/mod.rs | 392 |
1 files changed, 0 insertions, 392 deletions
diff --git a/src/database/redis/mod.rs b/src/database/redis/mod.rs deleted file mode 100644 index eeaa3f2..0000000 --- a/src/database/redis/mod.rs +++ /dev/null @@ -1,392 +0,0 @@ -use async_trait::async_trait; -use futures::stream; -use futures_util::FutureExt; -use futures_util::StreamExt; -use futures_util::TryStream; -use futures_util::TryStreamExt; -use lazy_static::lazy_static; -use log::error; -use mobc::Pool; -use mobc_redis::redis; -use mobc_redis::redis::AsyncCommands; -use mobc_redis::RedisConnectionManager; -use serde_json::json; -use std::time::Duration; - -use crate::database::{ErrorKind, MicropubChannel, Result, Storage, StorageError, filter_post}; -use crate::indieauth::User; - -struct RedisScripts { - edit_post: redis::Script, -} - -impl From<mobc_redis::redis::RedisError> for StorageError { - fn from(err: mobc_redis::redis::RedisError) -> Self { - Self { - msg: format!("{}", err), - source: Some(Box::new(err)), - kind: ErrorKind::Backend, - } - } -} -impl From<mobc::Error<mobc_redis::redis::RedisError>> for StorageError { - fn from(err: mobc::Error<mobc_redis::redis::RedisError>) -> Self { - Self { - msg: format!("{}", err), - source: Some(Box::new(err)), - kind: ErrorKind::Backend, - } - } -} - -lazy_static! { - static ref SCRIPTS: RedisScripts = RedisScripts { - edit_post: redis::Script::new(include_str!("./edit_post.lua")) - }; -} - -#[derive(Clone)] -pub struct RedisStorage { - // note to future Vika: - // mobc::Pool is actually a fancy name for an Arc - // around a shared connection pool with a manager - // which makes it safe to implement [`Clone`] and - // not worry about new pools being suddenly made - // - // stop worrying and start coding, you dum-dum - redis: mobc::Pool<RedisConnectionManager>, -} - -#[async_trait] -impl Storage for RedisStorage { - async fn get_setting<'a>(&self, setting: &'a str, user: &'a str) -> Result<String> { - let mut conn = self.redis.get().await.map_err(|e| StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e)))?; - Ok(conn - .hget::<String, &str, String>(format!("settings_{}", user), setting) - .await?) - } - - async fn set_setting<'a>(&self, setting: &'a str, user: &'a str, value: &'a str) -> Result<()> { - let mut conn = self.redis.get().await.map_err(|e| StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e)))?; - Ok(conn - .hset::<String, &str, &str, ()>(format!("settings_{}", user), setting, value) - .await?) - } - - async fn delete_post<'a>(&self, url: &'a str) -> Result<()> { - let mut conn = self.redis.get().await.map_err(|e| StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e)))?; - Ok(conn.hdel::<&str, &str, ()>("posts", url).await?) - } - - async fn post_exists(&self, url: &str) -> Result<bool> { - let mut conn = self.redis.get().await.map_err(|e| StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e)))?; - Ok(conn.hexists::<&str, &str, bool>("posts", url).await?) - } - - async fn get_post(&self, url: &str) -> Result<Option<serde_json::Value>> { - let mut conn = self.redis.get().await.map_err(|e| StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e)))?; - match conn - .hget::<&str, &str, Option<String>>("posts", url) - .await? - { - Some(val) => { - let parsed = serde_json::from_str::<serde_json::Value>(&val)?; - if let Some(new_url) = parsed["see_other"].as_str() { - match conn - .hget::<&str, &str, Option<String>>("posts", new_url) - .await? - { - Some(val) => Ok(Some(serde_json::from_str::<serde_json::Value>(&val)?)), - None => Ok(None), - } - } else { - Ok(Some(parsed)) - } - } - None => Ok(None), - } - } - - async fn get_channels(&self, user: &User) -> Result<Vec<MicropubChannel>> { - let mut conn = self.redis.get().await.map_err(|e| StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e)))?; - let channels = conn - .smembers::<String, Vec<String>>("channels_".to_string() + user.me.as_str()) - .await?; - // TODO: use streams here instead of this weird thing... how did I even write this?! - Ok(futures_util::future::join_all( - channels - .iter() - .map(|channel| { - self.get_post(channel).map(|result| result.unwrap()).map( - |post: Option<serde_json::Value>| { - post.map(|post| MicropubChannel { - uid: post["properties"]["uid"][0].as_str().unwrap().to_string(), - name: post["properties"]["name"][0].as_str().unwrap().to_string(), - }) - }, - ) - }) - .collect::<Vec<_>>(), - ) - .await - .into_iter() - .flatten() - .collect::<Vec<_>>()) - } - - async fn put_post<'a>(&self, post: &'a serde_json::Value, user: &'a str) -> Result<()> { - let mut conn = self.redis.get().await.map_err(|e| StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e)))?; - let key: &str; - match post["properties"]["uid"][0].as_str() { - Some(uid) => key = uid, - None => { - return Err(StorageError::new( - ErrorKind::BadRequest, - "post doesn't have a UID", - )) - } - } - conn.hset::<&str, &str, String, ()>("posts", key, post.to_string()) - .await?; - if post["properties"]["url"].is_array() { - for url in post["properties"]["url"] - .as_array() - .unwrap() - .iter() - .map(|i| i.as_str().unwrap().to_string()) - { - if url != key && url.starts_with(user) { - conn.hset::<&str, &str, String, ()>( - "posts", - &url, - json!({ "see_other": key }).to_string(), - ) - .await?; - } - } - } - if post["type"] - .as_array() - .unwrap() - .iter() - .any(|i| i == "h-feed") - { - // This is a feed. Add it to the channels array if it's not already there. - conn.sadd::<String, &str, ()>( - "channels_".to_string() + post["properties"]["author"][0].as_str().unwrap(), - key, - ) - .await? - } - Ok(()) - } - - async fn read_feed_with_limit<'a>( - &self, - url: &'a str, - after: &'a Option<String>, - limit: usize, - user: &'a Option<String>, - ) -> Result<Option<serde_json::Value>> { - let mut conn = self.redis.get().await?; - let mut feed; - match conn - .hget::<&str, &str, Option<String>>("posts", url) - .await - .map_err(|e| StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e)))? - { - Some(post) => feed = serde_json::from_str::<serde_json::Value>(&post)?, - None => return Ok(None), - } - if feed["see_other"].is_string() { - match conn - .hget::<&str, &str, Option<String>>("posts", feed["see_other"].as_str().unwrap()) - .await? - { - Some(post) => feed = serde_json::from_str::<serde_json::Value>(&post)?, - None => return Ok(None), - } - } - if let Some(post) = filter_post(feed, user) { - feed = post - } else { - return Err(StorageError::new( - ErrorKind::PermissionDenied, - "specified user cannot access this post", - )); - } - if feed["children"].is_array() { - let children = feed["children"].as_array().unwrap(); - let mut posts_iter = children.iter().map(|i| i.as_str().unwrap().to_string()); - if after.is_some() { - loop { - let i = posts_iter.next(); - if &i == after { - break; - } - } - } - async fn fetch_post_for_feed(url: String) -> Option<serde_json::Value> { - return Some(serde_json::json!({})); - } - let posts = stream::iter(posts_iter) - .map(|url: String| async move { - return Ok(fetch_post_for_feed(url).await); - /*match self.redis.get().await { - Ok(mut conn) => { - match conn.hget::<&str, &str, Option<String>>("posts", &url).await { - Ok(post) => match post { - Some(post) => { - Ok(Some(serde_json::from_str(&post)?)) - } - // Happens because of a broken link (result of an improper deletion?) - None => Ok(None), - }, - Err(err) => Err(StorageError::with_source(ErrorKind::Backend, "Error executing a Redis command", Box::new(err))) - } - } - Err(err) => Err(StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(err))) - }*/ - }) - // TODO: determine the optimal value for this buffer - // It will probably depend on how often can you encounter a private post on the page - // It shouldn't be too large, or we'll start fetching too many posts from the database - // It MUST NOT be larger than the typical page size - // It MUST NOT be a significant amount of the connection pool size - //.buffered(std::cmp::min(3, limit)) - // Hack to unwrap the Option and sieve out broken links - // Broken links return None, and Stream::filter_map skips all Nones. - // I wonder if one can use try_flatten() here somehow akin to iters - .try_filter_map(|post| async move { Ok(post) }) - .try_filter_map(|post| async move { - Ok(filter_post(post, user)) - }) - .take(limit); - match posts.try_collect::<Vec<serde_json::Value>>().await { - Ok(posts) => feed["children"] = json!(posts), - Err(err) => { - let e = StorageError::with_source( - ErrorKind::Other, - "An error was encountered while processing the feed", - Box::new(err) - ); - error!("Error while assembling feed: {}", e); - return Err(e); - } - } - } - return Ok(Some(feed)); - } - - async fn update_post<'a>(&self, mut url: &'a str, update: serde_json::Value) -> Result<()> { - let mut conn = self.redis.get().await.map_err(|e| StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e)))?; - if !conn - .hexists::<&str, &str, bool>("posts", url) - .await - .unwrap() - { - return Err(StorageError::new( - ErrorKind::NotFound, - "can't edit a non-existent post", - )); - } - let post: serde_json::Value = - serde_json::from_str(&conn.hget::<&str, &str, String>("posts", url).await?)?; - if let Some(new_url) = post["see_other"].as_str() { - url = new_url - } - Ok(SCRIPTS - .edit_post - .key("posts") - .arg(url) - .arg(update.to_string()) - .invoke_async::<_, ()>(&mut conn as &mut redis::aio::Connection) - .await?) - } -} - -impl RedisStorage { - /// Create a new RedisDatabase that will connect to Redis at `redis_uri` to store data. - pub async fn new(redis_uri: String) -> Result<Self> { - match redis::Client::open(redis_uri) { - Ok(client) => Ok(Self { - redis: Pool::builder() - .max_open(20) - .max_idle(5) - .get_timeout(Some(Duration::from_secs(3))) - .max_lifetime(Some(Duration::from_secs(120))) - .build(RedisConnectionManager::new(client)), - }), - Err(e) => Err(e.into()), - } - } - - pub async fn conn(&self) -> Result<mobc::Connection<mobc_redis::RedisConnectionManager>> { - self.redis.get().await.map_err(|e| StorageError::with_source( - ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e) - )) - } -} - -#[cfg(test)] -pub mod tests { - use mobc_redis::redis; - use std::process; - use std::time::Duration; - - pub struct RedisInstance { - // We just need to hold on to it so it won't get dropped and remove the socket - _tempdir: tempdir::TempDir, - uri: String, - child: std::process::Child, - } - impl Drop for RedisInstance { - fn drop(&mut self) { - self.child.kill().expect("Failed to kill the child!"); - } - } - impl RedisInstance { - pub fn uri(&self) -> &str { - &self.uri - } - } - - pub async fn get_redis_instance() -> RedisInstance { - let tempdir = tempdir::TempDir::new("redis").expect("failed to create tempdir"); - let socket = tempdir.path().join("redis.sock"); - let redis_child = process::Command::new("redis-server") - .current_dir(&tempdir) - .arg("--port") - .arg("0") - .arg("--unixsocket") - .arg(&socket) - .stdout(process::Stdio::null()) - .stderr(process::Stdio::null()) - .spawn() - .expect("Failed to spawn Redis"); - println!("redis+unix:///{}", socket.to_str().unwrap()); - let uri = format!("redis+unix:///{}", socket.to_str().unwrap()); - // There should be a slight delay, we need to wait for Redis to spin up - let client = redis::Client::open(uri.clone()).unwrap(); - let millisecond = Duration::from_millis(1); - let mut retries: usize = 0; - const MAX_RETRIES: usize = 60 * 1000/*ms*/; - while let Err(err) = client.get_connection() { - if err.is_connection_refusal() { - async_std::task::sleep(millisecond).await; - retries += 1; - if retries > MAX_RETRIES { - panic!("Timeout waiting for Redis, last error: {}", err); - } - } else { - panic!("Could not connect: {}", err); - } - } - - RedisInstance { - uri, - child: redis_child, - _tempdir: tempdir, - } - } -} |