feat: event-driven job dispatch via NATS subscription with 60s fallback sweep
This commit is contained in:
@@ -17,6 +17,7 @@ adapters-nats = { workspace = true }
|
||||
event-transport = { workspace = true }
|
||||
async-nats = { workspace = true }
|
||||
|
||||
futures = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
anyhow = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
pub struct WorkerConfig {
|
||||
pub database_url: String,
|
||||
pub nats_url: String,
|
||||
pub poll_interval_secs: u64,
|
||||
pub fallback_sweep_secs: u64,
|
||||
pub storage_path: String,
|
||||
}
|
||||
|
||||
@@ -12,10 +12,10 @@ impl WorkerConfig {
|
||||
Self {
|
||||
database_url: std::env::var("DATABASE_URL").expect("DATABASE_URL must be set"),
|
||||
nats_url: std::env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".into()),
|
||||
poll_interval_secs: std::env::var("POLL_INTERVAL_SECS")
|
||||
fallback_sweep_secs: std::env::var("FALLBACK_SWEEP_SECS")
|
||||
.ok()
|
||||
.and_then(|v| v.parse().ok())
|
||||
.unwrap_or(5),
|
||||
.unwrap_or(60),
|
||||
storage_path: std::env::var("STORAGE_PATH").unwrap_or_else(|_| "./storage".into()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,13 +1,18 @@
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tracing::{error, info};
|
||||
|
||||
use futures::StreamExt;
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
use application::processing::ProcessNextJobCommand;
|
||||
use domain::events::DomainEvent;
|
||||
use domain::ports::EventConsumer;
|
||||
|
||||
mod config;
|
||||
mod factories;
|
||||
mod plugin_registry;
|
||||
mod plugins;
|
||||
|
||||
use application::processing::ProcessNextJobCommand;
|
||||
use factories::{Repos, build_plugin_registry, build_process_next_handler};
|
||||
|
||||
#[tokio::main]
|
||||
@@ -34,26 +39,80 @@ async fn main() -> anyhow::Result<()> {
|
||||
));
|
||||
let sidecar_writer: Arc<dyn domain::ports::SidecarWriterPort> = Arc::new(LogSidecarWriter);
|
||||
|
||||
let transport = adapters_nats::NatsTransport::new(nats_client);
|
||||
// Publisher transport consumes a client clone; the consumer gets another.
|
||||
let pub_transport = adapters_nats::NatsTransport::new(nats_client.clone());
|
||||
let event_pub: Arc<dyn domain::ports::EventPublisher> =
|
||||
Arc::new(event_transport::EventPublisherAdapter::new(transport));
|
||||
Arc::new(event_transport::EventPublisherAdapter::new(pub_transport));
|
||||
|
||||
let registry = Arc::new(build_plugin_registry(&repos, file_storage, sidecar_writer));
|
||||
let process_next = build_process_next_handler(&repos, registry, event_pub);
|
||||
|
||||
let poll_interval = Duration::from_secs(config.poll_interval_secs);
|
||||
info!(poll_secs = config.poll_interval_secs, "Worker running");
|
||||
let process_next = Arc::new(build_process_next_handler(&repos, registry, event_pub));
|
||||
|
||||
// ── Fallback sweep task ────────────────────────────────────────────
|
||||
let sweep_interval = Duration::from_secs(config.fallback_sweep_secs);
|
||||
let sweep_handler = Arc::clone(&process_next);
|
||||
tokio::spawn(async move {
|
||||
info!(every_secs = config.fallback_sweep_secs, "fallback sweep task started");
|
||||
loop {
|
||||
match process_next.execute(ProcessNextJobCommand).await {
|
||||
Ok(Some(job)) => info!(job_id = %job.job_id, status = ?job.status, "processed job"),
|
||||
Ok(None) => tokio::time::sleep(poll_interval).await,
|
||||
tokio::time::sleep(sweep_interval).await;
|
||||
info!("fallback sweep: draining queued jobs");
|
||||
loop {
|
||||
match sweep_handler.execute(ProcessNextJobCommand).await {
|
||||
Ok(Some(job)) => {
|
||||
info!(job_id = %job.job_id, status = ?job.status, "sweep: processed job");
|
||||
}
|
||||
Ok(None) => break,
|
||||
Err(e) => {
|
||||
error!(error = %e, "worker error");
|
||||
tokio::time::sleep(poll_interval).await;
|
||||
error!(error = %e, "sweep: error processing job");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// ── Event-driven loop via NATS ─────────────────────────────────────
|
||||
let consumer_source = adapters_nats::NatsMessageSource::new(nats_client);
|
||||
let event_consumer = event_transport::EventConsumerAdapter::new(consumer_source);
|
||||
|
||||
info!("event loop: listening for NATS events");
|
||||
let mut stream = event_consumer.consume();
|
||||
|
||||
while let Some(result) = stream.next().await {
|
||||
let envelope = match result {
|
||||
Ok(env) => env,
|
||||
Err(e) => {
|
||||
error!(error = %e, "event loop: consumer error");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
match &envelope.event {
|
||||
DomainEvent::JobEnqueued {
|
||||
job_id, job_type, ..
|
||||
} => {
|
||||
info!(job_id = %job_id, job_type = %job_type, "event loop: JobEnqueued received");
|
||||
(envelope.ack)();
|
||||
match process_next.execute(ProcessNextJobCommand).await {
|
||||
Ok(Some(job)) => {
|
||||
info!(job_id = %job.job_id, status = ?job.status, "event loop: processed job");
|
||||
}
|
||||
Ok(None) => {
|
||||
warn!("event loop: JobEnqueued event but no queued job found");
|
||||
}
|
||||
Err(e) => {
|
||||
error!(error = %e, "event loop: error processing job");
|
||||
}
|
||||
}
|
||||
}
|
||||
other => {
|
||||
info!(event = ?other, "event loop: non-job event, acking");
|
||||
(envelope.ack)();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
error!("event loop: NATS stream ended unexpectedly");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct LogSidecarWriter;
|
||||
|
||||
Reference in New Issue
Block a user