From 2e702c64cc56961b86b31795668a1be9f4a8ce45 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Thu, 14 May 2026 09:58:33 +0200 Subject: [PATCH] feat(worker): consumer loop binary connecting NATS to handlers --- crates/worker/src/main.rs | 57 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 56 insertions(+), 1 deletion(-) diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index f328e4d..c7405fa 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -1 +1,56 @@ -fn main() {} +mod handlers; + +use std::sync::Arc; +use futures::StreamExt; +use sqlx::PgPool; +use domain::ports::EventConsumer; + +#[tokio::main] +async fn main() { + dotenvy::dotenv().ok(); + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .init(); + + let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL required"); + let nats_url = std::env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".into()); + + tracing::info!("Connecting to postgres..."); + let pool = PgPool::connect(&database_url).await.expect("DB connect failed"); + + tracing::info!("Connecting to NATS at {nats_url}..."); + let nats_client = async_nats::connect(&nats_url).await.expect("NATS connect failed"); + let consumer = nats::NatsEventConsumer::new(nats_client); + + let notification_handler = handlers::NotificationHandler { + thoughts: Arc::new(postgres::thought::PgThoughtRepository::new(pool.clone())), + notifications: Arc::new(postgres::notification::PgNotificationRepository::new(pool.clone())), + }; + let federation_handler = handlers::FederationHandler; + + tracing::info!("Worker started, consuming events..."); + + let mut stream = consumer.consume(); + while let Some(result) = stream.next().await { + match result { + Ok(envelope) => { + let event = &envelope.event; + tracing::debug!(?event, "received event"); + + let n_result = notification_handler.handle(event).await; + let f_result = federation_handler.handle(event).await; + + if n_result.is_ok() && f_result.is_ok() { + (envelope.ack)(); + } else { + if let Err(e) = n_result { tracing::error!("notification handler error: {e}"); } + if let Err(e) = f_result { tracing::error!("federation handler error: {e}"); } + (envelope.nack)(); + } + } + Err(e) => { + tracing::error!("consumer error: {e}"); + } + } + } +}