16 KiB
event-transport Rename + Consumer Abstraction Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Rename event-publisher → event-transport and add the symmetric consumer abstraction (MessageSource trait + EventConsumerAdapter<S>) so both publish and subscribe are transport-agnostic.
Architecture after this plan:
event-transport/ ← Transport + EventPublisherAdapter<T> (existing)
← MessageSource + EventConsumerAdapter<S> (new)
← RawMessage { subject, payload, ack, nack } (new)
nats/ ← NatsTransport (existing, implements Transport)
← NatsMessageSource (new, implements MessageSource)
← NatsEventConsumer removed
worker/ ← EventConsumerAdapter::new(NatsMessageSource::new(client))
Dependency chain:
event-transport → domain, event-payload, serde_json, async-trait
nats → domain, event-payload, event-transport, async-nats
worker → domain, nats, event-transport, postgres
File Map
Rename: crates/adapters/event-publisher/ → crates/adapters/event-transport/
Modify: Cargo.toml (root) ← update member path + workspace dep name
Modify: crates/adapters/event-transport/Cargo.toml ← name = "event-transport"
Modify: crates/adapters/nats/Cargo.toml ← event-publisher → event-transport
Modify: crates/adapters/nats/src/lib.rs ← use event_transport; add NatsMessageSource; remove NatsEventConsumer
Modify: crates/bootstrap/Cargo.toml ← event-publisher → event-transport
Modify: crates/bootstrap/src/factory.rs ← use event_transport; update EventConsumerAdapter wiring
Modify: crates/worker/Cargo.toml ← add event-transport dep
Modify: crates/worker/src/main.rs ← EventConsumerAdapter<NatsMessageSource>
Modify: crates/adapters/event-transport/src/lib.rs ← add RawMessage + MessageSource + EventConsumerAdapter
Task 1: Rename crate + update all references
Files: root Cargo.toml, event-publisher/Cargo.toml (renamed), nats/Cargo.toml, bootstrap/Cargo.toml, nats/src/lib.rs, bootstrap/src/factory.rs
- Rename the directory:
git mv crates/adapters/event-publisher crates/adapters/event-transport
- Update
crates/adapters/event-transport/Cargo.toml— change the package name:
[package]
name = "event-transport"
version = "0.1.0"
edition = "2021"
[dependencies]
domain = { workspace = true }
event-payload = { workspace = true }
serde_json = { workspace = true }
async-trait = { workspace = true }
tracing = { workspace = true }
[dev-dependencies]
tokio = { workspace = true, features = ["full"] }
- Update root
Cargo.toml:
In [workspace] members, change:
"crates/adapters/event-publisher",
to:
"crates/adapters/event-transport",
In [workspace.dependencies], change:
event-publisher = { path = "crates/adapters/event-publisher" }
to:
event-transport = { path = "crates/adapters/event-transport" }
- Update
crates/adapters/nats/Cargo.toml:
Change event-publisher = { workspace = true } to event-transport = { workspace = true }.
- Update
crates/adapters/nats/src/lib.rs:
Change use event_publisher::Transport; to use event_transport::Transport;.
- Update
crates/bootstrap/Cargo.toml:
Change event-publisher = { workspace = true } to event-transport = { workspace = true }.
- Update
crates/bootstrap/src/factory.rs:
Change use event_publisher::EventPublisherAdapter; to use event_transport::EventPublisherAdapter;.
-
Run:
cargo check --workspace— Expected: no errors. -
Run:
cargo test -p event-transport— Expected: 2 tests pass (same tests as before, just crate renamed). -
Commit:
git add Cargo.toml \
crates/adapters/event-transport/ \
crates/adapters/nats/Cargo.toml \
crates/adapters/nats/src/lib.rs \
crates/bootstrap/Cargo.toml \
crates/bootstrap/src/factory.rs
git commit -m "refactor: rename event-publisher → event-transport"
Task 2: Add MessageSource + EventConsumerAdapter to event-transport
Files:
-
Modify:
crates/adapters/event-transport/src/lib.rs -
Write failing tests — append to the test module in
src/lib.rs:
#[tokio::test]
async fn consumer_adapter_deserializes_and_yields_event() {
use domain::value_objects::ThoughtId;
use futures::StreamExt;
// Produce a serialized EventPayload for ThoughtCreated
let event = DomainEvent::ThoughtCreated {
thought_id: ThoughtId::new(),
user_id: UserId::new(),
in_reply_to_id: None,
};
let payload = EventPayload::from(&event);
let bytes = serde_json::to_vec(&payload).unwrap();
// A MessageSource that yields one message then ends
struct OneMessageSource { bytes: Vec<u8> }
#[async_trait]
impl MessageSource for OneMessageSource {
fn messages(&self) -> futures::stream::BoxStream<'_, Result<RawMessage, DomainError>> {
let msg = RawMessage {
subject: "thoughts.created".to_string(),
payload: self.bytes.clone(),
ack: Box::new(|| {}),
nack: Box::new(|| {}),
};
Box::pin(futures::stream::once(async { Ok(msg) }))
}
}
let adapter = EventConsumerAdapter::new(OneMessageSource { bytes });
let mut stream = adapter.consume();
let envelope = stream.next().await.unwrap().unwrap();
assert!(matches!(envelope.event, DomainEvent::ThoughtCreated { .. }));
}
#[tokio::test]
async fn consumer_adapter_skips_invalid_payloads() {
use futures::StreamExt;
struct BadMessageSource;
#[async_trait]
impl MessageSource for BadMessageSource {
fn messages(&self) -> futures::stream::BoxStream<'_, Result<RawMessage, DomainError>> {
let msg = RawMessage {
subject: "bad".to_string(),
payload: b"not valid json".to_vec(),
ack: Box::new(|| {}),
nack: Box::new(|| {}),
};
Box::pin(futures::stream::once(async { Ok(msg) }))
}
}
let adapter = EventConsumerAdapter::new(BadMessageSource);
let mut stream = adapter.consume();
// Invalid JSON should be skipped — stream ends with no items
assert!(stream.next().await.is_none());
}
-
Run:
cargo test -p event-transport— Expected: FAIL (MessageSource, RawMessage, EventConsumerAdapter not defined). -
Add to
crates/adapters/event-transport/src/lib.rs— append after the existingEventPublisherAdapterimpl and before#[cfg(test)]:
use domain::{events::EventEnvelope, ports::EventConsumer};
use futures::stream::BoxStream;
/// A raw inbound message from a transport backend.
/// `ack` and `nack` are transport-level acknowledgements (e.g. Kafka offset commit).
/// For at-most-once transports (basic NATS), both are no-ops.
pub struct RawMessage {
pub subject: String,
pub payload: Vec<u8>,
pub ack: Box<dyn Fn() + Send + Sync>,
pub nack: Box<dyn Fn() + Send + Sync>,
}
/// Abstraction over any subscribe/consume backend.
/// Implement this for NATS, Kafka, Redis Streams, etc.
pub trait MessageSource: Send + Sync {
fn messages(&self) -> BoxStream<'_, Result<RawMessage, DomainError>>;
}
/// Deserializes raw transport messages into domain `EventEnvelope`s.
///
/// Converts: `RawMessage.payload` → `EventPayload` → `DomainEvent` → `EventEnvelope`
///
/// Invalid or unknown messages are skipped with a warning — the stream continues.
pub struct EventConsumerAdapter<S: MessageSource> {
source: S,
}
impl<S: MessageSource> EventConsumerAdapter<S> {
pub fn new(source: S) -> Self { Self { source } }
}
impl<S: MessageSource> EventConsumer for EventConsumerAdapter<S> {
fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> {
use futures::StreamExt;
let stream = self.source.messages();
Box::pin(stream.filter_map(|result| async move {
match result {
Err(e) => {
tracing::warn!("transport error: {e}");
None
}
Ok(msg) => {
let payload = match serde_json::from_slice::<EventPayload>(&msg.payload) {
Ok(p) => p,
Err(e) => {
tracing::warn!("failed to deserialize event payload: {e}");
return None;
}
};
let event = match DomainEvent::try_from(payload) {
Ok(e) => e,
Err(e) => {
tracing::warn!("unknown event type: {e}");
return None;
}
};
Some(Ok(EventEnvelope {
event,
ack: msg.ack,
nack: msg.nack,
}))
}
}
}))
}
}
Note: the existing imports at the top of lib.rs already have use domain::... — add EventEnvelope and EventConsumer to those imports. Also add futures::stream::BoxStream if not already present.
Also add futures = { workspace = true } to event-transport/Cargo.toml dependencies (needed for BoxStream and StreamExt).
-
Run:
cargo test -p event-transport— Expected: 4 tests pass (2 existing + 2 new). -
Commit:
git add crates/adapters/event-transport/
git commit -m "feat(event-transport): MessageSource trait + EventConsumerAdapter for transport-agnostic consuming"
Task 3: nats — add NatsMessageSource, remove NatsEventConsumer
Files:
-
Modify:
crates/adapters/nats/src/lib.rs -
Rewrite
crates/adapters/nats/src/lib.rs— removeNatsEventConsumer, addNatsMessageSource:
use async_trait::async_trait;
use domain::errors::DomainError;
use event_payload::EventPayload;
use event_transport::{MessageSource, RawMessage, Transport};
use futures::stream::BoxStream;
// ── NatsTransport — raw NATS publish backend ────────────────────────────────
pub struct NatsTransport {
client: async_nats::Client,
}
impl NatsTransport {
pub fn new(client: async_nats::Client) -> Self { Self { client } }
}
#[async_trait]
impl Transport for NatsTransport {
async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), DomainError> {
self.client
.publish(subject, bytes.to_vec().into())
.await
.map_err(|e| DomainError::Internal(e.to_string()))
}
}
// ── NatsMessageSource — raw NATS subscribe backend ──────────────────────────
pub struct NatsMessageSource {
client: async_nats::Client,
}
impl NatsMessageSource {
pub fn new(client: async_nats::Client) -> Self { Self { client } }
}
impl MessageSource for NatsMessageSource {
fn messages(&self) -> BoxStream<'_, Result<RawMessage, DomainError>> {
let client = self.client.clone();
Box::pin(async_stream::try_stream! {
let mut sub = client
.subscribe(">")
.await
.map_err(|e| DomainError::Internal(e.to_string()))?;
use futures::StreamExt;
while let Some(msg) = sub.next().await {
let subject = msg.subject.to_string();
let payload = msg.payload.to_vec();
// Basic NATS: at-most-once delivery — ack/nack are no-ops.
// Replace with JetStream for at-least-once delivery.
yield RawMessage {
subject,
payload,
ack: Box::new(|| {}),
nack: Box::new(|| {}),
};
}
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use domain::{events::DomainEvent, value_objects::{LikeId, ThoughtId, UserId}};
#[test]
fn payload_from_domain_event_has_correct_subject() {
let event = DomainEvent::ThoughtCreated {
thought_id: ThoughtId::new(),
user_id: UserId::new(),
in_reply_to_id: None,
};
let payload = EventPayload::from(&event);
assert_eq!(payload.subject(), "thoughts.created");
}
#[test]
fn domain_event_roundtrip_via_payload() {
let uid = UserId::new();
let tid = ThoughtId::new();
let event = DomainEvent::LikeAdded {
like_id: LikeId::new(),
user_id: uid.clone(),
thought_id: tid.clone(),
};
let payload = EventPayload::from(&event);
let back = DomainEvent::try_from(payload).unwrap();
if let DomainEvent::LikeAdded { user_id, thought_id, .. } = back {
assert_eq!(user_id, uid);
assert_eq!(thought_id, tid);
} else {
panic!("wrong variant");
}
}
}
-
Run:
cargo test -p nats— Expected: 2 tests pass. -
Run:
cargo check --workspace— Expected: one error inworker(uses removedNatsEventConsumer). That's expected — fixed in Task 4. -
Commit:
git add crates/adapters/nats/src/lib.rs
git commit -m "refactor(nats): replace NatsEventConsumer with NatsMessageSource implementing MessageSource"
Task 4: Update worker + full verification
Files:
-
Modify:
crates/worker/Cargo.toml -
Modify:
crates/worker/src/main.rs -
Add
event-transport = { workspace = true }tocrates/worker/Cargo.toml. -
Update
crates/worker/src/main.rs— find and update the consumer creation.
Current code in main.rs:
let consumer = nats::NatsEventConsumer::new(nats_client);
Replace with:
use event_transport::EventConsumerAdapter;
use nats::NatsMessageSource;
let consumer = EventConsumerAdapter::new(NatsMessageSource::new(nats_client));
Also add the use statements at the top of main.rs alongside existing imports.
-
Run:
cargo check --workspace— Expected: no errors. -
Run full test suite:
DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres cargo test --workspace 2>&1 | tail -3
Expected: all tests pass (79 existing + 2 new event-transport consumer tests = 81+).
- Commit:
git add crates/worker/
git commit -m "feat(worker): use EventConsumerAdapter<NatsMessageSource> — transport-agnostic consuming"
Self-Review
Spec coverage:
- ✅
event-publisherrenamed toevent-transporteverywhere (Task 1) - ✅
RawMessage { subject, payload, ack, nack }inevent-transport(Task 2) - ✅
MessageSourcetrait withmessages() -> BoxStream<RawMessage>(Task 2) - ✅
EventConsumerAdapter<S: MessageSource>implementingEventConsumer(Task 2) - ✅ Invalid messages skipped with warning, stream continues (Task 2)
- ✅ 2 new tests: valid deserialization + invalid JSON skip (Task 2)
- ✅
NatsEventConsumerremoved from nats (Task 3) - ✅
NatsMessageSourceimplementingMessageSourceadded to nats (Task 3) - ✅ Worker uses
EventConsumerAdapter::new(NatsMessageSource::new(client))(Task 4)
Adding Kafka later:
# kafka/Cargo.toml: event-transport = { workspace = true }
// kafka/src/lib.rs
pub struct KafkaMessageSource { ... }
impl MessageSource for KafkaMessageSource { ... } // yields RawMessage + real ack/nack
pub struct KafkaTransport { ... }
impl Transport for KafkaTransport { ... }
// bootstrap/src/factory.rs — two lines change:
EventPublisherAdapter::new(KafkaTransport::new(...))
EventConsumerAdapter::new(KafkaMessageSource::new(...))
Type consistency:
EventConsumerAdapter<NatsMessageSource>—NatsMessageSourceimplementsMessageSource, adapter implementsEventConsumer✓RawMessage.ack/.nacktransferred toEventEnvelope.ack/.nackin consumer adapter ✓event_transport::(underscore) is the Rust module name forevent-transport(dash) crate ✓