use async_nats::{ Client, jetstream::{self, consumer::pull, message::AckKind, stream::Config as StreamConfig}, }; use async_trait::async_trait; use domain::{ errors::DomainError, events::{AckHandle, DomainEvent, EventEnvelope}, ports::EventConsumer, }; use futures::{ StreamExt, stream::{self, BoxStream}, }; use std::sync::Arc; use tokio::sync::{Mutex, mpsc}; use crate::{config::NatsConfig, payload::NatsEventPayload, subject::consumer_subject_filter}; // ── JetStream ack handle ───────────────────────────────────────────────────── struct NatsJetStreamAckHandle { message: async_nats::jetstream::Message, } #[async_trait] impl AckHandle for NatsJetStreamAckHandle { async fn ack(&self) -> Result<(), DomainError> { tracing::debug!( "acknowledging message with sequence {}", self.message.info().unwrap().stream_sequence ); self.message .ack() .await .map_err(|e| DomainError::InfrastructureError(e.to_string())) } async fn nack(&self) -> Result<(), DomainError> { tracing::debug!( "negatively acknowledging message with sequence {}", self.message.info().unwrap().stream_sequence ); self.message .ack_with(AckKind::Nak(None)) .await .map_err(|e| DomainError::InfrastructureError(e.to_string())) } } // ── Core NATS ack handle (no-op) ───────────────────────────────────────────── struct NoopAck; #[async_trait] impl AckHandle for NoopAck { async fn ack(&self) -> Result<(), DomainError> { Ok(()) } async fn nack(&self) -> Result<(), DomainError> { Ok(()) } } // ── Envelope construction helpers ──────────────────────────────────────────── fn decode_js(msg: async_nats::jetstream::Message) -> Result { let payload: NatsEventPayload = serde_json::from_slice(&msg.payload) .map_err(|e| DomainError::InfrastructureError(format!("deserialize: {e}")))?; let event = DomainEvent::try_from(payload)?; Ok(EventEnvelope::new( event, Box::new(NatsJetStreamAckHandle { message: msg }), )) } fn decode_core(msg: async_nats::Message) -> Result { let payload: NatsEventPayload = serde_json::from_slice(&msg.payload) .map_err(|e| DomainError::InfrastructureError(format!("deserialize: {e}")))?; let event = DomainEvent::try_from(payload)?; Ok(EventEnvelope::new(event, Box::new(NoopAck))) } // ── Channel-bridge shared by both consumers ────────────────────────────────── type EnvelopeRx = Arc>>>; fn consume_from_rx(rx: EnvelopeRx) -> BoxStream<'static, Result> { Box::pin(stream::unfold(rx, |rx| async move { let item = rx.lock().await.recv().await?; Some((item, rx)) })) } // ── JetStream consumer ──────────────────────────────────────────────────────── pub struct NatsJetStreamConsumer { rx: EnvelopeRx, } impl NatsJetStreamConsumer { pub async fn create(cfg: &NatsConfig, client: Client) -> anyhow::Result { let js = jetstream::new(client); let stream = js .get_or_create_stream(StreamConfig { name: cfg.stream_name.clone(), subjects: vec![consumer_subject_filter(&cfg.subject_prefix)], max_messages: 100_000, ..Default::default() }) .await?; let subject_filter = consumer_subject_filter(&cfg.subject_prefix); let consumer = stream .get_or_create_consumer( cfg.consumer_name.as_str(), pull::Config { durable_name: Some(cfg.consumer_name.clone()), filter_subject: subject_filter, ack_wait: std::time::Duration::from_secs(600), ..Default::default() }, ) .await?; let (tx, rx) = mpsc::channel(128); tokio::spawn(async move { loop { let mut messages = match consumer.messages().await { Err(e) => { tracing::error!("failed to fetch messages: {}", e); let _ = tx .send(Err(DomainError::InfrastructureError(e.to_string()))) .await; return; } Ok(m) => m, }; while let Some(result) = messages.next().await { let envelope = result .map_err(|e| DomainError::InfrastructureError(e.to_string())) .and_then(decode_js); if tx.send(envelope).await.is_err() { tracing::info!("consumer channel closed, stopping message processing"); return; } tracing::debug!("message sent to consumer channel"); } // messages() stream ended (fetch expired in strict mode) — restart } }); Ok(Self { rx: Arc::new(Mutex::new(rx)), }) } } impl EventConsumer for NatsJetStreamConsumer { fn consume(&self) -> BoxStream<'_, Result> { consume_from_rx(Arc::clone(&self.rx)) } } // ── Core NATS consumer ──────────────────────────────────────────────────────── pub struct NatsCoreConsumer { rx: EnvelopeRx, } impl NatsCoreConsumer { pub async fn create(cfg: &NatsConfig, client: Client) -> anyhow::Result { let subject = consumer_subject_filter(&cfg.subject_prefix); let mut subscriber = client.subscribe(subject).await?; let (tx, rx) = mpsc::channel(128); tokio::spawn(async move { while let Some(msg) = subscriber.next().await { let envelope = decode_core(msg); tracing::debug!("message received and decoded, sending to consumer channel"); if tx.send(envelope).await.is_err() { tracing::info!("consumer channel closed, stopping message processing"); break; } } }); Ok(Self { rx: Arc::new(Mutex::new(rx)), }) } } impl EventConsumer for NatsCoreConsumer { fn consume(&self) -> BoxStream<'_, Result> { consume_from_rx(Arc::clone(&self.rx)) } } fn _assert_send_sync() { fn check() {} check::(); check::(); }