about summary refs log tree commit diff
path: root/src/database/file/mod.rs
diff options
context:
space:
mode:
authorVika <vika@fireburn.ru>2022-02-15 02:44:33 +0300
committerVika <vika@fireburn.ru>2022-02-15 02:46:24 +0300
commit9e4c4551a786830bf34d74c4ef111a8ed292fa9f (patch)
tree7796d7e529c89f22bccfbba4566b6bf5efca8071 /src/database/file/mod.rs
parentd1327ed6b28a49770aa5d9b06245aa063b406f78 (diff)
downloadkittybox-9e4c4551a786830bf34d74c4ef111a8ed292fa9f.tar.zst
WIP: convert to Tokio and Warp
Warp allows requests to be applied as "filters", allowing to flexibly
split up logic and have it work in a functional style, similar to
pipes.

Tokio is just an alternative runtime. I thought that maybe switching
runtimes and refactoring the code might allow me to fish out that
pesky bug with the whole application hanging after a certain amount of
requests...
Diffstat (limited to 'src/database/file/mod.rs')
-rw-r--r--src/database/file/mod.rs51
1 files changed, 25 insertions, 26 deletions
diff --git a/src/database/file/mod.rs b/src/database/file/mod.rs
index 3717023..6cbe3c6 100644
--- a/src/database/file/mod.rs
+++ b/src/database/file/mod.rs
@@ -2,11 +2,10 @@ use crate::database::{filter_post, ErrorKind, Result, Storage, StorageError};
 use std::fs::{File, OpenOptions};
 use std::io::{ErrorKind as IOErrorKind, Seek, SeekFrom, Read, Write};
 use std::time::Duration;
-use async_std::future::TimeoutError;
-use async_std::task::spawn_blocking;
+use tokio::task::spawn_blocking;
 use async_trait::async_trait;
 use fd_lock::RwLock;
-use futures::stream;
+use futures_util::stream;
 use futures_util::StreamExt;
 use futures_util::TryStreamExt;
 use log::debug;
@@ -27,8 +26,8 @@ impl From<std::io::Error> for StorageError {
     }
 }
 
-impl From<TimeoutError> for StorageError {
-    fn from(source: TimeoutError) -> Self {
+impl From<tokio::time::error::Elapsed> for StorageError {
+    fn from(source: tokio::time::error::Elapsed) -> Self {
         Self::with_source(
             ErrorKind::Backend,
             "timeout on I/O operation",
@@ -259,14 +258,14 @@ impl Storage for FileStorage {
     async fn post_exists(&self, url: &str) -> Result<bool> {
         let path = url_to_path(&self.root_dir, url);
         debug!("Checking if {:?} exists...", path);
-        Ok(spawn_blocking(move || path.is_file()).await)
+        Ok(spawn_blocking(move || path.is_file()).await.unwrap())
     }
 
     async fn get_post(&self, url: &str) -> Result<Option<serde_json::Value>> {
         let path = url_to_path(&self.root_dir, url);
         debug!("Opening {:?}", path);
         // Use exclusively synchronous operations to never transfer a lock over an await boundary
-        async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || {
+        tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || {
             match File::open(&path) {
                 Ok(file) => {
                     let lock = RwLock::new(file);
@@ -289,7 +288,7 @@ impl Storage for FileStorage {
                     }
                 }
             }
-        })).await?
+        })).await?.unwrap()
     }
 
     async fn put_post<'a>(&self, post: &'a serde_json::Value, user: &'a str) -> Result<()> {
@@ -303,7 +302,7 @@ impl Storage for FileStorage {
         let post_json = post.to_string();
         let post_path = path.clone();
         // Use exclusively synchronous operations to never transfer a lock over an await boundary
-        async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || {
+        tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || {
             let parent = post_path.parent().unwrap().to_owned();
             if !parent.is_dir() {
                 std::fs::create_dir_all(post_path.parent().unwrap())?;
@@ -323,7 +322,7 @@ impl Storage for FileStorage {
             drop(guard);
 
             Result::Ok(())
-        })).await??;
+        })).await?.unwrap()?;
 
         if post["properties"]["url"].is_array() {
             for url in post["properties"]["url"]
@@ -345,7 +344,7 @@ impl Storage for FileStorage {
                     })?;
                     let relative = path_relative_from(&orig, basedir).unwrap();
                     println!("{:?} - {:?} = {:?}", &orig, &basedir, &relative);
-                    async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || {
+                    tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || {
                         println!("Created a symlink at {:?}", &link);
                         let symlink_result;
                         #[cfg(unix)]
@@ -362,7 +361,7 @@ impl Storage for FileStorage {
                         } else {
                             Result::Ok(())
                         }
-                    })).await??;
+                    })).await?.unwrap()?;
                 }
             }
         }
@@ -386,7 +385,7 @@ impl Storage for FileStorage {
                 .unwrap_or_else(String::default);
             let key = key.to_string();
             drop(post);
-            async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || {
+            tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || {
                 let file = OpenOptions::new()
                     .read(true)
                     .write(true)
@@ -417,15 +416,15 @@ impl Storage for FileStorage {
                 (*guard).write_all(serde_json::to_string(&channels)?.as_bytes())?;
 
                 Result::Ok(())
-            })).await??;
+            })).await?.unwrap()?;
         }
         Ok(())
     }
 
     async fn update_post<'a>(&self, url: &'a str, update: serde_json::Value) -> Result<()> {
         let path = url_to_path(&self.root_dir, url);
-
-        let (old_json, new_json) = async_std::future::timeout(
+        #[allow(unused_variables)]
+        let (old_json, new_json) = tokio::time::timeout(
             Duration::from_secs(IO_TIMEOUT),
             spawn_blocking(move || {
                 let f = OpenOptions::new()
@@ -450,7 +449,7 @@ impl Storage for FileStorage {
 
                 Result::Ok((json, new_json))
             })
-        ).await??;
+        ).await?.unwrap()?;
         // TODO check if URLs changed between old and new JSON
         Ok(())
     }
@@ -461,7 +460,7 @@ impl Storage for FileStorage {
         path.push("channels");
 
         let path = path.to_path(&self.root_dir);
-        async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || {
+        tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || {
             match File::open(&path) {
                 Ok(f) => {
                     let lock = RwLock::new(f);
@@ -484,7 +483,7 @@ impl Storage for FileStorage {
                     }
                 }
             }
-        })).await?
+        })).await?.unwrap()
     }
 
     async fn read_feed_with_limit<'a>(
@@ -548,7 +547,7 @@ impl Storage for FileStorage {
 
     async fn delete_post<'a>(&self, url: &'a str) -> Result<()> {
         let path = url_to_path(&self.root_dir, url);
-        if let Err(e) = async_std::fs::remove_file(path).await {
+        if let Err(e) = tokio::fs::remove_file(path).await {
             Err(e.into())
         } else {
             // TODO check for dangling references in the channel list
@@ -565,7 +564,7 @@ impl Storage for FileStorage {
 
         let path = path.to_path(&self.root_dir);
         let setting = setting.to_string();
-        async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || {
+        tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || {
             let lock = RwLock::new(File::open(path)?);
             let guard = lock.read()?;
 
@@ -579,7 +578,7 @@ impl Storage for FileStorage {
                 .get(&setting)
                 .cloned()
                 .ok_or_else(|| StorageError::new(ErrorKind::Backend, "Setting not set"))
-        })).await?
+        })).await?.unwrap()
     }
 
     async fn set_setting<'a>(&self, setting: &'a str, user: &'a str, value: &'a str) -> Result<()> {
@@ -591,13 +590,13 @@ impl Storage for FileStorage {
         let path = path.to_path(&self.root_dir);
 
         let parent = path.parent().unwrap().to_owned();
-        if !spawn_blocking(move || parent.is_dir()).await {
-            async_std::fs::create_dir_all(path.parent().unwrap()).await?;
+        if !spawn_blocking(move || parent.is_dir()).await.unwrap() {
+            tokio::fs::create_dir_all(path.parent().unwrap()).await?;
         }
 
         let (setting, value) = (setting.to_string(), value.to_string());
 
-        async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || {
+        tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || {
             let file = OpenOptions::new()
                 .write(true)
                 .read(true)
@@ -622,6 +621,6 @@ impl Storage for FileStorage {
             (&mut *guard).set_len(0)?;
             (&mut *guard).write_all(serde_json::to_string(&settings)?.as_bytes())?;
             Result::Ok(())
-        })).await?
+        })).await?.unwrap()
     }
 }