From bb48819cadb02c872cabb3dd475ed8a302ecbaec Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Sun, 17 May 2026 11:49:53 +0200 Subject: [PATCH] feat: implement federation post/connections backfill schedulers schedule_actor_posts_fetch now spawns backfill_outbox in background, fetching all pages of a remote outbox and persisting via accept_note. schedule_connections_fetch follows AP collection next-links, resolves profiles, and caches them in the DB. Both were no-ops ("deferred"). Add connections_repo field to ActivityPubService; wire both factories. --- .../adapters/activitypub-base/src/service.rs | 127 ++++++++++++++++-- crates/bootstrap/src/factory.rs | 1 + crates/worker/src/factory.rs | 2 + 3 files changed, 118 insertions(+), 12 deletions(-) diff --git a/crates/adapters/activitypub-base/src/service.rs b/crates/adapters/activitypub-base/src/service.rs index ff2f22f..eb77f1d 100644 --- a/crates/adapters/activitypub-base/src/service.rs +++ b/crates/adapters/activitypub-base/src/service.rs @@ -1,5 +1,7 @@ use std::sync::Arc; +use domain::ports::FederationFetchPort; + use activitypub_federation::{ activity_sending::SendActivityTask, fetch::object_id::ObjectId, protocol::context::WithContext, traits::Actor, @@ -154,9 +156,11 @@ pub(crate) async fn send_with_retry( failures } +#[derive(Clone)] pub struct ActivityPubService { federation_config: ApFederationConfig, base_url: String, + connections_repo: Arc, } impl ActivityPubService { @@ -170,6 +174,7 @@ impl ActivityPubService { software_name: String, debug: bool, event_publisher: Option>, + connections_repo: Arc, ) -> anyhow::Result { let data = FederationData::new( repo, @@ -184,6 +189,7 @@ impl ActivityPubService { Ok(Self { federation_config, base_url, + connections_repo, }) } @@ -1586,11 +1592,14 @@ impl domain::ports::FederationSchedulerPort for ActivityPubService { actor_ap_url: &str, outbox_url: &str, ) -> Result<(), domain::errors::DomainError> { - tracing::debug!( - actor = actor_ap_url, - outbox = outbox_url, - "schedule_actor_posts_fetch: deferred" - ); + let service = self.clone(); + let actor = actor_ap_url.to_string(); + let outbox = outbox_url.to_string(); + tokio::spawn(async move { + if let Err(e) = service.backfill_outbox(&outbox, &actor).await { + tracing::warn!(actor = %actor, error = %e, "posts backfill failed"); + } + }); Ok(()) } @@ -1601,13 +1610,107 @@ impl domain::ports::FederationSchedulerPort for ActivityPubService { connection_type: &str, page: u32, ) -> Result<(), domain::errors::DomainError> { - tracing::debug!( - actor = actor_ap_url, - collection = collection_url, - connection_type, - page, - "schedule_connections_fetch: deferred" - ); + // Only trigger a full fetch on page 1 to avoid redundant network traffic. + if page != 1 { + return Ok(()); + } + let service = self.clone(); + let actor = actor_ap_url.to_string(); + let collection = collection_url.to_string(); + let conn_type = connection_type.to_string(); + let connections_repo = self.connections_repo.clone(); + tokio::spawn(async move { + let client = match reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(HTTP_FETCH_TIMEOUT_SECS)) + .build() + { + Ok(c) => c, + Err(e) => { + tracing::warn!(error = %e, "connections fetch: failed to build client"); + return; + } + }; + + // Walk the AP collection, following first/next links. + let mut all_urls: Vec = Vec::new(); + let mut current_url: Option = Some(collection.clone()); + const MAX_ACTORS: usize = 500; + + while let Some(url) = current_url.take() { + let val: serde_json::Value = match client + .get(&url) + .header("Accept", "application/activity+json, application/ld+json") + .send() + .await + { + Ok(r) => match r.json().await { + Ok(v) => v, + Err(e) => { + tracing::warn!(error = %e, url = %url, "connections: parse error"); + break; + } + }, + Err(e) => { + tracing::warn!(error = %e, url = %url, "connections: HTTP error"); + break; + } + }; + + // OrderedCollection root — follow its `first` page. + if val["type"].as_str() == Some("OrderedCollection") { + current_url = val["first"].as_str().map(|s| s.to_string()); + continue; + } + + // Collect actor URLs from orderedItems (string or {id: ...}). + let empty = vec![]; + let items = val["orderedItems"].as_array().unwrap_or(&empty); + for item in items { + let actor_url = item + .as_str() + .or_else(|| item["id"].as_str()) + .unwrap_or(""); + if !actor_url.is_empty() { + all_urls.push(actor_url.to_string()); + } + } + + if all_urls.len() >= MAX_ACTORS { + break; + } + current_url = val["next"].as_str().map(|s| s.to_string()); + if current_url.is_some() { + tokio::time::sleep(std::time::Duration::from_millis(BATCH_FETCH_SLEEP_MS)) + .await; + } + } + + if all_urls.is_empty() { + tracing::debug!(actor = %actor, connection_type = %conn_type, "connections: empty collection"); + return; + } + + // Resolve profiles and cache in pages of PAGE_SIZE. + const PAGE_SIZE: usize = 20; + for (idx, chunk) in all_urls.chunks(PAGE_SIZE).enumerate() { + let page_num = (idx + 1) as u32; + let chunk_urls: Vec = chunk.to_vec(); + let resolved = service.resolve_actor_profiles(chunk_urls).await; + if let Err(e) = connections_repo + .upsert_connections(&actor, &conn_type, page_num, &resolved) + .await + { + tracing::warn!(error = %e, "connections: upsert failed"); + } + } + + tracing::debug!( + actor = %actor, + connection_type = %conn_type, + count = all_urls.len(), + "connections fetch complete" + ); + }); Ok(()) } } diff --git a/crates/bootstrap/src/factory.rs b/crates/bootstrap/src/factory.rs index bc1d553..57041d9 100644 --- a/crates/bootstrap/src/factory.rs +++ b/crates/bootstrap/src/factory.rs @@ -86,6 +86,7 @@ pub async fn build(cfg: &Config) -> Infrastructure { "thoughts".to_string(), cfg.debug, None, + Arc::new(PgRemoteActorConnectionRepository::new(pool.clone())), ) .await .expect("Failed to build ActivityPubService"), diff --git a/crates/worker/src/factory.rs b/crates/worker/src/factory.rs index bc43891..8da2445 100644 --- a/crates/worker/src/factory.rs +++ b/crates/worker/src/factory.rs @@ -1,4 +1,5 @@ use postgres::failed_event::PgFailedEventStore; +use postgres::remote_actor_connections::PgRemoteActorConnectionRepository; use sqlx::PgPool; use std::sync::Arc; @@ -56,6 +57,7 @@ pub async fn build(database_url: &str, base_url: &str, nats_url: &str) -> Worker "thoughts".to_string(), false, None, + Arc::new(PgRemoteActorConnectionRepository::new(pool.clone())), ) .await .expect("ActivityPubService build failed"),