1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
|
#[deny(missing_docs)]
mod watchdog;
pub use watchdog::Watchdog;
pub struct WatchdogLayer {
pet: watchdog::Pet,
}
/// A Tower layer to send a signal if there hasn't been new requests
/// in a while.
///
/// It resets a timer at the beginning of every single incoming
/// request. Wait on the watchdog to begin. If no new requests haven't
/// arrived in a while, the corresponding paired [Watchdog]'s
/// [wait][Watchdog::wait] future will resolve. This is a signal to
/// gracefully shutdown a server.
impl WatchdogLayer {
pub fn new(timeout: std::time::Duration) -> (watchdog::Watchdog, WatchdogLayer) {
let (watchdog, pet) = watchdog::watchdog(timeout);
(watchdog, WatchdogLayer { pet })
}
}
impl<S> tower_layer::Layer<S> for WatchdogLayer {
type Service = WatchdogService<S>;
fn layer(&self, inner: S) -> Self::Service {
Self::Service {
pet: self.pet.clone(),
inner,
}
}
}
pub struct WatchdogService<S> {
pet: watchdog::Pet,
inner: S,
}
impl<S: tower_service::Service<Request> + Clone + 'static, Request: std::fmt::Debug + 'static>
tower_service::Service<Request> for WatchdogService<S>
{
type Response = S::Response;
type Error = S::Error;
type Future = std::pin::Pin<
Box<
futures::future::Then<
std::pin::Pin<
Box<
dyn std::future::Future<
Output = Result<(), tokio::sync::mpsc::error::SendError<()>>,
> + Send,
>,
>,
std::pin::Pin<Box<S::Future>>,
Box<
dyn FnOnce(
Result<(), tokio::sync::mpsc::error::SendError<()>>,
) -> std::pin::Pin<Box<S::Future>>,
>,
>,
>,
>;
fn poll_ready(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, request: Request) -> Self::Future {
use futures::FutureExt;
// We need to get the service that we just polled. For this,
// we clone the service, leave in the clone and take the
// original.
//
// Don't ask me why this is needed.
let mut inner = self.inner.clone();
std::mem::swap(&mut self.inner, &mut inner);
let pet = self.pet.clone();
Box::pin(
pet.pet_owned()
.boxed()
.then(Box::new(move |_| Box::pin(inner.call(request)))),
)
}
}
#[cfg(test)]
mod tests {
use futures::FutureExt;
#[tokio::test(start_paused = true)]
async fn test_watchdog_layer() {
use std::time::Duration;
let (watchdog, layer) = super::WatchdogLayer::new(Duration::from_secs(1));
let (mut mock, mut handle) = tower_test::mock::spawn_layer::<(), (), _>(layer);
handle.allow((100..1000).count() as u64 + 1);
// We don't actually care what the service itself does.
let responder = tokio::task::spawn(async move {
while let Some(((), res)) = handle.next_request().await {
res.send_response(())
}
});
let mut watchdog_future = Box::pin(watchdog.wait().fuse());
for i in 100..=1_000 {
if i != 1000 {
assert!(mock.poll_ready().is_ready());
let request = Box::pin(
tokio::time::sleep(std::time::Duration::from_millis(i))
.then(|()| mock.call(())),
);
tokio::select! {
_ = &mut watchdog_future => panic!("Watchdog called earlier than response!"),
_ = request => {},
};
} else {
assert!(mock.poll_ready().is_ready());
// We use `+ 1` here, because the watchdog behavior is
// subject to a data race if a request arrives in the
// same tick.
let request = Box::pin(
tokio::time::sleep(std::time::Duration::from_millis(i + 1))
.then(|()| mock.call(())),
);
tokio::select! {
_ = &mut watchdog_future => {
},
_ = request => panic!("Watchdog didn't fire!")
};
}
}
responder.abort();
}
}
|