feat: wire NATS event publisher into bootstrap + worker

- Both binaries connect to NATS on startup, ensure JetStream stream
- EventPublisherAdapter<NatsTransport> replaces LogEventPublisher
- nats_url config with default nats://localhost:4222
- Deleted bootstrap's LogEventPublisher (no longer needed)
This commit is contained in:
2026-05-31 11:53:51 +02:00
parent 0e9911ebfc
commit 838ed9a3f8
11 changed files with 35 additions and 36 deletions

6
Cargo.lock generated
View File

@@ -309,14 +309,17 @@ name = "bootstrap"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"adapters-auth", "adapters-auth",
"adapters-nats",
"adapters-postgres", "adapters-postgres",
"adapters-storage", "adapters-storage",
"anyhow", "anyhow",
"application", "application",
"async-nats",
"async-trait", "async-trait",
"axum", "axum",
"domain", "domain",
"dotenvy", "dotenvy",
"event-transport",
"presentation", "presentation",
"tokio", "tokio",
"tower-http", "tower-http",
@@ -3559,13 +3562,16 @@ dependencies = [
name = "worker" name = "worker"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"adapters-nats",
"adapters-postgres", "adapters-postgres",
"adapters-storage", "adapters-storage",
"anyhow", "anyhow",
"application", "application",
"async-nats",
"async-trait", "async-trait",
"domain", "domain",
"dotenvy", "dotenvy",
"event-transport",
"tokio", "tokio",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",

View File

@@ -100,9 +100,7 @@ impl MessageSource for NatsMessageSource {
if let Ok(info) = stream.consumer_info(CONSUMER_NAME).await if let Ok(info) = stream.consumer_info(CONSUMER_NAME).await
&& info.config.deliver_subject.is_some() && info.config.deliver_subject.is_some()
{ {
tracing::info!( tracing::info!("deleting old push consumer '{CONSUMER_NAME}', replacing with pull");
"deleting old push consumer '{CONSUMER_NAME}', replacing with pull"
);
let _ = stream.delete_consumer(CONSUMER_NAME).await; let _ = stream.delete_consumer(CONSUMER_NAME).await;
} }

View File

@@ -12,7 +12,10 @@ domain = { workspace = true }
application = { workspace = true } application = { workspace = true }
adapters-auth = { workspace = true } adapters-auth = { workspace = true }
adapters-storage = { workspace = true, features = ["s3"] } adapters-storage = { workspace = true, features = ["s3"] }
adapters-nats = { workspace = true }
event-transport = { workspace = true }
async-nats = { workspace = true }
presentation = { workspace = true } presentation = { workspace = true }

View File

@@ -3,6 +3,7 @@ pub struct Config {
pub host: String, pub host: String,
pub port: u16, pub port: u16,
pub database_url: String, pub database_url: String,
pub nats_url: String,
pub jwt_secret: String, pub jwt_secret: String,
pub cors_allowed_origins: Vec<String>, pub cors_allowed_origins: Vec<String>,
} }
@@ -17,6 +18,8 @@ impl Config {
.and_then(|p| p.parse().ok()) .and_then(|p| p.parse().ok())
.unwrap_or(3000), .unwrap_or(3000),
database_url: std::env::var("DATABASE_URL").expect("DATABASE_URL must be set"), database_url: std::env::var("DATABASE_URL").expect("DATABASE_URL must be set"),
nats_url: std::env::var("NATS_URL")
.unwrap_or_else(|_| "nats://localhost:4222".to_string()),
jwt_secret: std::env::var("JWT_SECRET").expect("JWT_SECRET must be set"), jwt_secret: std::env::var("JWT_SECRET").expect("JWT_SECRET must be set"),
cors_allowed_origins: std::env::var("CORS_ALLOWED_ORIGINS") cors_allowed_origins: std::env::var("CORS_ALLOWED_ORIGINS")
.unwrap_or_else(|_| "http://localhost:3000".to_string()) .unwrap_or_else(|_| "http://localhost:3000".to_string())

View File

@@ -55,13 +55,15 @@ use presentation::{
}; };
use crate::config::Config; use crate::config::Config;
use crate::log_event_publisher::LogEventPublisher;
use crate::log_sidecar_writer::LogSidecarWriter; use crate::log_sidecar_writer::LogSidecarWriter;
pub async fn build_app(config: &Config) -> Result<Router> { pub async fn build_app(config: &Config) -> Result<Router> {
let pool = connect(&config.database_url).await?; let pool = connect(&config.database_url).await?;
run_migrations(&pool).await?; run_migrations(&pool).await?;
let nats_client = async_nats::connect(&config.nats_url).await?;
adapters_nats::ensure_stream(&nats_client).await?;
// Identity // Identity
let user_repo = Arc::new(PostgresUserRepository::new(pool.clone())); let user_repo = Arc::new(PostgresUserRepository::new(pool.clone()));
let hasher = Arc::new(BcryptPasswordHasher); let hasher = Arc::new(BcryptPasswordHasher);
@@ -91,7 +93,9 @@ pub async fn build_app(config: &Config) -> Result<Router> {
let batch_repo = Arc::new(PostgresJobBatchRepository::new(pool.clone())); let batch_repo = Arc::new(PostgresJobBatchRepository::new(pool.clone()));
let plugin_repo = Arc::new(PostgresPluginRepository::new(pool.clone())); let plugin_repo = Arc::new(PostgresPluginRepository::new(pool.clone()));
let pipeline_repo = Arc::new(PostgresPipelineRepository::new(pool.clone())); let pipeline_repo = Arc::new(PostgresPipelineRepository::new(pool.clone()));
let event_publisher: Arc<LogEventPublisher> = Arc::new(LogEventPublisher); let transport = adapters_nats::NatsTransport::new(nats_client);
let event_publisher: Arc<dyn domain::ports::EventPublisher> =
Arc::new(event_transport::EventPublisherAdapter::new(transport));
let sidecar_writer: Arc<LogSidecarWriter> = Arc::new(LogSidecarWriter); let sidecar_writer: Arc<LogSidecarWriter> = Arc::new(LogSidecarWriter);
// File storage // File storage

View File

@@ -1,4 +1,3 @@
pub mod config; pub mod config;
pub mod factory; pub mod factory;
pub mod log_event_publisher;
pub mod log_sidecar_writer; pub mod log_sidecar_writer;

View File

@@ -1,12 +0,0 @@
use async_trait::async_trait;
use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher};
pub struct LogEventPublisher;
#[async_trait]
impl EventPublisher for LogEventPublisher {
async fn publish(&self, event: &DomainEvent) -> Result<(), DomainError> {
tracing::info!(?event, "domain event published");
Ok(())
}
}

View File

@@ -3,7 +3,6 @@ use tracing::info;
mod config; mod config;
mod factory; mod factory;
mod log_event_publisher;
mod log_sidecar_writer; mod log_sidecar_writer;
#[tokio::main] #[tokio::main]

View File

@@ -11,8 +11,11 @@ path = "src/main.rs"
domain = { workspace = true } domain = { workspace = true }
application = { workspace = true } application = { workspace = true }
adapters-postgres = { path = "../adapters/postgres" } adapters-postgres = { path = "../adapters/postgres" }
adapters-storage = { workspace = true } adapters-storage = { workspace = true }
adapters-nats = { workspace = true }
event-transport = { workspace = true }
async-nats = { workspace = true }
tokio = { workspace = true } tokio = { workspace = true }
anyhow = { workspace = true } anyhow = { workspace = true }

View File

@@ -1,6 +1,7 @@
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct WorkerConfig { pub struct WorkerConfig {
pub database_url: String, pub database_url: String,
pub nats_url: String,
pub poll_interval_secs: u64, pub poll_interval_secs: u64,
pub storage_path: String, pub storage_path: String,
} }
@@ -10,6 +11,7 @@ impl WorkerConfig {
dotenvy::dotenv().ok(); dotenvy::dotenv().ok();
Self { Self {
database_url: std::env::var("DATABASE_URL").expect("DATABASE_URL must be set"), database_url: std::env::var("DATABASE_URL").expect("DATABASE_URL must be set"),
nats_url: std::env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".into()),
poll_interval_secs: std::env::var("POLL_INTERVAL_SECS") poll_interval_secs: std::env::var("POLL_INTERVAL_SECS")
.ok() .ok()
.and_then(|v| v.parse().ok()) .and_then(|v| v.parse().ok())

View File

@@ -24,12 +24,19 @@ async fn main() -> anyhow::Result<()> {
let pool = adapters_postgres::connect(&config.database_url).await?; let pool = adapters_postgres::connect(&config.database_url).await?;
adapters_postgres::run_migrations(&pool).await?; adapters_postgres::run_migrations(&pool).await?;
let nats_client = async_nats::connect(&config.nats_url).await?;
adapters_nats::ensure_stream(&nats_client).await?;
info!(nats_url = %config.nats_url, "NATS connected");
let repos = Repos::new(pool); let repos = Repos::new(pool);
let file_storage = Arc::new(adapters_storage::LocalFileStorage::new( let file_storage = Arc::new(adapters_storage::LocalFileStorage::new(
&config.storage_path, &config.storage_path,
)); ));
let sidecar_writer: Arc<dyn domain::ports::SidecarWriterPort> = Arc::new(LogSidecarWriter); let sidecar_writer: Arc<dyn domain::ports::SidecarWriterPort> = Arc::new(LogSidecarWriter);
let event_pub: Arc<dyn domain::ports::EventPublisher> = Arc::new(LogEventPublisher);
let transport = adapters_nats::NatsTransport::new(nats_client);
let event_pub: Arc<dyn domain::ports::EventPublisher> =
Arc::new(event_transport::EventPublisherAdapter::new(transport));
let registry = Arc::new(build_plugin_registry(&repos, file_storage, sidecar_writer)); let registry = Arc::new(build_plugin_registry(&repos, file_storage, sidecar_writer));
let process_next = build_process_next_handler(&repos, registry, event_pub); let process_next = build_process_next_handler(&repos, registry, event_pub);
@@ -49,19 +56,6 @@ async fn main() -> anyhow::Result<()> {
} }
} }
struct LogEventPublisher;
#[async_trait::async_trait]
impl domain::ports::EventPublisher for LogEventPublisher {
async fn publish(
&self,
event: &domain::events::DomainEvent,
) -> Result<(), domain::errors::DomainError> {
info!(event = ?event, "domain event");
Ok(())
}
}
struct LogSidecarWriter; struct LogSidecarWriter;
#[async_trait::async_trait] #[async_trait::async_trait]