Files
thoughts/crates/worker/src/factory.rs

103 lines
3.4 KiB
Rust

use sqlx::PgPool;
use std::sync::Arc;
use activitypub::ThoughtsObjectHandler;
use activitypub_base::ActivityPubService;
use application::services::{FederationEventService, NotificationEventService};
use domain::ports::{ActivityPubRepository, FederationActionPort, OutboundFederationPort};
use postgres::activitypub::PgActivityPubRepository;
use postgres::remote_actor_connections::PgRemoteActorConnectionRepository;
use postgres_federation::{PostgresApUserRepository, PostgresFederationRepository};
use crate::handlers::{FederationHandler, NotificationHandler};
pub struct WorkerHandlers {
pub notification: NotificationHandler,
pub federation: FederationHandler,
}
pub async fn build(
database_url: &str,
base_url: &str,
nats_url: &str,
) -> (
event_transport::EventConsumerAdapter<nats::NatsMessageSource>,
WorkerHandlers,
) {
let pool = PgPool::connect(database_url)
.await
.expect("DB connect failed");
// Repos
let thoughts = Arc::new(postgres::thought::PgThoughtRepository::new(pool.clone()));
let users = Arc::new(postgres::user::PgUserRepository::new(pool.clone()));
let notifications = Arc::new(postgres::notification::PgNotificationRepository::new(
pool.clone(),
));
// ActivityPub service (for federation fan-out)
let ap_service = Arc::new(
ActivityPubService::new(
Arc::new(PostgresFederationRepository::new(pool.clone())),
Arc::new(PostgresApUserRepository::new(
pool.clone(),
base_url.to_string(),
)),
Arc::new(ThoughtsObjectHandler::new(
Arc::new(PgActivityPubRepository::new(pool.clone())),
base_url,
)),
base_url.to_string(),
false,
"thoughts".to_string(),
false,
None,
)
.await
.expect("ActivityPubService build failed"),
);
let ap_outbound = ap_service.clone() as Arc<dyn OutboundFederationPort>;
let ap_federation = ap_service.clone() as Arc<dyn FederationActionPort>;
let ap_repo_worker =
Arc::new(PgActivityPubRepository::new(pool.clone())) as Arc<dyn ActivityPubRepository>;
let actor_connections = Arc::new(PgRemoteActorConnectionRepository::new(pool.clone()))
as Arc<dyn domain::ports::RemoteActorConnectionRepository>;
// Application services
let notification_svc = Arc::new(NotificationEventService {
thoughts: thoughts.clone(),
notifications,
});
let federation_svc = Arc::new(FederationEventService {
thoughts,
users,
ap: ap_outbound,
base_url: base_url.to_string(),
federation_action: ap_federation,
ap_repo: ap_repo_worker,
remote_actor_connections: actor_connections,
});
// Thin handlers
let handlers = WorkerHandlers {
notification: NotificationHandler {
service: notification_svc,
},
federation: FederationHandler {
service: federation_svc,
},
};
// NATS consumer
let nats_client = async_nats::connect(nats_url)
.await
.expect("NATS connect failed");
nats::ensure_stream(&nats_client)
.await
.expect("JetStream stream setup failed");
let consumer =
event_transport::EventConsumerAdapter::new(nats::NatsMessageSource::new(nats_client));
(consumer, handlers)
}