about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock68
-rw-r--r--Cargo.toml2
-rw-r--r--tower-watchdog/Cargo.toml18
-rw-r--r--tower-watchdog/src/lib.rs108
-rw-r--r--tower-watchdog/src/watchdog.rs45
5 files changed, 236 insertions, 5 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 05f96dd..c9f8ac2 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -250,6 +250,28 @@ dependencies = [
 ]
 
 [[package]]
+name = "async-stream"
+version = "0.3.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476"
+dependencies = [
+ "async-stream-impl",
+ "futures-core",
+ "pin-project-lite",
+]
+
+[[package]]
+name = "async-stream-impl"
+version = "0.3.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.72",
+]
+
+[[package]]
 name = "async-trait"
 version = "0.1.81"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -4126,6 +4148,19 @@ dependencies = [
 ]
 
 [[package]]
+name = "tokio-test"
+version = "0.4.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2468baabc3311435b55dd935f702f42cd1b8abb7e754fb7dfb16bd36aa88f9f7"
+dependencies = [
+ "async-stream",
+ "bytes",
+ "futures-core",
+ "tokio",
+ "tokio-stream",
+]
+
+[[package]]
 name = "tokio-util"
 version = "0.7.11"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -4174,15 +4209,40 @@ dependencies = [
 
 [[package]]
 name = "tower-layer"
-version = "0.3.2"
+version = "0.3.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0"
+checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e"
 
 [[package]]
 name = "tower-service"
-version = "0.3.2"
+version = "0.3.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52"
+checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3"
+
+[[package]]
+name = "tower-test"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a4546773ffeab9e4ea02b8872faa49bb616a80a7da66afc2f32688943f97efa7"
+dependencies = [
+ "futures-util",
+ "pin-project",
+ "tokio",
+ "tokio-test",
+ "tower-layer",
+ "tower-service",
+]
+
+[[package]]
+name = "tower-watchdog"
+version = "1.0.0"
+dependencies = [
+ "futures",
+ "tokio",
+ "tower-layer",
+ "tower-service",
+ "tower-test",
+]
 
 [[package]]
 name = "tracing"
diff --git a/Cargo.toml b/Cargo.toml
index 6d4056d..001f2b1 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -54,7 +54,7 @@ path = "examples/sql.rs"
 required-features = ["sqlparser"]
 
 [workspace]
-members = [".", "./util", "./templates", "./indieauth", "./templates-neo"]
+members = [".", "./util", "./templates", "./indieauth", "./templates-neo", "./tower-watchdog"]
 default-members = [".", "./util", "./templates", "./indieauth"]
 [dependencies.kittybox-util]
 version = "0.2.0"
diff --git a/tower-watchdog/Cargo.toml b/tower-watchdog/Cargo.toml
new file mode 100644
index 0000000..24cc0fe
--- /dev/null
+++ b/tower-watchdog/Cargo.toml
@@ -0,0 +1,18 @@
+[package]
+name = "tower-watchdog"
+version = "1.0.0"
+edition = "2021"
+
+[dev-dependencies]
+tower-test = "0.4.0"
+tokio = { version = "1.29.1", features = [ "macros", "rt" ] }
+[dependencies]
+[dependencies.tokio]
+version = "^1.29.1"
+features = ["time", "sync"]
+[dependencies.tower-layer]
+version = "0.3.3"
+[dependencies.tower-service]
+version = "0.3.3"
+[dependencies.futures]
+version = "0.3.14"
\ No newline at end of file
diff --git a/tower-watchdog/src/lib.rs b/tower-watchdog/src/lib.rs
new file mode 100644
index 0000000..9a5c609
--- /dev/null
+++ b/tower-watchdog/src/lib.rs
@@ -0,0 +1,108 @@
+#[deny(missing_docs)]
+mod watchdog;
+pub use watchdog::Watchdog;
+
+pub struct WatchdogLayer {
+    pet: watchdog::Pet,
+}
+
+/// A Tower layer to send a signal if there hasn't been new requests
+/// in a while.
+///
+/// It resets a timer at the beginning of every single incoming
+/// request. Wait on the watchdog to begin. If no new requests haven't
+/// arrived in a while, the corresponding paired [Watchdog]'s
+/// [wait][Watchdog::wait] future will resolve. This is a signal to
+/// gracefully shutdown a server.
+impl WatchdogLayer {
+    pub fn new(timeout: std::time::Duration) -> (watchdog::Watchdog, WatchdogLayer) {
+        let (watchdog, pet) = watchdog::watchdog(timeout);
+        (watchdog, WatchdogLayer { pet })
+    }
+}
+
+impl<S> tower_layer::Layer<S> for WatchdogLayer {
+    type Service = WatchdogService<S>;
+
+    fn layer(&self, inner: S) -> Self::Service {
+        Self::Service {
+            pet: self.pet.clone(),
+            inner
+        }
+    }
+}
+
+pub struct WatchdogService<S> {
+    pet: watchdog::Pet,
+    inner: S
+}
+
+impl<S: tower_service::Service<Request> + Clone + 'static, Request: std::fmt::Debug + 'static> tower_service::Service<Request> for WatchdogService<S> {
+    type Response = S::Response;
+    type Error = S::Error;
+    type Future = std::pin::Pin<Box<futures::future::Then<std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), tokio::sync::mpsc::error::SendError<()>>> + Send>>, std::pin::Pin<Box<S::Future>>, Box<dyn FnOnce(Result<(), tokio::sync::mpsc::error::SendError<()>>) -> std::pin::Pin<Box<S::Future>>>>>>;
+
+    fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
+        self.inner.poll_ready(cx)
+    }
+
+    fn call(&mut self, request: Request) -> Self::Future {
+        use futures::FutureExt;
+        // We need to get the service that we just polled. For this,
+        // we clone the service, leave in the clone and take the
+        // original.
+        //
+        // Don't ask me why this is needed.
+        let mut inner = self.inner.clone();
+        std::mem::swap(&mut self.inner, &mut inner);
+
+        let pet = self.pet.clone();
+        Box::pin(pet.pet_owned().boxed().then(Box::new(move |_| Box::pin(inner.call(request)))))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use futures::FutureExt;
+
+    #[tokio::test(start_paused = true)]
+    async fn test_watchdog_layer() {
+        use std::time::Duration;
+
+        let (watchdog, layer) = super::WatchdogLayer::new(Duration::from_secs(1));
+        let (mut mock, mut handle) = tower_test::mock::spawn_layer::<(), (), _>(layer);
+        handle.allow((100..1000).count() as u64 + 1);
+        // We don't actually care what the service itself does.
+        let responder = tokio::task::spawn(async move {
+            while let Some(((), res)) = handle.next_request().await {
+                res.send_response(())
+            }
+        });
+
+        let mut watchdog_future = Box::pin(watchdog.wait().fuse());
+
+        for i in 100..=1_000 {
+            if i != 1000 {
+                assert!(mock.poll_ready().is_ready());
+                let request = Box::pin(tokio::time::sleep(std::time::Duration::from_millis(i)).then(|()| mock.call(())));
+                tokio::select! {
+                    _ = &mut watchdog_future => panic!("Watchdog called earlier than response!"),
+                    _ = request => {},
+                };
+            } else {
+                assert!(mock.poll_ready().is_ready());
+                // We use `+ 1` here, because the watchdog behavior is
+                // subject to a data race if a request arrives in the
+                // same tick.
+                let request = Box::pin(tokio::time::sleep(std::time::Duration::from_millis(i + 1)).then(|()| mock.call(())));
+                tokio::select! {
+                    _ = &mut watchdog_future => {
+                    },
+                    _ = request => panic!("Watchdog didn't fire!")
+                };
+            }
+        }
+
+        responder.abort();
+    }
+}
diff --git a/tower-watchdog/src/watchdog.rs b/tower-watchdog/src/watchdog.rs
new file mode 100644
index 0000000..4162294
--- /dev/null
+++ b/tower-watchdog/src/watchdog.rs
@@ -0,0 +1,45 @@
+/// A watchdog timer with a timeout that needs to be regularly pinged
+/// to prevent it from firing.
+///
+/// **Note**: the behavior of the watchdog is undefined (but will not
+/// result in a crash or put the watchdog in an invalid state) if it
+/// is pet at the last possible moment, for example, like this:
+///
+/// ```no_compile
+/// let (watchdog, pet) = /* ... */
+/// tokio::select! {
+///     tokio::time::sleep(watchdog.timeout).then(move |()| pet.pet()) = _ => { eprintln!("foo"); },
+///     watchdog.wait() = _ => { eprintln!("bar"); }
+/// ```
+pub struct Watchdog {
+    rx: tokio::sync::mpsc::Receiver<()>,
+    /// Timeout of this watchdog.
+    pub timeout: std::time::Duration,
+}
+#[derive(Clone)]
+pub(crate) struct Pet(tokio::sync::mpsc::Sender<()>);
+
+pub(crate) fn watchdog(timeout: std::time::Duration) -> (Watchdog, Pet) {
+    let (tx, rx) = tokio::sync::mpsc::channel::<()>(1);
+    let pet = Pet(tx);
+    let watchdog = Watchdog { rx, timeout };
+
+    (watchdog, pet)
+}
+
+impl Watchdog {
+    /// Wait until the watchdog fires from not being pet in a while.
+    ///
+    /// The watchdog doesn't start waiting for pets and scritches
+    /// until you await the future returned from this method.
+    pub async fn wait(mut self) {
+        while let Ok(Some(())) = tokio::time::timeout(self.timeout, self.rx.recv()).await {}
+    }
+}
+
+impl Pet {
+    pub(crate) async fn pet_owned(self) -> Result<(), tokio::sync::mpsc::error::SendError<()>> {
+        let tx = self.0.clone();
+        tx.send(()).await
+    }
+}