# event-transport Rename + Consumer Abstraction Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Rename `event-publisher` → `event-transport` and add the symmetric consumer abstraction (`MessageSource` trait + `EventConsumerAdapter`) so both publish and subscribe are transport-agnostic. **Architecture after this plan:** ``` event-transport/ ← Transport + EventPublisherAdapter (existing) ← MessageSource + EventConsumerAdapter (new) ← RawMessage { subject, payload, ack, nack } (new) nats/ ← NatsTransport (existing, implements Transport) ← NatsMessageSource (new, implements MessageSource) ← NatsEventConsumer removed worker/ ← EventConsumerAdapter::new(NatsMessageSource::new(client)) ``` **Dependency chain:** ``` event-transport → domain, event-payload, serde_json, async-trait nats → domain, event-payload, event-transport, async-nats worker → domain, nats, event-transport, postgres ``` --- ## File Map ``` Rename: crates/adapters/event-publisher/ → crates/adapters/event-transport/ Modify: Cargo.toml (root) ← update member path + workspace dep name Modify: crates/adapters/event-transport/Cargo.toml ← name = "event-transport" Modify: crates/adapters/nats/Cargo.toml ← event-publisher → event-transport Modify: crates/adapters/nats/src/lib.rs ← use event_transport; add NatsMessageSource; remove NatsEventConsumer Modify: crates/bootstrap/Cargo.toml ← event-publisher → event-transport Modify: crates/bootstrap/src/factory.rs ← use event_transport; update EventConsumerAdapter wiring Modify: crates/worker/Cargo.toml ← add event-transport dep Modify: crates/worker/src/main.rs ← EventConsumerAdapter Modify: crates/adapters/event-transport/src/lib.rs ← add RawMessage + MessageSource + EventConsumerAdapter ``` --- ### Task 1: Rename crate + update all references **Files:** root `Cargo.toml`, `event-publisher/Cargo.toml` (renamed), `nats/Cargo.toml`, `bootstrap/Cargo.toml`, `nats/src/lib.rs`, `bootstrap/src/factory.rs` - [ ] **Rename the directory:** ```bash git mv crates/adapters/event-publisher crates/adapters/event-transport ``` - [ ] **Update `crates/adapters/event-transport/Cargo.toml`** — change the package name: ```toml [package] name = "event-transport" version = "0.1.0" edition = "2021" [dependencies] domain = { workspace = true } event-payload = { workspace = true } serde_json = { workspace = true } async-trait = { workspace = true } tracing = { workspace = true } [dev-dependencies] tokio = { workspace = true, features = ["full"] } ``` - [ ] **Update root `Cargo.toml`:** In `[workspace] members`, change: ```toml "crates/adapters/event-publisher", ``` to: ```toml "crates/adapters/event-transport", ``` In `[workspace.dependencies]`, change: ```toml event-publisher = { path = "crates/adapters/event-publisher" } ``` to: ```toml event-transport = { path = "crates/adapters/event-transport" } ``` - [ ] **Update `crates/adapters/nats/Cargo.toml`:** Change `event-publisher = { workspace = true }` to `event-transport = { workspace = true }`. - [ ] **Update `crates/adapters/nats/src/lib.rs`:** Change `use event_publisher::Transport;` to `use event_transport::Transport;`. - [ ] **Update `crates/bootstrap/Cargo.toml`:** Change `event-publisher = { workspace = true }` to `event-transport = { workspace = true }`. - [ ] **Update `crates/bootstrap/src/factory.rs`:** Change `use event_publisher::EventPublisherAdapter;` to `use event_transport::EventPublisherAdapter;`. - [ ] **Run:** `cargo check --workspace` — Expected: no errors. - [ ] **Run:** `cargo test -p event-transport` — Expected: 2 tests pass (same tests as before, just crate renamed). - [ ] **Commit:** ```bash git add Cargo.toml \ crates/adapters/event-transport/ \ crates/adapters/nats/Cargo.toml \ crates/adapters/nats/src/lib.rs \ crates/bootstrap/Cargo.toml \ crates/bootstrap/src/factory.rs git commit -m "refactor: rename event-publisher → event-transport" ``` --- ### Task 2: Add MessageSource + EventConsumerAdapter to event-transport **Files:** - Modify: `crates/adapters/event-transport/src/lib.rs` - [ ] **Write failing tests** — append to the test module in `src/lib.rs`: ```rust #[tokio::test] async fn consumer_adapter_deserializes_and_yields_event() { use domain::value_objects::ThoughtId; use futures::StreamExt; // Produce a serialized EventPayload for ThoughtCreated let event = DomainEvent::ThoughtCreated { thought_id: ThoughtId::new(), user_id: UserId::new(), in_reply_to_id: None, }; let payload = EventPayload::from(&event); let bytes = serde_json::to_vec(&payload).unwrap(); // A MessageSource that yields one message then ends struct OneMessageSource { bytes: Vec } #[async_trait] impl MessageSource for OneMessageSource { fn messages(&self) -> futures::stream::BoxStream<'_, Result> { let msg = RawMessage { subject: "thoughts.created".to_string(), payload: self.bytes.clone(), ack: Box::new(|| {}), nack: Box::new(|| {}), }; Box::pin(futures::stream::once(async { Ok(msg) })) } } let adapter = EventConsumerAdapter::new(OneMessageSource { bytes }); let mut stream = adapter.consume(); let envelope = stream.next().await.unwrap().unwrap(); assert!(matches!(envelope.event, DomainEvent::ThoughtCreated { .. })); } #[tokio::test] async fn consumer_adapter_skips_invalid_payloads() { use futures::StreamExt; struct BadMessageSource; #[async_trait] impl MessageSource for BadMessageSource { fn messages(&self) -> futures::stream::BoxStream<'_, Result> { let msg = RawMessage { subject: "bad".to_string(), payload: b"not valid json".to_vec(), ack: Box::new(|| {}), nack: Box::new(|| {}), }; Box::pin(futures::stream::once(async { Ok(msg) })) } } let adapter = EventConsumerAdapter::new(BadMessageSource); let mut stream = adapter.consume(); // Invalid JSON should be skipped — stream ends with no items assert!(stream.next().await.is_none()); } ``` - [ ] **Run:** `cargo test -p event-transport` — Expected: FAIL (MessageSource, RawMessage, EventConsumerAdapter not defined). - [ ] **Add to `crates/adapters/event-transport/src/lib.rs`** — append after the existing `EventPublisherAdapter` impl and before `#[cfg(test)]`: ```rust use domain::{events::EventEnvelope, ports::EventConsumer}; use futures::stream::BoxStream; /// A raw inbound message from a transport backend. /// `ack` and `nack` are transport-level acknowledgements (e.g. Kafka offset commit). /// For at-most-once transports (basic NATS), both are no-ops. pub struct RawMessage { pub subject: String, pub payload: Vec, pub ack: Box, pub nack: Box, } /// Abstraction over any subscribe/consume backend. /// Implement this for NATS, Kafka, Redis Streams, etc. pub trait MessageSource: Send + Sync { fn messages(&self) -> BoxStream<'_, Result>; } /// Deserializes raw transport messages into domain `EventEnvelope`s. /// /// Converts: `RawMessage.payload` → `EventPayload` → `DomainEvent` → `EventEnvelope` /// /// Invalid or unknown messages are skipped with a warning — the stream continues. pub struct EventConsumerAdapter { source: S, } impl EventConsumerAdapter { pub fn new(source: S) -> Self { Self { source } } } impl EventConsumer for EventConsumerAdapter { fn consume(&self) -> BoxStream<'_, Result> { use futures::StreamExt; let stream = self.source.messages(); Box::pin(stream.filter_map(|result| async move { match result { Err(e) => { tracing::warn!("transport error: {e}"); None } Ok(msg) => { let payload = match serde_json::from_slice::(&msg.payload) { Ok(p) => p, Err(e) => { tracing::warn!("failed to deserialize event payload: {e}"); return None; } }; let event = match DomainEvent::try_from(payload) { Ok(e) => e, Err(e) => { tracing::warn!("unknown event type: {e}"); return None; } }; Some(Ok(EventEnvelope { event, ack: msg.ack, nack: msg.nack, })) } } })) } } ``` Note: the existing imports at the top of `lib.rs` already have `use domain::...` — add `EventEnvelope` and `EventConsumer` to those imports. Also add `futures::stream::BoxStream` if not already present. Also add `futures = { workspace = true }` to `event-transport/Cargo.toml` dependencies (needed for `BoxStream` and `StreamExt`). - [ ] **Run:** `cargo test -p event-transport` — Expected: 4 tests pass (2 existing + 2 new). - [ ] **Commit:** ```bash git add crates/adapters/event-transport/ git commit -m "feat(event-transport): MessageSource trait + EventConsumerAdapter for transport-agnostic consuming" ``` --- ### Task 3: nats — add NatsMessageSource, remove NatsEventConsumer **Files:** - Modify: `crates/adapters/nats/src/lib.rs` - [ ] **Rewrite `crates/adapters/nats/src/lib.rs`** — remove `NatsEventConsumer`, add `NatsMessageSource`: ```rust use async_trait::async_trait; use domain::errors::DomainError; use event_payload::EventPayload; use event_transport::{MessageSource, RawMessage, Transport}; use futures::stream::BoxStream; // ── NatsTransport — raw NATS publish backend ──────────────────────────────── pub struct NatsTransport { client: async_nats::Client, } impl NatsTransport { pub fn new(client: async_nats::Client) -> Self { Self { client } } } #[async_trait] impl Transport for NatsTransport { async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), DomainError> { self.client .publish(subject, bytes.to_vec().into()) .await .map_err(|e| DomainError::Internal(e.to_string())) } } // ── NatsMessageSource — raw NATS subscribe backend ────────────────────────── pub struct NatsMessageSource { client: async_nats::Client, } impl NatsMessageSource { pub fn new(client: async_nats::Client) -> Self { Self { client } } } impl MessageSource for NatsMessageSource { fn messages(&self) -> BoxStream<'_, Result> { let client = self.client.clone(); Box::pin(async_stream::try_stream! { let mut sub = client .subscribe(">") .await .map_err(|e| DomainError::Internal(e.to_string()))?; use futures::StreamExt; while let Some(msg) = sub.next().await { let subject = msg.subject.to_string(); let payload = msg.payload.to_vec(); // Basic NATS: at-most-once delivery — ack/nack are no-ops. // Replace with JetStream for at-least-once delivery. yield RawMessage { subject, payload, ack: Box::new(|| {}), nack: Box::new(|| {}), }; } }) } } #[cfg(test)] mod tests { use super::*; use domain::{events::DomainEvent, value_objects::{LikeId, ThoughtId, UserId}}; #[test] fn payload_from_domain_event_has_correct_subject() { let event = DomainEvent::ThoughtCreated { thought_id: ThoughtId::new(), user_id: UserId::new(), in_reply_to_id: None, }; let payload = EventPayload::from(&event); assert_eq!(payload.subject(), "thoughts.created"); } #[test] fn domain_event_roundtrip_via_payload() { let uid = UserId::new(); let tid = ThoughtId::new(); let event = DomainEvent::LikeAdded { like_id: LikeId::new(), user_id: uid.clone(), thought_id: tid.clone(), }; let payload = EventPayload::from(&event); let back = DomainEvent::try_from(payload).unwrap(); if let DomainEvent::LikeAdded { user_id, thought_id, .. } = back { assert_eq!(user_id, uid); assert_eq!(thought_id, tid); } else { panic!("wrong variant"); } } } ``` - [ ] **Run:** `cargo test -p nats` — Expected: 2 tests pass. - [ ] **Run:** `cargo check --workspace` — Expected: one error in `worker` (uses removed `NatsEventConsumer`). That's expected — fixed in Task 4. - [ ] **Commit:** ```bash git add crates/adapters/nats/src/lib.rs git commit -m "refactor(nats): replace NatsEventConsumer with NatsMessageSource implementing MessageSource" ``` --- ### Task 4: Update worker + full verification **Files:** - Modify: `crates/worker/Cargo.toml` - Modify: `crates/worker/src/main.rs` - [ ] **Add `event-transport = { workspace = true }` to `crates/worker/Cargo.toml`.** - [ ] **Update `crates/worker/src/main.rs`** — find and update the consumer creation. Current code in `main.rs`: ```rust let consumer = nats::NatsEventConsumer::new(nats_client); ``` Replace with: ```rust use event_transport::EventConsumerAdapter; use nats::NatsMessageSource; let consumer = EventConsumerAdapter::new(NatsMessageSource::new(nats_client)); ``` Also add the `use` statements at the top of `main.rs` alongside existing imports. - [ ] **Run:** `cargo check --workspace` — Expected: no errors. - [ ] **Run full test suite:** ```bash DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres cargo test --workspace 2>&1 | tail -3 ``` Expected: all tests pass (79 existing + 2 new event-transport consumer tests = 81+). - [ ] **Commit:** ```bash git add crates/worker/ git commit -m "feat(worker): use EventConsumerAdapter — transport-agnostic consuming" ``` --- ## Self-Review **Spec coverage:** - ✅ `event-publisher` renamed to `event-transport` everywhere (Task 1) - ✅ `RawMessage { subject, payload, ack, nack }` in `event-transport` (Task 2) - ✅ `MessageSource` trait with `messages() -> BoxStream` (Task 2) - ✅ `EventConsumerAdapter` implementing `EventConsumer` (Task 2) - ✅ Invalid messages skipped with warning, stream continues (Task 2) - ✅ 2 new tests: valid deserialization + invalid JSON skip (Task 2) - ✅ `NatsEventConsumer` removed from nats (Task 3) - ✅ `NatsMessageSource` implementing `MessageSource` added to nats (Task 3) - ✅ Worker uses `EventConsumerAdapter::new(NatsMessageSource::new(client))` (Task 4) **Adding Kafka later:** ```toml # kafka/Cargo.toml: event-transport = { workspace = true } ``` ```rust // kafka/src/lib.rs pub struct KafkaMessageSource { ... } impl MessageSource for KafkaMessageSource { ... } // yields RawMessage + real ack/nack pub struct KafkaTransport { ... } impl Transport for KafkaTransport { ... } ``` ```rust // bootstrap/src/factory.rs — two lines change: EventPublisherAdapter::new(KafkaTransport::new(...)) EventConsumerAdapter::new(KafkaMessageSource::new(...)) ``` **Type consistency:** - `EventConsumerAdapter` — `NatsMessageSource` implements `MessageSource`, adapter implements `EventConsumer` ✓ - `RawMessage.ack` / `.nack` transferred to `EventEnvelope.ack` / `.nack` in consumer adapter ✓ - `event_transport::` (underscore) is the Rust module name for `event-transport` (dash) crate ✓