refactor(nats): replace NatsEventConsumer with NatsMessageSource implementing MessageSource
This commit is contained in:
@@ -1,11 +1,6 @@
|
|||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use domain::{
|
use domain::errors::DomainError;
|
||||||
errors::DomainError,
|
use event_transport::{MessageSource, RawMessage, Transport};
|
||||||
events::{DomainEvent, EventEnvelope},
|
|
||||||
ports::EventConsumer,
|
|
||||||
};
|
|
||||||
use event_payload::EventPayload;
|
|
||||||
use event_transport::Transport;
|
|
||||||
use futures::stream::BoxStream;
|
use futures::stream::BoxStream;
|
||||||
|
|
||||||
// ── NatsTransport — raw NATS publish backend ────────────────────────────────
|
// ── NatsTransport — raw NATS publish backend ────────────────────────────────
|
||||||
@@ -28,18 +23,18 @@ impl Transport for NatsTransport {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── NatsEventConsumer — subscribes and yields EventEnvelopes ────────────────
|
// ── NatsMessageSource — raw NATS subscribe backend ──────────────────────────
|
||||||
|
|
||||||
pub struct NatsEventConsumer {
|
pub struct NatsMessageSource {
|
||||||
client: async_nats::Client,
|
client: async_nats::Client,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl NatsEventConsumer {
|
impl NatsMessageSource {
|
||||||
pub fn new(client: async_nats::Client) -> Self { Self { client } }
|
pub fn new(client: async_nats::Client) -> Self { Self { client } }
|
||||||
}
|
}
|
||||||
|
|
||||||
impl EventConsumer for NatsEventConsumer {
|
impl MessageSource for NatsMessageSource {
|
||||||
fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> {
|
fn messages(&self) -> BoxStream<'_, Result<RawMessage, DomainError>> {
|
||||||
let client = self.client.clone();
|
let client = self.client.clone();
|
||||||
Box::pin(async_stream::try_stream! {
|
Box::pin(async_stream::try_stream! {
|
||||||
let mut sub = client
|
let mut sub = client
|
||||||
@@ -49,22 +44,12 @@ impl EventConsumer for NatsEventConsumer {
|
|||||||
|
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
while let Some(msg) = sub.next().await {
|
while let Some(msg) = sub.next().await {
|
||||||
let payload = match serde_json::from_slice::<EventPayload>(&msg.payload) {
|
let subject = msg.subject.to_string();
|
||||||
Ok(p) => p,
|
let payload = msg.payload.to_vec();
|
||||||
Err(e) => {
|
// Basic NATS: at-most-once — ack/nack are no-ops.
|
||||||
tracing::warn!("failed to deserialize event payload: {e}");
|
yield RawMessage {
|
||||||
continue;
|
subject,
|
||||||
}
|
payload,
|
||||||
};
|
|
||||||
let event = match DomainEvent::try_from(payload) {
|
|
||||||
Ok(e) => e,
|
|
||||||
Err(e) => {
|
|
||||||
tracing::warn!("failed to convert payload to domain event: {e}");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
yield EventEnvelope {
|
|
||||||
event,
|
|
||||||
ack: Box::new(|| {}),
|
ack: Box::new(|| {}),
|
||||||
nack: Box::new(|| {}),
|
nack: Box::new(|| {}),
|
||||||
};
|
};
|
||||||
@@ -76,7 +61,8 @@ impl EventConsumer for NatsEventConsumer {
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use domain::value_objects::{LikeId, ThoughtId, UserId};
|
use domain::{events::DomainEvent, value_objects::{LikeId, ThoughtId, UserId}};
|
||||||
|
use event_payload::EventPayload;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn payload_from_domain_event_has_correct_subject() {
|
fn payload_from_domain_event_has_correct_subject() {
|
||||||
|
|||||||
Reference in New Issue
Block a user