From 4d2d56c8ae26ff7f8595c5808eccc55039ad3dba Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Thu, 14 May 2026 23:34:51 +0200 Subject: [PATCH] =?UTF-8?q?fix(nats):=20revert=20to=20consumer.messages()?= =?UTF-8?q?=20=E2=80=94=20fetch()=20defaults=20no=5Fwait:true=20which=20sk?= =?UTF-8?q?ips=20empty=20queues?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crates/adapters/nats/src/lib.rs | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/crates/adapters/nats/src/lib.rs b/crates/adapters/nats/src/lib.rs index 2b65d18..cc3b749 100644 --- a/crates/adapters/nats/src/lib.rs +++ b/crates/adapters/nats/src/lib.rs @@ -139,19 +139,14 @@ impl MessageSource for NatsMessageSource { tracing::info!("NATS pull consumer ready"); loop { - // fetch().expires() keeps the request open server-side until messages arrive - // (up to 30 s), then the batch ends and we loop. Without expires, the fetch - // returns immediately with zero messages when the queue is empty. - let mut messages = match consumer - .fetch() - .max_messages(100) - .expires(std::time::Duration::from_secs(30)) - .messages() - .await - { + // consumer.messages() uses long-poll (no no_wait flag) — NATS holds the + // request open and delivers messages as they arrive. + // fetch() in async-nats 0.48 defaults to no_wait:true which returns + // immediately when the queue is empty, so we avoid it here. + let mut messages = match consumer.messages().await { Ok(m) => m, Err(e) => { - tracing::error!("NATS fetch failed: {e}"); + tracing::error!("NATS messages() failed: {e}"); let _ = tx.send(Err(DomainError::Internal(e.to_string()))).await; return; }