diff --git a/crates/adapters/nats/src/lib.rs b/crates/adapters/nats/src/lib.rs index 268d9ba..bf253a2 100644 --- a/crates/adapters/nats/src/lib.rs +++ b/crates/adapters/nats/src/lib.rs @@ -137,10 +137,19 @@ impl MessageSource for NatsMessageSource { tracing::info!("NATS pull consumer ready"); loop { - let mut messages = match consumer.messages().await { + // fetch().expires() keeps the request open server-side until messages arrive + // (up to 30 s), then the batch ends and we loop. Without expires, the fetch + // returns immediately with zero messages when the queue is empty. + let mut messages = match consumer + .fetch() + .max_messages(100) + .expires(std::time::Duration::from_secs(30)) + .messages() + .await + { Ok(m) => m, Err(e) => { - tracing::error!("NATS consumer.messages() failed: {e}"); + tracing::error!("NATS fetch failed: {e}"); let _ = tx.send(Err(DomainError::Internal(e.to_string()))).await; return; }