From e559259686f984fdcce5669f0ab8c6dc27d76077 Mon Sep 17 00:00:00 2001 From: Vika Date: Wed, 27 Oct 2021 06:27:13 +0300 Subject: Deprecated Redis backend and added a database migration tool (untested, beware) --- Cargo.lock | 40 ++--------------- Cargo.toml | 19 +++++---- src/bin/kittybox_database_converter.rs | 78 ++++++++++++++++++++++++++++++++++ src/bin/pyindieblog_to_kittybox.rs | 4 +- src/database/file/mod.rs | 2 +- src/database/mod.rs | 42 ++++++++++++------ src/database/redis/mod.rs | 26 +++++++++--- src/lib.rs | 13 +++--- src/main.rs | 18 +------- 9 files changed, 152 insertions(+), 90 deletions(-) create mode 100644 src/bin/kittybox_database_converter.rs diff --git a/Cargo.lock b/Cargo.lock index 290e416..659cae5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1004,12 +1004,6 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d3d00f4eddb73e498a54394f228cd55853bdf059259e8e7bc6e69d408892e99" -[[package]] -name = "futures-timer" -version = "3.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" - [[package]] name = "futures-util" version = "0.3.17" @@ -1277,12 +1271,11 @@ dependencies = [ "log 0.4.14", "markdown", "markup", - "mobc", - "mobc-redis", "mockito", "newbase60", "paste", "prometheus", + "redis", "relative-path", "retainer", "serde", @@ -1469,32 +1462,6 @@ dependencies = [ "autocfg", ] -[[package]] -name = "mobc" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f76d2f2e2dcbb00a8d3b2b09f026a74a82693ea52cd071647aa6cfa7f1ff37e" -dependencies = [ - "async-std", - "async-trait", - "futures-channel", - "futures-core", - "futures-timer", - "futures-util", - "log 0.4.14", - "tokio", -] - -[[package]] -name = "mobc-redis" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7b5db77b37c9224d5b9949b214041ea3e1c15b6b1e5dd24a5acb8e73975d6d6" -dependencies = [ - "mobc", - "redis", -] - [[package]] name = "mockito" version = "0.30.0" @@ -1966,9 +1933,9 @@ dependencies = [ [[package]] name = "redis" -version = "0.19.0" +version = "0.21.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a6ddfecac9391fed21cce10e83c65fa4abafd77df05c98b1c647c65374ce9b3" +checksum = "dd71bdb3d0d6e9183e675c977f652fbf8abc3b63fcb722e9abb42f82ef839b65" dependencies = [ "async-std", "async-trait", @@ -2663,7 +2630,6 @@ dependencies = [ "autocfg", "bytes", "memchr 2.4.1", - "num_cpus", "pin-project-lite 0.2.7", ] diff --git a/Cargo.toml b/Cargo.toml index 66e9043..cb289c3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,9 +5,8 @@ authors = ["Vika "] edition = "2018" [features] -default = ["util"] +default = ["util", "redis"] util = ["anyhow"] -redis = ["mobc", "mobc-redis"] [[bin]] name = "kittybox-bulk-import" @@ -18,6 +17,12 @@ required-features = ["util"] name = "pyindieblog-export" path = "src/bin/pyindieblog_to_kittybox.rs" required-features = ["util", "redis"] + +[[bin]] +name = "kittybox-database-converter" +path = "src/bin/kittybox_database_converter.rs" +required-features = ["util", "redis"] + # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dev-dependencies] @@ -55,14 +60,10 @@ features = ["attributes", "unstable"] [dependencies.chrono] # Date and time library for Rust version = "^0.4.19" features = ["serde"] -[dependencies.mobc] # A generic connection pool with async/await support -version = "^0.7.2" -optional = true -[dependencies.mobc-redis] # Redis support for the mobc connection pool -version = "^0.7.0" +[dependencies.redis] +version = "^0.21.3" optional = true -features = ["async-std-comp"] -default-features = false +features = ["aio", "async-std-comp"] [dependencies.prometheus] # Prometheus instrumentation library for Rust applications version = "^0.12.0" features = ["process"] diff --git a/src/bin/kittybox_database_converter.rs b/src/bin/kittybox_database_converter.rs new file mode 100644 index 0000000..ff28d49 --- /dev/null +++ b/src/bin/kittybox_database_converter.rs @@ -0,0 +1,78 @@ +use anyhow::{anyhow, Context}; +use redis::{self, AsyncCommands}; +use futures_util::StreamExt; +use kittybox::database::Storage; +use kittybox::database::FileStorage; +use std::collections::HashMap; + +/// Convert from a Redis storage to a new storage new_storage. +async fn convert_from_redis(from: String, new_storage: S) -> anyhow::Result<()> { + let db = redis::Client::open(from).context("Failed to open the Redis connection")?; + + let mut conn = db.get_async_std_connection().await.context("Failed to connect to Redis")?; + + // Rebinding to convince the borrow checker we're not smuggling stuff outta scope + let storage = &new_storage; + + conn.hscan::<_, String>("posts").await?.map(|it| async move { + let json = serde_json::from_str(&it); + (it, json) + }).for_each_concurrent(8, |it| async move { + let (orig, res): (String, serde_json::Result) = it.await; + match res { + Ok(json) => { + // XXX this assumes a trusted database that was created with Kittybox that checks for UID matching user token's prefix + let user = &(url::Url::parse(json["properties"]["uid"][0].as_str().unwrap()).unwrap().origin().ascii_serialization().clone() + "/"); + if let Err(err) = storage.clone().put_post(&json, user).await { + eprintln!("Error saving post: {}", err); + } + }, + Err(err) => { + eprintln!("Error: {} (rejected JSON follows below on stderr)", err); + eprintln!("{}", orig); + } + } + }).await; + + let mut stream: redis::AsyncIter = conn.scan_match("settings_*").await?; + while let Some(key) = stream.next_item().await { + let mut conn = db.get_async_std_connection().await.context("Failed to connect to Redis")?; + let user = key.strip_prefix("settings_").unwrap(); + match conn.hgetall::<&str, HashMap>(&key).await.context(format!("Failed getting settings from key {}", key)) { + Ok(settings) => { + for (k, v) in settings.iter() { + if let Err(e) = storage.set_setting(k, &user, v).await.with_context(|| format!("Failed setting {} for {}", k, user)) { + eprintln!("{}", e); + } + } + }, + Err(e) => { + eprintln!("{}", e); + } + } + } + + return Ok(()); +} + +#[async_std::main] +async fn main() -> anyhow::Result<()> { + let mut args = std::env::args(); + args.next(); // skip argv[0] + let old_uri = args.next().ok_or_else(|| anyhow!("No import source is provided."))?; + let new_uri = args.next().ok_or_else(|| anyhow!("No import destination is provided."))?; + + let storage = if new_uri.starts_with("file:") { + let folder = new_uri.strip_prefix("file://").unwrap(); + let path = std::path::PathBuf::from(folder); + Box::new(FileStorage::new(path).await.context("Failed to construct the file storage")?) + } else { + anyhow::bail!("Cannot construct the storage abstraction for destination storage. Check the storage type?"); + }; + + if old_uri.starts_with("redis") { + convert_from_redis(old_uri, *storage).await? + } + + Ok(()) +} diff --git a/src/bin/pyindieblog_to_kittybox.rs b/src/bin/pyindieblog_to_kittybox.rs index b4e2b97..303ca56 100644 --- a/src/bin/pyindieblog_to_kittybox.rs +++ b/src/bin/pyindieblog_to_kittybox.rs @@ -1,6 +1,6 @@ use anyhow::{anyhow, Context, Result}; -use mobc_redis::redis; -use mobc_redis::redis::AsyncCommands; +use redis; +use redis::AsyncCommands; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::fs::File; diff --git a/src/database/file/mod.rs b/src/database/file/mod.rs index f58317e..1e0102a 100644 --- a/src/database/file/mod.rs +++ b/src/database/file/mod.rs @@ -402,7 +402,7 @@ impl Storage for FileStorage { } } let posts = stream::iter(posts_iter) - .map(|url| async move { + .map(|url: String| async move { self.get_post(&url).await }) .buffered(std::cmp::min(3, limit)) diff --git a/src/database/mod.rs b/src/database/mod.rs index 7c67e42..4e74c8f 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -3,32 +3,46 @@ use crate::indieauth::User; use async_trait::async_trait; use serde::{Deserialize, Serialize}; -#[cfg(redis)] -mod redis; -#[cfg(redis)] -pub use crate::database::redis::RedisStorage; -#[cfg(all(redis, test))] -pub use redis::tests::{get_redis_instance, RedisInstance}; +//#[cfg(feature="redis")] +//mod redis; +//#[cfg(feature="redis")] +//pub use crate::database::redis::RedisStorage; +//#[cfg(all(redis, test))] +//pub use redis::tests::{get_redis_instance, RedisInstance}; mod file; pub use crate::database::file::FileStorage; +/// Data structure representing a Micropub channel in the ?q=channels output. #[derive(Serialize, Deserialize, PartialEq, Debug)] pub struct MicropubChannel { + /// The channel's UID. It is usually also a publically accessible permalink URL. pub uid: String, + /// The channel's user-friendly name used to recognize it in lists. pub name: String, } +/// Enum representing different errors that might occur during the database query. #[derive(Debug, Clone, Copy)] pub enum ErrorKind { + /// Backend error (e.g. database connection error) Backend, + /// Error due to insufficient contextual permissions for the query PermissionDenied, + /// Error due to the database being unable to parse JSON returned from the backing storage. + /// Usually indicative of someone fiddling with the database manually instead of using proper tools. JsonParsing, + /// - ErrorKind::NotFound - equivalent to a 404 error. Note, some requests return an Option, + /// in which case None is also equivalent to a 404. NotFound, + /// The user's query or request to the database was malformed. Used whenever the database processes + /// the user's query directly, such as when editing posts inside of the database (e.g. Redis backend) BadRequest, + /// - ErrorKind::Other - when something so weird happens that it becomes undescribable. Other, } +/// Error signalled from the database. #[derive(Debug)] pub struct StorageError { msg: String, @@ -115,6 +129,7 @@ impl StorageError { pub fn kind(&self) -> ErrorKind { self.kind } + /// Get the message as a string slice. pub fn msg(&self) -> &str { &self.msg } @@ -238,8 +253,8 @@ pub trait Storage: Clone + Send + Sync { #[cfg(test)] mod tests { - #[cfg(redis)] - use super::redis::tests::get_redis_instance; + //#[cfg(feature="redis")] + //use super::redis::tests::get_redis_instance; use super::{MicropubChannel, Storage}; use serde_json::json; use paste::paste; @@ -404,10 +419,11 @@ mod tests { "Vika's Hideout" ); } - macro_rules! redis_test { + + /*macro_rules! redis_test { ($func_name:expr) => { paste! { - #[cfg(redis)] + #[cfg(feature="redis")] #[async_std::test] async fn [] () { test_logger::ensure_env_logger_initialized(); @@ -419,7 +435,7 @@ mod tests { } } } - } + }*/ macro_rules! file_test { ($func_name:expr) => { @@ -435,10 +451,10 @@ mod tests { } } - redis_test!(test_backend_basic_operations); + /*redis_test!(test_backend_basic_operations); redis_test!(test_backend_get_channel_list); redis_test!(test_backend_settings); - redis_test!(test_backend_update); + redis_test!(test_backend_update);*/ file_test!(test_backend_basic_operations); file_test!(test_backend_get_channel_list); file_test!(test_backend_settings); diff --git a/src/database/redis/mod.rs b/src/database/redis/mod.rs index f1724b7..eeaa3f2 100644 --- a/src/database/redis/mod.rs +++ b/src/database/redis/mod.rs @@ -2,6 +2,7 @@ use async_trait::async_trait; use futures::stream; use futures_util::FutureExt; use futures_util::StreamExt; +use futures_util::TryStream; use futures_util::TryStreamExt; use lazy_static::lazy_static; use log::error; @@ -225,9 +226,13 @@ impl Storage for RedisStorage { } } } + async fn fetch_post_for_feed(url: String) -> Option { + return Some(serde_json::json!({})); + } let posts = stream::iter(posts_iter) - .map(|url| async move { - match self.redis.get().await { + .map(|url: String| async move { + return Ok(fetch_post_for_feed(url).await); + /*match self.redis.get().await { Ok(mut conn) => { match conn.hget::<&str, &str, Option>("posts", &url).await { Ok(post) => match post { @@ -241,18 +246,21 @@ impl Storage for RedisStorage { } } Err(err) => Err(StorageError::with_source(ErrorKind::Backend, "Error getting a connection from the pool", Box::new(err))) - } + }*/ }) // TODO: determine the optimal value for this buffer // It will probably depend on how often can you encounter a private post on the page // It shouldn't be too large, or we'll start fetching too many posts from the database // It MUST NOT be larger than the typical page size // It MUST NOT be a significant amount of the connection pool size - .buffered(std::cmp::min(3, limit)) + //.buffered(std::cmp::min(3, limit)) // Hack to unwrap the Option and sieve out broken links // Broken links return None, and Stream::filter_map skips all Nones. - .try_filter_map(|post: Option| async move { Ok(post) }) - .try_filter_map(|post| async move { Ok(filter_post(post, user)) }) + // I wonder if one can use try_flatten() here somehow akin to iters + .try_filter_map(|post| async move { Ok(post) }) + .try_filter_map(|post| async move { + Ok(filter_post(post, user)) + }) .take(limit); match posts.try_collect::>().await { Ok(posts) => feed["children"] = json!(posts), @@ -312,6 +320,12 @@ impl RedisStorage { Err(e) => Err(e.into()), } } + + pub async fn conn(&self) -> Result> { + self.redis.get().await.map_err(|e| StorageError::with_source( + ErrorKind::Backend, "Error getting a connection from the pool", Box::new(e) + )) + } } #[cfg(test)] diff --git a/src/lib.rs b/src/lib.rs index dfcfe6d..108a42f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,7 @@ use tide::{Request, Response}; -mod database; +/// Database abstraction layer for Kittybox, allowing the CMS to work with any kind of database. +pub mod database; mod frontend; mod indieauth; mod micropub; @@ -35,7 +36,7 @@ where .with(IndieAuthMiddleware::new()) .get(micropub::get_handler) .post(micropub::post_handler); - // The Micropub client. It'll start small, but could grow into something full-featured! + // The Micropub client. It'll start small, but could grow into something full-featured app.at("/micropub/client").get(|_: Request<_>| async move { Ok(Response::builder(200) .body(MICROPUB_CLIENT) @@ -67,7 +68,7 @@ where app } -#[cfg(redis)] +/*#[cfg(feature="redis")] pub async fn get_app_with_redis( token_endpoint: surf::Url, authorization_endpoint: surf::Url, @@ -85,7 +86,7 @@ pub async fn get_app_with_redis( }); equip_app(app) -} +}*/ pub async fn get_app_with_file( token_endpoint: surf::Url, @@ -128,7 +129,7 @@ pub async fn get_app_with_test_file(token_endpoint: surf::Url) -> ( (tempdir, backend, equip_app(app)) } -#[cfg(all(redis, test))] +/*#[cfg(all(redis, test))] pub async fn get_app_with_test_redis( token_endpoint: surf::Url, ) -> ( @@ -151,7 +152,7 @@ pub async fn get_app_with_test_redis( http_client: surf::Client::new(), }); (redis_instance, backend, equip_app(app)) -} +}*/ #[cfg(test)] #[allow(unused_variables)] diff --git a/src/main.rs b/src/main.rs index e91476b..3130411 100644 --- a/src/main.rs +++ b/src/main.rs @@ -65,22 +65,8 @@ async fn main() -> Result<(), std::io::Error> { .unwrap_or_else(|| "0.0.0.0:8080".to_string()); if backend_uri.starts_with("redis") { - #[cfg(redis)] - { - let app = kittybox::get_app_with_redis( - token_endpoint, - authorization_endpoint, - backend_uri, - media_endpoint, - internal_token, - ).await; - app.listen(host).await - } - #[cfg(not(redis))] - { - println!("The Redis backend was disabled at build-time. Please recompile the package with --features=redis."); - std::process::exit(1); - } + println!("The Redis backend is deprecated."); + std::process::exit(1); } else if backend_uri.starts_with("file") { let app = kittybox::get_app_with_file( token_endpoint, -- cgit 1.4.1