about summary refs log tree commit diff
path: root/kittybox-rs/src/webmentions/mod.rs
diff options
context:
space:
mode:
authorVika <vika@fireburn.ru>2023-07-29 21:59:56 +0300
committerVika <vika@fireburn.ru>2023-07-29 21:59:56 +0300
commit0617663b249f9ca488e5de652108b17d67fbaf45 (patch)
tree11564b6c8fa37bf9203a0a4cc1c4e9cc088cb1a5 /kittybox-rs/src/webmentions/mod.rs
parent26c2b79f6a6380ae3224e9309b9f3352f5717bd7 (diff)
downloadkittybox-0617663b249f9ca488e5de652108b17d67fbaf45.tar.zst
Moved the entire Kittybox tree into the root
Diffstat (limited to 'kittybox-rs/src/webmentions/mod.rs')
-rw-r--r--kittybox-rs/src/webmentions/mod.rs195
1 files changed, 0 insertions, 195 deletions
diff --git a/kittybox-rs/src/webmentions/mod.rs b/kittybox-rs/src/webmentions/mod.rs
deleted file mode 100644
index 95ea870..0000000
--- a/kittybox-rs/src/webmentions/mod.rs
+++ /dev/null
@@ -1,195 +0,0 @@
-use axum::{Form, response::{IntoResponse, Response}, Extension};
-use axum::http::StatusCode;
-use tracing::error;
-
-use crate::database::{Storage, StorageError};
-use self::queue::JobQueue;
-pub mod queue;
-
-#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
-#[cfg_attr(feature = "sqlx", derive(sqlx::FromRow))]
-pub struct Webmention {
-    source: String,
-    target: String,
-}
-
-impl queue::JobItem for Webmention {}
-impl queue::PostgresJobItem for Webmention {
-    const DATABASE_NAME: &'static str = "kittybox_webmention.incoming_webmention_queue";
-    const NOTIFICATION_CHANNEL: &'static str = "incoming_webmention";
-}
-
-async fn accept_webmention<Q: JobQueue<Webmention>>(
-    Extension(queue): Extension<Q>,
-    Form(webmention): Form<Webmention>,
-) -> Response {
-    if let Err(err) = webmention.source.parse::<url::Url>() {
-        return (StatusCode::BAD_REQUEST, err.to_string()).into_response()
-    }
-    if let Err(err) = webmention.target.parse::<url::Url>() {
-        return (StatusCode::BAD_REQUEST, err.to_string()).into_response()
-    }
-
-    match queue.put(&webmention).await {
-        Ok(id) => (StatusCode::ACCEPTED, [
-            ("Location", format!("/.kittybox/webmention/{id}"))
-        ]).into_response(),
-        Err(err) => (StatusCode::INTERNAL_SERVER_ERROR, [
-            ("Content-Type", "text/plain")
-        ], err.to_string()).into_response()
-    }
-}
-
-pub fn router<Q: JobQueue<Webmention>, S: Storage + 'static>(
-    queue: Q, db: S, http: reqwest::Client,
-    cancellation_token: tokio_util::sync::CancellationToken
-) -> (axum::Router, SupervisedTask) {
-    // Automatically spawn a background task to handle webmentions
-    let bgtask_handle = supervised_webmentions_task(queue.clone(), db, http, cancellation_token);
-
-    let router = axum::Router::new()
-        .route("/.kittybox/webmention",
-            axum::routing::post(accept_webmention::<Q>)
-        )
-        .layer(Extension(queue));
-
-    (router, bgtask_handle)
-}
-
-#[derive(thiserror::Error, Debug)]
-pub enum SupervisorError {
-    #[error("the task was explicitly cancelled")]
-    Cancelled
-}
-
-pub type SupervisedTask = tokio::task::JoinHandle<Result<std::convert::Infallible, SupervisorError>>;
-
-pub fn supervisor<E, A, F>(mut f: F, cancellation_token: tokio_util::sync::CancellationToken) -> SupervisedTask
-where
-    E: std::error::Error + std::fmt::Debug + Send + 'static,
-    A: std::future::Future<Output = Result<std::convert::Infallible, E>> + Send + 'static,
-    F: FnMut() -> A + Send + 'static
-{
-
-    let supervisor_future = async move {
-        loop {
-            // Don't spawn the task if we are already cancelled, but
-            // have somehow missed it (probably because the task
-            // crashed and we immediately received a cancellation
-            // request after noticing the crashed task)
-            if cancellation_token.is_cancelled() {
-                return Err(SupervisorError::Cancelled)
-            }
-            let task = tokio::task::spawn(f());
-            tokio::select! {
-                _ = cancellation_token.cancelled() => {
-                    tracing::info!("Shutdown of background task {:?} requested.", std::any::type_name::<A>());
-                    return Err(SupervisorError::Cancelled)
-                }
-                task_result = task => match task_result {
-                    Err(e) => tracing::error!("background task {:?} exited unexpectedly: {}", std::any::type_name::<A>(), e),
-                    Ok(Err(e)) => tracing::error!("background task {:?} returned error: {}", std::any::type_name::<A>(), e),
-                    Ok(Ok(_)) => unreachable!("task's Ok is Infallible")
-                }
-            }
-            tracing::debug!("Sleeping for a little while to back-off...");
-            tokio::time::sleep(std::time::Duration::from_secs(5)).await;
-        }
-    };
-    #[cfg(not(tokio_unstable))]
-    return tokio::task::spawn(supervisor_future);
-    #[cfg(tokio_unstable)]
-    return tokio::task::Builder::new()
-        .name(format!("supervisor for background task {}", std::any::type_name::<A>()).as_str())
-        .spawn(supervisor_future)
-        .unwrap();
-}
-
-mod check;
-
-#[derive(thiserror::Error, Debug)]
-enum Error<Q: std::error::Error + std::fmt::Debug + Send + 'static> {
-    #[error("queue error: {0}")]
-    Queue(#[from] Q),
-    #[error("storage error: {0}")]
-    Storage(StorageError)
-}
-
-async fn process_webmentions_from_queue<Q: JobQueue<Webmention>, S: Storage + 'static>(queue: Q, db: S, http: reqwest::Client) -> Result<std::convert::Infallible, Error<Q::Error>> {
-    use futures_util::StreamExt;
-    use self::queue::Job;
-
-    let mut stream = queue.into_stream().await?;
-    while let Some(item) = stream.next().await.transpose()? {
-        let job = item.job();
-        let (source, target) = (
-            job.source.parse::<url::Url>().unwrap(),
-            job.target.parse::<url::Url>().unwrap()
-        );
-
-        let (code, text) = match http.get(source.clone()).send().await {
-            Ok(response) => {
-                let code = response.status();
-                if ![StatusCode::OK, StatusCode::GONE].iter().any(|i| i == &code) {
-                    error!("error processing webmention: webpage fetch returned {}", code);
-                    continue;
-                }
-                match response.text().await {
-                    Ok(text) => (code, text),
-                    Err(err) => {
-                        error!("error processing webmention: error fetching webpage text: {}", err);
-                        continue
-                    }
-                }
-            }
-            Err(err) => {
-                error!("error processing webmention: error requesting webpage: {}", err);
-                continue
-            }
-        };
-
-        if code == StatusCode::GONE {
-            todo!("removing webmentions is not implemented yet");
-            // db.remove_webmention(target.as_str(), source.as_str()).await.map_err(Error::<Q::Error>::Storage)?;
-        } else {
-            // Verify webmention
-            let (mention_type, mut mention) = match tokio::task::block_in_place({
-                || check::check_mention(text, &source, &target)
-            }) {
-                Ok(Some(mention_type)) => mention_type,
-                Ok(None) => {
-                    error!("webmention {} -> {} invalid, rejecting", source, target);
-                    item.done().await?;
-                    continue;
-                }
-                Err(err) => {
-                    error!("error processing webmention: error checking webmention: {}", err);
-                    continue;
-                }
-            };
-
-            {
-                mention["type"] = serde_json::json!(["h-cite"]);
-
-                if !mention["properties"].as_object().unwrap().contains_key("uid") {
-                    let url = mention["properties"]["url"][0].as_str().unwrap_or_else(|| target.as_str()).to_owned();
-                    let props = mention["properties"].as_object_mut().unwrap();
-                    props.insert("uid".to_owned(), serde_json::Value::Array(
-                        vec![serde_json::Value::String(url)])
-                    );
-                }
-            }
-
-            db.add_or_update_webmention(target.as_str(), mention_type, mention).await.map_err(Error::<Q::Error>::Storage)?;
-        }
-    }
-    unreachable!()
-}
-
-fn supervised_webmentions_task<Q: JobQueue<Webmention>, S: Storage + 'static>(
-    queue: Q, db: S,
-    http: reqwest::Client,
-    cancellation_token: tokio_util::sync::CancellationToken
-) -> SupervisedTask {
-    supervisor::<Error<Q::Error>, _, _>(move || process_webmentions_from_queue(queue.clone(), db.clone(), http.clone()), cancellation_token)
-}