diff --git a/Cargo.lock b/Cargo.lock
index 05f96dd..c9f8ac2 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -250,6 +250,28 @@ dependencies = [
]
[[package]]
+name = "async-stream"
+version = "0.3.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476"
+dependencies = [
+ "async-stream-impl",
+ "futures-core",
+ "pin-project-lite",
+]
+
+[[package]]
+name = "async-stream-impl"
+version = "0.3.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.72",
+]
+
+[[package]]
name = "async-trait"
version = "0.1.81"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -4126,6 +4148,19 @@ dependencies = [
]
[[package]]
+name = "tokio-test"
+version = "0.4.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2468baabc3311435b55dd935f702f42cd1b8abb7e754fb7dfb16bd36aa88f9f7"
+dependencies = [
+ "async-stream",
+ "bytes",
+ "futures-core",
+ "tokio",
+ "tokio-stream",
+]
+
+[[package]]
name = "tokio-util"
version = "0.7.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -4174,15 +4209,40 @@ dependencies = [
[[package]]
name = "tower-layer"
-version = "0.3.2"
+version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0"
+checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e"
[[package]]
name = "tower-service"
-version = "0.3.2"
+version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52"
+checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3"
+
+[[package]]
+name = "tower-test"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a4546773ffeab9e4ea02b8872faa49bb616a80a7da66afc2f32688943f97efa7"
+dependencies = [
+ "futures-util",
+ "pin-project",
+ "tokio",
+ "tokio-test",
+ "tower-layer",
+ "tower-service",
+]
+
+[[package]]
+name = "tower-watchdog"
+version = "1.0.0"
+dependencies = [
+ "futures",
+ "tokio",
+ "tower-layer",
+ "tower-service",
+ "tower-test",
+]
[[package]]
name = "tracing"
diff --git a/Cargo.toml b/Cargo.toml
index 6d4056d..001f2b1 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -54,7 +54,7 @@ path = "examples/sql.rs"
required-features = ["sqlparser"]
[workspace]
-members = [".", "./util", "./templates", "./indieauth", "./templates-neo"]
+members = [".", "./util", "./templates", "./indieauth", "./templates-neo", "./tower-watchdog"]
default-members = [".", "./util", "./templates", "./indieauth"]
[dependencies.kittybox-util]
version = "0.2.0"
diff --git a/tower-watchdog/Cargo.toml b/tower-watchdog/Cargo.toml
new file mode 100644
index 0000000..24cc0fe
--- /dev/null
+++ b/tower-watchdog/Cargo.toml
@@ -0,0 +1,18 @@
+[package]
+name = "tower-watchdog"
+version = "1.0.0"
+edition = "2021"
+
+[dev-dependencies]
+tower-test = "0.4.0"
+tokio = { version = "1.29.1", features = [ "macros", "rt" ] }
+[dependencies]
+[dependencies.tokio]
+version = "^1.29.1"
+features = ["time", "sync"]
+[dependencies.tower-layer]
+version = "0.3.3"
+[dependencies.tower-service]
+version = "0.3.3"
+[dependencies.futures]
+version = "0.3.14"
\ No newline at end of file
diff --git a/tower-watchdog/src/lib.rs b/tower-watchdog/src/lib.rs
new file mode 100644
index 0000000..9a5c609
--- /dev/null
+++ b/tower-watchdog/src/lib.rs
@@ -0,0 +1,108 @@
+#[deny(missing_docs)]
+mod watchdog;
+pub use watchdog::Watchdog;
+
+pub struct WatchdogLayer {
+ pet: watchdog::Pet,
+}
+
+/// A Tower layer to send a signal if there hasn't been new requests
+/// in a while.
+///
+/// It resets a timer at the beginning of every single incoming
+/// request. Wait on the watchdog to begin. If no new requests haven't
+/// arrived in a while, the corresponding paired [Watchdog]'s
+/// [wait][Watchdog::wait] future will resolve. This is a signal to
+/// gracefully shutdown a server.
+impl WatchdogLayer {
+ pub fn new(timeout: std::time::Duration) -> (watchdog::Watchdog, WatchdogLayer) {
+ let (watchdog, pet) = watchdog::watchdog(timeout);
+ (watchdog, WatchdogLayer { pet })
+ }
+}
+
+impl<S> tower_layer::Layer<S> for WatchdogLayer {
+ type Service = WatchdogService<S>;
+
+ fn layer(&self, inner: S) -> Self::Service {
+ Self::Service {
+ pet: self.pet.clone(),
+ inner
+ }
+ }
+}
+
+pub struct WatchdogService<S> {
+ pet: watchdog::Pet,
+ inner: S
+}
+
+impl<S: tower_service::Service<Request> + Clone + 'static, Request: std::fmt::Debug + 'static> tower_service::Service<Request> for WatchdogService<S> {
+ type Response = S::Response;
+ type Error = S::Error;
+ type Future = std::pin::Pin<Box<futures::future::Then<std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), tokio::sync::mpsc::error::SendError<()>>> + Send>>, std::pin::Pin<Box<S::Future>>, Box<dyn FnOnce(Result<(), tokio::sync::mpsc::error::SendError<()>>) -> std::pin::Pin<Box<S::Future>>>>>>;
+
+ fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
+ self.inner.poll_ready(cx)
+ }
+
+ fn call(&mut self, request: Request) -> Self::Future {
+ use futures::FutureExt;
+ // We need to get the service that we just polled. For this,
+ // we clone the service, leave in the clone and take the
+ // original.
+ //
+ // Don't ask me why this is needed.
+ let mut inner = self.inner.clone();
+ std::mem::swap(&mut self.inner, &mut inner);
+
+ let pet = self.pet.clone();
+ Box::pin(pet.pet_owned().boxed().then(Box::new(move |_| Box::pin(inner.call(request)))))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use futures::FutureExt;
+
+ #[tokio::test(start_paused = true)]
+ async fn test_watchdog_layer() {
+ use std::time::Duration;
+
+ let (watchdog, layer) = super::WatchdogLayer::new(Duration::from_secs(1));
+ let (mut mock, mut handle) = tower_test::mock::spawn_layer::<(), (), _>(layer);
+ handle.allow((100..1000).count() as u64 + 1);
+ // We don't actually care what the service itself does.
+ let responder = tokio::task::spawn(async move {
+ while let Some(((), res)) = handle.next_request().await {
+ res.send_response(())
+ }
+ });
+
+ let mut watchdog_future = Box::pin(watchdog.wait().fuse());
+
+ for i in 100..=1_000 {
+ if i != 1000 {
+ assert!(mock.poll_ready().is_ready());
+ let request = Box::pin(tokio::time::sleep(std::time::Duration::from_millis(i)).then(|()| mock.call(())));
+ tokio::select! {
+ _ = &mut watchdog_future => panic!("Watchdog called earlier than response!"),
+ _ = request => {},
+ };
+ } else {
+ assert!(mock.poll_ready().is_ready());
+ // We use `+ 1` here, because the watchdog behavior is
+ // subject to a data race if a request arrives in the
+ // same tick.
+ let request = Box::pin(tokio::time::sleep(std::time::Duration::from_millis(i + 1)).then(|()| mock.call(())));
+ tokio::select! {
+ _ = &mut watchdog_future => {
+ },
+ _ = request => panic!("Watchdog didn't fire!")
+ };
+ }
+ }
+
+ responder.abort();
+ }
+}
diff --git a/tower-watchdog/src/watchdog.rs b/tower-watchdog/src/watchdog.rs
new file mode 100644
index 0000000..4162294
--- /dev/null
+++ b/tower-watchdog/src/watchdog.rs
@@ -0,0 +1,45 @@
+/// A watchdog timer with a timeout that needs to be regularly pinged
+/// to prevent it from firing.
+///
+/// **Note**: the behavior of the watchdog is undefined (but will not
+/// result in a crash or put the watchdog in an invalid state) if it
+/// is pet at the last possible moment, for example, like this:
+///
+/// ```no_compile
+/// let (watchdog, pet) = /* ... */
+/// tokio::select! {
+/// tokio::time::sleep(watchdog.timeout).then(move |()| pet.pet()) = _ => { eprintln!("foo"); },
+/// watchdog.wait() = _ => { eprintln!("bar"); }
+/// ```
+pub struct Watchdog {
+ rx: tokio::sync::mpsc::Receiver<()>,
+ /// Timeout of this watchdog.
+ pub timeout: std::time::Duration,
+}
+#[derive(Clone)]
+pub(crate) struct Pet(tokio::sync::mpsc::Sender<()>);
+
+pub(crate) fn watchdog(timeout: std::time::Duration) -> (Watchdog, Pet) {
+ let (tx, rx) = tokio::sync::mpsc::channel::<()>(1);
+ let pet = Pet(tx);
+ let watchdog = Watchdog { rx, timeout };
+
+ (watchdog, pet)
+}
+
+impl Watchdog {
+ /// Wait until the watchdog fires from not being pet in a while.
+ ///
+ /// The watchdog doesn't start waiting for pets and scritches
+ /// until you await the future returned from this method.
+ pub async fn wait(mut self) {
+ while let Ok(Some(())) = tokio::time::timeout(self.timeout, self.rx.recv()).await {}
+ }
+}
+
+impl Pet {
+ pub(crate) async fn pet_owned(self) -> Result<(), tokio::sync::mpsc::error::SendError<()>> {
+ let tx = self.0.clone();
+ tx.send(()).await
+ }
+}
|