13 KiB
event-publisher Transport Abstraction Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Fill event-publisher with a Transport trait + EventPublisherAdapter<T: Transport>, strip NatsEventPublisher from the nats crate and replace it with NatsTransport implementing Transport, then wire EventPublisherAdapter::new(NatsTransport::new(client)) in bootstrap — so adding Kafka/Redis later only requires a new transport crate.
Architecture: event-publisher defines the abstraction (Transport + EventPublisherAdapter). nats implements Transport for NATS (pure bytes: publish/subscribe). event-publisher never imports nats. bootstrap wires them together. NatsEventConsumer stays in nats — it's transport-specific and will never be shared.
Dependency chain after refactor:
event-publisher → domain, event-payload, serde_json
nats → domain, event-payload, event-publisher, async-nats
bootstrap → event-publisher, nats (+ all others)
File Map
Modify: crates/adapters/event-publisher/Cargo.toml ← add deps
Modify: crates/adapters/event-publisher/src/lib.rs ← Transport trait + EventPublisherAdapter<T>
Modify: crates/adapters/nats/Cargo.toml ← add event-publisher dep
Modify: crates/adapters/nats/src/lib.rs ← remove NatsEventPublisher, add NatsTransport
Modify: crates/bootstrap/src/factory.rs ← use EventPublisherAdapter<NatsTransport>
Modify: crates/bootstrap/Cargo.toml ← add event-publisher dep (if missing)
Task 1: Fill event-publisher — Transport trait + EventPublisherAdapter
Files:
-
Modify:
crates/adapters/event-publisher/Cargo.toml -
Modify:
crates/adapters/event-publisher/src/lib.rs -
Write tests at the bottom of
crates/adapters/event-publisher/src/lib.rs:
#[cfg(test)]
mod tests {
use super::*;
use async_trait::async_trait;
use std::sync::{Arc, Mutex};
use domain::value_objects::{ThoughtId, UserId};
struct SpyTransport {
calls: Arc<Mutex<Vec<(String, Vec<u8>)>>>,
}
impl SpyTransport {
fn new() -> (Self, Arc<Mutex<Vec<(String, Vec<u8>)>>>) {
let calls = Arc::new(Mutex::new(vec![]));
(Self { calls: calls.clone() }, calls)
}
}
#[async_trait]
impl Transport for SpyTransport {
async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), domain::errors::DomainError> {
self.calls.lock().unwrap().push((subject.to_string(), bytes.to_vec()));
Ok(())
}
}
#[tokio::test]
async fn thought_created_routes_to_correct_subject() {
let (spy, calls) = SpyTransport::new();
let publisher = EventPublisherAdapter::new(spy);
publisher.publish(&domain::events::DomainEvent::ThoughtCreated {
thought_id: ThoughtId::new(),
user_id: UserId::new(),
in_reply_to_id: None,
}).await.unwrap();
let calls = calls.lock().unwrap();
assert_eq!(calls.len(), 1);
assert_eq!(calls[0].0, "thoughts.created");
}
#[tokio::test]
async fn serialized_payload_is_valid_json() {
let (spy, calls) = SpyTransport::new();
let publisher = EventPublisherAdapter::new(spy);
publisher.publish(&domain::events::DomainEvent::UserBlocked {
blocker_id: UserId::new(),
blocked_id: UserId::new(),
}).await.unwrap();
let bytes = &calls.lock().unwrap()[0].1.clone();
let json: serde_json::Value = serde_json::from_slice(bytes).expect("valid JSON");
assert_eq!(json["type"], "UserBlocked");
}
}
-
Run:
cargo test -p event-publisher— Expected: FAIL (no implementation yet). -
Write
crates/adapters/event-publisher/Cargo.toml:
[package]
name = "event-publisher"
version = "0.1.0"
edition = "2021"
[dependencies]
domain = { workspace = true }
event-payload = { workspace = true }
serde_json = { workspace = true }
async-trait = { workspace = true }
tracing = { workspace = true }
[dev-dependencies]
tokio = { workspace = true, features = ["full"] }
- Write
crates/adapters/event-publisher/src/lib.rs:
use async_trait::async_trait;
use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher};
use event_payload::EventPayload;
/// Abstraction over any pub/sub transport backend.
/// Implement this for NATS, Kafka, Redis Streams, etc.
/// The adapter calls `publish_bytes(subject, bytes)` — subjects come from `EventPayload::subject()`.
#[async_trait]
pub trait Transport: Send + Sync {
async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), DomainError>;
}
/// Routes domain events to a transport backend.
///
/// Converts: `DomainEvent` → `EventPayload` (via `From`) → JSON bytes → `transport.publish_bytes(subject, bytes)`
///
/// To swap transports (e.g. NATS → Kafka), replace the `T` at the composition root.
/// This type never needs to change.
pub struct EventPublisherAdapter<T: Transport> {
transport: T,
}
impl<T: Transport> EventPublisherAdapter<T> {
pub fn new(transport: T) -> Self {
Self { transport }
}
}
#[async_trait]
impl<T: Transport> EventPublisher for EventPublisherAdapter<T> {
async fn publish(&self, event: &DomainEvent) -> Result<(), DomainError> {
let payload = EventPayload::from(event);
let subject = payload.subject();
let bytes = serde_json::to_vec(&payload)
.map_err(|e| DomainError::Internal(e.to_string()))?;
tracing::debug!(subject, "publishing event");
self.transport.publish_bytes(subject, &bytes).await
}
}
-
Run:
cargo test -p event-publisher— Expected: 2 tests pass. -
Commit:
git add crates/adapters/event-publisher/
git commit -m "feat(event-publisher): Transport trait + EventPublisherAdapter for transport-agnostic event routing"
Task 2: Refactor nats — strip NatsEventPublisher, add NatsTransport
Files:
-
Modify:
crates/adapters/nats/Cargo.toml -
Modify:
crates/adapters/nats/src/lib.rs -
Add
event-publishertocrates/adapters/nats/Cargo.toml:
event-publisher = { workspace = true }
- Rewrite
crates/adapters/nats/src/lib.rs— removeNatsEventPublisher, addNatsTransport:
use async_trait::async_trait;
use domain::{
errors::DomainError,
events::{DomainEvent, EventEnvelope},
ports::EventConsumer,
};
use event_payload::EventPayload;
use event_publisher::Transport;
use futures::stream::BoxStream;
// ── NatsTransport — raw NATS publish backend ────────────────────────────────
pub struct NatsTransport {
client: async_nats::Client,
}
impl NatsTransport {
pub fn new(client: async_nats::Client) -> Self { Self { client } }
}
#[async_trait]
impl Transport for NatsTransport {
async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), DomainError> {
self.client
.publish(subject, bytes.to_vec().into())
.await
.map_err(|e| DomainError::Internal(e.to_string()))
}
}
// ── NatsEventConsumer — subscribes and yields EventEnvelopes ────────────────
pub struct NatsEventConsumer {
client: async_nats::Client,
}
impl NatsEventConsumer {
pub fn new(client: async_nats::Client) -> Self { Self { client } }
}
impl EventConsumer for NatsEventConsumer {
fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> {
let client = self.client.clone();
Box::pin(async_stream::try_stream! {
let mut sub = client
.subscribe(">")
.await
.map_err(|e| DomainError::Internal(e.to_string()))?;
use futures::StreamExt;
while let Some(msg) = sub.next().await {
let payload = match serde_json::from_slice::<EventPayload>(&msg.payload) {
Ok(p) => p,
Err(e) => {
tracing::warn!("failed to deserialize event payload: {e}");
continue;
}
};
let event = match DomainEvent::try_from(payload) {
Ok(e) => e,
Err(e) => {
tracing::warn!("failed to convert payload to domain event: {e}");
continue;
}
};
yield EventEnvelope {
event,
ack: Box::new(|| {}),
nack: Box::new(|| {}),
};
}
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use domain::value_objects::{LikeId, ThoughtId, UserId};
#[test]
fn payload_from_domain_event_has_correct_subject() {
let event = DomainEvent::ThoughtCreated {
thought_id: ThoughtId::new(),
user_id: UserId::new(),
in_reply_to_id: None,
};
let payload = EventPayload::from(&event);
assert_eq!(payload.subject(), "thoughts.created");
}
#[test]
fn domain_event_roundtrip_via_payload() {
let uid = UserId::new();
let tid = ThoughtId::new();
let event = DomainEvent::LikeAdded {
like_id: LikeId::new(),
user_id: uid.clone(),
thought_id: tid.clone(),
};
let payload = EventPayload::from(&event);
let back = DomainEvent::try_from(payload).unwrap();
if let DomainEvent::LikeAdded { user_id, thought_id, .. } = back {
assert_eq!(user_id, uid);
assert_eq!(thought_id, tid);
} else {
panic!("wrong variant");
}
}
}
-
Run:
cargo test -p nats— Expected: 2 tests pass. -
Run:
cargo check --workspace— Expected: one error inbootstrap(uses removedNatsEventPublisher) — this is expected and fixed in Task 3. -
Commit:
git add crates/adapters/nats/
git commit -m "refactor(nats): strip NatsEventPublisher, add NatsTransport implementing Transport"
Task 3: Wire EventPublisherAdapter in bootstrap
Files:
-
Modify:
crates/bootstrap/Cargo.toml -
Modify:
crates/bootstrap/src/factory.rs -
Add
event-publishertocrates/bootstrap/Cargo.toml:
event-publisher = { workspace = true }
- Update
crates/bootstrap/src/factory.rs— find the NATS event publisher section and replace:
Find (in the build function):
Arc::new(nats::NatsEventPublisher::new(client))
Replace with:
Arc::new(event_publisher::EventPublisherAdapter::new(nats::NatsTransport::new(client)))
The use imports at the top of factory.rs need event_publisher in scope. Add:
use event_publisher::EventPublisherAdapter;
The NoOpEventPublisher struct and its impl EventPublisher stays in factory.rs — it's the fallback when NATS is unavailable and lives correctly in the composition root.
-
Run:
cargo check -p bootstrap— Expected: no errors. -
Run:
cargo check --workspace— Expected: no errors. -
Run full test suite:
DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres cargo test --workspace 2>&1 | tail -3
Expected: all tests pass (including new event-publisher tests).
- Smoke test:
DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres \
JWT_SECRET=dev BASE_URL=http://localhost:3000 \
RUST_LOG=info cargo run -p bootstrap &
sleep 3
curl -s http://localhost:3000/health | jq .
kill %1 2>/dev/null
Expected: {"status":"ok","db":"connected"}.
- Commit:
git add crates/bootstrap/
git commit -m "feat(bootstrap): wire EventPublisherAdapter<NatsTransport> — transport-agnostic event publishing"
Self-Review
Spec coverage:
- ✅
Transporttrait inevent-publisherwithpublish_bytes(subject, bytes)(Task 1) - ✅
EventPublisherAdapter<T: Transport>implementsEventPublisher(Task 1) - ✅ 2 tests: correct subject routing, valid JSON serialization (Task 1)
- ✅
NatsEventPublisherremoved fromnats(Task 2) - ✅
NatsTransportimplementsTransportfor NATS (Task 2) - ✅
NatsEventConsumerunchanged — stays innats(Task 2) - ✅
bootstrapwiresEventPublisherAdapter::new(NatsTransport::new(client))(Task 3) - ✅
NoOpEventPublisherstays infactory.rsas fallback (Task 3)
Placeholder scan: None.
Type consistency:
EventPublisherAdapter<NatsTransport>—NatsTransportimplementsTransport,EventPublisherAdapter<T: Transport>implementsEventPublisher✓event_publisher::Transportimported innats/src/lib.rs—natsdepends onevent-publisher✓factory.rsusesevent_publisher::EventPublisherAdapterandnats::NatsTransport— both in bootstrap deps ✓
Adding Kafka later:
# kafka/Cargo.toml
[dependencies]
event-publisher = { workspace = true }
rdkafka = "..."
// kafka/src/lib.rs
pub struct KafkaTransport { ... }
#[async_trait] impl Transport for KafkaTransport { ... }
// bootstrap/src/factory.rs — only this line changes:
Arc::new(EventPublisherAdapter::new(KafkaTransport::new(...)))