From 458feebcdda9200990684b750c34179867baa026 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Thu, 14 May 2026 16:41:17 +0200 Subject: [PATCH] =?UTF-8?q?feat(nats):=20migrate=20to=20JetStream=20?= =?UTF-8?q?=E2=80=94=20at-least-once=20delivery=20with=20durable=20consume?= =?UTF-8?q?r?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- compose.yml | 2 +- crates/adapters/nats/src/lib.rs | 112 +++++++++++++++++++++++++++----- crates/bootstrap/src/factory.rs | 3 + crates/worker/src/factory.rs | 3 + 4 files changed, 104 insertions(+), 16 deletions(-) diff --git a/compose.yml b/compose.yml index 2ac351d..6a2b94e 100644 --- a/compose.yml +++ b/compose.yml @@ -20,7 +20,7 @@ services: ports: - "4222:4222" - "8222:8222" # monitoring endpoint - command: ["--http_port", "8222"] + command: ["--jetstream", "--http_port", "8222"] volumes: postgres_data: diff --git a/crates/adapters/nats/src/lib.rs b/crates/adapters/nats/src/lib.rs index 677ea89..f240d87 100644 --- a/crates/adapters/nats/src/lib.rs +++ b/crates/adapters/nats/src/lib.rs @@ -1,61 +1,143 @@ +use async_nats::jetstream::{self, stream::Config as StreamConfig, AckKind}; use async_trait::async_trait; use domain::errors::DomainError; use event_transport::{MessageSource, RawMessage, Transport}; use futures::stream::BoxStream; +use std::sync::Arc; -// ── NatsTransport — raw NATS publish backend ──────────────────────────────── +// Stream name and subjects used by both publisher and consumer. +const STREAM_NAME: &str = "THOUGHTS_EVENTS"; +const STREAM_SUBJECTS: &[&str] = &[">"]; +const CONSUMER_NAME: &str = "worker"; +// Redelivery timeout: if a message is not acked within this time, NATS redelivers it. +const ACK_WAIT_SECS: u64 = 30; +// Maximum delivery attempts before the message goes to a dead-letter stream (if configured). +const MAX_DELIVER: i64 = 5; + +fn stream_config() -> StreamConfig { + StreamConfig { + name: STREAM_NAME.to_string(), + subjects: STREAM_SUBJECTS.iter().map(|s| s.to_string()).collect(), + retention: jetstream::stream::RetentionPolicy::WorkQueue, + ..Default::default() + } +} + +/// Ensure the JetStream stream exists. Call once at startup before publishing or consuming. +/// Idempotent — safe to call from both bootstrap and worker factories. +pub async fn ensure_stream(client: &async_nats::Client) -> Result<(), DomainError> { + let js = jetstream::new(client.clone()); + js.get_or_create_stream(stream_config()) + .await + .map_err(|e| DomainError::Internal(format!("JetStream stream setup failed: {e}")))?; + Ok(()) +} + +// ── NatsTransport — JetStream publish ────────────────────────────────────── pub struct NatsTransport { - client: async_nats::Client, + jetstream: jetstream::Context, } impl NatsTransport { pub fn new(client: async_nats::Client) -> Self { - Self { client } + Self { + jetstream: jetstream::new(client), + } } } #[async_trait] impl Transport for NatsTransport { async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), DomainError> { - self.client + self.jetstream .publish(subject.to_string(), bytes.to_vec().into()) .await - .map_err(|e| DomainError::Internal(e.to_string())) + .map_err(|e| DomainError::Internal(e.to_string()))? + .await // wait for server ack — confirms message is durably stored + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) } } -// ── NatsMessageSource — raw NATS subscribe backend ────────────────────────── +// ── NatsMessageSource — JetStream durable push consumer ──────────────────── pub struct NatsMessageSource { - client: async_nats::Client, + jetstream: jetstream::Context, } impl NatsMessageSource { pub fn new(client: async_nats::Client) -> Self { - Self { client } + Self { + jetstream: jetstream::new(client), + } } } impl MessageSource for NatsMessageSource { fn messages(&self) -> BoxStream<'_, Result> { - let client = self.client.clone(); + let js = self.jetstream.clone(); Box::pin(async_stream::try_stream! { - let mut sub = client - .subscribe(">") + // Ensure stream exists (idempotent). + js.get_or_create_stream(stream_config()) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + let stream = js + .get_stream(STREAM_NAME) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + // Durable push consumer — survives worker restarts. + let consumer = stream + .get_or_create_consumer( + CONSUMER_NAME, + jetstream::consumer::push::Config { + durable_name: Some(CONSUMER_NAME.to_string()), + deliver_subject: CONSUMER_NAME.to_string() + ".deliver", + ack_policy: jetstream::consumer::AckPolicy::Explicit, + ack_wait: std::time::Duration::from_secs(ACK_WAIT_SECS), + max_deliver: MAX_DELIVER, + ..Default::default() + }, + ) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + let mut messages = consumer + .messages() .await .map_err(|e| DomainError::Internal(e.to_string()))?; use futures::StreamExt; - while let Some(msg) = sub.next().await { + while let Some(result) = messages.next().await { + let msg = result.map_err(|e| DomainError::Internal(e.to_string()))?; let subject = msg.subject.to_string(); let payload = msg.payload.to_vec(); - // Basic NATS: at-most-once — ack/nack are no-ops. + + // Wrap in Arc so both closures can hold a reference. + let msg = Arc::new(msg); + let msg_nack = Arc::clone(&msg); + yield RawMessage { subject, payload, - ack: Box::new(|| {}), - nack: Box::new(|| {}), + ack: Box::new(move || { + let m = Arc::clone(&msg); + tokio::spawn(async move { + if let Err(e) = m.ack().await { + tracing::warn!("NATS ack failed: {e}"); + } + }); + }), + nack: Box::new(move || { + let m = Arc::clone(&msg_nack); + tokio::spawn(async move { + if let Err(e) = m.ack_with(AckKind::Nak(None)).await { + tracing::warn!("NATS nak failed: {e}"); + } + }); + }), }; } }) diff --git a/crates/bootstrap/src/factory.rs b/crates/bootstrap/src/factory.rs index 211a8e3..dd85510 100644 --- a/crates/bootstrap/src/factory.rs +++ b/crates/bootstrap/src/factory.rs @@ -44,6 +44,9 @@ pub async fn build(cfg: &Config) -> Infrastructure { Some(url) => match async_nats::connect(url).await { Ok(client) => { tracing::info!("Connected to NATS at {url}"); + if let Err(e) = nats::ensure_stream(&client).await { + tracing::warn!("JetStream stream setup failed: {e} — events may be lost"); + } Arc::new(EventPublisherAdapter::new(NatsTransport::new(client))) } Err(e) => { diff --git a/crates/worker/src/factory.rs b/crates/worker/src/factory.rs index cc7e52c..da47251 100644 --- a/crates/worker/src/factory.rs +++ b/crates/worker/src/factory.rs @@ -81,6 +81,9 @@ pub async fn build( let nats_client = async_nats::connect(nats_url) .await .expect("NATS connect failed"); + nats::ensure_stream(&nats_client) + .await + .expect("JetStream stream setup failed"); let consumer = event_transport::EventConsumerAdapter::new(nats::NatsMessageSource::new(nats_client));