feat(worker): add FederationManagementHandler and wire into event loop

This commit is contained in:
2026-05-28 02:30:22 +02:00
parent 0dce4fbe64
commit a06d09c101
3 changed files with 30 additions and 4 deletions

View File

@@ -5,17 +5,20 @@ use std::sync::Arc;
use activitypub::{ActivityPubRepository, OutboundFederationPort}; use activitypub::{ActivityPubRepository, OutboundFederationPort};
use activitypub::{ApFederationAdapter, ThoughtsObjectHandler}; use activitypub::{ApFederationAdapter, ThoughtsObjectHandler};
use application::services::{FederationEventService, NotificationEventService}; use application::services::{
FederationEventService, FederationManagementEventService, NotificationEventService,
};
use domain::ports::EventPublisher; use domain::ports::EventPublisher;
use k_ap::ActivityPubService; use k_ap::ActivityPubService;
use postgres::activitypub::PgActivityPubRepository; use postgres::activitypub::PgActivityPubRepository;
use postgres_federation::{PostgresApUserRepository, PostgresFederationRepository}; use postgres_federation::{PostgresApUserRepository, PostgresFederationRepository};
use crate::handlers::{FederationHandler, NotificationHandler}; use crate::handlers::{FederationHandler, FederationManagementHandler, NotificationHandler};
pub struct WorkerHandlers { pub struct WorkerHandlers {
pub notification: NotificationHandler, pub notification: NotificationHandler,
pub federation: FederationHandler, pub federation: FederationHandler,
pub federation_management: FederationManagementHandler,
} }
pub struct WorkerInfra { pub struct WorkerInfra {
@@ -82,6 +85,9 @@ pub async fn build(database_url: &str, base_url: &str, nats_url: &str) -> Worker
base_url: base_url.to_string(), base_url: base_url.to_string(),
ap_repo: ap_repo_worker, ap_repo: ap_repo_worker,
}); });
let federation_management_svc = Arc::new(FederationManagementEventService {
federation: ap_service.clone() as Arc<dyn domain::ports::FederationActionPort>,
});
// Thin handlers // Thin handlers
let handlers = WorkerHandlers { let handlers = WorkerHandlers {
@@ -91,6 +97,9 @@ pub async fn build(database_url: &str, base_url: &str, nats_url: &str) -> Worker
federation: FederationHandler { federation: FederationHandler {
service: federation_svc, service: federation_svc,
}, },
federation_management: FederationManagementHandler {
service: federation_management_svc,
},
}; };
// DLQ store // DLQ store

View File

@@ -1,4 +1,6 @@
use application::services::{FederationEventService, NotificationEventService}; use application::services::{
FederationEventService, FederationManagementEventService, NotificationEventService,
};
use domain::{errors::DomainError, events::DomainEvent}; use domain::{errors::DomainError, events::DomainEvent};
use std::sync::Arc; use std::sync::Arc;
@@ -21,3 +23,13 @@ impl FederationHandler {
self.service.process(event).await self.service.process(event).await
} }
} }
pub struct FederationManagementHandler {
pub service: Arc<FederationManagementEventService>,
}
impl FederationManagementHandler {
pub async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> {
self.service.process(event).await
}
}

View File

@@ -52,8 +52,9 @@ async fn main() {
let n = infra.handlers.notification.handle(event).await; let n = infra.handlers.notification.handle(event).await;
let f = infra.handlers.federation.handle(event).await; let f = infra.handlers.federation.handle(event).await;
let fm = infra.handlers.federation_management.handle(event).await;
if n.is_ok() && f.is_ok() { if n.is_ok() && f.is_ok() && fm.is_ok() {
(envelope.ack)(); (envelope.ack)();
tracing::info!(event_type, "event handled ok"); tracing::info!(event_type, "event handled ok");
} else { } else {
@@ -63,6 +64,9 @@ async fn main() {
if let Err(e) = &f { if let Err(e) = &f {
tracing::error!("federation handler: {e}"); tracing::error!("federation handler: {e}");
} }
if let Err(e) = &fm {
tracing::error!("federation management handler: {e}");
}
// Last delivery attempt -> move to DLQ then ack. // Last delivery attempt -> move to DLQ then ack.
// Earlier attempts -> nack so NATS retries. // Earlier attempts -> nack so NATS retries.
@@ -70,6 +74,7 @@ async fn main() {
let error_msg = n let error_msg = n
.err() .err()
.or(f.err()) .or(f.err())
.or(fm.err())
.map(|e| e.to_string()) .map(|e| e.to_string())
.unwrap_or_else(|| "unknown error".into()); .unwrap_or_else(|| "unknown error".into());