feat: add backfill_outbox to ActivityPubService and ActivityPubPort

This commit is contained in:
2026-05-13 01:32:14 +02:00
parent ca9a504632
commit e92c6789d9
2 changed files with 92 additions and 0 deletions

View File

@@ -975,6 +975,91 @@ impl ActivityPubService {
Ok(())
}
pub async fn backfill_outbox(&self, outbox_url: &str, actor_url: &str) -> anyhow::Result<()> {
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(30))
.build()?;
let data = self.federation_config.to_request_data();
let actor = url::Url::parse(actor_url)?;
let root: serde_json::Value = client
.get(outbox_url)
.header("Accept", "application/activity+json")
.send()
.await?
.json()
.await?;
let first = match root.get("first").and_then(|v| v.as_str()) {
Some(url) => url.to_string(),
None => {
tracing::debug!(outbox = %outbox_url, "outbox has no first page, nothing to backfill");
return Ok(());
}
};
let mut current_url = first;
let mut visited = std::collections::HashSet::new();
loop {
if !visited.insert(current_url.clone()) {
tracing::warn!(url = %current_url, "backfill: loop detected, stopping");
break;
}
let page: serde_json::Value = match client
.get(&current_url)
.header("Accept", "application/activity+json")
.send()
.await
{
Ok(resp) => match resp.json().await {
Ok(v) => v,
Err(e) => {
tracing::error!(error = %e, url = %current_url, "backfill: failed to parse page JSON");
break;
}
},
Err(e) => {
tracing::error!(error = %e, url = %current_url, "backfill: HTTP request failed");
break;
}
};
if let Some(items) = page.get("orderedItems").and_then(|v| v.as_array()) {
for item in items {
let activity_type = item.get("type").and_then(|v| v.as_str()).unwrap_or("");
if activity_type != "Create" && activity_type != "Add" {
continue;
}
let object = match item.get("object") {
Some(o) if o.is_object() => o.clone(),
_ => continue,
};
let ap_id = match object
.get("id")
.and_then(|v| v.as_str())
.and_then(|s| url::Url::parse(s).ok())
{
Some(u) => u,
None => continue,
};
if let Err(e) = data.object_handler.on_create(&ap_id, &actor, object).await {
tracing::warn!(ap_id = %ap_id, error = %e, "backfill: failed to process item, skipping");
}
}
}
match page.get("next").and_then(|v| v.as_str()) {
Some(next) => current_url = next.to_string(),
None => break,
}
}
tracing::info!(outbox = %outbox_url, pages = visited.len(), "backfill complete");
Ok(())
}
fn spawn_backfill(&self, owner_user_id: uuid::Uuid, follower_inbox_url: String) {
let config = self.federation_config.clone();
let base_url = self.base_url.clone();