feat(worker): consumer loop binary connecting NATS to handlers
This commit is contained in:
@@ -1 +1,56 @@
|
|||||||
fn main() {}
|
mod handlers;
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
use futures::StreamExt;
|
||||||
|
use sqlx::PgPool;
|
||||||
|
use domain::ports::EventConsumer;
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() {
|
||||||
|
dotenvy::dotenv().ok();
|
||||||
|
tracing_subscriber::fmt()
|
||||||
|
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
|
||||||
|
.init();
|
||||||
|
|
||||||
|
let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL required");
|
||||||
|
let nats_url = std::env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".into());
|
||||||
|
|
||||||
|
tracing::info!("Connecting to postgres...");
|
||||||
|
let pool = PgPool::connect(&database_url).await.expect("DB connect failed");
|
||||||
|
|
||||||
|
tracing::info!("Connecting to NATS at {nats_url}...");
|
||||||
|
let nats_client = async_nats::connect(&nats_url).await.expect("NATS connect failed");
|
||||||
|
let consumer = nats::NatsEventConsumer::new(nats_client);
|
||||||
|
|
||||||
|
let notification_handler = handlers::NotificationHandler {
|
||||||
|
thoughts: Arc::new(postgres::thought::PgThoughtRepository::new(pool.clone())),
|
||||||
|
notifications: Arc::new(postgres::notification::PgNotificationRepository::new(pool.clone())),
|
||||||
|
};
|
||||||
|
let federation_handler = handlers::FederationHandler;
|
||||||
|
|
||||||
|
tracing::info!("Worker started, consuming events...");
|
||||||
|
|
||||||
|
let mut stream = consumer.consume();
|
||||||
|
while let Some(result) = stream.next().await {
|
||||||
|
match result {
|
||||||
|
Ok(envelope) => {
|
||||||
|
let event = &envelope.event;
|
||||||
|
tracing::debug!(?event, "received event");
|
||||||
|
|
||||||
|
let n_result = notification_handler.handle(event).await;
|
||||||
|
let f_result = federation_handler.handle(event).await;
|
||||||
|
|
||||||
|
if n_result.is_ok() && f_result.is_ok() {
|
||||||
|
(envelope.ack)();
|
||||||
|
} else {
|
||||||
|
if let Err(e) = n_result { tracing::error!("notification handler error: {e}"); }
|
||||||
|
if let Err(e) = f_result { tracing::error!("federation handler error: {e}"); }
|
||||||
|
(envelope.nack)();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("consumer error: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user