use async_nats::jetstream::{self, stream::Config as StreamConfig, AckKind}; use async_trait::async_trait; use domain::errors::DomainError; use event_transport::{MessageSource, RawMessage, Transport}; use futures::stream::BoxStream; use std::sync::Arc; // Stream name and subjects used by both publisher and consumer. const STREAM_NAME: &str = "THOUGHTS_EVENTS"; // Explicit prefixes instead of ">" — NATS WorkQueue retention disallows // the catch-all ">" wildcard without also setting no_ack = true. const STREAM_SUBJECTS: &[&str] = &["thoughts.>", "likes.>", "boosts.>", "follows.>", "users.>"]; const CONSUMER_NAME: &str = "worker"; // Redelivery timeout: if a message is not acked within this time, NATS redelivers it. const ACK_WAIT_SECS: u64 = 30; // Maximum delivery attempts before the message goes to a dead-letter stream (if configured). const MAX_DELIVER: i64 = 5; fn stream_config() -> StreamConfig { StreamConfig { name: STREAM_NAME.to_string(), subjects: STREAM_SUBJECTS.iter().map(|s| s.to_string()).collect(), retention: jetstream::stream::RetentionPolicy::WorkQueue, ..Default::default() } } /// Ensure the JetStream stream exists. Call once at startup before publishing or consuming. /// Idempotent — safe to call from both bootstrap and worker factories. pub async fn ensure_stream(client: &async_nats::Client) -> Result<(), DomainError> { let js = jetstream::new(client.clone()); js.get_or_create_stream(stream_config()) .await .map_err(|e| DomainError::Internal(format!("JetStream stream setup failed: {e}")))?; Ok(()) } // ── NatsTransport — JetStream publish ────────────────────────────────────── pub struct NatsTransport { jetstream: jetstream::Context, } impl NatsTransport { pub fn new(client: async_nats::Client) -> Self { Self { jetstream: jetstream::new(client), } } } #[async_trait] impl Transport for NatsTransport { async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), DomainError> { self.jetstream .publish(subject.to_string(), bytes.to_vec().into()) .await .map_err(|e| DomainError::Internal(e.to_string()))? .await // wait for server ack — confirms message is durably stored .map_err(|e| DomainError::Internal(e.to_string()))?; Ok(()) } } // ── NatsMessageSource — JetStream durable push consumer ──────────────────── pub struct NatsMessageSource { jetstream: jetstream::Context, } impl NatsMessageSource { pub fn new(client: async_nats::Client) -> Self { Self { jetstream: jetstream::new(client), } } } impl MessageSource for NatsMessageSource { fn messages(&self) -> BoxStream<'_, Result> { let js = self.jetstream.clone(); Box::pin(async_stream::try_stream! { // Ensure stream exists (idempotent). js.get_or_create_stream(stream_config()) .await .map_err(|e| DomainError::Internal(e.to_string()))?; let stream = js .get_stream(STREAM_NAME) .await .map_err(|e| DomainError::Internal(e.to_string()))?; // Durable push consumer — survives worker restarts. let consumer = stream .get_or_create_consumer( CONSUMER_NAME, jetstream::consumer::push::Config { durable_name: Some(CONSUMER_NAME.to_string()), deliver_subject: CONSUMER_NAME.to_string() + ".deliver", ack_policy: jetstream::consumer::AckPolicy::Explicit, ack_wait: std::time::Duration::from_secs(ACK_WAIT_SECS), max_deliver: MAX_DELIVER, ..Default::default() }, ) .await .map_err(|e| DomainError::Internal(e.to_string()))?; let mut messages = consumer .messages() .await .map_err(|e| DomainError::Internal(e.to_string()))?; use futures::StreamExt; while let Some(result) = messages.next().await { let msg = result.map_err(|e| DomainError::Internal(e.to_string()))?; let subject = msg.subject.to_string(); let payload = msg.payload.to_vec(); // Wrap in Arc so both closures can hold a reference. let msg = Arc::new(msg); let msg_nack = Arc::clone(&msg); yield RawMessage { subject, payload, ack: Box::new(move || { let m = Arc::clone(&msg); tokio::spawn(async move { if let Err(e) = m.ack().await { tracing::warn!("NATS ack failed: {e}"); } }); }), nack: Box::new(move || { let m = Arc::clone(&msg_nack); tokio::spawn(async move { if let Err(e) = m.ack_with(AckKind::Nak(None)).await { tracing::warn!("NATS nak failed: {e}"); } }); }), }; } }) } } #[cfg(test)] mod tests { use super::*; use domain::{ events::DomainEvent, value_objects::{LikeId, ThoughtId, UserId}, }; use event_payload::EventPayload; #[test] fn payload_from_domain_event_has_correct_subject() { let event = DomainEvent::ThoughtCreated { thought_id: ThoughtId::new(), user_id: UserId::new(), in_reply_to_id: None, }; let payload = EventPayload::from(&event); assert_eq!(payload.subject(), "thoughts.created"); } #[test] fn domain_event_roundtrip_via_payload() { let uid = UserId::new(); let tid = ThoughtId::new(); let event = DomainEvent::LikeAdded { like_id: LikeId::new(), user_id: uid.clone(), thought_id: tid.clone(), }; let payload = EventPayload::from(&event); let back = DomainEvent::try_from(payload).unwrap(); if let DomainEvent::LikeAdded { user_id, thought_id, .. } = back { assert_eq!(user_id, uid); assert_eq!(thought_id, tid); } else { panic!("wrong variant"); } } }