From 7f23ec84bc05c236c1bf40c2f0d72412af711516 Mon Sep 17 00:00:00 2001 From: Vika Date: Thu, 7 Jul 2022 00:32:33 +0300 Subject: treewide: rewrite using Axum Axum has streaming bodies and allows to write simpler code. It also helps enforce stronger types and looks much more neat. This allows me to progress on the media endpoint and add streaming reads and writes to the MediaStore trait. Metrics are temporarily not implemented. Everything else was preserved, and the tests still pass, after adjusting for new calling conventions. TODO: create method routers for protocol endpoints --- kittybox-rs/src/database/memory.rs | 138 ++++++++++++++++++++++++++++--------- 1 file changed, 104 insertions(+), 34 deletions(-) (limited to 'kittybox-rs/src/database/memory.rs') diff --git a/kittybox-rs/src/database/memory.rs b/kittybox-rs/src/database/memory.rs index 786466c..c8cc125 100644 --- a/kittybox-rs/src/database/memory.rs +++ b/kittybox-rs/src/database/memory.rs @@ -1,26 +1,26 @@ #![allow(clippy::todo)] use async_trait::async_trait; +use futures_util::FutureExt; +use serde_json::json; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::RwLock; -use futures_util::FutureExt; -use serde_json::json; -use crate::database::{Storage, Result, StorageError, ErrorKind, MicropubChannel, Settings}; +use crate::database::{ErrorKind, MicropubChannel, Result, Settings, Storage, StorageError}; #[derive(Clone, Debug)] pub struct MemoryStorage { pub mapping: Arc>>, - pub channels: Arc>>> + pub channels: Arc>>>, } #[async_trait] impl Storage for MemoryStorage { async fn post_exists(&self, url: &str) -> Result { - return Ok(self.mapping.read().await.contains_key(url)) + return Ok(self.mapping.read().await.contains_key(url)); } - async fn get_post(&self, url: &str) ->Result> { + async fn get_post(&self, url: &str) -> Result> { let mapping = self.mapping.read().await; match mapping.get(url) { Some(val) => { @@ -36,8 +36,8 @@ impl Storage for MemoryStorage { } else { Ok(Some(val.clone())) } - }, - _ => Ok(None) + } + _ => Ok(None), } } @@ -45,20 +45,45 @@ impl Storage for MemoryStorage { let mapping = &mut self.mapping.write().await; let key: &str = match post["properties"]["uid"][0].as_str() { Some(uid) => uid, - None => return Err(StorageError::new(ErrorKind::Other, "post doesn't have a UID")) + None => { + return Err(StorageError::new( + ErrorKind::Other, + "post doesn't have a UID", + )) + } }; mapping.insert(key.to_string(), post.clone()); if post["properties"]["url"].is_array() { - for url in post["properties"]["url"].as_array().unwrap().iter().map(|i| i.as_str().unwrap().to_string()) { + for url in post["properties"]["url"] + .as_array() + .unwrap() + .iter() + .map(|i| i.as_str().unwrap().to_string()) + { if url != key { - mapping.insert(url, json!({"see_other": key})); + mapping.insert(url, json!({ "see_other": key })); } } } - if post["type"].as_array().unwrap().iter().any(|i| i == "h-feed") { + if post["type"] + .as_array() + .unwrap() + .iter() + .any(|i| i == "h-feed") + { // This is a feed. Add it to the channels array if it's not already there. println!("{:#}", post); - self.channels.write().await.entry(post["properties"]["author"][0].as_str().unwrap().to_string()).or_insert_with(Vec::new).push(key.to_string()) + self.channels + .write() + .await + .entry( + post["properties"]["author"][0] + .as_str() + .unwrap() + .to_string(), + ) + .or_insert_with(Vec::new) + .push(key.to_string()) } Ok(()) } @@ -69,13 +94,24 @@ impl Storage for MemoryStorage { let mut remove_values: HashMap> = HashMap::new(); if let Some(delete) = update["delete"].as_array() { - remove_keys.extend(delete.iter().filter_map(|v| v.as_str()).map(|v| v.to_string())); + remove_keys.extend( + delete + .iter() + .filter_map(|v| v.as_str()) + .map(|v| v.to_string()), + ); } else if let Some(delete) = update["delete"].as_object() { for (k, v) in delete { if let Some(v) = v.as_array() { - remove_values.entry(k.to_string()).or_default().extend(v.clone()); + remove_values + .entry(k.to_string()) + .or_default() + .extend(v.clone()); } else { - return Err(StorageError::new(ErrorKind::BadRequest, "Malformed update object")); + return Err(StorageError::new( + ErrorKind::BadRequest, + "Malformed update object", + )); } } } @@ -84,7 +120,10 @@ impl Storage for MemoryStorage { if v.is_array() { add_keys.insert(k.to_string(), v.clone()); } else { - return Err(StorageError::new(ErrorKind::BadRequest, "Malformed update object")); + return Err(StorageError::new( + ErrorKind::BadRequest, + "Malformed update object", + )); } } } @@ -100,7 +139,10 @@ impl Storage for MemoryStorage { if let Some(new_post) = mapping.get(url) { post = new_post } else { - return Err(StorageError::new(ErrorKind::NotFound, "The post you have requested is not found in the database.")); + return Err(StorageError::new( + ErrorKind::NotFound, + "The post you have requested is not found in the database.", + )); } } let mut post = post.clone(); @@ -131,7 +173,12 @@ impl Storage for MemoryStorage { let k = &k; if let Some(prop) = props[k].as_array_mut() { if k == "children" { - v.as_array().unwrap().iter().cloned().rev().for_each(|v| prop.insert(0, v)); + v.as_array() + .unwrap() + .iter() + .cloned() + .rev() + .for_each(|v| prop.insert(0, v)); } else { prop.extend(v.as_array().unwrap().iter().cloned()); } @@ -139,32 +186,55 @@ impl Storage for MemoryStorage { post["properties"][k] = v } } - mapping.insert(post["properties"]["uid"][0].as_str().unwrap().to_string(), post); + mapping.insert( + post["properties"]["uid"][0].as_str().unwrap().to_string(), + post, + ); } else { - return Err(StorageError::new(ErrorKind::NotFound, "The designated post wasn't found in the database.")); + return Err(StorageError::new( + ErrorKind::NotFound, + "The designated post wasn't found in the database.", + )); } Ok(()) } async fn get_channels(&self, user: &'_ str) -> Result> { match self.channels.read().await.get(user) { - Some(channels) => Ok(futures_util::future::join_all(channels.iter() - .map(|channel| self.get_post(channel) - .map(|result| result.unwrap()) - .map(|post: Option| { - post.map(|post| MicropubChannel { - uid: post["properties"]["uid"][0].as_str().unwrap().to_string(), - name: post["properties"]["name"][0].as_str().unwrap().to_string() - }) + Some(channels) => Ok(futures_util::future::join_all( + channels + .iter() + .map(|channel| { + self.get_post(channel).map(|result| result.unwrap()).map( + |post: Option| { + post.map(|post| MicropubChannel { + uid: post["properties"]["uid"][0].as_str().unwrap().to_string(), + name: post["properties"]["name"][0] + .as_str() + .unwrap() + .to_string(), + }) + }, + ) }) - ).collect::>()).await.into_iter().flatten().collect::>()), - None => Ok(vec![]) + .collect::>(), + ) + .await + .into_iter() + .flatten() + .collect::>()), + None => Ok(vec![]), } - } #[allow(unused_variables)] - async fn read_feed_with_limit(&self, url: &'_ str, after: &'_ Option, limit: usize, user: &'_ Option) -> Result> { + async fn read_feed_with_limit( + &self, + url: &'_ str, + after: &'_ Option, + limit: usize, + user: &'_ Option, + ) -> Result> { todo!() } @@ -194,7 +264,7 @@ impl MemoryStorage { pub fn new() -> Self { Self { mapping: Arc::new(RwLock::new(HashMap::new())), - channels: Arc::new(RwLock::new(HashMap::new())) + channels: Arc::new(RwLock::new(HashMap::new())), } } } -- cgit 1.4.1