From e995b29be19b6d82189128dcc86cf18d3df39870 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Thu, 14 May 2026 12:36:45 +0200 Subject: [PATCH] refactor(nats): replace NatsEventConsumer with NatsMessageSource implementing MessageSource --- crates/adapters/nats/src/lib.rs | 44 +++++++++++---------------------- 1 file changed, 15 insertions(+), 29 deletions(-) diff --git a/crates/adapters/nats/src/lib.rs b/crates/adapters/nats/src/lib.rs index 5732dea..0a14fd5 100644 --- a/crates/adapters/nats/src/lib.rs +++ b/crates/adapters/nats/src/lib.rs @@ -1,11 +1,6 @@ use async_trait::async_trait; -use domain::{ - errors::DomainError, - events::{DomainEvent, EventEnvelope}, - ports::EventConsumer, -}; -use event_payload::EventPayload; -use event_transport::Transport; +use domain::errors::DomainError; +use event_transport::{MessageSource, RawMessage, Transport}; use futures::stream::BoxStream; // ── NatsTransport — raw NATS publish backend ──────────────────────────────── @@ -28,18 +23,18 @@ impl Transport for NatsTransport { } } -// ── NatsEventConsumer — subscribes and yields EventEnvelopes ──────────────── +// ── NatsMessageSource — raw NATS subscribe backend ────────────────────────── -pub struct NatsEventConsumer { +pub struct NatsMessageSource { client: async_nats::Client, } -impl NatsEventConsumer { +impl NatsMessageSource { pub fn new(client: async_nats::Client) -> Self { Self { client } } } -impl EventConsumer for NatsEventConsumer { - fn consume(&self) -> BoxStream<'_, Result> { +impl MessageSource for NatsMessageSource { + fn messages(&self) -> BoxStream<'_, Result> { let client = self.client.clone(); Box::pin(async_stream::try_stream! { let mut sub = client @@ -49,22 +44,12 @@ impl EventConsumer for NatsEventConsumer { use futures::StreamExt; while let Some(msg) = sub.next().await { - let payload = match serde_json::from_slice::(&msg.payload) { - Ok(p) => p, - Err(e) => { - tracing::warn!("failed to deserialize event payload: {e}"); - continue; - } - }; - let event = match DomainEvent::try_from(payload) { - Ok(e) => e, - Err(e) => { - tracing::warn!("failed to convert payload to domain event: {e}"); - continue; - } - }; - yield EventEnvelope { - event, + let subject = msg.subject.to_string(); + let payload = msg.payload.to_vec(); + // Basic NATS: at-most-once — ack/nack are no-ops. + yield RawMessage { + subject, + payload, ack: Box::new(|| {}), nack: Box::new(|| {}), }; @@ -76,7 +61,8 @@ impl EventConsumer for NatsEventConsumer { #[cfg(test)] mod tests { use super::*; - use domain::value_objects::{LikeId, ThoughtId, UserId}; + use domain::{events::DomainEvent, value_objects::{LikeId, ThoughtId, UserId}}; + use event_payload::EventPayload; #[test] fn payload_from_domain_event_has_correct_subject() {