diff --git a/crates/adapters/nats/src/lib.rs b/crates/adapters/nats/src/lib.rs index 3e44dae..eb24139 100644 --- a/crates/adapters/nats/src/lib.rs +++ b/crates/adapters/nats/src/lib.rs @@ -96,25 +96,28 @@ impl NatsMessageSource { impl MessageSource for NatsMessageSource { fn messages(&self) -> BoxStream<'_, Result> { + use futures::stream; + use tokio::sync::{mpsc, Mutex as TokioMutex}; + let js = self.jetstream.clone(); - Box::pin(async_stream::try_stream! { - // Ensure stream exists (idempotent). - js.get_or_create_stream(stream_config()) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + let (tx, rx) = mpsc::channel::>(128); - let stream = js - .get_stream(STREAM_NAME) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + // Spawn the consumer loop in the background. + // Pull consumer: worker explicitly fetches from NATS rather than NATS pushing. + tokio::spawn(async move { + let stream = match js.get_stream(STREAM_NAME).await { + Ok(s) => s, + Err(e) => { + let _ = tx.send(Err(DomainError::Internal(e.to_string()))).await; + return; + } + }; - // Durable push consumer — survives worker restarts. - let consumer = stream + let consumer = match stream .get_or_create_consumer( CONSUMER_NAME, - jetstream::consumer::push::Config { + jetstream::consumer::pull::Config { durable_name: Some(CONSUMER_NAME.to_string()), - deliver_subject: CONSUMER_NAME.to_string() + ".deliver", ack_policy: jetstream::consumer::AckPolicy::Explicit, ack_wait: std::time::Duration::from_secs(ACK_WAIT_SECS), max_deliver: MAX_DELIVER, @@ -122,45 +125,76 @@ impl MessageSource for NatsMessageSource { }, ) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + { + Ok(c) => c, + Err(e) => { + let _ = tx.send(Err(DomainError::Internal(e.to_string()))).await; + return; + } + }; - let mut messages = consumer - .messages() - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + tracing::info!("NATS pull consumer ready"); - use futures::StreamExt; - while let Some(result) = messages.next().await { - let msg = result.map_err(|e| DomainError::Internal(e.to_string()))?; - let subject = msg.subject.to_string(); - let payload = msg.payload.to_vec(); - - // Wrap in Arc so both closures can hold a reference. - let msg = Arc::new(msg); - let msg_nack = Arc::clone(&msg); - - yield RawMessage { - subject, - payload, - ack: Box::new(move || { - let m = Arc::clone(&msg); - tokio::spawn(async move { - if let Err(e) = m.ack().await { - tracing::warn!("NATS ack failed: {e}"); - } - }); - }), - nack: Box::new(move || { - let m = Arc::clone(&msg_nack); - tokio::spawn(async move { - if let Err(e) = m.ack_with(AckKind::Nak(None)).await { - tracing::warn!("NATS nak failed: {e}"); - } - }); - }), + loop { + let mut messages = match consumer.messages().await { + Ok(m) => m, + Err(e) => { + tracing::error!("NATS consumer.messages() failed: {e}"); + let _ = tx.send(Err(DomainError::Internal(e.to_string()))).await; + return; + } }; + + use futures::StreamExt; + while let Some(result) = messages.next().await { + let msg = match result { + Ok(m) => m, + Err(e) => { + tracing::warn!("NATS message error: {e}"); + continue; + } + }; + + let subject = msg.subject.to_string(); + let payload = msg.payload.to_vec(); + let msg = Arc::new(msg); + let msg_nack = Arc::clone(&msg); + + let raw = RawMessage { + subject, + payload, + ack: Box::new(move || { + let m = Arc::clone(&msg); + tokio::spawn(async move { + if let Err(e) = m.ack().await { + tracing::warn!("NATS ack failed: {e}"); + } + }); + }), + nack: Box::new(move || { + let m = Arc::clone(&msg_nack); + tokio::spawn(async move { + if let Err(e) = m.ack_with(AckKind::Nak(None)).await { + tracing::warn!("NATS nak failed: {e}"); + } + }); + }), + }; + + if tx.send(Ok(raw)).await.is_err() { + return; // receiver dropped — worker shutting down + } + } + // messages() stream ended (e.g. fetch timeout) — loop and restart } - }) + }); + + // Bridge the channel receiver into a BoxStream. + let rx = Arc::new(TokioMutex::new(rx)); + Box::pin(stream::unfold(rx, |rx| async move { + let item = rx.lock().await.recv().await?; + Some((item, rx)) + })) } }