From d022cb9068ee7abb41987b3285a8a2fc54a6436e Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Sun, 31 May 2026 18:31:53 +0200 Subject: [PATCH] feat: event-driven job dispatch via NATS subscription with 60s fallback sweep --- crates/worker/Cargo.toml | 1 + crates/worker/src/config.rs | 6 +-- crates/worker/src/main.rs | 85 +++++++++++++++++++++++++++++++------ 3 files changed, 76 insertions(+), 16 deletions(-) diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index 779e191..ad85ae4 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -17,6 +17,7 @@ adapters-nats = { workspace = true } event-transport = { workspace = true } async-nats = { workspace = true } +futures = { workspace = true } tokio = { workspace = true } anyhow = { workspace = true } tracing = { workspace = true } diff --git a/crates/worker/src/config.rs b/crates/worker/src/config.rs index a8776a3..b45468d 100644 --- a/crates/worker/src/config.rs +++ b/crates/worker/src/config.rs @@ -2,7 +2,7 @@ pub struct WorkerConfig { pub database_url: String, pub nats_url: String, - pub poll_interval_secs: u64, + pub fallback_sweep_secs: u64, pub storage_path: String, } @@ -12,10 +12,10 @@ impl WorkerConfig { Self { database_url: std::env::var("DATABASE_URL").expect("DATABASE_URL must be set"), nats_url: std::env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".into()), - poll_interval_secs: std::env::var("POLL_INTERVAL_SECS") + fallback_sweep_secs: std::env::var("FALLBACK_SWEEP_SECS") .ok() .and_then(|v| v.parse().ok()) - .unwrap_or(5), + .unwrap_or(60), storage_path: std::env::var("STORAGE_PATH").unwrap_or_else(|_| "./storage".into()), } } diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index a79a8fc..106bc1d 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -1,13 +1,18 @@ use std::sync::Arc; use std::time::Duration; -use tracing::{error, info}; + +use futures::StreamExt; +use tracing::{error, info, warn}; + +use application::processing::ProcessNextJobCommand; +use domain::events::DomainEvent; +use domain::ports::EventConsumer; mod config; mod factories; mod plugin_registry; mod plugins; -use application::processing::ProcessNextJobCommand; use factories::{Repos, build_plugin_registry, build_process_next_handler}; #[tokio::main] @@ -34,26 +39,80 @@ async fn main() -> anyhow::Result<()> { )); let sidecar_writer: Arc = Arc::new(LogSidecarWriter); - let transport = adapters_nats::NatsTransport::new(nats_client); + // Publisher transport consumes a client clone; the consumer gets another. + let pub_transport = adapters_nats::NatsTransport::new(nats_client.clone()); let event_pub: Arc = - Arc::new(event_transport::EventPublisherAdapter::new(transport)); + Arc::new(event_transport::EventPublisherAdapter::new(pub_transport)); let registry = Arc::new(build_plugin_registry(&repos, file_storage, sidecar_writer)); - let process_next = build_process_next_handler(&repos, registry, event_pub); + let process_next = Arc::new(build_process_next_handler(&repos, registry, event_pub)); - let poll_interval = Duration::from_secs(config.poll_interval_secs); - info!(poll_secs = config.poll_interval_secs, "Worker running"); + // ── Fallback sweep task ──────────────────────────────────────────── + let sweep_interval = Duration::from_secs(config.fallback_sweep_secs); + let sweep_handler = Arc::clone(&process_next); + tokio::spawn(async move { + info!(every_secs = config.fallback_sweep_secs, "fallback sweep task started"); + loop { + tokio::time::sleep(sweep_interval).await; + info!("fallback sweep: draining queued jobs"); + loop { + match sweep_handler.execute(ProcessNextJobCommand).await { + Ok(Some(job)) => { + info!(job_id = %job.job_id, status = ?job.status, "sweep: processed job"); + } + Ok(None) => break, + Err(e) => { + error!(error = %e, "sweep: error processing job"); + break; + } + } + } + } + }); - loop { - match process_next.execute(ProcessNextJobCommand).await { - Ok(Some(job)) => info!(job_id = %job.job_id, status = ?job.status, "processed job"), - Ok(None) => tokio::time::sleep(poll_interval).await, + // ── Event-driven loop via NATS ───────────────────────────────────── + let consumer_source = adapters_nats::NatsMessageSource::new(nats_client); + let event_consumer = event_transport::EventConsumerAdapter::new(consumer_source); + + info!("event loop: listening for NATS events"); + let mut stream = event_consumer.consume(); + + while let Some(result) = stream.next().await { + let envelope = match result { + Ok(env) => env, Err(e) => { - error!(error = %e, "worker error"); - tokio::time::sleep(poll_interval).await; + error!(error = %e, "event loop: consumer error"); + continue; + } + }; + + match &envelope.event { + DomainEvent::JobEnqueued { + job_id, job_type, .. + } => { + info!(job_id = %job_id, job_type = %job_type, "event loop: JobEnqueued received"); + (envelope.ack)(); + match process_next.execute(ProcessNextJobCommand).await { + Ok(Some(job)) => { + info!(job_id = %job.job_id, status = ?job.status, "event loop: processed job"); + } + Ok(None) => { + warn!("event loop: JobEnqueued event but no queued job found"); + } + Err(e) => { + error!(error = %e, "event loop: error processing job"); + } + } + } + other => { + info!(event = ?other, "event loop: non-job event, acking"); + (envelope.ack)(); } } } + + error!("event loop: NATS stream ended unexpectedly"); + Ok(()) } struct LogSidecarWriter;