diff --git a/crates/adapters/event-publisher/Cargo.toml b/crates/adapters/event-publisher/Cargo.toml index 0d7d213..70f7160 100644 --- a/crates/adapters/event-publisher/Cargo.toml +++ b/crates/adapters/event-publisher/Cargo.toml @@ -2,3 +2,13 @@ name = "event-publisher" version = "0.1.0" edition = "2021" + +[dependencies] +domain = { workspace = true } +event-payload = { workspace = true } +serde_json = { workspace = true } +async-trait = { workspace = true } +tracing = { workspace = true } + +[dev-dependencies] +tokio = { workspace = true, features = ["full"] } diff --git a/crates/adapters/event-publisher/src/lib.rs b/crates/adapters/event-publisher/src/lib.rs index e69de29..8ef1c79 100644 --- a/crates/adapters/event-publisher/src/lib.rs +++ b/crates/adapters/event-publisher/src/lib.rs @@ -0,0 +1,90 @@ +use async_trait::async_trait; +use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher}; +use event_payload::EventPayload; + +/// Abstraction over any pub/sub transport backend. +/// Implement this for NATS, Kafka, Redis Streams, etc. +/// The adapter calls `publish_bytes(subject, bytes)` — subjects come from `EventPayload::subject()`. +#[async_trait] +pub trait Transport: Send + Sync { + async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), DomainError>; +} + +/// Routes domain events to a transport backend. +/// +/// Converts: `DomainEvent` → `EventPayload` → JSON bytes → `transport.publish_bytes(subject, bytes)` +/// +/// To swap transports (e.g. NATS → Kafka), replace the `T` at the composition root. +pub struct EventPublisherAdapter { + transport: T, +} + +impl EventPublisherAdapter { + pub fn new(transport: T) -> Self { + Self { transport } + } +} + +#[async_trait] +impl EventPublisher for EventPublisherAdapter { + async fn publish(&self, event: &DomainEvent) -> Result<(), DomainError> { + let payload = EventPayload::from(event); + let subject = payload.subject(); + let bytes = serde_json::to_vec(&payload) + .map_err(|e| DomainError::Internal(e.to_string()))?; + tracing::debug!(subject, "publishing event"); + self.transport.publish_bytes(subject, &bytes).await + } +} + +#[cfg(test)] +mod tests { + use super::*; + use async_trait::async_trait; + use std::sync::{Arc, Mutex}; + use domain::value_objects::{ThoughtId, UserId}; + + struct SpyTransport { + calls: Arc)>>>, + } + impl SpyTransport { + fn new() -> (Self, Arc)>>>) { + let calls = Arc::new(Mutex::new(vec![])); + (Self { calls: calls.clone() }, calls) + } + } + #[async_trait] + impl Transport for SpyTransport { + async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), DomainError> { + self.calls.lock().unwrap().push((subject.to_string(), bytes.to_vec())); + Ok(()) + } + } + + #[tokio::test] + async fn thought_created_routes_to_correct_subject() { + let (spy, calls) = SpyTransport::new(); + let publisher = EventPublisherAdapter::new(spy); + publisher.publish(&DomainEvent::ThoughtCreated { + thought_id: ThoughtId::new(), + user_id: UserId::new(), + in_reply_to_id: None, + }).await.unwrap(); + let calls = calls.lock().unwrap(); + assert_eq!(calls.len(), 1); + assert_eq!(calls[0].0, "thoughts.created"); + } + + #[tokio::test] + async fn serialized_payload_is_valid_json() { + let (spy, calls) = SpyTransport::new(); + let publisher = EventPublisherAdapter::new(spy); + publisher.publish(&DomainEvent::UserBlocked { + blocker_id: UserId::new(), + blocked_id: UserId::new(), + }).await.unwrap(); + let bytes = calls.lock().unwrap()[0].1.clone(); + let json: serde_json::Value = serde_json::from_slice(&bytes).expect("valid JSON"); + assert_eq!(json["type"], "UserBlocked"); + } +}