From 16892007a3ba0a1cfb1f7b1d7435b2a234b58635 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Thu, 14 May 2026 23:25:01 +0200 Subject: [PATCH] =?UTF-8?q?fix(nats):=20use=20fetch().expires(30s)=20inste?= =?UTF-8?q?ad=20of=20messages()=20=E2=80=94=20without=20expires=20NATS=20r?= =?UTF-8?q?eturns=20empty=20immediately?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crates/adapters/nats/src/lib.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/crates/adapters/nats/src/lib.rs b/crates/adapters/nats/src/lib.rs index 268d9ba..bf253a2 100644 --- a/crates/adapters/nats/src/lib.rs +++ b/crates/adapters/nats/src/lib.rs @@ -137,10 +137,19 @@ impl MessageSource for NatsMessageSource { tracing::info!("NATS pull consumer ready"); loop { - let mut messages = match consumer.messages().await { + // fetch().expires() keeps the request open server-side until messages arrive + // (up to 30 s), then the batch ends and we loop. Without expires, the fetch + // returns immediately with zero messages when the queue is empty. + let mut messages = match consumer + .fetch() + .max_messages(100) + .expires(std::time::Duration::from_secs(30)) + .messages() + .await + { Ok(m) => m, Err(e) => { - tracing::error!("NATS consumer.messages() failed: {e}"); + tracing::error!("NATS fetch failed: {e}"); let _ = tx.send(Err(DomainError::Internal(e.to_string()))).await; return; }