about summary refs log tree commit diff
path: root/kittybox-rs/src/webmentions/mod.rs
blob: becddbcf39805426b757b535e443eb66f1ad6b4f (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
use axum::{Form, response::{IntoResponse, Response}, Extension};
use axum::http::StatusCode;

use self::queue::JobQueue;
pub mod queue;

#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[cfg_attr(feature = "sqlx", derive(sqlx::FromRow))]
pub struct Webmention {
    source: String,
    target: String,
}

impl queue::JobItem for Webmention {}
impl queue::PostgresJobItem for Webmention {
    const DATABASE_NAME: &'static str = "kittybox.incoming_webmention_queue";
    const NOTIFICATION_CHANNEL: &'static str = "incoming_webmention";
}

async fn accept_webmention<Q: JobQueue<Webmention>>(
    Extension(queue): Extension<Q>,
    Form(webmention): Form<Webmention>,
) -> Response {
    if let Err(err) = webmention.source.parse::<url::Url>() {
        return (StatusCode::BAD_REQUEST, err.to_string()).into_response()
    }
    if let Err(err) = webmention.target.parse::<url::Url>() {
        return (StatusCode::BAD_REQUEST, err.to_string()).into_response()
    }

    match queue.put(&webmention).await {
        Ok(id) => (StatusCode::ACCEPTED, [
            ("Location", format!("/.kittybox/webmention/{id}"))
        ]).into_response(),
        Err(err) => (StatusCode::INTERNAL_SERVER_ERROR, [
            ("Content-Type", "text/plain")
        ], err.to_string()).into_response()
    }
}

pub fn router<Q: JobQueue<Webmention>>(queue: Q, cancellation_token: tokio_util::sync::CancellationToken) -> (axum::Router, SupervisedTask) {
    // Automatically spawn a background task to handle webmentions
    let bgtask_handle = supervised_webmentions_task(queue.clone(), cancellation_token);

    let router = axum::Router::new()
        .route("/.kittybox/webmention",
            axum::routing::post(accept_webmention::<Q>)
        )
        .layer(Extension(queue));

    (router, bgtask_handle)
}

#[derive(thiserror::Error, Debug)]
pub enum SupervisorError {
    #[error("the task was explicitly cancelled")]
    Cancelled
}

pub type SupervisedTask = tokio::task::JoinHandle<Result<std::convert::Infallible, SupervisorError>>;

pub fn supervisor<E, A, F>(mut f: F, cancellation_token: tokio_util::sync::CancellationToken) -> SupervisedTask
where
    E: std::error::Error + std::fmt::Debug + Send + 'static,
    A: std::future::Future<Output = Result<std::convert::Infallible, E>> + Send + 'static,
    F: FnMut() -> A + Send + 'static
{

    let supervisor_future = async move {
        loop {
            // Don't spawn the task if we are already cancelled, but
            // have somehow missed it (probably because the task
            // crashed and we immediately received a cancellation
            // request after noticing the crashed task)
            if cancellation_token.is_cancelled() {
                return Err(SupervisorError::Cancelled)
            }
            let task = tokio::task::spawn(f());
            tokio::select! {
                _ = cancellation_token.cancelled() => {
                    tracing::info!("Shutdown of background task {:?} requested.", std::any::type_name::<A>());
                    return Err(SupervisorError::Cancelled)
                }
                task_result = task => match task_result {
                    Err(e) => tracing::error!("background task {:?} exited unexpectedly: {}", std::any::type_name::<A>(), e),
                    Ok(Err(e)) => tracing::error!("background task {:?} returned error: {}", std::any::type_name::<A>(), e),
                    Ok(Ok(_)) => unreachable!("task's Ok is Infallible")
                }
            }
            tracing::debug!("Sleeping for a little while to back-off...");
            tokio::time::sleep(std::time::Duration::from_secs(5)).await;
        }
    };
    #[cfg(not(tokio_unstable))]
    return tokio::task::spawn(supervisor_future);
    #[cfg(tokio_unstable)]
    return tokio::task::Builder::new()
        .name(format!("supervisor for background task {}", std::any::type_name::<A>()).as_str())
        .spawn(supervisor_future)
        .unwrap();
}

async fn process_webmentions_from_queue<Q: JobQueue<Webmention>>(queue: Q) -> Result<std::convert::Infallible, Q::Error> {
    use futures_util::StreamExt;
    use self::queue::Job;

    let mut stream = queue.into_stream().await?;
    while let Some(item) = stream.next().await.transpose()? {
        let job = item.job();
        let (source, target) = (
            job.source.parse::<url::Url>().unwrap(),
            job.target.parse::<url::Url>().unwrap()
        );

        todo!()
    }
    unreachable!()
}

fn supervised_webmentions_task<Q: JobQueue<Webmention>>(queue: Q, cancellation_token: tokio_util::sync::CancellationToken) -> SupervisedTask {
    supervisor::<Q::Error, _, _>(move || process_webmentions_from_queue(queue.clone()), cancellation_token)
}