use async_trait::async_trait; use domain::{ errors::DomainError, events::{DomainEvent, EventEnvelope}, ports::{EventConsumer, EventPublisher}, }; use event_payload::EventPayload; use futures::stream::BoxStream; // ── NatsEventPublisher ──────────────────────────────────────────────────── pub struct NatsEventPublisher { client: async_nats::Client, } impl NatsEventPublisher { pub fn new(client: async_nats::Client) -> Self { Self { client } } } #[async_trait] impl EventPublisher for NatsEventPublisher { async fn publish(&self, event: &DomainEvent) -> Result<(), DomainError> { let payload = EventPayload::from(event); let subject = payload.subject(); let bytes = serde_json::to_vec(&payload) .map_err(|e| DomainError::Internal(e.to_string()))?; self.client .publish(subject, bytes.into()) .await .map_err(|e| DomainError::Internal(e.to_string())) } } // ── NatsEventConsumer ───────────────────────────────────────────────────── pub struct NatsEventConsumer { client: async_nats::Client, } impl NatsEventConsumer { pub fn new(client: async_nats::Client) -> Self { Self { client } } } impl EventConsumer for NatsEventConsumer { fn consume(&self) -> BoxStream<'_, Result> { let client = self.client.clone(); Box::pin(async_stream::try_stream! { let mut sub = client .subscribe(">") .await .map_err(|e| DomainError::Internal(e.to_string()))?; use futures::StreamExt; while let Some(msg) = sub.next().await { let payload = match serde_json::from_slice::(&msg.payload) { Ok(p) => p, Err(e) => { tracing::warn!("failed to deserialize event payload: {e}"); continue; } }; let event = match DomainEvent::try_from(payload) { Ok(e) => e, Err(e) => { tracing::warn!("failed to convert payload to domain event: {e}"); continue; } }; // Basic NATS: no ack/nack (at-most-once delivery) yield EventEnvelope { event, ack: Box::new(|| {}), nack: Box::new(|| {}), }; } }) } } #[cfg(test)] mod tests { use super::*; use domain::value_objects::{LikeId, ThoughtId, UserId}; #[test] fn payload_from_domain_event_has_correct_subject() { let event = DomainEvent::ThoughtCreated { thought_id: ThoughtId::new(), user_id: UserId::new(), in_reply_to_id: None, }; let payload = EventPayload::from(&event); assert_eq!(payload.subject(), "thoughts.created"); } #[test] fn domain_event_roundtrip_via_payload() { let uid = UserId::new(); let tid = ThoughtId::new(); let event = DomainEvent::LikeAdded { like_id: LikeId::new(), user_id: uid.clone(), thought_id: tid.clone(), }; let payload = EventPayload::from(&event); let back = DomainEvent::try_from(payload).unwrap(); if let DomainEvent::LikeAdded { user_id, thought_id, .. } = back { assert_eq!(user_id, uid); assert_eq!(thought_id, tid); } else { panic!("wrong variant"); } } }