use sqlx::PgPool; use std::sync::Arc; use activitypub::ThoughtsObjectHandler; use activitypub_base::ActivityPubService; use application::services::{FederationEventService, NotificationEventService}; use postgres::activitypub::PgActivityPubRepository; use postgres_federation::{PostgresApUserRepository, PostgresFederationRepository}; use crate::handlers::{FederationHandler, NotificationHandler}; pub struct WorkerHandlers { pub notification: NotificationHandler, pub federation: FederationHandler, } pub async fn build( database_url: &str, base_url: &str, nats_url: &str, ) -> ( event_transport::EventConsumerAdapter, WorkerHandlers, ) { let pool = PgPool::connect(database_url) .await .expect("DB connect failed"); // Repos let thoughts = Arc::new(postgres::thought::PgThoughtRepository::new(pool.clone())); let users = Arc::new(postgres::user::PgUserRepository::new(pool.clone())); let notifications = Arc::new(postgres::notification::PgNotificationRepository::new( pool.clone(), )); // ActivityPub service (for federation fan-out) let ap_service: Arc = Arc::new( ActivityPubService::new( Arc::new(PostgresFederationRepository::new(pool.clone())), Arc::new(PostgresApUserRepository::new( pool.clone(), base_url.to_string(), )), Arc::new(ThoughtsObjectHandler::new( Arc::new(PgActivityPubRepository::new(pool.clone())), base_url, )), base_url.to_string(), false, "thoughts".to_string(), false, None, ) .await .expect("ActivityPubService build failed"), ); // Application services let notification_svc = Arc::new(NotificationEventService { thoughts: thoughts.clone(), notifications, }); let federation_svc = Arc::new(FederationEventService { thoughts, users, ap: ap_service, base_url: base_url.to_string(), }); // Thin handlers let handlers = WorkerHandlers { notification: NotificationHandler { service: notification_svc, }, federation: FederationHandler { service: federation_svc, }, }; // NATS consumer let nats_client = async_nats::connect(nats_url) .await .expect("NATS connect failed"); nats::ensure_stream(&nats_client) .await .expect("JetStream stream setup failed"); let consumer = event_transport::EventConsumerAdapter::new(nats::NatsMessageSource::new(nats_client)); (consumer, handlers) }