From 838ed9a3f88235762c8bb928b4a2dc6c7759a3f0 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Sun, 31 May 2026 11:53:51 +0200 Subject: [PATCH] feat: wire NATS event publisher into bootstrap + worker - Both binaries connect to NATS on startup, ensure JetStream stream - EventPublisherAdapter replaces LogEventPublisher - nats_url config with default nats://localhost:4222 - Deleted bootstrap's LogEventPublisher (no longer needed) --- Cargo.lock | 6 ++++++ crates/adapters/nats/src/lib.rs | 4 +--- crates/bootstrap/Cargo.toml | 5 ++++- crates/bootstrap/src/config.rs | 3 +++ crates/bootstrap/src/factory.rs | 8 ++++++-- crates/bootstrap/src/lib.rs | 1 - crates/bootstrap/src/log_event_publisher.rs | 12 ----------- crates/bootstrap/src/main.rs | 1 - crates/worker/Cargo.toml | 7 +++++-- crates/worker/src/config.rs | 2 ++ crates/worker/src/main.rs | 22 ++++++++------------- 11 files changed, 35 insertions(+), 36 deletions(-) delete mode 100644 crates/bootstrap/src/log_event_publisher.rs diff --git a/Cargo.lock b/Cargo.lock index d860935..8457352 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -309,14 +309,17 @@ name = "bootstrap" version = "0.1.0" dependencies = [ "adapters-auth", + "adapters-nats", "adapters-postgres", "adapters-storage", "anyhow", "application", + "async-nats", "async-trait", "axum", "domain", "dotenvy", + "event-transport", "presentation", "tokio", "tower-http", @@ -3559,13 +3562,16 @@ dependencies = [ name = "worker" version = "0.1.0" dependencies = [ + "adapters-nats", "adapters-postgres", "adapters-storage", "anyhow", "application", + "async-nats", "async-trait", "domain", "dotenvy", + "event-transport", "tokio", "tracing", "tracing-subscriber", diff --git a/crates/adapters/nats/src/lib.rs b/crates/adapters/nats/src/lib.rs index d02f642..9d8d70d 100644 --- a/crates/adapters/nats/src/lib.rs +++ b/crates/adapters/nats/src/lib.rs @@ -100,9 +100,7 @@ impl MessageSource for NatsMessageSource { if let Ok(info) = stream.consumer_info(CONSUMER_NAME).await && info.config.deliver_subject.is_some() { - tracing::info!( - "deleting old push consumer '{CONSUMER_NAME}', replacing with pull" - ); + tracing::info!("deleting old push consumer '{CONSUMER_NAME}', replacing with pull"); let _ = stream.delete_consumer(CONSUMER_NAME).await; } diff --git a/crates/bootstrap/Cargo.toml b/crates/bootstrap/Cargo.toml index e540252..fb63568 100644 --- a/crates/bootstrap/Cargo.toml +++ b/crates/bootstrap/Cargo.toml @@ -12,7 +12,10 @@ domain = { workspace = true } application = { workspace = true } adapters-auth = { workspace = true } -adapters-storage = { workspace = true, features = ["s3"] } +adapters-storage = { workspace = true, features = ["s3"] } +adapters-nats = { workspace = true } +event-transport = { workspace = true } +async-nats = { workspace = true } presentation = { workspace = true } diff --git a/crates/bootstrap/src/config.rs b/crates/bootstrap/src/config.rs index 6b8ee02..6db4732 100644 --- a/crates/bootstrap/src/config.rs +++ b/crates/bootstrap/src/config.rs @@ -3,6 +3,7 @@ pub struct Config { pub host: String, pub port: u16, pub database_url: String, + pub nats_url: String, pub jwt_secret: String, pub cors_allowed_origins: Vec, } @@ -17,6 +18,8 @@ impl Config { .and_then(|p| p.parse().ok()) .unwrap_or(3000), database_url: std::env::var("DATABASE_URL").expect("DATABASE_URL must be set"), + nats_url: std::env::var("NATS_URL") + .unwrap_or_else(|_| "nats://localhost:4222".to_string()), jwt_secret: std::env::var("JWT_SECRET").expect("JWT_SECRET must be set"), cors_allowed_origins: std::env::var("CORS_ALLOWED_ORIGINS") .unwrap_or_else(|_| "http://localhost:3000".to_string()) diff --git a/crates/bootstrap/src/factory.rs b/crates/bootstrap/src/factory.rs index 443d34f..6b7bd77 100644 --- a/crates/bootstrap/src/factory.rs +++ b/crates/bootstrap/src/factory.rs @@ -55,13 +55,15 @@ use presentation::{ }; use crate::config::Config; -use crate::log_event_publisher::LogEventPublisher; use crate::log_sidecar_writer::LogSidecarWriter; pub async fn build_app(config: &Config) -> Result { let pool = connect(&config.database_url).await?; run_migrations(&pool).await?; + let nats_client = async_nats::connect(&config.nats_url).await?; + adapters_nats::ensure_stream(&nats_client).await?; + // Identity let user_repo = Arc::new(PostgresUserRepository::new(pool.clone())); let hasher = Arc::new(BcryptPasswordHasher); @@ -91,7 +93,9 @@ pub async fn build_app(config: &Config) -> Result { let batch_repo = Arc::new(PostgresJobBatchRepository::new(pool.clone())); let plugin_repo = Arc::new(PostgresPluginRepository::new(pool.clone())); let pipeline_repo = Arc::new(PostgresPipelineRepository::new(pool.clone())); - let event_publisher: Arc = Arc::new(LogEventPublisher); + let transport = adapters_nats::NatsTransport::new(nats_client); + let event_publisher: Arc = + Arc::new(event_transport::EventPublisherAdapter::new(transport)); let sidecar_writer: Arc = Arc::new(LogSidecarWriter); // File storage diff --git a/crates/bootstrap/src/lib.rs b/crates/bootstrap/src/lib.rs index b3a8391..d971a4f 100644 --- a/crates/bootstrap/src/lib.rs +++ b/crates/bootstrap/src/lib.rs @@ -1,4 +1,3 @@ pub mod config; pub mod factory; -pub mod log_event_publisher; pub mod log_sidecar_writer; diff --git a/crates/bootstrap/src/log_event_publisher.rs b/crates/bootstrap/src/log_event_publisher.rs deleted file mode 100644 index 058200f..0000000 --- a/crates/bootstrap/src/log_event_publisher.rs +++ /dev/null @@ -1,12 +0,0 @@ -use async_trait::async_trait; -use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher}; - -pub struct LogEventPublisher; - -#[async_trait] -impl EventPublisher for LogEventPublisher { - async fn publish(&self, event: &DomainEvent) -> Result<(), DomainError> { - tracing::info!(?event, "domain event published"); - Ok(()) - } -} diff --git a/crates/bootstrap/src/main.rs b/crates/bootstrap/src/main.rs index 61ac860..fe930a1 100644 --- a/crates/bootstrap/src/main.rs +++ b/crates/bootstrap/src/main.rs @@ -3,7 +3,6 @@ use tracing::info; mod config; mod factory; -mod log_event_publisher; mod log_sidecar_writer; #[tokio::main] diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index 2159a9f..779e191 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -11,8 +11,11 @@ path = "src/main.rs" domain = { workspace = true } application = { workspace = true } -adapters-postgres = { path = "../adapters/postgres" } -adapters-storage = { workspace = true } +adapters-postgres = { path = "../adapters/postgres" } +adapters-storage = { workspace = true } +adapters-nats = { workspace = true } +event-transport = { workspace = true } +async-nats = { workspace = true } tokio = { workspace = true } anyhow = { workspace = true } diff --git a/crates/worker/src/config.rs b/crates/worker/src/config.rs index 34b3ea5..a8776a3 100644 --- a/crates/worker/src/config.rs +++ b/crates/worker/src/config.rs @@ -1,6 +1,7 @@ #[derive(Debug, Clone)] pub struct WorkerConfig { pub database_url: String, + pub nats_url: String, pub poll_interval_secs: u64, pub storage_path: String, } @@ -10,6 +11,7 @@ impl WorkerConfig { dotenvy::dotenv().ok(); Self { database_url: std::env::var("DATABASE_URL").expect("DATABASE_URL must be set"), + nats_url: std::env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".into()), poll_interval_secs: std::env::var("POLL_INTERVAL_SECS") .ok() .and_then(|v| v.parse().ok()) diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index 221b4f5..a79a8fc 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -24,12 +24,19 @@ async fn main() -> anyhow::Result<()> { let pool = adapters_postgres::connect(&config.database_url).await?; adapters_postgres::run_migrations(&pool).await?; + let nats_client = async_nats::connect(&config.nats_url).await?; + adapters_nats::ensure_stream(&nats_client).await?; + info!(nats_url = %config.nats_url, "NATS connected"); + let repos = Repos::new(pool); let file_storage = Arc::new(adapters_storage::LocalFileStorage::new( &config.storage_path, )); let sidecar_writer: Arc = Arc::new(LogSidecarWriter); - let event_pub: Arc = Arc::new(LogEventPublisher); + + let transport = adapters_nats::NatsTransport::new(nats_client); + let event_pub: Arc = + Arc::new(event_transport::EventPublisherAdapter::new(transport)); let registry = Arc::new(build_plugin_registry(&repos, file_storage, sidecar_writer)); let process_next = build_process_next_handler(&repos, registry, event_pub); @@ -49,19 +56,6 @@ async fn main() -> anyhow::Result<()> { } } -struct LogEventPublisher; - -#[async_trait::async_trait] -impl domain::ports::EventPublisher for LogEventPublisher { - async fn publish( - &self, - event: &domain::events::DomainEvent, - ) -> Result<(), domain::errors::DomainError> { - info!(event = ?event, "domain event"); - Ok(()) - } -} - struct LogSidecarWriter; #[async_trait::async_trait]