diff --git a/crates/adapters/nats/src/lib.rs b/crates/adapters/nats/src/lib.rs index 2b65d18..cc3b749 100644 --- a/crates/adapters/nats/src/lib.rs +++ b/crates/adapters/nats/src/lib.rs @@ -139,19 +139,14 @@ impl MessageSource for NatsMessageSource { tracing::info!("NATS pull consumer ready"); loop { - // fetch().expires() keeps the request open server-side until messages arrive - // (up to 30 s), then the batch ends and we loop. Without expires, the fetch - // returns immediately with zero messages when the queue is empty. - let mut messages = match consumer - .fetch() - .max_messages(100) - .expires(std::time::Duration::from_secs(30)) - .messages() - .await - { + // consumer.messages() uses long-poll (no no_wait flag) — NATS holds the + // request open and delivers messages as they arrive. + // fetch() in async-nats 0.48 defaults to no_wait:true which returns + // immediately when the queue is empty, so we avoid it here. + let mut messages = match consumer.messages().await { Ok(m) => m, Err(e) => { - tracing::error!("NATS fetch failed: {e}"); + tracing::error!("NATS messages() failed: {e}"); let _ = tx.send(Err(DomainError::Internal(e.to_string()))).await; return; }