From a06d09c101620bddaa056f18f9dcd543be74d9c0 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Thu, 28 May 2026 02:30:22 +0200 Subject: [PATCH] feat(worker): add FederationManagementHandler and wire into event loop --- crates/worker/src/factory.rs | 13 +++++++++++-- crates/worker/src/handlers.rs | 14 +++++++++++++- crates/worker/src/main.rs | 7 ++++++- 3 files changed, 30 insertions(+), 4 deletions(-) diff --git a/crates/worker/src/factory.rs b/crates/worker/src/factory.rs index b78ca9a..3cdefef 100644 --- a/crates/worker/src/factory.rs +++ b/crates/worker/src/factory.rs @@ -5,17 +5,20 @@ use std::sync::Arc; use activitypub::{ActivityPubRepository, OutboundFederationPort}; use activitypub::{ApFederationAdapter, ThoughtsObjectHandler}; -use application::services::{FederationEventService, NotificationEventService}; +use application::services::{ + FederationEventService, FederationManagementEventService, NotificationEventService, +}; use domain::ports::EventPublisher; use k_ap::ActivityPubService; use postgres::activitypub::PgActivityPubRepository; use postgres_federation::{PostgresApUserRepository, PostgresFederationRepository}; -use crate::handlers::{FederationHandler, NotificationHandler}; +use crate::handlers::{FederationHandler, FederationManagementHandler, NotificationHandler}; pub struct WorkerHandlers { pub notification: NotificationHandler, pub federation: FederationHandler, + pub federation_management: FederationManagementHandler, } pub struct WorkerInfra { @@ -82,6 +85,9 @@ pub async fn build(database_url: &str, base_url: &str, nats_url: &str) -> Worker base_url: base_url.to_string(), ap_repo: ap_repo_worker, }); + let federation_management_svc = Arc::new(FederationManagementEventService { + federation: ap_service.clone() as Arc, + }); // Thin handlers let handlers = WorkerHandlers { @@ -91,6 +97,9 @@ pub async fn build(database_url: &str, base_url: &str, nats_url: &str) -> Worker federation: FederationHandler { service: federation_svc, }, + federation_management: FederationManagementHandler { + service: federation_management_svc, + }, }; // DLQ store diff --git a/crates/worker/src/handlers.rs b/crates/worker/src/handlers.rs index 2adb2a4..e733ccf 100644 --- a/crates/worker/src/handlers.rs +++ b/crates/worker/src/handlers.rs @@ -1,4 +1,6 @@ -use application::services::{FederationEventService, NotificationEventService}; +use application::services::{ + FederationEventService, FederationManagementEventService, NotificationEventService, +}; use domain::{errors::DomainError, events::DomainEvent}; use std::sync::Arc; @@ -21,3 +23,13 @@ impl FederationHandler { self.service.process(event).await } } + +pub struct FederationManagementHandler { + pub service: Arc, +} + +impl FederationManagementHandler { + pub async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> { + self.service.process(event).await + } +} diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index 953b395..399566b 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -52,8 +52,9 @@ async fn main() { let n = infra.handlers.notification.handle(event).await; let f = infra.handlers.federation.handle(event).await; + let fm = infra.handlers.federation_management.handle(event).await; - if n.is_ok() && f.is_ok() { + if n.is_ok() && f.is_ok() && fm.is_ok() { (envelope.ack)(); tracing::info!(event_type, "event handled ok"); } else { @@ -63,6 +64,9 @@ async fn main() { if let Err(e) = &f { tracing::error!("federation handler: {e}"); } + if let Err(e) = &fm { + tracing::error!("federation management handler: {e}"); + } // Last delivery attempt -> move to DLQ then ack. // Earlier attempts -> nack so NATS retries. @@ -70,6 +74,7 @@ async fn main() { let error_msg = n .err() .or(f.err()) + .or(fm.err()) .map(|e| e.to_string()) .unwrap_or_else(|| "unknown error".into());