use async_trait::async_trait; use domain::{ errors::DomainError, events::{DomainEvent, EventEnvelope}, ports::EventConsumer, }; use event_payload::EventPayload; use event_publisher::Transport; use futures::stream::BoxStream; // ── NatsTransport — raw NATS publish backend ──────────────────────────────── pub struct NatsTransport { client: async_nats::Client, } impl NatsTransport { pub fn new(client: async_nats::Client) -> Self { Self { client } } } #[async_trait] impl Transport for NatsTransport { async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), DomainError> { self.client .publish(subject.to_string(), bytes.to_vec().into()) .await .map_err(|e| DomainError::Internal(e.to_string())) } } // ── NatsEventConsumer — subscribes and yields EventEnvelopes ──────────────── pub struct NatsEventConsumer { client: async_nats::Client, } impl NatsEventConsumer { pub fn new(client: async_nats::Client) -> Self { Self { client } } } impl EventConsumer for NatsEventConsumer { fn consume(&self) -> BoxStream<'_, Result> { let client = self.client.clone(); Box::pin(async_stream::try_stream! { let mut sub = client .subscribe(">") .await .map_err(|e| DomainError::Internal(e.to_string()))?; use futures::StreamExt; while let Some(msg) = sub.next().await { let payload = match serde_json::from_slice::(&msg.payload) { Ok(p) => p, Err(e) => { tracing::warn!("failed to deserialize event payload: {e}"); continue; } }; let event = match DomainEvent::try_from(payload) { Ok(e) => e, Err(e) => { tracing::warn!("failed to convert payload to domain event: {e}"); continue; } }; yield EventEnvelope { event, ack: Box::new(|| {}), nack: Box::new(|| {}), }; } }) } } #[cfg(test)] mod tests { use super::*; use domain::value_objects::{LikeId, ThoughtId, UserId}; #[test] fn payload_from_domain_event_has_correct_subject() { let event = DomainEvent::ThoughtCreated { thought_id: ThoughtId::new(), user_id: UserId::new(), in_reply_to_id: None, }; let payload = EventPayload::from(&event); assert_eq!(payload.subject(), "thoughts.created"); } #[test] fn domain_event_roundtrip_via_payload() { let uid = UserId::new(); let tid = ThoughtId::new(); let event = DomainEvent::LikeAdded { like_id: LikeId::new(), user_id: uid.clone(), thought_id: tid.clone(), }; let payload = EventPayload::from(&event); let back = DomainEvent::try_from(payload).unwrap(); if let DomainEvent::LikeAdded { user_id, thought_id, .. } = back { assert_eq!(user_id, uid); assert_eq!(thought_id, tid); } else { panic!("wrong variant"); } } }