use activitypub_federation::{activity_sending::SendActivityTask, fetch::object_id::ObjectId, protocol::context::WithContext}; use url::Url; use crate::{ activities::CreateActivity, actors::get_local_actor, federation::ApFederationConfig, }; use super::{ActivityPubService, delivery::send_with_retry}; impl ActivityPubService { pub async fn backfill_outbox(&self, outbox_url: &str, actor_url: &str) -> anyhow::Result<()> { let client = reqwest::Client::builder() .timeout(std::time::Duration::from_secs(super::HTTP_FETCH_TIMEOUT_SECS)) .build()?; let data = self.federation_config.to_request_data(); let actor = url::Url::parse(actor_url)?; let root: serde_json::Value = client.get(outbox_url).header("Accept", "application/activity+json").send().await?.json().await?; let first = match root.get("first").and_then(|v| v.as_str()) { Some(url) => url.to_string(), None => { tracing::debug!(outbox = %outbox_url, "outbox has no first page"); return Ok(()); } }; let mut current_url = first; let mut visited = std::collections::HashSet::new(); loop { if !visited.insert(current_url.clone()) { tracing::warn!(url = %current_url, "backfill: loop detected, stopping"); break; } let page: serde_json::Value = match client.get(¤t_url).header("Accept", "application/activity+json").send().await { Ok(resp) => match resp.json().await { Ok(v) => v, Err(e) => { tracing::error!(error = %e, "backfill: failed to parse page JSON"); break; } }, Err(e) => { tracing::error!(error = %e, "backfill: HTTP request failed"); break; } }; if let Some(items) = page.get("orderedItems").and_then(|v| v.as_array()) { for item in items { let activity_type = item.get("type").and_then(|v| v.as_str()).unwrap_or(""); if activity_type != "Create" && activity_type != "Add" { continue; } let Some(object) = item.get("object").filter(|o| o.is_object()).cloned() else { continue }; let Some(ap_id) = object.get("id").and_then(|v| v.as_str()).and_then(|s| url::Url::parse(s).ok()) else { continue }; if let Err(e) = data.object_handler.on_create(&ap_id, &actor, object).await { tracing::warn!(ap_id = %ap_id, error = %e, "backfill: failed to process item"); } } } match page.get("next").and_then(|v| v.as_str()) { Some(next) => current_url = next.to_string(), None => break, } } tracing::info!(outbox = %outbox_url, pages = visited.len(), "backfill complete"); Ok(()) } /// Spawns the backfill task in the background. /// `pub(crate)` so `service::follow` can call it from `accept_follower`. pub(crate) fn spawn_backfill(&self, owner_user_id: uuid::Uuid, follower_inbox_url: String) { let config = self.federation_config.clone(); let base_url = self.base_url.clone(); let max_attempts = self.delivery_max_attempts; let initial_delay = self.delivery_initial_delay_secs; tokio::spawn(async move { if let Err(e) = ActivityPubService::run_backfill(config, base_url, owner_user_id, follower_inbox_url, max_attempts, initial_delay).await { tracing::warn!(error = %e, "backfill: task failed"); } }); } async fn run_backfill( config: ApFederationConfig, base_url: String, owner_user_id: uuid::Uuid, follower_inbox_url: String, max_attempts: u32, initial_delay: u64, ) -> anyhow::Result<()> { const BATCH_SIZE: usize = 20; let data = config.to_request_data(); let local_actor = get_local_actor(owner_user_id, &data).await.map_err(|e| anyhow::anyhow!("{e}"))?; let inbox = Url::parse(&follower_inbox_url)?; let mut objects = data.object_handler.get_local_objects_for_user(owner_user_id).await?; objects.reverse(); let total = objects.len(); let (mut success_count, mut failure_count) = (0usize, 0usize); for chunk in objects.chunks(BATCH_SIZE) { for (ap_id, object_json) in chunk { let create_id = Url::parse(&format!( "{}/activities/create/{}", base_url, uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_URL, ap_id.as_str().as_bytes()) ))?; let create = CreateActivity { id: create_id, kind: Default::default(), actor: ObjectId::from(local_actor.ap_id.clone()), object: object_json.clone(), to: vec![], cc: vec![], bto: vec![], bcc: vec![], }; let sends = SendActivityTask::prepare(&WithContext::new_default(create), &local_actor, vec![inbox.clone()], &data).await?; if send_with_retry(sends, &data, max_attempts, initial_delay).await.is_empty() { success_count += 1; } else { failure_count += 1; } } tokio::time::sleep(std::time::Duration::from_millis(super::BATCH_FETCH_SLEEP_MS)).await; } tracing::info!(user_id = %owner_user_id, follower = %follower_inbox_url, sent = success_count, failed = failure_count, total = total, "backfill complete"); Ok(()) } }