From 9e4c4551a786830bf34d74c4ef111a8ed292fa9f Mon Sep 17 00:00:00 2001 From: Vika Date: Tue, 15 Feb 2022 02:44:33 +0300 Subject: WIP: convert to Tokio and Warp Warp allows requests to be applied as "filters", allowing to flexibly split up logic and have it work in a functional style, similar to pipes. Tokio is just an alternative runtime. I thought that maybe switching runtimes and refactoring the code might allow me to fish out that pesky bug with the whole application hanging after a certain amount of requests... --- src/database/file/mod.rs | 51 +++++---- src/database/mod.rs | 38 ++----- src/lib.rs | 270 +++++++++++++++++++++++++++++------------------ src/main.rs | 118 ++++++++++++++++++--- src/metrics.rs | 23 +++- src/micropub/mod.rs | 128 +++++++++++++++++----- 6 files changed, 416 insertions(+), 212 deletions(-) (limited to 'src') diff --git a/src/database/file/mod.rs b/src/database/file/mod.rs index 3717023..6cbe3c6 100644 --- a/src/database/file/mod.rs +++ b/src/database/file/mod.rs @@ -2,11 +2,10 @@ use crate::database::{filter_post, ErrorKind, Result, Storage, StorageError}; use std::fs::{File, OpenOptions}; use std::io::{ErrorKind as IOErrorKind, Seek, SeekFrom, Read, Write}; use std::time::Duration; -use async_std::future::TimeoutError; -use async_std::task::spawn_blocking; +use tokio::task::spawn_blocking; use async_trait::async_trait; use fd_lock::RwLock; -use futures::stream; +use futures_util::stream; use futures_util::StreamExt; use futures_util::TryStreamExt; use log::debug; @@ -27,8 +26,8 @@ impl From for StorageError { } } -impl From for StorageError { - fn from(source: TimeoutError) -> Self { +impl From for StorageError { + fn from(source: tokio::time::error::Elapsed) -> Self { Self::with_source( ErrorKind::Backend, "timeout on I/O operation", @@ -259,14 +258,14 @@ impl Storage for FileStorage { async fn post_exists(&self, url: &str) -> Result { let path = url_to_path(&self.root_dir, url); debug!("Checking if {:?} exists...", path); - Ok(spawn_blocking(move || path.is_file()).await) + Ok(spawn_blocking(move || path.is_file()).await.unwrap()) } async fn get_post(&self, url: &str) -> Result> { let path = url_to_path(&self.root_dir, url); debug!("Opening {:?}", path); // Use exclusively synchronous operations to never transfer a lock over an await boundary - async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { + tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { match File::open(&path) { Ok(file) => { let lock = RwLock::new(file); @@ -289,7 +288,7 @@ impl Storage for FileStorage { } } } - })).await? + })).await?.unwrap() } async fn put_post<'a>(&self, post: &'a serde_json::Value, user: &'a str) -> Result<()> { @@ -303,7 +302,7 @@ impl Storage for FileStorage { let post_json = post.to_string(); let post_path = path.clone(); // Use exclusively synchronous operations to never transfer a lock over an await boundary - async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { + tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { let parent = post_path.parent().unwrap().to_owned(); if !parent.is_dir() { std::fs::create_dir_all(post_path.parent().unwrap())?; @@ -323,7 +322,7 @@ impl Storage for FileStorage { drop(guard); Result::Ok(()) - })).await??; + })).await?.unwrap()?; if post["properties"]["url"].is_array() { for url in post["properties"]["url"] @@ -345,7 +344,7 @@ impl Storage for FileStorage { })?; let relative = path_relative_from(&orig, basedir).unwrap(); println!("{:?} - {:?} = {:?}", &orig, &basedir, &relative); - async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { + tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { println!("Created a symlink at {:?}", &link); let symlink_result; #[cfg(unix)] @@ -362,7 +361,7 @@ impl Storage for FileStorage { } else { Result::Ok(()) } - })).await??; + })).await?.unwrap()?; } } } @@ -386,7 +385,7 @@ impl Storage for FileStorage { .unwrap_or_else(String::default); let key = key.to_string(); drop(post); - async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { + tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { let file = OpenOptions::new() .read(true) .write(true) @@ -417,15 +416,15 @@ impl Storage for FileStorage { (*guard).write_all(serde_json::to_string(&channels)?.as_bytes())?; Result::Ok(()) - })).await??; + })).await?.unwrap()?; } Ok(()) } async fn update_post<'a>(&self, url: &'a str, update: serde_json::Value) -> Result<()> { let path = url_to_path(&self.root_dir, url); - - let (old_json, new_json) = async_std::future::timeout( + #[allow(unused_variables)] + let (old_json, new_json) = tokio::time::timeout( Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { let f = OpenOptions::new() @@ -450,7 +449,7 @@ impl Storage for FileStorage { Result::Ok((json, new_json)) }) - ).await??; + ).await?.unwrap()?; // TODO check if URLs changed between old and new JSON Ok(()) } @@ -461,7 +460,7 @@ impl Storage for FileStorage { path.push("channels"); let path = path.to_path(&self.root_dir); - async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { + tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { match File::open(&path) { Ok(f) => { let lock = RwLock::new(f); @@ -484,7 +483,7 @@ impl Storage for FileStorage { } } } - })).await? + })).await?.unwrap() } async fn read_feed_with_limit<'a>( @@ -548,7 +547,7 @@ impl Storage for FileStorage { async fn delete_post<'a>(&self, url: &'a str) -> Result<()> { let path = url_to_path(&self.root_dir, url); - if let Err(e) = async_std::fs::remove_file(path).await { + if let Err(e) = tokio::fs::remove_file(path).await { Err(e.into()) } else { // TODO check for dangling references in the channel list @@ -565,7 +564,7 @@ impl Storage for FileStorage { let path = path.to_path(&self.root_dir); let setting = setting.to_string(); - async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { + tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { let lock = RwLock::new(File::open(path)?); let guard = lock.read()?; @@ -579,7 +578,7 @@ impl Storage for FileStorage { .get(&setting) .cloned() .ok_or_else(|| StorageError::new(ErrorKind::Backend, "Setting not set")) - })).await? + })).await?.unwrap() } async fn set_setting<'a>(&self, setting: &'a str, user: &'a str, value: &'a str) -> Result<()> { @@ -591,13 +590,13 @@ impl Storage for FileStorage { let path = path.to_path(&self.root_dir); let parent = path.parent().unwrap().to_owned(); - if !spawn_blocking(move || parent.is_dir()).await { - async_std::fs::create_dir_all(path.parent().unwrap()).await?; + if !spawn_blocking(move || parent.is_dir()).await.unwrap() { + tokio::fs::create_dir_all(path.parent().unwrap()).await?; } let (setting, value) = (setting.to_string(), value.to_string()); - async_std::future::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { + tokio::time::timeout(Duration::from_secs(IO_TIMEOUT), spawn_blocking(move || { let file = OpenOptions::new() .write(true) .read(true) @@ -622,6 +621,6 @@ impl Storage for FileStorage { (&mut *guard).set_len(0)?; (&mut *guard).write_all(serde_json::to_string(&settings)?.as_bytes())?; Result::Ok(()) - })).await? + })).await?.unwrap() } } diff --git a/src/database/mod.rs b/src/database/mod.rs index c0f9f29..55ab027 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -2,13 +2,6 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; -//#[cfg(feature="redis")] -//mod redis; -//#[cfg(feature="redis")] -//pub use crate::database::redis::RedisStorage; -//#[cfg(all(redis, test))] -//pub use redis::tests::{get_redis_instance, RedisInstance}; - mod file; pub use crate::database::file::FileStorage; @@ -49,7 +42,7 @@ pub struct StorageError { kind: ErrorKind, } -impl From for tide::Response { +/*impl From for tide::Response { fn from(err: StorageError) -> Self { tide::Response::builder(match err.kind() { ErrorKind::BadRequest => 400, @@ -66,7 +59,8 @@ impl From for tide::Response { })) .build() } -} +}*/ + impl std::error::Error for StorageError { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { self.source @@ -431,24 +425,7 @@ mod tests { ); } - /*macro_rules! redis_test { - ($func_name:expr) => { - paste! { - #[cfg(feature="redis")] - #[async_std::test] - async fn [] () { - test_logger::ensure_env_logger_initialized(); - let redis_instance = get_redis_instance().await; - let backend = super::RedisStorage::new(redis_instance.uri().to_string()) - .await - .unwrap(); - $func_name(backend).await - } - } - } - }*/ - - macro_rules! file_test { + /*macro_rules! file_test { ($func_name:expr) => { paste! { #[async_std::test] @@ -461,13 +438,10 @@ mod tests { } }; } - - /*redis_test!(test_backend_basic_operations); - redis_test!(test_backend_get_channel_list); - redis_test!(test_backend_settings); - redis_test!(test_backend_update);*/ + file_test!(test_backend_basic_operations); file_test!(test_backend_get_channel_list); file_test!(test_backend_settings); file_test!(test_backend_update); + */ } diff --git a/src/lib.rs b/src/lib.rs index 2b4d1cc..2585227 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,87 +1,172 @@ -use tide::{Request, Response}; - -/// Database abstraction layer for Kittybox, allowing the CMS to work with any kind of database. -pub mod database; +//use tide::{Request, Response}; +use warp::Filter; +/*pub mod database; mod frontend; mod indieauth; -mod metrics; -mod micropub; +mod micropub;*/ +pub mod metrics; +/// Database abstraction layer for Kittybox, allowing the CMS to work with any kind of database. +pub mod database; +pub mod micropub; +//pub mod indieauth; -use crate::indieauth::IndieAuthMiddleware; -use crate::micropub::CORSMiddleware; +/*use crate::indieauth::IndieAuthMiddleware; +use crate::micropub::CORSMiddleware;*/ -#[derive(Clone)] -pub struct ApplicationState -where - StorageBackend: database::Storage + Send + Sync + 'static, -{ - token_endpoint: surf::Url, - authorization_endpoint: surf::Url, - media_endpoint: Option, - internal_token: Option, - cookie_secret: String, - http_client: surf::Client, - storage: StorageBackend, +pub mod rejections { + #[derive(Debug)] + pub struct UnacceptableContentType; + impl warp::reject::Reject for UnacceptableContentType {} + + #[derive(Debug)] + pub struct HostHeaderUnset; + impl warp::reject::Reject for HostHeaderUnset {} } -type App = tide::Server>; - -static MICROPUB_CLIENT: &[u8] = include_bytes!("./index.html"); - -fn equip_app(mut app: App) -> App -where - Storage: database::Storage + Send + Sync + Clone, -{ - app.at("/micropub") - .with(CORSMiddleware {}) - .with(IndieAuthMiddleware::new()) - .get(micropub::get_handler) - .post(micropub::post_handler); - // The Micropub client. It'll start small, but could grow into something full-featured - app.at("/micropub/client").get(|_: Request<_>| async move { - Ok(Response::builder(200) - .body(MICROPUB_CLIENT) - .content_type("text/html") - .build()) - }); - app.at("/") - .with(CORSMiddleware {}) - .with(frontend::ErrorHandlerMiddleware {}) - .get(frontend::mainpage) - .post(frontend::onboarding_receiver); - app.at("/login") - .with(frontend::ErrorHandlerMiddleware {}) - .get(frontend::login::form) - .post(frontend::login::handler); - app.at("/login/callback") - .with(frontend::ErrorHandlerMiddleware {}) - .get(frontend::login::callback); - app.at("/static/*path") - .with(frontend::ErrorHandlerMiddleware {}) - .get(frontend::handle_static); - app.at("/*path") - .with(frontend::ErrorHandlerMiddleware {}) - .get(frontend::render_post); - app.at("/coffee") - .with(frontend::ErrorHandlerMiddleware {}) - .get(frontend::coffee); - // TODO make sure the health check actually checks the backend or something - // otherwise it'll get false-negatives for application faults like resource - // exhaustion - app.at("/health").get(|_| async { Ok("OK") }); - app.at("/metrics").get(metrics::gather); - - app.with(metrics::InstrumentationMiddleware {}); - app.with( - tide::sessions::SessionMiddleware::new( - tide::sessions::CookieStore::new(), - app.state().cookie_secret.as_bytes(), - ) - .with_cookie_name("kittybox_session") - .without_save_unchanged(), - ); - app +pub static MICROPUB_CLIENT: &[u8] = include_bytes!("./index.html"); + +pub mod util { + use warp::{Filter, host::Authority}; + use super::rejections; + + pub fn require_host() -> impl Filter + Copy { + warp::host::optional() + .and_then(|authority: Option| async move { + authority.ok_or_else(|| warp::reject::custom(rejections::HostHeaderUnset)) + }) + } + + pub fn template( + template: R + ) -> impl warp::Reply + where + R: markup::Render + std::fmt::Display + { + warp::reply::html(template.to_string()) + } + + pub fn parse_accept() -> impl Filter + Copy { + warp::header::value("Accept").and_then(|accept: warp::http::HeaderValue| async move { + let mut accept: http_types::content::Accept = { + // This is unneccesarily complicated because I want to reuse some http-types parsing + // and http-types has constructor for Headers private so I need to construct + // a mock Request to reason about headers... this is so dumb wtf + let bytes: &[u8] = accept.as_bytes(); + let value = http_types::headers::HeaderValue::from_bytes(bytes.to_vec()).unwrap(); + let values: http_types::headers::HeaderValues = vec![value].into(); + let mut request = http_types::Request::new(http_types::Method::Get, "http://example.com/"); + request.append_header("Accept".parse::().unwrap(), &values); + http_types::content::Accept::from_headers(&request).unwrap().unwrap() + }; + + // This code is INCREDIBLY dumb, honestly... + // why did I even try to use it? + // TODO vendor this stuff in so I can customize it + match accept.negotiate(&[ + "text/html; encoding=\"utf-8\"".into(), + "application/json; encoding=\"utf-8\"".into(), + "text/html".into(), + "application/json".into(), + + ]) { + Ok(mime) => { + Ok(http_types::Mime::from(mime.value().as_str())) + }, + Err(err) => { + log::error!("Content-Type negotiation error: {:?}, accepting: {:?}", err, accept); + Err(warp::reject::custom(rejections::UnacceptableContentType)) + } + } + }) + } + + mod tests { + #[tokio::test] + async fn test_require_host_with_host() { + use super::require_host; + + let filter = require_host(); + + let res = warp::test::request() + .path("/") + .header("Host", "localhost:8080") + .filter(&filter) + .await + .unwrap(); + + assert_eq!(res, "localhost:8080"); + + } + + #[tokio::test] + async fn test_require_host_no_host() { + use super::require_host; + + let filter = require_host(); + + let res = warp::test::request() + .path("/") + .filter(&filter) + .await; + + assert!(res.is_err()); + } + } } +// fn equip_app(mut app: App) -> App +// where +// Storage: database::Storage + Send + Sync + Clone, +// { +// app.at("/micropub") +// .with(CORSMiddleware {}) +// .with(IndieAuthMiddleware::new()) +// .get(micropub::get_handler) +// .post(micropub::post_handler); +// // The Micropub client. It'll start small, but could grow into something full-featured +// app.at("/micropub/client").get(|_: Request<_>| async move { +// Ok(Response::builder(200) +// .body(MICROPUB_CLIENT) +// .content_type("text/html") +// .build()) +// }); +// app.at("/") +// .with(CORSMiddleware {}) +// .with(frontend::ErrorHandlerMiddleware {}) +// .get(frontend::mainpage) +// .post(frontend::onboarding_receiver); +// app.at("/login") +// .with(frontend::ErrorHandlerMiddleware {}) +// .get(frontend::login::form) +// .post(frontend::login::handler); +// app.at("/login/callback") +// .with(frontend::ErrorHandlerMiddleware {}) +// .get(frontend::login::callback); +// app.at("/static/*path") +// .with(frontend::ErrorHandlerMiddleware {}) +// .get(frontend::handle_static); +// app.at("/*path") +// .with(frontend::ErrorHandlerMiddleware {}) +// .get(frontend::render_post); +// app.at("/coffee") +// .with(frontend::ErrorHandlerMiddleware {}) +// .get(frontend::coffee); +// // TODO make sure the health check actually checks the backend or something +// // otherwise it'll get false-negatives for application faults like resource +// // exhaustion +// app.at("/health").get(|_| async { Ok("OK") }); +// app.at("/metrics").get(metrics::gather); + +// app.with(metrics::InstrumentationMiddleware {}); +// app.with( +// tide::sessions::SessionMiddleware::new( +// tide::sessions::CookieStore::new(), +// app.state().cookie_secret.as_bytes(), +// ) +// .with_cookie_name("kittybox_session") +// .without_save_unchanged(), +// ); +// app +// } /*#[cfg(feature="redis")] pub async fn get_app_with_redis( @@ -103,30 +188,7 @@ pub async fn get_app_with_redis( equip_app(app) }*/ -pub async fn get_app_with_file( - token_endpoint: surf::Url, - authorization_endpoint: surf::Url, - backend_uri: String, - media_endpoint: Option, - cookie_secret: String, - internal_token: Option, -) -> App { - let folder = backend_uri.strip_prefix("file://").unwrap(); - let path = std::path::PathBuf::from(folder); - let app = tide::with_state(ApplicationState { - token_endpoint, - media_endpoint, - authorization_endpoint, - internal_token, - cookie_secret, - storage: database::FileStorage::new(path).await.unwrap(), - http_client: surf::Client::new(), - }); - - equip_app(app) -} - -#[cfg(test)] +/*#[cfg(test)] pub async fn get_app_with_test_file( token_endpoint: surf::Url, ) -> ( @@ -151,7 +213,7 @@ pub async fn get_app_with_test_file( (tempdir, backend, equip_app(app)) } -/*#[cfg(all(redis, test))] +#[cfg(all(redis, test))] pub async fn get_app_with_test_redis( token_endpoint: surf::Url, ) -> ( @@ -176,7 +238,7 @@ pub async fn get_app_with_test_redis( (redis_instance, backend, equip_app(app)) }*/ -#[cfg(test)] +/*#[cfg(test)] #[allow(unused_variables)] mod tests { use super::*; @@ -459,4 +521,4 @@ mod tests { assert_eq!(new_feed["children"][0].as_str().unwrap(), uid); assert_eq!(new_feed["children"][1].as_str().unwrap(), first_uid); } -} +}*/ diff --git a/src/main.rs b/src/main.rs index 79e0cf5..4036d46 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,10 @@ use log::{debug, error, info}; use std::env; -use surf::Url; +use http_types::Url; +use warp::{Filter, host::Authority, path::FullPath}; -#[async_std::main] -async fn main() -> Result<(), std::io::Error> { +#[tokio::main] +async fn main() -> Result<(), kittybox::database::StorageError> { // TODO json logging in the future? let logger_env = env_logger::Env::new().filter_or("RUST_LOG", "info"); env_logger::init_from_env(logger_env); @@ -64,13 +65,14 @@ async fn main() -> Result<(), std::io::Error> { Some(value) => value, None => { if let Ok(filename) = env::var("COOKIE_SECRET_FILE") { - use async_std::io::ReadExt; + /*use async_std::io::ReadExt; let mut file = async_std::fs::File::open(filename).await?; let mut temp_string = String::new(); file.read_to_string(&mut temp_string).await?; - temp_string + temp_string*/ + todo!() } else { error!("COOKIE_SECRET or COOKIE_SECRET_FILE is not set, will not be able to log in users securely!"); std::process::exit(1); @@ -78,24 +80,106 @@ async fn main() -> Result<(), std::io::Error> { } }; - let host = env::var("SERVE_AT") + let host: std::net::SocketAddr = match env::var("SERVE_AT") .ok() - .unwrap_or_else(|| "0.0.0.0:8080".to_string()); + .unwrap_or_else(|| "0.0.0.0:8080".to_string()) + .parse() { + Ok(addr) => addr, + Err(e) => { + error!("Cannot parse SERVE_AT: {}", e); + std::process::exit(1); + } + }; if backend_uri.starts_with("redis") { println!("The Redis backend is deprecated."); std::process::exit(1); } else if backend_uri.starts_with("file") { - let app = kittybox::get_app_with_file( - token_endpoint, - authorization_endpoint, - backend_uri, - media_endpoint, - cookie_secret, - internal_token, - ) - .await; - app.listen(host).await + + let database = { + let folder = backend_uri.strip_prefix("file://").unwrap(); + let path = std::path::PathBuf::from(folder); + kittybox::database::FileStorage::new(path).await? + }; + + // TODO interpret HEAD + let homepage = kittybox::util::require_host() + .and(warp::get()) + .and(warp::path::end()) + // TODO fetch content from the database + // TODO parse content-type and determine appropriate response + .map(|host| format!("front page for {}!", host)); + + let micropub = warp::path("micropub") + .and(warp::path::end() + .and(warp::get() + .and(kittybox::micropub::query(database)) + .or(warp::post() + .and(kittybox::util::require_host()) + .map(|host| "micropub post!")) + .or(warp::options() + .map(|| warp::reply::json::>(&None)) + // TODO: why doesn't this work? + // .map(warp::reply::with::header("Allow", "GET, POST")) + .map(|reply| warp::reply::with_header(reply, "Allow", "GET, POST")) + )) + .or(warp::get() + .and(warp::path("client")) + .and(warp::path::end()) + .map(|| kittybox::MICROPUB_CLIENT))); + + let media = warp::path("media") + .and(warp::path::end() + .and(kittybox::util::require_host()) + .map(|host| "media endpoint?...") + .or(kittybox::util::require_host() + .and(warp::path::param()) + .map(|host: Authority, path: String| format!("media file {}", path)))); + + // TODO remember how login logic works because I forgor + let login = warp::path("login") + .and(warp::path("callback") + .map(|| "callback!") + // TODO form on GET and handler on POST + .or(warp::path::end().map(|| "login page!"))); + + // TODO prettier error response + let coffee = warp::path("coffee") + .map(|| warp::reply::with_status("I'm a teapot!", warp::http::StatusCode::IM_A_TEAPOT)); + + // TODO interpret HEAD + let static_files = warp::get() + .and(warp::path!("static" / String)) + .map(|path| path); + + // TODO interpret HEAD + let catchall = warp::get() + .and(kittybox::util::require_host()) + .and(warp::path::full()) + .map(|host: Authority, path: FullPath| host.to_string() + path.as_str() + ".json") + // TODO fetch content from the database + // TODO parse content-type and determine appropriate response + ; + + let health = warp::path("health").and(warp::path::end()).map(|| "OK"); + // TODO instrumentation middleware (see metrics.rs for comments) + //let metrics = warp::path("metrics").and(warp::path::end()).map(kittybox::metrics::gather); + let app = homepage + .or(login) + .or(static_files) + .or(coffee) + .or(health) + .or(micropub) + .or(media) + .or(catchall) + ; + + let server = warp::serve(app); + + // TODO use warp::Server::bind_with_graceful_shutdown + info!("Listening on {:?}", host); + server.bind(host).await; + Ok(()) } else { println!("Unknown backend, not starting."); std::process::exit(1); diff --git a/src/metrics.rs b/src/metrics.rs index 9f512dd..7bfa2d2 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1,3 +1,4 @@ +#![allow(unused_imports, dead_code)] use async_trait::async_trait; use lazy_static::lazy_static; use prometheus::{ @@ -5,7 +6,7 @@ use prometheus::{ TextEncoder, }; use std::time::{Duration, Instant}; -use tide::{Next, Request, Response, Result}; +//use tide::{Next, Request, Response, Result}; // Copied from https://docs.rs/prometheus/0.12.0/src/prometheus/histogram.rs.html#885-889 #[inline] @@ -29,7 +30,7 @@ lazy_static! { .unwrap(); } -pub struct InstrumentationMiddleware {} +/*pub struct InstrumentationMiddleware {} #[async_trait] impl tide::Middleware for InstrumentationMiddleware @@ -55,9 +56,9 @@ where Ok(res) } -} +}*/ -pub async fn gather(_: Request) -> Result +/*pub async fn gather(_: Request) -> Result where S: Send + Sync + Clone, { @@ -67,4 +68,18 @@ where encoder.encode(&metric_families, &mut buffer).unwrap(); Ok(Response::builder(200).body(buffer).build()) +}*/ + +// TODO metrics middleware +// warp doesn't allow running a filter manually +// so you need to escape into the world of hyper +// to collect metrics on requests + +pub fn gather() -> Vec { + let mut buffer: Vec = vec![]; + let encoder = TextEncoder::new(); + let metric_families = prometheus::gather(); + encoder.encode(&metric_families, &mut buffer).unwrap(); + + buffer } diff --git a/src/micropub/mod.rs b/src/micropub/mod.rs index 23f20c4..95595cf 100644 --- a/src/micropub/mod.rs +++ b/src/micropub/mod.rs @@ -1,31 +1,101 @@ -pub mod get; -pub mod post; - -pub use get::get_handler; -pub use post::normalize_mf2; -pub use post::post_handler; - -pub struct CORSMiddleware {} - -use crate::database; -use crate::ApplicationState; -use async_trait::async_trait; -use tide::{Next, Request, Result}; - -#[async_trait] -impl tide::Middleware> for CORSMiddleware -where - B: database::Storage + Send + Sync + Clone, -{ - async fn handle( - &self, - req: Request>, - next: Next<'_, ApplicationState>, - ) -> Result { - let mut res = next.run(req).await; - - res.insert_header("Access-Control-Allow-Origin", "*"); - - Ok(res) +use warp::http::StatusCode; +use warp::{Filter, Rejection, reject::InvalidQuery}; +use serde_json::{json, Value}; +use serde::{Serialize, Deserialize}; +use crate::database::{MicropubChannel, Storage}; + +#[derive(Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +enum QueryType { + Source, + Config, + Channel, + SyndicateTo +} + +#[derive(Serialize, Deserialize)] +struct MicropubQuery { + q: QueryType, + url: Option +} + +#[derive(Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +enum ErrorType { + InvalidRequest, + InternalServerError +} + +#[derive(Serialize, Deserialize)] +struct MicropubError { + error: ErrorType, + error_description: String +} + +impl From for StatusCode { + fn from(err: MicropubError) -> Self { + match err.error { + ErrorType::InvalidRequest => StatusCode::BAD_REQUEST, + ErrorType::InternalServerError => StatusCode::INTERNAL_SERVER_ERROR + } + } +} + +impl MicropubError { + fn new(error: ErrorType, error_description: &str) -> Self { + Self { + error, + error_description: error_description.to_owned() + } } } + +pub fn query(db: D) -> impl Filter + Clone { + warp::get() + .map(move || db.clone()) + .and(crate::util::require_host()) + .and(warp::query::()) + .then(|db: D, host: warp::host::Authority, query: MicropubQuery| async move { + match query.q { + QueryType::Config => { + let channels: Vec = match db.get_channels(host.as_str()).await { + Ok(chans) => chans, + Err(err) => return warp::reply::json(&MicropubError::new( + ErrorType::InternalServerError, + &format!("Error fetching channels: {}", err) + )) + }; + + warp::reply::json(json!({ + "q": [ + QueryType::Source, + QueryType::Config, + QueryType::Channel, + QueryType::SyndicateTo + ], + "channels": channels, + "_kittybox_authority": host.as_str() + }).as_object().unwrap()) + }, + _ => { + todo!() + } + } + }) + .recover(|err: Rejection| async move { + let error = if let Some(_) = err.find::() { + MicropubError::new( + ErrorType::InvalidRequest, + "Invalid query parameters sent. Try ?q=config to see what you can do." + ) + } else { + log::error!("Unhandled rejection: {:?}", err); + MicropubError::new( + ErrorType::InternalServerError, + &format!("Unknown error: {:?}", err) + ) + }; + + Ok(warp::reply::with_status(warp::reply::json(&error), error.into())) + }) +} -- cgit 1.4.1