From 38a13ad6413de0f144535faeec07c6d962a641f6 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Fri, 15 May 2026 00:40:34 +0200 Subject: [PATCH] =?UTF-8?q?feat(worker):=20handle=20FetchActorConnections?= =?UTF-8?q?=20=E2=80=94=20resolve=20and=20cache=20remote=20actor=20connect?= =?UTF-8?q?ions?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/services/federation_event.rs | 63 +++++++++++++++++++ crates/worker/src/factory.rs | 4 ++ 2 files changed, 67 insertions(+) diff --git a/crates/application/src/services/federation_event.rs b/crates/application/src/services/federation_event.rs index c698e89..784716a 100644 --- a/crates/application/src/services/federation_event.rs +++ b/crates/application/src/services/federation_event.rs @@ -14,6 +14,7 @@ pub struct FederationEventService { pub base_url: String, pub federation_action: Arc, pub ap_repo: Arc, + pub remote_actor_connections: Arc, } impl FederationEventService { @@ -157,6 +158,52 @@ impl FederationEventService { Ok(()) } + DomainEvent::FetchActorConnections { + actor_ap_url, + collection_url, + connection_type, + page, + } => { + let urls = match self + .federation_action + .fetch_actor_urls_from_collection(collection_url) + .await + { + Ok(u) => u, + Err(e) => { + tracing::warn!( + collection_url, + error = %e, + "failed to fetch actor connections collection" + ); + return Ok(()); + } + }; + + if urls.is_empty() { + return Ok(()); + } + + let summaries = self.federation_action.resolve_actor_profiles(urls).await; + + if summaries.is_empty() { + return Ok(()); + } + + tracing::info!( + count = summaries.len(), + connection_type, + actor = actor_ap_url, + "caching actor connections" + ); + + self.remote_actor_connections + .upsert_connections(actor_ap_url, connection_type, *page, &summaries) + .await?; + + Ok(()) + } + _ => Ok(()), } } @@ -255,6 +302,7 @@ mod tests { base_url: "https://example.com".to_string(), federation_action: Arc::new(store.clone()), ap_repo: Arc::new(store.clone()), + remote_actor_connections: Arc::new(store.clone()), } } @@ -592,4 +640,19 @@ mod tests { .unwrap(); // TestStore.fetch_outbox_page returns Ok(vec![]) — no notes, no error } + + #[tokio::test] + async fn fetch_actor_connections_is_noop_when_collection_empty() { + let store = TestStore::default(); + let spy = Arc::new(SpyPort::default()); + svc(&store, spy.clone()) + .process(&DomainEvent::FetchActorConnections { + actor_ap_url: "https://mastodon.social/users/alice".into(), + collection_url: "https://mastodon.social/users/alice/followers".into(), + connection_type: "followers".into(), + page: 1, + }) + .await + .unwrap(); + } } diff --git a/crates/worker/src/factory.rs b/crates/worker/src/factory.rs index 8ff0ced..a2bf1f7 100644 --- a/crates/worker/src/factory.rs +++ b/crates/worker/src/factory.rs @@ -6,6 +6,7 @@ use activitypub_base::ActivityPubService; use application::services::{FederationEventService, NotificationEventService}; use domain::ports::{ActivityPubRepository, FederationActionPort, OutboundFederationPort}; use postgres::activitypub::PgActivityPubRepository; +use postgres::remote_actor_connections::PgRemoteActorConnectionRepository; use postgres_federation::{PostgresApUserRepository, PostgresFederationRepository}; use crate::handlers::{FederationHandler, NotificationHandler}; @@ -59,6 +60,8 @@ pub async fn build( let ap_federation = ap_service.clone() as Arc; let ap_repo_worker = Arc::new(PgActivityPubRepository::new(pool.clone())) as Arc; + let actor_connections = Arc::new(PgRemoteActorConnectionRepository::new(pool.clone())) + as Arc; // Application services let notification_svc = Arc::new(NotificationEventService { @@ -72,6 +75,7 @@ pub async fn build( base_url: base_url.to_string(), federation_action: ap_federation, ap_repo: ap_repo_worker, + remote_actor_connections: actor_connections, }); // Thin handlers