From e92c6789d99b6f94258ca669aff272b889788930 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Wed, 13 May 2026 01:32:14 +0200 Subject: [PATCH] feat: add backfill_outbox to ActivityPubService and ActivityPubPort --- .../adapters/activitypub-base/src/service.rs | 85 +++++++++++++++++++ crates/adapters/activitypub/src/port.rs | 7 ++ 2 files changed, 92 insertions(+) diff --git a/crates/adapters/activitypub-base/src/service.rs b/crates/adapters/activitypub-base/src/service.rs index 992ff62..4fbc841 100644 --- a/crates/adapters/activitypub-base/src/service.rs +++ b/crates/adapters/activitypub-base/src/service.rs @@ -975,6 +975,91 @@ impl ActivityPubService { Ok(()) } + pub async fn backfill_outbox(&self, outbox_url: &str, actor_url: &str) -> anyhow::Result<()> { + let client = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(30)) + .build()?; + let data = self.federation_config.to_request_data(); + let actor = url::Url::parse(actor_url)?; + + let root: serde_json::Value = client + .get(outbox_url) + .header("Accept", "application/activity+json") + .send() + .await? + .json() + .await?; + + let first = match root.get("first").and_then(|v| v.as_str()) { + Some(url) => url.to_string(), + None => { + tracing::debug!(outbox = %outbox_url, "outbox has no first page, nothing to backfill"); + return Ok(()); + } + }; + + let mut current_url = first; + let mut visited = std::collections::HashSet::new(); + + loop { + if !visited.insert(current_url.clone()) { + tracing::warn!(url = %current_url, "backfill: loop detected, stopping"); + break; + } + + let page: serde_json::Value = match client + .get(¤t_url) + .header("Accept", "application/activity+json") + .send() + .await + { + Ok(resp) => match resp.json().await { + Ok(v) => v, + Err(e) => { + tracing::error!(error = %e, url = %current_url, "backfill: failed to parse page JSON"); + break; + } + }, + Err(e) => { + tracing::error!(error = %e, url = %current_url, "backfill: HTTP request failed"); + break; + } + }; + + if let Some(items) = page.get("orderedItems").and_then(|v| v.as_array()) { + for item in items { + let activity_type = item.get("type").and_then(|v| v.as_str()).unwrap_or(""); + if activity_type != "Create" && activity_type != "Add" { + continue; + } + let object = match item.get("object") { + Some(o) if o.is_object() => o.clone(), + _ => continue, + }; + let ap_id = match object + .get("id") + .and_then(|v| v.as_str()) + .and_then(|s| url::Url::parse(s).ok()) + { + Some(u) => u, + None => continue, + }; + if let Err(e) = data.object_handler.on_create(&ap_id, &actor, object).await { + tracing::warn!(ap_id = %ap_id, error = %e, "backfill: failed to process item, skipping"); + } + } + } + + match page.get("next").and_then(|v| v.as_str()) { + Some(next) => current_url = next.to_string(), + None => break, + } + } + + tracing::info!(outbox = %outbox_url, pages = visited.len(), "backfill complete"); + Ok(()) + } + fn spawn_backfill(&self, owner_user_id: uuid::Uuid, follower_inbox_url: String) { let config = self.federation_config.clone(); let base_url = self.base_url.clone(); diff --git a/crates/adapters/activitypub/src/port.rs b/crates/adapters/activitypub/src/port.rs index dea8b31..0fa8296 100644 --- a/crates/adapters/activitypub/src/port.rs +++ b/crates/adapters/activitypub/src/port.rs @@ -31,6 +31,7 @@ pub trait ActivityPubPort: Send + Sync { async fn add_blocked_domain(&self, domain: &str, reason: Option<&str>) -> anyhow::Result<()>; async fn remove_blocked_domain(&self, domain: &str) -> anyhow::Result<()>; async fn get_blocked_domains(&self) -> anyhow::Result>; + async fn backfill_outbox(&self, outbox_url: &str, actor_url: &str) -> anyhow::Result<()>; } #[async_trait] @@ -97,6 +98,9 @@ impl ActivityPubPort for ActivityPubService { async fn get_blocked_domains(&self) -> anyhow::Result> { self.get_blocked_domains().await } + async fn backfill_outbox(&self, outbox_url: &str, actor_url: &str) -> anyhow::Result<()> { + self.backfill_outbox(outbox_url, actor_url).await + } } pub struct NoopActivityPubService; @@ -154,4 +158,7 @@ impl ActivityPubPort for NoopActivityPubService { async fn get_blocked_domains(&self) -> anyhow::Result> { Ok(vec![]) } + async fn backfill_outbox(&self, _: &str, _: &str) -> anyhow::Result<()> { + Ok(()) + } }