feat(event-publisher): Transport trait + EventPublisherAdapter for transport-agnostic event routing
This commit is contained in:
@@ -2,3 +2,13 @@
|
|||||||
name = "event-publisher"
|
name = "event-publisher"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
domain = { workspace = true }
|
||||||
|
event-payload = { workspace = true }
|
||||||
|
serde_json = { workspace = true }
|
||||||
|
async-trait = { workspace = true }
|
||||||
|
tracing = { workspace = true }
|
||||||
|
|
||||||
|
[dev-dependencies]
|
||||||
|
tokio = { workspace = true, features = ["full"] }
|
||||||
|
|||||||
@@ -0,0 +1,90 @@
|
|||||||
|
use async_trait::async_trait;
|
||||||
|
use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher};
|
||||||
|
use event_payload::EventPayload;
|
||||||
|
|
||||||
|
/// Abstraction over any pub/sub transport backend.
|
||||||
|
/// Implement this for NATS, Kafka, Redis Streams, etc.
|
||||||
|
/// The adapter calls `publish_bytes(subject, bytes)` — subjects come from `EventPayload::subject()`.
|
||||||
|
#[async_trait]
|
||||||
|
pub trait Transport: Send + Sync {
|
||||||
|
async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), DomainError>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Routes domain events to a transport backend.
|
||||||
|
///
|
||||||
|
/// Converts: `DomainEvent` → `EventPayload` → JSON bytes → `transport.publish_bytes(subject, bytes)`
|
||||||
|
///
|
||||||
|
/// To swap transports (e.g. NATS → Kafka), replace the `T` at the composition root.
|
||||||
|
pub struct EventPublisherAdapter<T: Transport> {
|
||||||
|
transport: T,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Transport> EventPublisherAdapter<T> {
|
||||||
|
pub fn new(transport: T) -> Self {
|
||||||
|
Self { transport }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl<T: Transport> EventPublisher for EventPublisherAdapter<T> {
|
||||||
|
async fn publish(&self, event: &DomainEvent) -> Result<(), DomainError> {
|
||||||
|
let payload = EventPayload::from(event);
|
||||||
|
let subject = payload.subject();
|
||||||
|
let bytes = serde_json::to_vec(&payload)
|
||||||
|
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||||
|
tracing::debug!(subject, "publishing event");
|
||||||
|
self.transport.publish_bytes(subject, &bytes).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
use domain::value_objects::{ThoughtId, UserId};
|
||||||
|
|
||||||
|
struct SpyTransport {
|
||||||
|
calls: Arc<Mutex<Vec<(String, Vec<u8>)>>>,
|
||||||
|
}
|
||||||
|
impl SpyTransport {
|
||||||
|
fn new() -> (Self, Arc<Mutex<Vec<(String, Vec<u8>)>>>) {
|
||||||
|
let calls = Arc::new(Mutex::new(vec![]));
|
||||||
|
(Self { calls: calls.clone() }, calls)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#[async_trait]
|
||||||
|
impl Transport for SpyTransport {
|
||||||
|
async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), DomainError> {
|
||||||
|
self.calls.lock().unwrap().push((subject.to_string(), bytes.to_vec()));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn thought_created_routes_to_correct_subject() {
|
||||||
|
let (spy, calls) = SpyTransport::new();
|
||||||
|
let publisher = EventPublisherAdapter::new(spy);
|
||||||
|
publisher.publish(&DomainEvent::ThoughtCreated {
|
||||||
|
thought_id: ThoughtId::new(),
|
||||||
|
user_id: UserId::new(),
|
||||||
|
in_reply_to_id: None,
|
||||||
|
}).await.unwrap();
|
||||||
|
let calls = calls.lock().unwrap();
|
||||||
|
assert_eq!(calls.len(), 1);
|
||||||
|
assert_eq!(calls[0].0, "thoughts.created");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn serialized_payload_is_valid_json() {
|
||||||
|
let (spy, calls) = SpyTransport::new();
|
||||||
|
let publisher = EventPublisherAdapter::new(spy);
|
||||||
|
publisher.publish(&DomainEvent::UserBlocked {
|
||||||
|
blocker_id: UserId::new(),
|
||||||
|
blocked_id: UserId::new(),
|
||||||
|
}).await.unwrap();
|
||||||
|
let bytes = calls.lock().unwrap()[0].1.clone();
|
||||||
|
let json: serde_json::Value = serde_json::from_slice(&bytes).expect("valid JSON");
|
||||||
|
assert_eq!(json["type"], "UserBlocked");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user