feat(nats): migrate to JetStream — at-least-once delivery with durable consumer
Some checks failed
lint / lint (push) Has been cancelled
test / unit (push) Has been cancelled
test / integration (push) Has been cancelled
lint / lint (pull_request) Failing after 9m25s
test / unit (pull_request) Successful in 15m53s
test / integration (pull_request) Failing after 16m42s

This commit is contained in:
2026-05-14 16:41:17 +02:00
parent 550865bad4
commit 458feebcdd
4 changed files with 104 additions and 16 deletions

View File

@@ -20,7 +20,7 @@ services:
ports: ports:
- "4222:4222" - "4222:4222"
- "8222:8222" # monitoring endpoint - "8222:8222" # monitoring endpoint
command: ["--http_port", "8222"] command: ["--jetstream", "--http_port", "8222"]
volumes: volumes:
postgres_data: postgres_data:

View File

@@ -1,61 +1,143 @@
use async_nats::jetstream::{self, stream::Config as StreamConfig, AckKind};
use async_trait::async_trait; use async_trait::async_trait;
use domain::errors::DomainError; use domain::errors::DomainError;
use event_transport::{MessageSource, RawMessage, Transport}; use event_transport::{MessageSource, RawMessage, Transport};
use futures::stream::BoxStream; use futures::stream::BoxStream;
use std::sync::Arc;
// ── NatsTransport — raw NATS publish backend ──────────────────────────────── // Stream name and subjects used by both publisher and consumer.
const STREAM_NAME: &str = "THOUGHTS_EVENTS";
const STREAM_SUBJECTS: &[&str] = &[">"];
const CONSUMER_NAME: &str = "worker";
// Redelivery timeout: if a message is not acked within this time, NATS redelivers it.
const ACK_WAIT_SECS: u64 = 30;
// Maximum delivery attempts before the message goes to a dead-letter stream (if configured).
const MAX_DELIVER: i64 = 5;
fn stream_config() -> StreamConfig {
StreamConfig {
name: STREAM_NAME.to_string(),
subjects: STREAM_SUBJECTS.iter().map(|s| s.to_string()).collect(),
retention: jetstream::stream::RetentionPolicy::WorkQueue,
..Default::default()
}
}
/// Ensure the JetStream stream exists. Call once at startup before publishing or consuming.
/// Idempotent — safe to call from both bootstrap and worker factories.
pub async fn ensure_stream(client: &async_nats::Client) -> Result<(), DomainError> {
let js = jetstream::new(client.clone());
js.get_or_create_stream(stream_config())
.await
.map_err(|e| DomainError::Internal(format!("JetStream stream setup failed: {e}")))?;
Ok(())
}
// ── NatsTransport — JetStream publish ──────────────────────────────────────
pub struct NatsTransport { pub struct NatsTransport {
client: async_nats::Client, jetstream: jetstream::Context,
} }
impl NatsTransport { impl NatsTransport {
pub fn new(client: async_nats::Client) -> Self { pub fn new(client: async_nats::Client) -> Self {
Self { client } Self {
jetstream: jetstream::new(client),
}
} }
} }
#[async_trait] #[async_trait]
impl Transport for NatsTransport { impl Transport for NatsTransport {
async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), DomainError> { async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), DomainError> {
self.client self.jetstream
.publish(subject.to_string(), bytes.to_vec().into()) .publish(subject.to_string(), bytes.to_vec().into())
.await .await
.map_err(|e| DomainError::Internal(e.to_string())) .map_err(|e| DomainError::Internal(e.to_string()))?
.await // wait for server ack — confirms message is durably stored
.map_err(|e| DomainError::Internal(e.to_string()))?;
Ok(())
} }
} }
// ── NatsMessageSource — raw NATS subscribe backend ────────────────────────── // ── NatsMessageSource — JetStream durable push consumer ────────────────────
pub struct NatsMessageSource { pub struct NatsMessageSource {
client: async_nats::Client, jetstream: jetstream::Context,
} }
impl NatsMessageSource { impl NatsMessageSource {
pub fn new(client: async_nats::Client) -> Self { pub fn new(client: async_nats::Client) -> Self {
Self { client } Self {
jetstream: jetstream::new(client),
}
} }
} }
impl MessageSource for NatsMessageSource { impl MessageSource for NatsMessageSource {
fn messages(&self) -> BoxStream<'_, Result<RawMessage, DomainError>> { fn messages(&self) -> BoxStream<'_, Result<RawMessage, DomainError>> {
let client = self.client.clone(); let js = self.jetstream.clone();
Box::pin(async_stream::try_stream! { Box::pin(async_stream::try_stream! {
let mut sub = client // Ensure stream exists (idempotent).
.subscribe(">") js.get_or_create_stream(stream_config())
.await
.map_err(|e| DomainError::Internal(e.to_string()))?;
let stream = js
.get_stream(STREAM_NAME)
.await
.map_err(|e| DomainError::Internal(e.to_string()))?;
// Durable push consumer — survives worker restarts.
let consumer = stream
.get_or_create_consumer(
CONSUMER_NAME,
jetstream::consumer::push::Config {
durable_name: Some(CONSUMER_NAME.to_string()),
deliver_subject: CONSUMER_NAME.to_string() + ".deliver",
ack_policy: jetstream::consumer::AckPolicy::Explicit,
ack_wait: std::time::Duration::from_secs(ACK_WAIT_SECS),
max_deliver: MAX_DELIVER,
..Default::default()
},
)
.await
.map_err(|e| DomainError::Internal(e.to_string()))?;
let mut messages = consumer
.messages()
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_err(|e| DomainError::Internal(e.to_string()))?;
use futures::StreamExt; use futures::StreamExt;
while let Some(msg) = sub.next().await { while let Some(result) = messages.next().await {
let msg = result.map_err(|e| DomainError::Internal(e.to_string()))?;
let subject = msg.subject.to_string(); let subject = msg.subject.to_string();
let payload = msg.payload.to_vec(); let payload = msg.payload.to_vec();
// Basic NATS: at-most-once — ack/nack are no-ops.
// Wrap in Arc so both closures can hold a reference.
let msg = Arc::new(msg);
let msg_nack = Arc::clone(&msg);
yield RawMessage { yield RawMessage {
subject, subject,
payload, payload,
ack: Box::new(|| {}), ack: Box::new(move || {
nack: Box::new(|| {}), let m = Arc::clone(&msg);
tokio::spawn(async move {
if let Err(e) = m.ack().await {
tracing::warn!("NATS ack failed: {e}");
}
});
}),
nack: Box::new(move || {
let m = Arc::clone(&msg_nack);
tokio::spawn(async move {
if let Err(e) = m.ack_with(AckKind::Nak(None)).await {
tracing::warn!("NATS nak failed: {e}");
}
});
}),
}; };
} }
}) })

View File

@@ -44,6 +44,9 @@ pub async fn build(cfg: &Config) -> Infrastructure {
Some(url) => match async_nats::connect(url).await { Some(url) => match async_nats::connect(url).await {
Ok(client) => { Ok(client) => {
tracing::info!("Connected to NATS at {url}"); tracing::info!("Connected to NATS at {url}");
if let Err(e) = nats::ensure_stream(&client).await {
tracing::warn!("JetStream stream setup failed: {e} — events may be lost");
}
Arc::new(EventPublisherAdapter::new(NatsTransport::new(client))) Arc::new(EventPublisherAdapter::new(NatsTransport::new(client)))
} }
Err(e) => { Err(e) => {

View File

@@ -81,6 +81,9 @@ pub async fn build(
let nats_client = async_nats::connect(nats_url) let nats_client = async_nats::connect(nats_url)
.await .await
.expect("NATS connect failed"); .expect("NATS connect failed");
nats::ensure_stream(&nats_client)
.await
.expect("JetStream stream setup failed");
let consumer = let consumer =
event_transport::EventConsumerAdapter::new(nats::NatsMessageSource::new(nats_client)); event_transport::EventConsumerAdapter::new(nats::NatsMessageSource::new(nats_client));