From 9e4c4551a786830bf34d74c4ef111a8ed292fa9f Mon Sep 17 00:00:00 2001 From: Vika Date: Tue, 15 Feb 2022 02:44:33 +0300 Subject: WIP: convert to Tokio and Warp Warp allows requests to be applied as "filters", allowing to flexibly split up logic and have it work in a functional style, similar to pipes. Tokio is just an alternative runtime. I thought that maybe switching runtimes and refactoring the code might allow me to fish out that pesky bug with the whole application hanging after a certain amount of requests... --- src/database/file/mod.rs | 51 ++++++++++++++++++++++++------------------------ 1 file changed, 25 insertions(+), 26 deletions(-) (limited to 'src/database/file') diff --git a/src/database/file/mod.rs b/src/database/file/mod.rs index 3717023..6cbe3c6 100644 --- a/src/database/file/mod.rs +++ b/src/database/file/mod.rs @@ -2,11 +2,10 @@ use crate::database::{filter_post, ErrorKind, Result, Storage, StorageError}; use std::fs::{File, OpenOptions}; use std::io::{ErrorKind as IOErrorKind, Seek, SeekFrom, Read, Write}; use std::time::Duration; -use async_std::future::TimeoutError; -use async_std::task::spawn_blocking; +use tokio::task::spawn_blocking; use async_trait::async_trait; use fd_lock::RwLock; -use futures::stream; +use futures_util::stream; use futures_util::StreamExt; use futures_util::TryStreamExt; use log::debug; @@ -27,8 +26,8 @@ impl From for StorageError { } } -impl From for StorageError { - fn from(source: TimeoutError) -> Self { +impl From for StorageError { + fn from(source: tokio::time::error::Elapsed) -> Self { Self::with_source( ErrorKind::Backend, "timeout on I/O operation", @@ -259,14 +258,14 @@ impl Storage for FileStorage { async fn post_exists(&self, url: &str) -> Result { let path = url_to_path(&self.root_dir, url); debug!("Checking if {:?} exists...", path); - Ok(spawn_blocking(move || path.is_file()).await) + Ok(spawn_blocking(move || path.is_file()).await.unwrap()) } async fn get_post(&self, url: &str) -> Result> { let path = url_to_path(&self.root_dir, url); debug!("Opening {:?}", path); // Use exclusively synchronous operations to never transfer a lock over an await boundary - async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { + tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { match File::open(&path) { Ok(file) => { let lock = RwLock::new(file); @@ -289,7 +288,7 @@ impl Storage for FileStorage { } } } - })).await? + })).await?.unwrap() } async fn put_post<'a>(&self, post: &'a serde_json::Value, user: &'a str) -> Result<()> { @@ -303,7 +302,7 @@ impl Storage for FileStorage { let post_json = post.to_string(); let post_path = path.clone(); // Use exclusively synchronous operations to never transfer a lock over an await boundary - async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { + tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { let parent = post_path.parent().unwrap().to_owned(); if !parent.is_dir() { std::fs::create_dir_all(post_path.parent().unwrap())?; @@ -323,7 +322,7 @@ impl Storage for FileStorage { drop(guard); Result::Ok(()) - })).await??; + })).await?.unwrap()?; if post["properties"]["url"].is_array() { for url in post["properties"]["url"] @@ -345,7 +344,7 @@ impl Storage for FileStorage { })?; let relative = path_relative_from(&orig, basedir).unwrap(); println!("{:?} - {:?} = {:?}", &orig, &basedir, &relative); - async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { + tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { println!("Created a symlink at {:?}", &link); let symlink_result; #[cfg(unix)] @@ -362,7 +361,7 @@ impl Storage for FileStorage { } else { Result::Ok(()) } - })).await??; + })).await?.unwrap()?; } } } @@ -386,7 +385,7 @@ impl Storage for FileStorage { .unwrap_or_else(String::default); let key = key.to_string(); drop(post); - async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { + tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { let file = OpenOptions::new() .read(true) .write(true) @@ -417,15 +416,15 @@ impl Storage for FileStorage { (*guard).write_all(serde_json::to_string(&channels)?.as_bytes())?; Result::Ok(()) - })).await??; + })).await?.unwrap()?; } Ok(()) } async fn update_post<'a>(&self, url: &'a str, update: serde_json::Value) -> Result<()> { let path = url_to_path(&self.root_dir, url); - - let (old_json, new_json) = async_std::future::timeout( + #[allow(unused_variables)] + let (old_json, new_json) = tokio::time::timeout( Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { let f = OpenOptions::new() @@ -450,7 +449,7 @@ impl Storage for FileStorage { Result::Ok((json, new_json)) }) - ).await??; + ).await?.unwrap()?; // TODO check if URLs changed between old and new JSON Ok(()) } @@ -461,7 +460,7 @@ impl Storage for FileStorage { path.push("channels"); let path = path.to_path(&self.root_dir); - async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { + tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { match File::open(&path) { Ok(f) => { let lock = RwLock::new(f); @@ -484,7 +483,7 @@ impl Storage for FileStorage { } } } - })).await? + })).await?.unwrap() } async fn read_feed_with_limit<'a>( @@ -548,7 +547,7 @@ impl Storage for FileStorage { async fn delete_post<'a>(&self, url: &'a str) -> Result<()> { let path = url_to_path(&self.root_dir, url); - if let Err(e) = async_std::fs::remove_file(path).await { + if let Err(e) = tokio::fs::remove_file(path).await { Err(e.into()) } else { // TODO check for dangling references in the channel list @@ -565,7 +564,7 @@ impl Storage for FileStorage { let path = path.to_path(&self.root_dir); let setting = setting.to_string(); - async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { + tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { let lock = RwLock::new(File::open(path)?); let guard = lock.read()?; @@ -579,7 +578,7 @@ impl Storage for FileStorage { .get(&setting) .cloned() .ok_or_else(|| StorageError::new(ErrorKind::Backend, "Setting not set")) - })).await? + })).await?.unwrap() } async fn set_setting<'a>(&self, setting: &'a str, user: &'a str, value: &'a str) -> Result<()> { @@ -591,13 +590,13 @@ impl Storage for FileStorage { let path = path.to_path(&self.root_dir); let parent = path.parent().unwrap().to_owned(); - if !spawn_blocking(move || parent.is_dir()).await { - async_std::fs::create_dir_all(path.parent().unwrap()).await?; + if !spawn_blocking(move || parent.is_dir()).await.unwrap() { + tokio::fs::create_dir_all(path.parent().unwrap()).await?; } let (setting, value) = (setting.to_string(), value.to_string()); - async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { + tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { let file = OpenOptions::new() .write(true) .read(true) @@ -622,6 +621,6 @@ impl Storage for FileStorage { (&mut *guard).set_len(0)?; (&mut *guard).write_all(serde_json::to_string(&settings)?.as_bytes())?; Result::Ok(()) - })).await? + })).await?.unwrap() } } -- cgit 1.4.1