diff --git a/crates/adapters/event-transport/Cargo.toml b/crates/adapters/event-transport/Cargo.toml index fa67b60..e7e7c38 100644 --- a/crates/adapters/event-transport/Cargo.toml +++ b/crates/adapters/event-transport/Cargo.toml @@ -9,6 +9,7 @@ event-payload = { workspace = true } serde_json = { workspace = true } async-trait = { workspace = true } tracing = { workspace = true } +futures = { workspace = true } [dev-dependencies] tokio = { workspace = true, features = ["full"] } diff --git a/crates/adapters/event-transport/src/lib.rs b/crates/adapters/event-transport/src/lib.rs index 8ef1c79..bc483d6 100644 --- a/crates/adapters/event-transport/src/lib.rs +++ b/crates/adapters/event-transport/src/lib.rs @@ -1,6 +1,7 @@ use async_trait::async_trait; -use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher}; +use domain::{errors::DomainError, events::{DomainEvent, EventEnvelope}, ports::{EventConsumer, EventPublisher}}; use event_payload::EventPayload; +use futures::stream::BoxStream; /// Abstraction over any pub/sub transport backend. /// Implement this for NATS, Kafka, Redis Streams, etc. @@ -37,6 +38,67 @@ impl EventPublisher for EventPublisherAdapter { } } +/// A raw inbound message from a transport backend. +/// `ack` and `nack` are transport-level acknowledgements (e.g. Kafka offset commit). +/// For at-most-once transports (basic NATS), both are no-ops. +pub struct RawMessage { + pub subject: String, + pub payload: Vec, + pub ack: Box, + pub nack: Box, +} + +/// Abstraction over any subscribe/consume backend. +pub trait MessageSource: Send + Sync { + fn messages(&self) -> BoxStream<'_, Result>; +} + +/// Deserializes raw transport messages into domain `EventEnvelope`s. +/// Invalid or unknown messages are skipped with a warning — stream continues. +pub struct EventConsumerAdapter { + source: S, +} + +impl EventConsumerAdapter { + pub fn new(source: S) -> Self { Self { source } } +} + +impl EventConsumer for EventConsumerAdapter { + fn consume(&self) -> BoxStream<'_, Result> { + use futures::StreamExt; + let stream = self.source.messages(); + Box::pin(stream.filter_map(|result| async move { + match result { + Err(e) => { + tracing::warn!("transport error: {e}"); + None + } + Ok(msg) => { + let payload = match serde_json::from_slice::(&msg.payload) { + Ok(p) => p, + Err(e) => { + tracing::warn!("failed to deserialize event payload: {e}"); + return None; + } + }; + let event = match DomainEvent::try_from(payload) { + Ok(e) => e, + Err(e) => { + tracing::warn!("unknown event type: {e}"); + return None; + } + }; + Some(Ok(EventEnvelope { + event, + ack: msg.ack, + nack: msg.nack, + })) + } + } + })) + } +} + #[cfg(test)] mod tests { use super::*; @@ -87,4 +149,60 @@ mod tests { let json: serde_json::Value = serde_json::from_slice(&bytes).expect("valid JSON"); assert_eq!(json["type"], "UserBlocked"); } + + #[tokio::test] + async fn consumer_adapter_deserializes_and_yields_event() { + use domain::value_objects::ThoughtId; + use futures::StreamExt; + + let event = DomainEvent::ThoughtCreated { + thought_id: ThoughtId::new(), + user_id: UserId::new(), + in_reply_to_id: None, + }; + let payload = EventPayload::from(&event); + let bytes = serde_json::to_vec(&payload).unwrap(); + + struct OneMessageSource { bytes: Vec } + #[async_trait::async_trait] + impl MessageSource for OneMessageSource { + fn messages(&self) -> futures::stream::BoxStream<'_, Result> { + let msg = RawMessage { + subject: "thoughts.created".to_string(), + payload: self.bytes.clone(), + ack: Box::new(|| {}), + nack: Box::new(|| {}), + }; + Box::pin(futures::stream::once(async { Ok(msg) })) + } + } + + let adapter = EventConsumerAdapter::new(OneMessageSource { bytes }); + let mut stream = adapter.consume(); + let envelope = stream.next().await.unwrap().unwrap(); + assert!(matches!(envelope.event, DomainEvent::ThoughtCreated { .. })); + } + + #[tokio::test] + async fn consumer_adapter_skips_invalid_payloads() { + use futures::StreamExt; + + struct BadMessageSource; + #[async_trait::async_trait] + impl MessageSource for BadMessageSource { + fn messages(&self) -> futures::stream::BoxStream<'_, Result> { + let msg = RawMessage { + subject: "bad".to_string(), + payload: b"not valid json".to_vec(), + ack: Box::new(|| {}), + nack: Box::new(|| {}), + }; + Box::pin(futures::stream::once(async { Ok(msg) })) + } + } + + let adapter = EventConsumerAdapter::new(BadMessageSource); + let mut stream = adapter.consume(); + assert!(stream.next().await.is_none()); + } }