1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
use anyhow::{anyhow, Context};
use redis::{self, AsyncCommands};
use futures_util::StreamExt;
use kittybox::database::Storage;
use kittybox::database::FileStorage;
use std::collections::HashMap;
/// Convert from a Redis storage to a new storage new_storage.
async fn convert_from_redis<S: Storage>(from: String, new_storage: S) -> anyhow::Result<()> {
let db = redis::Client::open(from).context("Failed to open the Redis connection")?;
let mut conn = db.get_async_std_connection().await.context("Failed to connect to Redis")?;
// Rebinding to convince the borrow checker we're not smuggling stuff outta scope
let storage = &new_storage;
conn.hscan::<_, String>("posts").await?.map(|it| async move {
let json = serde_json::from_str(&it);
(it, json)
}).for_each_concurrent(8, |it| async move {
let (orig, res): (String, serde_json::Result<serde_json::Value>) = it.await;
match res {
Ok(json) => {
// XXX this assumes a trusted database that was created with Kittybox that checks for UID matching user token's prefix
let user = &(url::Url::parse(json["properties"]["uid"][0].as_str().unwrap()).unwrap().origin().ascii_serialization().clone() + "/");
if let Err(err) = storage.clone().put_post(&json, user).await {
eprintln!("Error saving post: {}", err);
}
},
Err(err) => {
eprintln!("Error: {} (rejected JSON follows below on stderr)", err);
eprintln!("{}", orig);
}
}
}).await;
let mut stream: redis::AsyncIter<String> = conn.scan_match("settings_*").await?;
while let Some(key) = stream.next_item().await {
let mut conn = db.get_async_std_connection().await.context("Failed to connect to Redis")?;
let user = key.strip_prefix("settings_").unwrap();
match conn.hgetall::<&str, HashMap<String, String>>(&key).await.context(format!("Failed getting settings from key {}", key)) {
Ok(settings) => {
for (k, v) in settings.iter() {
if let Err(e) = storage.set_setting(k, &user, v).await.with_context(|| format!("Failed setting {} for {}", k, user)) {
eprintln!("{}", e);
}
}
},
Err(e) => {
eprintln!("{}", e);
}
}
}
return Ok(());
}
#[async_std::main]
async fn main() -> anyhow::Result<()> {
let mut args = std::env::args();
args.next(); // skip argv[0]
let old_uri = args.next().ok_or_else(|| anyhow!("No import source is provided."))?;
let new_uri = args.next().ok_or_else(|| anyhow!("No import destination is provided."))?;
let storage = if new_uri.starts_with("file:") {
let folder = new_uri.strip_prefix("file://").unwrap();
let path = std::path::PathBuf::from(folder);
Box::new(FileStorage::new(path).await.context("Failed to construct the file storage")?)
} else {
anyhow::bail!("Cannot construct the storage abstraction for destination storage. Check the storage type?");
};
if old_uri.starts_with("redis") {
convert_from_redis(old_uri, *storage).await?
}
Ok(())
}
|