feat: implement federation post/connections backfill schedulers

schedule_actor_posts_fetch now spawns backfill_outbox in background,
fetching all pages of a remote outbox and persisting via accept_note.
schedule_connections_fetch follows AP collection next-links, resolves
profiles, and caches them in the DB. Both were no-ops ("deferred").

Add connections_repo field to ActivityPubService; wire both factories.
This commit is contained in:
2026-05-17 11:49:53 +02:00
parent 39f7d39232
commit bb48819cad
3 changed files with 118 additions and 12 deletions

View File

@@ -1,5 +1,7 @@
use std::sync::Arc; use std::sync::Arc;
use domain::ports::FederationFetchPort;
use activitypub_federation::{ use activitypub_federation::{
activity_sending::SendActivityTask, fetch::object_id::ObjectId, protocol::context::WithContext, activity_sending::SendActivityTask, fetch::object_id::ObjectId, protocol::context::WithContext,
traits::Actor, traits::Actor,
@@ -154,9 +156,11 @@ pub(crate) async fn send_with_retry(
failures failures
} }
#[derive(Clone)]
pub struct ActivityPubService { pub struct ActivityPubService {
federation_config: ApFederationConfig, federation_config: ApFederationConfig,
base_url: String, base_url: String,
connections_repo: Arc<dyn domain::ports::RemoteActorConnectionRepository>,
} }
impl ActivityPubService { impl ActivityPubService {
@@ -170,6 +174,7 @@ impl ActivityPubService {
software_name: String, software_name: String,
debug: bool, debug: bool,
event_publisher: Option<Arc<dyn domain::ports::EventPublisher>>, event_publisher: Option<Arc<dyn domain::ports::EventPublisher>>,
connections_repo: Arc<dyn domain::ports::RemoteActorConnectionRepository>,
) -> anyhow::Result<Self> { ) -> anyhow::Result<Self> {
let data = FederationData::new( let data = FederationData::new(
repo, repo,
@@ -184,6 +189,7 @@ impl ActivityPubService {
Ok(Self { Ok(Self {
federation_config, federation_config,
base_url, base_url,
connections_repo,
}) })
} }
@@ -1586,11 +1592,14 @@ impl domain::ports::FederationSchedulerPort for ActivityPubService {
actor_ap_url: &str, actor_ap_url: &str,
outbox_url: &str, outbox_url: &str,
) -> Result<(), domain::errors::DomainError> { ) -> Result<(), domain::errors::DomainError> {
tracing::debug!( let service = self.clone();
actor = actor_ap_url, let actor = actor_ap_url.to_string();
outbox = outbox_url, let outbox = outbox_url.to_string();
"schedule_actor_posts_fetch: deferred" tokio::spawn(async move {
); if let Err(e) = service.backfill_outbox(&outbox, &actor).await {
tracing::warn!(actor = %actor, error = %e, "posts backfill failed");
}
});
Ok(()) Ok(())
} }
@@ -1601,13 +1610,107 @@ impl domain::ports::FederationSchedulerPort for ActivityPubService {
connection_type: &str, connection_type: &str,
page: u32, page: u32,
) -> Result<(), domain::errors::DomainError> { ) -> Result<(), domain::errors::DomainError> {
tracing::debug!( // Only trigger a full fetch on page 1 to avoid redundant network traffic.
actor = actor_ap_url, if page != 1 {
collection = collection_url, return Ok(());
connection_type, }
page, let service = self.clone();
"schedule_connections_fetch: deferred" let actor = actor_ap_url.to_string();
); let collection = collection_url.to_string();
let conn_type = connection_type.to_string();
let connections_repo = self.connections_repo.clone();
tokio::spawn(async move {
let client = match reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(HTTP_FETCH_TIMEOUT_SECS))
.build()
{
Ok(c) => c,
Err(e) => {
tracing::warn!(error = %e, "connections fetch: failed to build client");
return;
}
};
// Walk the AP collection, following first/next links.
let mut all_urls: Vec<String> = Vec::new();
let mut current_url: Option<String> = Some(collection.clone());
const MAX_ACTORS: usize = 500;
while let Some(url) = current_url.take() {
let val: serde_json::Value = match client
.get(&url)
.header("Accept", "application/activity+json, application/ld+json")
.send()
.await
{
Ok(r) => match r.json().await {
Ok(v) => v,
Err(e) => {
tracing::warn!(error = %e, url = %url, "connections: parse error");
break;
}
},
Err(e) => {
tracing::warn!(error = %e, url = %url, "connections: HTTP error");
break;
}
};
// OrderedCollection root — follow its `first` page.
if val["type"].as_str() == Some("OrderedCollection") {
current_url = val["first"].as_str().map(|s| s.to_string());
continue;
}
// Collect actor URLs from orderedItems (string or {id: ...}).
let empty = vec![];
let items = val["orderedItems"].as_array().unwrap_or(&empty);
for item in items {
let actor_url = item
.as_str()
.or_else(|| item["id"].as_str())
.unwrap_or("");
if !actor_url.is_empty() {
all_urls.push(actor_url.to_string());
}
}
if all_urls.len() >= MAX_ACTORS {
break;
}
current_url = val["next"].as_str().map(|s| s.to_string());
if current_url.is_some() {
tokio::time::sleep(std::time::Duration::from_millis(BATCH_FETCH_SLEEP_MS))
.await;
}
}
if all_urls.is_empty() {
tracing::debug!(actor = %actor, connection_type = %conn_type, "connections: empty collection");
return;
}
// Resolve profiles and cache in pages of PAGE_SIZE.
const PAGE_SIZE: usize = 20;
for (idx, chunk) in all_urls.chunks(PAGE_SIZE).enumerate() {
let page_num = (idx + 1) as u32;
let chunk_urls: Vec<String> = chunk.to_vec();
let resolved = service.resolve_actor_profiles(chunk_urls).await;
if let Err(e) = connections_repo
.upsert_connections(&actor, &conn_type, page_num, &resolved)
.await
{
tracing::warn!(error = %e, "connections: upsert failed");
}
}
tracing::debug!(
actor = %actor,
connection_type = %conn_type,
count = all_urls.len(),
"connections fetch complete"
);
});
Ok(()) Ok(())
} }
} }

View File

@@ -86,6 +86,7 @@ pub async fn build(cfg: &Config) -> Infrastructure {
"thoughts".to_string(), "thoughts".to_string(),
cfg.debug, cfg.debug,
None, None,
Arc::new(PgRemoteActorConnectionRepository::new(pool.clone())),
) )
.await .await
.expect("Failed to build ActivityPubService"), .expect("Failed to build ActivityPubService"),

View File

@@ -1,4 +1,5 @@
use postgres::failed_event::PgFailedEventStore; use postgres::failed_event::PgFailedEventStore;
use postgres::remote_actor_connections::PgRemoteActorConnectionRepository;
use sqlx::PgPool; use sqlx::PgPool;
use std::sync::Arc; use std::sync::Arc;
@@ -56,6 +57,7 @@ pub async fn build(database_url: &str, base_url: &str, nats_url: &str) -> Worker
"thoughts".to_string(), "thoughts".to_string(),
false, false,
None, None,
Arc::new(PgRemoteActorConnectionRepository::new(pool.clone())),
) )
.await .await
.expect("ActivityPubService build failed"), .expect("ActivityPubService build failed"),