1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
use axum::{Form, response::{IntoResponse, Response}, Extension};
use axum::http::StatusCode;
use self::queue::JobQueue;
pub mod queue;
#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[cfg_attr(feature = "sqlx", derive(sqlx::FromRow))]
pub struct Webmention {
source: String,
target: String,
}
impl queue::JobItem for Webmention {
const DATABASE_NAME: &'static str = "kittybox.incoming_webmention_queue";
}
async fn accept_webmention<Q: JobQueue<Webmention>>(
Form(webmention): Form<Webmention>,
Extension(queue): Extension<Q>
) -> Response {
if let Err(err) = webmention.source.parse::<url::Url>() {
return (StatusCode::BAD_REQUEST, err.to_string()).into_response()
}
if let Err(err) = webmention.target.parse::<url::Url>() {
return (StatusCode::BAD_REQUEST, err.to_string()).into_response()
}
match queue.put(&webmention).await {
Ok(id) => (StatusCode::ACCEPTED, [
("Location", format!("/.kittybox/webmention/{id}"))
]).into_response(),
Err(err) => (StatusCode::INTERNAL_SERVER_ERROR, [
("Content-Type", "text/plain")
], err.to_string()).into_response()
}
}
pub fn router<Q: JobQueue<Webmention>>(queue: Q) -> axum::Router {
// Automatically spawn a background task to handle webmentions
tokio::task::spawn(supervised_webmentions_task(queue.clone()));
axum::Router::new()
.route("/.kittybox/webmention",
axum::routing::post(accept_webmention::<Q>)
)
.layer(Extension(queue))
}
async fn supervisor<E: std::error::Error + std::fmt::Debug + Send + 'static, A: futures_util::Future<Output = Result<std::convert::Infallible, E>> + Send + 'static, F: FnMut() -> A>(mut f: F) -> std::convert::Infallible {
loop {
match tokio::task::spawn(f()).await {
Err(e) => tracing::error!("background task exited unexpectedly: {}", e),
Ok(Err(e)) => tracing::error!("background task returned error: {}", e),
Ok(Ok(_)) => unreachable!("task's Ok is Infallible")
}
}
}
async fn process_webmentions_from_queue<Q: JobQueue<Webmention>>(queue: Q) -> Result<std::convert::Infallible, Q::Error> {
use futures_util::StreamExt;
use self::queue::Job;
let mut stream = queue.into_stream().await?;
while let Some(item) = stream.next().await.transpose()? {
let job = item.job();
let (source, target) = (
job.source.parse::<url::Url>().unwrap(),
job.target.parse::<url::Url>().unwrap()
);
todo!()
}
unreachable!()
}
async fn supervised_webmentions_task<Q: JobQueue<Webmention>>(queue: Q) {
supervisor::<Q::Error, _, _>(|| process_webmentions_from_queue(queue.clone())).await;
}
|