From cfc8c19175a48baa129db5c61b4a885798d564f4 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Thu, 14 May 2026 12:22:11 +0200 Subject: [PATCH] refactor(nats): strip NatsEventPublisher, add NatsTransport implementing Transport --- crates/adapters/nats/Cargo.toml | 21 +++++++++++---------- crates/adapters/nats/src/lib.rs | 22 +++++++++------------- 2 files changed, 20 insertions(+), 23 deletions(-) diff --git a/crates/adapters/nats/Cargo.toml b/crates/adapters/nats/Cargo.toml index 3eb4fcb..051a88b 100644 --- a/crates/adapters/nats/Cargo.toml +++ b/crates/adapters/nats/Cargo.toml @@ -4,13 +4,14 @@ version = "0.1.0" edition = "2021" [dependencies] -domain = { workspace = true } -event-payload = { workspace = true } -async-nats = { workspace = true } -async-stream = { workspace = true } -serde_json = { workspace = true } -futures = { workspace = true } -tokio = { workspace = true } -async-trait = { workspace = true } -tracing = { workspace = true } -uuid = { workspace = true } +domain = { workspace = true } +event-payload = { workspace = true } +event-publisher = { workspace = true } +async-nats = { workspace = true } +async-stream = { workspace = true } +serde_json = { workspace = true } +futures = { workspace = true } +tokio = { workspace = true } +async-trait = { workspace = true } +tracing = { workspace = true } +uuid = { workspace = true } diff --git a/crates/adapters/nats/src/lib.rs b/crates/adapters/nats/src/lib.rs index f874ec8..72457c0 100644 --- a/crates/adapters/nats/src/lib.rs +++ b/crates/adapters/nats/src/lib.rs @@ -2,36 +2,33 @@ use async_trait::async_trait; use domain::{ errors::DomainError, events::{DomainEvent, EventEnvelope}, - ports::{EventConsumer, EventPublisher}, + ports::EventConsumer, }; use event_payload::EventPayload; +use event_publisher::Transport; use futures::stream::BoxStream; -// ── NatsEventPublisher ──────────────────────────────────────────────────── +// ── NatsTransport — raw NATS publish backend ──────────────────────────────── -pub struct NatsEventPublisher { +pub struct NatsTransport { client: async_nats::Client, } -impl NatsEventPublisher { +impl NatsTransport { pub fn new(client: async_nats::Client) -> Self { Self { client } } } #[async_trait] -impl EventPublisher for NatsEventPublisher { - async fn publish(&self, event: &DomainEvent) -> Result<(), DomainError> { - let payload = EventPayload::from(event); - let subject = payload.subject(); - let bytes = serde_json::to_vec(&payload) - .map_err(|e| DomainError::Internal(e.to_string()))?; +impl Transport for NatsTransport { + async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), DomainError> { self.client - .publish(subject, bytes.into()) + .publish(subject.to_string(), bytes.to_vec().into()) .await .map_err(|e| DomainError::Internal(e.to_string())) } } -// ── NatsEventConsumer ───────────────────────────────────────────────────── +// ── NatsEventConsumer — subscribes and yields EventEnvelopes ──────────────── pub struct NatsEventConsumer { client: async_nats::Client, @@ -66,7 +63,6 @@ impl EventConsumer for NatsEventConsumer { continue; } }; - // Basic NATS: no ack/nack (at-most-once delivery) yield EventEnvelope { event, ack: Box::new(|| {}),