From 42506c91065cd69bd03ce68715662dd898e2181f Mon Sep 17 00:00:00 2001 From: Vika Date: Wed, 13 Nov 2024 06:41:45 +0300 Subject: tower-watchdog: init at 1.0.0 Wait, is this my first self-contained crate? Not bad. I like this. Maybe I'll go publish it to crates.io? Change-Id: I340d0839746ff1cfbcc4c82c230ae2adff2a92f7 --- Cargo.lock | 68 ++++++++++++++++++++++++-- Cargo.toml | 2 +- tower-watchdog/Cargo.toml | 18 +++++++ tower-watchdog/src/lib.rs | 108 +++++++++++++++++++++++++++++++++++++++++ tower-watchdog/src/watchdog.rs | 45 +++++++++++++++++ 5 files changed, 236 insertions(+), 5 deletions(-) create mode 100644 tower-watchdog/Cargo.toml create mode 100644 tower-watchdog/src/lib.rs create mode 100644 tower-watchdog/src/watchdog.rs diff --git a/Cargo.lock b/Cargo.lock index 05f96dd..c9f8ac2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -249,6 +249,28 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.72", +] + [[package]] name = "async-trait" version = "0.1.81" @@ -4125,6 +4147,19 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-test" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2468baabc3311435b55dd935f702f42cd1b8abb7e754fb7dfb16bd36aa88f9f7" +dependencies = [ + "async-stream", + "bytes", + "futures-core", + "tokio", + "tokio-stream", +] + [[package]] name = "tokio-util" version = "0.7.11" @@ -4174,15 +4209,40 @@ dependencies = [ [[package]] name = "tower-layer" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" [[package]] name = "tower-service" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" +checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" + +[[package]] +name = "tower-test" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4546773ffeab9e4ea02b8872faa49bb616a80a7da66afc2f32688943f97efa7" +dependencies = [ + "futures-util", + "pin-project", + "tokio", + "tokio-test", + "tower-layer", + "tower-service", +] + +[[package]] +name = "tower-watchdog" +version = "1.0.0" +dependencies = [ + "futures", + "tokio", + "tower-layer", + "tower-service", + "tower-test", +] [[package]] name = "tracing" diff --git a/Cargo.toml b/Cargo.toml index 6d4056d..001f2b1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,7 +54,7 @@ path = "examples/sql.rs" required-features = ["sqlparser"] [workspace] -members = [".", "./util", "./templates", "./indieauth", "./templates-neo"] +members = [".", "./util", "./templates", "./indieauth", "./templates-neo", "./tower-watchdog"] default-members = [".", "./util", "./templates", "./indieauth"] [dependencies.kittybox-util] version = "0.2.0" diff --git a/tower-watchdog/Cargo.toml b/tower-watchdog/Cargo.toml new file mode 100644 index 0000000..24cc0fe --- /dev/null +++ b/tower-watchdog/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "tower-watchdog" +version = "1.0.0" +edition = "2021" + +[dev-dependencies] +tower-test = "0.4.0" +tokio = { version = "1.29.1", features = [ "macros", "rt" ] } +[dependencies] +[dependencies.tokio] +version = "^1.29.1" +features = ["time", "sync"] +[dependencies.tower-layer] +version = "0.3.3" +[dependencies.tower-service] +version = "0.3.3" +[dependencies.futures] +version = "0.3.14" \ No newline at end of file diff --git a/tower-watchdog/src/lib.rs b/tower-watchdog/src/lib.rs new file mode 100644 index 0000000..9a5c609 --- /dev/null +++ b/tower-watchdog/src/lib.rs @@ -0,0 +1,108 @@ +#[deny(missing_docs)] +mod watchdog; +pub use watchdog::Watchdog; + +pub struct WatchdogLayer { + pet: watchdog::Pet, +} + +/// A Tower layer to send a signal if there hasn't been new requests +/// in a while. +/// +/// It resets a timer at the beginning of every single incoming +/// request. Wait on the watchdog to begin. If no new requests haven't +/// arrived in a while, the corresponding paired [Watchdog]'s +/// [wait][Watchdog::wait] future will resolve. This is a signal to +/// gracefully shutdown a server. +impl WatchdogLayer { + pub fn new(timeout: std::time::Duration) -> (watchdog::Watchdog, WatchdogLayer) { + let (watchdog, pet) = watchdog::watchdog(timeout); + (watchdog, WatchdogLayer { pet }) + } +} + +impl tower_layer::Layer for WatchdogLayer { + type Service = WatchdogService; + + fn layer(&self, inner: S) -> Self::Service { + Self::Service { + pet: self.pet.clone(), + inner + } + } +} + +pub struct WatchdogService { + pet: watchdog::Pet, + inner: S +} + +impl + Clone + 'static, Request: std::fmt::Debug + 'static> tower_service::Service for WatchdogService { + type Response = S::Response; + type Error = S::Error; + type Future = std::pin::Pin>> + Send>>, std::pin::Pin>, Box>) -> std::pin::Pin>>>>>; + + fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, request: Request) -> Self::Future { + use futures::FutureExt; + // We need to get the service that we just polled. For this, + // we clone the service, leave in the clone and take the + // original. + // + // Don't ask me why this is needed. + let mut inner = self.inner.clone(); + std::mem::swap(&mut self.inner, &mut inner); + + let pet = self.pet.clone(); + Box::pin(pet.pet_owned().boxed().then(Box::new(move |_| Box::pin(inner.call(request))))) + } +} + +#[cfg(test)] +mod tests { + use futures::FutureExt; + + #[tokio::test(start_paused = true)] + async fn test_watchdog_layer() { + use std::time::Duration; + + let (watchdog, layer) = super::WatchdogLayer::new(Duration::from_secs(1)); + let (mut mock, mut handle) = tower_test::mock::spawn_layer::<(), (), _>(layer); + handle.allow((100..1000).count() as u64 + 1); + // We don't actually care what the service itself does. + let responder = tokio::task::spawn(async move { + while let Some(((), res)) = handle.next_request().await { + res.send_response(()) + } + }); + + let mut watchdog_future = Box::pin(watchdog.wait().fuse()); + + for i in 100..=1_000 { + if i != 1000 { + assert!(mock.poll_ready().is_ready()); + let request = Box::pin(tokio::time::sleep(std::time::Duration::from_millis(i)).then(|()| mock.call(()))); + tokio::select! { + _ = &mut watchdog_future => panic!("Watchdog called earlier than response!"), + _ = request => {}, + }; + } else { + assert!(mock.poll_ready().is_ready()); + // We use `+ 1` here, because the watchdog behavior is + // subject to a data race if a request arrives in the + // same tick. + let request = Box::pin(tokio::time::sleep(std::time::Duration::from_millis(i + 1)).then(|()| mock.call(()))); + tokio::select! { + _ = &mut watchdog_future => { + }, + _ = request => panic!("Watchdog didn't fire!") + }; + } + } + + responder.abort(); + } +} diff --git a/tower-watchdog/src/watchdog.rs b/tower-watchdog/src/watchdog.rs new file mode 100644 index 0000000..4162294 --- /dev/null +++ b/tower-watchdog/src/watchdog.rs @@ -0,0 +1,45 @@ +/// A watchdog timer with a timeout that needs to be regularly pinged +/// to prevent it from firing. +/// +/// **Note**: the behavior of the watchdog is undefined (but will not +/// result in a crash or put the watchdog in an invalid state) if it +/// is pet at the last possible moment, for example, like this: +/// +/// ```no_compile +/// let (watchdog, pet) = /* ... */ +/// tokio::select! { +/// tokio::time::sleep(watchdog.timeout).then(move |()| pet.pet()) = _ => { eprintln!("foo"); }, +/// watchdog.wait() = _ => { eprintln!("bar"); } +/// ``` +pub struct Watchdog { + rx: tokio::sync::mpsc::Receiver<()>, + /// Timeout of this watchdog. + pub timeout: std::time::Duration, +} +#[derive(Clone)] +pub(crate) struct Pet(tokio::sync::mpsc::Sender<()>); + +pub(crate) fn watchdog(timeout: std::time::Duration) -> (Watchdog, Pet) { + let (tx, rx) = tokio::sync::mpsc::channel::<()>(1); + let pet = Pet(tx); + let watchdog = Watchdog { rx, timeout }; + + (watchdog, pet) +} + +impl Watchdog { + /// Wait until the watchdog fires from not being pet in a while. + /// + /// The watchdog doesn't start waiting for pets and scritches + /// until you await the future returned from this method. + pub async fn wait(mut self) { + while let Ok(Some(())) = tokio::time::timeout(self.timeout, self.rx.recv()).await {} + } +} + +impl Pet { + pub(crate) async fn pet_owned(self) -> Result<(), tokio::sync::mpsc::error::SendError<()>> { + let tx = self.0.clone(); + tx.send(()).await + } +} -- cgit 1.4.1