From 9e4c4551a786830bf34d74c4ef111a8ed292fa9f Mon Sep 17 00:00:00 2001 From: Vika Date: Tue, 15 Feb 2022 02:44:33 +0300 Subject: WIP: convert to Tokio and Warp Warp allows requests to be applied as "filters", allowing to flexibly split up logic and have it work in a functional style, similar to pipes. Tokio is just an alternative runtime. I thought that maybe switching runtimes and refactoring the code might allow me to fish out that pesky bug with the whole application hanging after a certain amount of requests... --- src/database/file/mod.rs | 51 ++++++++++++++++++++++++------------------------ src/database/mod.rs | 38 ++++++------------------------------ 2 files changed, 31 insertions(+), 58 deletions(-) (limited to 'src/database') diff --git a/src/database/file/mod.rs b/src/database/file/mod.rs index 3717023..6cbe3c6 100644 --- a/src/database/file/mod.rs +++ b/src/database/file/mod.rs @@ -2,11 +2,10 @@ use crate::database::{filter_post, ErrorKind, Result, Storage, StorageError}; use std::fs::{File, OpenOptions}; use std::io::{ErrorKind as IOErrorKind, Seek, SeekFrom, Read, Write}; use std::time::Duration; -use async_std::future::TimeoutError; -use async_std::task::spawn_blocking; +use tokio::task::spawn_blocking; use async_trait::async_trait; use fd_lock::RwLock; -use futures::stream; +use futures_util::stream; use futures_util::StreamExt; use futures_util::TryStreamExt; use log::debug; @@ -27,8 +26,8 @@ impl From for StorageError { } } -impl From for StorageError { - fn from(source: TimeoutError) -> Self { +impl From for StorageError { + fn from(source: tokio::time::error::Elapsed) -> Self { Self::with_source( ErrorKind::Backend, "timeout on I/O operation", @@ -259,14 +258,14 @@ impl Storage for FileStorage { async fn post_exists(&self, url: &str) -> Result { let path = url_to_path(&self.root_dir, url); debug!("Checking if {:?} exists...", path); - Ok(spawn_blocking(move || path.is_file()).await) + Ok(spawn_blocking(move || path.is_file()).await.unwrap()) } async fn get_post(&self, url: &str) -> Result> { let path = url_to_path(&self.root_dir, url); debug!("Opening {:?}", path); // Use exclusively synchronous operations to never transfer a lock over an await boundary - async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { + tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { match File::open(&path) { Ok(file) => { let lock = RwLock::new(file); @@ -289,7 +288,7 @@ impl Storage for FileStorage { } } } - })).await? + })).await?.unwrap() } async fn put_post<'a>(&self, post: &'a serde_json::Value, user: &'a str) -> Result<()> { @@ -303,7 +302,7 @@ impl Storage for FileStorage { let post_json = post.to_string(); let post_path = path.clone(); // Use exclusively synchronous operations to never transfer a lock over an await boundary - async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { + tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { let parent = post_path.parent().unwrap().to_owned(); if !parent.is_dir() { std::fs::create_dir_all(post_path.parent().unwrap())?; @@ -323,7 +322,7 @@ impl Storage for FileStorage { drop(guard); Result::Ok(()) - })).await??; + })).await?.unwrap()?; if post["properties"]["url"].is_array() { for url in post["properties"]["url"] @@ -345,7 +344,7 @@ impl Storage for FileStorage { })?; let relative = path_relative_from(&orig, basedir).unwrap(); println!("{:?} - {:?} = {:?}", &orig, &basedir, &relative); - async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { + tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { println!("Created a symlink at {:?}", &link); let symlink_result; #[cfg(unix)] @@ -362,7 +361,7 @@ impl Storage for FileStorage { } else { Result::Ok(()) } - })).await??; + })).await?.unwrap()?; } } } @@ -386,7 +385,7 @@ impl Storage for FileStorage { .unwrap_or_else(String::default); let key = key.to_string(); drop(post); - async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { + tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { let file = OpenOptions::new() .read(true) .write(true) @@ -417,15 +416,15 @@ impl Storage for FileStorage { (*guard).write_all(serde_json::to_string(&channels)?.as_bytes())?; Result::Ok(()) - })).await??; + })).await?.unwrap()?; } Ok(()) } async fn update_post<'a>(&self, url: &'a str, update: serde_json::Value) -> Result<()> { let path = url_to_path(&self.root_dir, url); - - let (old_json, new_json) = async_std::future::timeout( + #[allow(unused_variables)] + let (old_json, new_json) = tokio::time::timeout( Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { let f = OpenOptions::new() @@ -450,7 +449,7 @@ impl Storage for FileStorage { Result::Ok((json, new_json)) }) - ).await??; + ).await?.unwrap()?; // TODO check if URLs changed between old and new JSON Ok(()) } @@ -461,7 +460,7 @@ impl Storage for FileStorage { path.push("channels"); let path = path.to_path(&self.root_dir); - async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { + tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { match File::open(&path) { Ok(f) => { let lock = RwLock::new(f); @@ -484,7 +483,7 @@ impl Storage for FileStorage { } } } - })).await? + })).await?.unwrap() } async fn read_feed_with_limit<'a>( @@ -548,7 +547,7 @@ impl Storage for FileStorage { async fn delete_post<'a>(&self, url: &'a str) -> Result<()> { let path = url_to_path(&self.root_dir, url); - if let Err(e) = async_std::fs::remove_file(path).await { + if let Err(e) = tokio::fs::remove_file(path).await { Err(e.into()) } else { // TODO check for dangling references in the channel list @@ -565,7 +564,7 @@ impl Storage for FileStorage { let path = path.to_path(&self.root_dir); let setting = setting.to_string(); - async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { + tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { let lock = RwLock::new(File::open(path)?); let guard = lock.read()?; @@ -579,7 +578,7 @@ impl Storage for FileStorage { .get(&setting) .cloned() .ok_or_else(|| StorageError::new(ErrorKind::Backend, "Setting not set")) - })).await? + })).await?.unwrap() } async fn set_setting<'a>(&self, setting: &'a str, user: &'a str, value: &'a str) -> Result<()> { @@ -591,13 +590,13 @@ impl Storage for FileStorage { let path = path.to_path(&self.root_dir); let parent = path.parent().unwrap().to_owned(); - if !spawn_blocking(move || parent.is_dir()).await { - async_std::fs::create_dir_all(path.parent().unwrap()).await?; + if !spawn_blocking(move || parent.is_dir()).await.unwrap() { + tokio::fs::create_dir_all(path.parent().unwrap()).await?; } let (setting, value) = (setting.to_string(), value.to_string()); - async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { + tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { let file = OpenOptions::new() .write(true) .read(true) @@ -622,6 +621,6 @@ impl Storage for FileStorage { (&mut *guard).set_len(0)?; (&mut *guard).write_all(serde_json::to_string(&settings)?.as_bytes())?; Result::Ok(()) - })).await? + })).await?.unwrap() } } diff --git a/src/database/mod.rs b/src/database/mod.rs index c0f9f29..55ab027 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -2,13 +2,6 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; -//#[cfg(feature="redis")] -//mod redis; -//#[cfg(feature="redis")] -//pub use crate::database::redis::RedisStorage; -//#[cfg(all(redis, test))] -//pub use redis::tests::{get_redis_instance, RedisInstance}; - mod file; pub use crate::database::file::FileStorage; @@ -49,7 +42,7 @@ pub struct StorageError { kind: ErrorKind, } -impl From for tide::Response { +/*impl From for tide::Response { fn from(err: StorageError) -> Self { tide::Response::builder(match err.kind() { ErrorKind::BadRequest => 400, @@ -66,7 +59,8 @@ impl From for tide::Response { })) .build() } -} +}*/ + impl std::error::Error for StorageError { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { self.source @@ -431,24 +425,7 @@ mod tests { ); } - /*macro_rules! redis_test { - ($func_name:expr) => { - paste! { - #[cfg(feature="redis")] - #[async_std::test] - async fn [] () { - test_logger::ensure_env_logger_initialized(); - let redis_instance = get_redis_instance().await; - let backend = super::RedisStorage::new(redis_instance.uri().to_string()) - .await - .unwrap(); - $func_name(backend).await - } - } - } - }*/ - - macro_rules! file_test { + /*macro_rules! file_test { ($func_name:expr) => { paste! { #[async_std::test] @@ -461,13 +438,10 @@ mod tests { } }; } - - /*redis_test!(test_backend_basic_operations); - redis_test!(test_backend_get_channel_list); - redis_test!(test_backend_settings); - redis_test!(test_backend_update);*/ + file_test!(test_backend_basic_operations); file_test!(test_backend_get_channel_list); file_test!(test_backend_settings); file_test!(test_backend_update); + */ } -- cgit 1.4.1