use async_nats::jetstream::{self, stream::Config as StreamConfig, AckKind}; use async_trait::async_trait; use domain::errors::DomainError; use event_transport::{MessageSource, RawMessage, Transport}; use futures::stream::BoxStream; use std::sync::Arc; // Stream name and subjects used by both publisher and consumer. const STREAM_NAME: &str = "THOUGHTS_EVENTS"; // Explicit prefixes instead of ">" — NATS WorkQueue retention disallows // the catch-all ">" wildcard without also setting no_ack = true. const STREAM_SUBJECTS: &[&str] = &[ "thoughts.>", "likes.>", "boosts.>", "follows.>", "users.>", "federation.>", ]; const CONSUMER_NAME: &str = "worker"; // Redelivery timeout: if a message is not acked within this time, NATS redelivers it. const ACK_WAIT_SECS: u64 = 30; // Maximum delivery attempts before the message goes to a dead-letter stream (if configured). const MAX_DELIVER: i64 = 5; fn stream_config() -> StreamConfig { StreamConfig { name: STREAM_NAME.to_string(), subjects: STREAM_SUBJECTS.iter().map(|s| s.to_string()).collect(), retention: jetstream::stream::RetentionPolicy::WorkQueue, ..Default::default() } } /// Ensure the JetStream stream exists and has the current subject list. /// Idempotent — creates if absent, updates subjects if already present. pub async fn ensure_stream(client: &async_nats::Client) -> Result<(), DomainError> { let js = jetstream::new(client.clone()); // Try to update first (covers the case where stream exists with stale subjects). // Falls back to create if the stream doesn't exist yet. match js.update_stream(stream_config()).await { Ok(_) => { tracing::info!(subjects = ?STREAM_SUBJECTS, "JetStream stream updated"); Ok(()) } Err(e) => { tracing::warn!("JetStream stream update failed ({e}), falling back to get_or_create"); js.get_or_create_stream(stream_config()) .await .map(|_| ()) .map_err(|e| DomainError::Internal(format!("JetStream stream setup failed: {e}"))) } } } // ── NatsTransport — JetStream publish ────────────────────────────────────── pub struct NatsTransport { jetstream: jetstream::Context, } impl NatsTransport { pub fn new(client: async_nats::Client) -> Self { Self { jetstream: jetstream::new(client), } } } #[async_trait] impl Transport for NatsTransport { async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), DomainError> { self.jetstream .publish(subject.to_string(), bytes.to_vec().into()) .await .map_err(|e| DomainError::Internal(e.to_string()))? .await // wait for server ack — confirms message is durably stored .map_err(|e| DomainError::Internal(e.to_string()))?; Ok(()) } } // ── NatsMessageSource — JetStream durable push consumer ──────────────────── pub struct NatsMessageSource { jetstream: jetstream::Context, } impl NatsMessageSource { pub fn new(client: async_nats::Client) -> Self { Self { jetstream: jetstream::new(client), } } } impl MessageSource for NatsMessageSource { fn messages(&self) -> BoxStream<'_, Result> { use futures::stream; use tokio::sync::{mpsc, Mutex as TokioMutex}; let js = self.jetstream.clone(); let (tx, rx) = mpsc::channel::>(128); // Spawn the consumer loop in the background. // Pull consumer: worker explicitly fetches from NATS rather than NATS pushing. tokio::spawn(async move { let stream = match js.get_stream(STREAM_NAME).await { Ok(s) => s, Err(e) => { let _ = tx.send(Err(DomainError::Internal(e.to_string()))).await; return; } }; let consumer = match stream .get_or_create_consumer( CONSUMER_NAME, jetstream::consumer::pull::Config { durable_name: Some(CONSUMER_NAME.to_string()), ack_policy: jetstream::consumer::AckPolicy::Explicit, ack_wait: std::time::Duration::from_secs(ACK_WAIT_SECS), max_deliver: MAX_DELIVER, ..Default::default() }, ) .await { Ok(c) => c, Err(e) => { let _ = tx.send(Err(DomainError::Internal(e.to_string()))).await; return; } }; tracing::info!("NATS pull consumer ready"); loop { let mut messages = match consumer.messages().await { Ok(m) => m, Err(e) => { tracing::error!("NATS consumer.messages() failed: {e}"); let _ = tx.send(Err(DomainError::Internal(e.to_string()))).await; return; } }; use futures::StreamExt; while let Some(result) = messages.next().await { let msg = match result { Ok(m) => m, Err(e) => { tracing::warn!("NATS message error: {e}"); continue; } }; let subject = msg.subject.to_string(); let payload = msg.payload.to_vec(); let msg = Arc::new(msg); let msg_nack = Arc::clone(&msg); let raw = RawMessage { subject, payload, ack: Box::new(move || { let m = Arc::clone(&msg); tokio::spawn(async move { if let Err(e) = m.ack().await { tracing::warn!("NATS ack failed: {e}"); } }); }), nack: Box::new(move || { let m = Arc::clone(&msg_nack); tokio::spawn(async move { if let Err(e) = m.ack_with(AckKind::Nak(None)).await { tracing::warn!("NATS nak failed: {e}"); } }); }), }; if tx.send(Ok(raw)).await.is_err() { return; // receiver dropped — worker shutting down } } // messages() stream ended (e.g. fetch timeout) — loop and restart } }); // Bridge the channel receiver into a BoxStream. let rx = Arc::new(TokioMutex::new(rx)); Box::pin(stream::unfold(rx, |rx| async move { let item = rx.lock().await.recv().await?; Some((item, rx)) })) } } #[cfg(test)] mod tests { use super::*; use domain::{ events::DomainEvent, value_objects::{LikeId, ThoughtId, UserId}, }; use event_payload::EventPayload; #[test] fn payload_from_domain_event_has_correct_subject() { let event = DomainEvent::ThoughtCreated { thought_id: ThoughtId::new(), user_id: UserId::new(), in_reply_to_id: None, }; let payload = EventPayload::from(&event); assert_eq!(payload.subject(), "thoughts.created"); } #[test] fn domain_event_roundtrip_via_payload() { let uid = UserId::new(); let tid = ThoughtId::new(); let event = DomainEvent::LikeAdded { like_id: LikeId::new(), user_id: uid.clone(), thought_id: tid.clone(), }; let payload = EventPayload::from(&event); let back = DomainEvent::try_from(payload).unwrap(); if let DomainEvent::LikeAdded { user_id, thought_id, .. } = back { assert_eq!(user_id, uid); assert_eq!(thought_id, tid); } else { panic!("wrong variant"); } } }