From 79632781890c14d74b59e69d4729a61b80b67efe Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Thu, 14 May 2026 12:29:28 +0200 Subject: [PATCH] docs: event-transport rename + consumer abstraction plan --- .../2026-05-14-event-transport-rename.md | 483 ++++++++++++++++++ 1 file changed, 483 insertions(+) create mode 100644 docs/superpowers/plans/2026-05-14-event-transport-rename.md diff --git a/docs/superpowers/plans/2026-05-14-event-transport-rename.md b/docs/superpowers/plans/2026-05-14-event-transport-rename.md new file mode 100644 index 0000000..620bb79 --- /dev/null +++ b/docs/superpowers/plans/2026-05-14-event-transport-rename.md @@ -0,0 +1,483 @@ +# event-transport Rename + Consumer Abstraction Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Rename `event-publisher` → `event-transport` and add the symmetric consumer abstraction (`MessageSource` trait + `EventConsumerAdapter`) so both publish and subscribe are transport-agnostic. + +**Architecture after this plan:** +``` +event-transport/ ← Transport + EventPublisherAdapter (existing) + ← MessageSource + EventConsumerAdapter (new) + ← RawMessage { subject, payload, ack, nack } (new) + +nats/ ← NatsTransport (existing, implements Transport) + ← NatsMessageSource (new, implements MessageSource) + ← NatsEventConsumer removed + +worker/ ← EventConsumerAdapter::new(NatsMessageSource::new(client)) +``` + +**Dependency chain:** +``` +event-transport → domain, event-payload, serde_json, async-trait +nats → domain, event-payload, event-transport, async-nats +worker → domain, nats, event-transport, postgres +``` + +--- + +## File Map + +``` +Rename: crates/adapters/event-publisher/ → crates/adapters/event-transport/ +Modify: Cargo.toml (root) ← update member path + workspace dep name +Modify: crates/adapters/event-transport/Cargo.toml ← name = "event-transport" +Modify: crates/adapters/nats/Cargo.toml ← event-publisher → event-transport +Modify: crates/adapters/nats/src/lib.rs ← use event_transport; add NatsMessageSource; remove NatsEventConsumer +Modify: crates/bootstrap/Cargo.toml ← event-publisher → event-transport +Modify: crates/bootstrap/src/factory.rs ← use event_transport; update EventConsumerAdapter wiring +Modify: crates/worker/Cargo.toml ← add event-transport dep +Modify: crates/worker/src/main.rs ← EventConsumerAdapter +Modify: crates/adapters/event-transport/src/lib.rs ← add RawMessage + MessageSource + EventConsumerAdapter +``` + +--- + +### Task 1: Rename crate + update all references + +**Files:** root `Cargo.toml`, `event-publisher/Cargo.toml` (renamed), `nats/Cargo.toml`, `bootstrap/Cargo.toml`, `nats/src/lib.rs`, `bootstrap/src/factory.rs` + +- [ ] **Rename the directory:** + +```bash +git mv crates/adapters/event-publisher crates/adapters/event-transport +``` + +- [ ] **Update `crates/adapters/event-transport/Cargo.toml`** — change the package name: + +```toml +[package] +name = "event-transport" +version = "0.1.0" +edition = "2021" + +[dependencies] +domain = { workspace = true } +event-payload = { workspace = true } +serde_json = { workspace = true } +async-trait = { workspace = true } +tracing = { workspace = true } + +[dev-dependencies] +tokio = { workspace = true, features = ["full"] } +``` + +- [ ] **Update root `Cargo.toml`:** + +In `[workspace] members`, change: +```toml +"crates/adapters/event-publisher", +``` +to: +```toml +"crates/adapters/event-transport", +``` + +In `[workspace.dependencies]`, change: +```toml +event-publisher = { path = "crates/adapters/event-publisher" } +``` +to: +```toml +event-transport = { path = "crates/adapters/event-transport" } +``` + +- [ ] **Update `crates/adapters/nats/Cargo.toml`:** + +Change `event-publisher = { workspace = true }` to `event-transport = { workspace = true }`. + +- [ ] **Update `crates/adapters/nats/src/lib.rs`:** + +Change `use event_publisher::Transport;` to `use event_transport::Transport;`. + +- [ ] **Update `crates/bootstrap/Cargo.toml`:** + +Change `event-publisher = { workspace = true }` to `event-transport = { workspace = true }`. + +- [ ] **Update `crates/bootstrap/src/factory.rs`:** + +Change `use event_publisher::EventPublisherAdapter;` to `use event_transport::EventPublisherAdapter;`. + +- [ ] **Run:** `cargo check --workspace` — Expected: no errors. + +- [ ] **Run:** `cargo test -p event-transport` — Expected: 2 tests pass (same tests as before, just crate renamed). + +- [ ] **Commit:** + +```bash +git add Cargo.toml \ + crates/adapters/event-transport/ \ + crates/adapters/nats/Cargo.toml \ + crates/adapters/nats/src/lib.rs \ + crates/bootstrap/Cargo.toml \ + crates/bootstrap/src/factory.rs +git commit -m "refactor: rename event-publisher → event-transport" +``` + +--- + +### Task 2: Add MessageSource + EventConsumerAdapter to event-transport + +**Files:** +- Modify: `crates/adapters/event-transport/src/lib.rs` + +- [ ] **Write failing tests** — append to the test module in `src/lib.rs`: + +```rust + #[tokio::test] + async fn consumer_adapter_deserializes_and_yields_event() { + use domain::value_objects::ThoughtId; + use futures::StreamExt; + + // Produce a serialized EventPayload for ThoughtCreated + let event = DomainEvent::ThoughtCreated { + thought_id: ThoughtId::new(), + user_id: UserId::new(), + in_reply_to_id: None, + }; + let payload = EventPayload::from(&event); + let bytes = serde_json::to_vec(&payload).unwrap(); + + // A MessageSource that yields one message then ends + struct OneMessageSource { bytes: Vec } + #[async_trait] + impl MessageSource for OneMessageSource { + fn messages(&self) -> futures::stream::BoxStream<'_, Result> { + let msg = RawMessage { + subject: "thoughts.created".to_string(), + payload: self.bytes.clone(), + ack: Box::new(|| {}), + nack: Box::new(|| {}), + }; + Box::pin(futures::stream::once(async { Ok(msg) })) + } + } + + let adapter = EventConsumerAdapter::new(OneMessageSource { bytes }); + let mut stream = adapter.consume(); + let envelope = stream.next().await.unwrap().unwrap(); + assert!(matches!(envelope.event, DomainEvent::ThoughtCreated { .. })); + } + + #[tokio::test] + async fn consumer_adapter_skips_invalid_payloads() { + use futures::StreamExt; + + struct BadMessageSource; + #[async_trait] + impl MessageSource for BadMessageSource { + fn messages(&self) -> futures::stream::BoxStream<'_, Result> { + let msg = RawMessage { + subject: "bad".to_string(), + payload: b"not valid json".to_vec(), + ack: Box::new(|| {}), + nack: Box::new(|| {}), + }; + Box::pin(futures::stream::once(async { Ok(msg) })) + } + } + + let adapter = EventConsumerAdapter::new(BadMessageSource); + let mut stream = adapter.consume(); + // Invalid JSON should be skipped — stream ends with no items + assert!(stream.next().await.is_none()); + } +``` + +- [ ] **Run:** `cargo test -p event-transport` — Expected: FAIL (MessageSource, RawMessage, EventConsumerAdapter not defined). + +- [ ] **Add to `crates/adapters/event-transport/src/lib.rs`** — append after the existing `EventPublisherAdapter` impl and before `#[cfg(test)]`: + +```rust +use domain::{events::EventEnvelope, ports::EventConsumer}; +use futures::stream::BoxStream; + +/// A raw inbound message from a transport backend. +/// `ack` and `nack` are transport-level acknowledgements (e.g. Kafka offset commit). +/// For at-most-once transports (basic NATS), both are no-ops. +pub struct RawMessage { + pub subject: String, + pub payload: Vec, + pub ack: Box, + pub nack: Box, +} + +/// Abstraction over any subscribe/consume backend. +/// Implement this for NATS, Kafka, Redis Streams, etc. +pub trait MessageSource: Send + Sync { + fn messages(&self) -> BoxStream<'_, Result>; +} + +/// Deserializes raw transport messages into domain `EventEnvelope`s. +/// +/// Converts: `RawMessage.payload` → `EventPayload` → `DomainEvent` → `EventEnvelope` +/// +/// Invalid or unknown messages are skipped with a warning — the stream continues. +pub struct EventConsumerAdapter { + source: S, +} + +impl EventConsumerAdapter { + pub fn new(source: S) -> Self { Self { source } } +} + +impl EventConsumer for EventConsumerAdapter { + fn consume(&self) -> BoxStream<'_, Result> { + use futures::StreamExt; + let stream = self.source.messages(); + Box::pin(stream.filter_map(|result| async move { + match result { + Err(e) => { + tracing::warn!("transport error: {e}"); + None + } + Ok(msg) => { + let payload = match serde_json::from_slice::(&msg.payload) { + Ok(p) => p, + Err(e) => { + tracing::warn!("failed to deserialize event payload: {e}"); + return None; + } + }; + let event = match DomainEvent::try_from(payload) { + Ok(e) => e, + Err(e) => { + tracing::warn!("unknown event type: {e}"); + return None; + } + }; + Some(Ok(EventEnvelope { + event, + ack: msg.ack, + nack: msg.nack, + })) + } + } + })) + } +} +``` + +Note: the existing imports at the top of `lib.rs` already have `use domain::...` — add `EventEnvelope` and `EventConsumer` to those imports. Also add `futures::stream::BoxStream` if not already present. + +Also add `futures = { workspace = true }` to `event-transport/Cargo.toml` dependencies (needed for `BoxStream` and `StreamExt`). + +- [ ] **Run:** `cargo test -p event-transport` — Expected: 4 tests pass (2 existing + 2 new). + +- [ ] **Commit:** + +```bash +git add crates/adapters/event-transport/ +git commit -m "feat(event-transport): MessageSource trait + EventConsumerAdapter for transport-agnostic consuming" +``` + +--- + +### Task 3: nats — add NatsMessageSource, remove NatsEventConsumer + +**Files:** +- Modify: `crates/adapters/nats/src/lib.rs` + +- [ ] **Rewrite `crates/adapters/nats/src/lib.rs`** — remove `NatsEventConsumer`, add `NatsMessageSource`: + +```rust +use async_trait::async_trait; +use domain::errors::DomainError; +use event_payload::EventPayload; +use event_transport::{MessageSource, RawMessage, Transport}; +use futures::stream::BoxStream; + +// ── NatsTransport — raw NATS publish backend ──────────────────────────────── + +pub struct NatsTransport { + client: async_nats::Client, +} + +impl NatsTransport { + pub fn new(client: async_nats::Client) -> Self { Self { client } } +} + +#[async_trait] +impl Transport for NatsTransport { + async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), DomainError> { + self.client + .publish(subject, bytes.to_vec().into()) + .await + .map_err(|e| DomainError::Internal(e.to_string())) + } +} + +// ── NatsMessageSource — raw NATS subscribe backend ────────────────────────── + +pub struct NatsMessageSource { + client: async_nats::Client, +} + +impl NatsMessageSource { + pub fn new(client: async_nats::Client) -> Self { Self { client } } +} + +impl MessageSource for NatsMessageSource { + fn messages(&self) -> BoxStream<'_, Result> { + let client = self.client.clone(); + Box::pin(async_stream::try_stream! { + let mut sub = client + .subscribe(">") + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + use futures::StreamExt; + while let Some(msg) = sub.next().await { + let subject = msg.subject.to_string(); + let payload = msg.payload.to_vec(); + // Basic NATS: at-most-once delivery — ack/nack are no-ops. + // Replace with JetStream for at-least-once delivery. + yield RawMessage { + subject, + payload, + ack: Box::new(|| {}), + nack: Box::new(|| {}), + }; + } + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use domain::{events::DomainEvent, value_objects::{LikeId, ThoughtId, UserId}}; + + #[test] + fn payload_from_domain_event_has_correct_subject() { + let event = DomainEvent::ThoughtCreated { + thought_id: ThoughtId::new(), + user_id: UserId::new(), + in_reply_to_id: None, + }; + let payload = EventPayload::from(&event); + assert_eq!(payload.subject(), "thoughts.created"); + } + + #[test] + fn domain_event_roundtrip_via_payload() { + let uid = UserId::new(); + let tid = ThoughtId::new(); + let event = DomainEvent::LikeAdded { + like_id: LikeId::new(), + user_id: uid.clone(), + thought_id: tid.clone(), + }; + let payload = EventPayload::from(&event); + let back = DomainEvent::try_from(payload).unwrap(); + if let DomainEvent::LikeAdded { user_id, thought_id, .. } = back { + assert_eq!(user_id, uid); + assert_eq!(thought_id, tid); + } else { + panic!("wrong variant"); + } + } +} +``` + +- [ ] **Run:** `cargo test -p nats` — Expected: 2 tests pass. + +- [ ] **Run:** `cargo check --workspace` — Expected: one error in `worker` (uses removed `NatsEventConsumer`). That's expected — fixed in Task 4. + +- [ ] **Commit:** + +```bash +git add crates/adapters/nats/src/lib.rs +git commit -m "refactor(nats): replace NatsEventConsumer with NatsMessageSource implementing MessageSource" +``` + +--- + +### Task 4: Update worker + full verification + +**Files:** +- Modify: `crates/worker/Cargo.toml` +- Modify: `crates/worker/src/main.rs` + +- [ ] **Add `event-transport = { workspace = true }` to `crates/worker/Cargo.toml`.** + +- [ ] **Update `crates/worker/src/main.rs`** — find and update the consumer creation. + +Current code in `main.rs`: +```rust +let consumer = nats::NatsEventConsumer::new(nats_client); +``` + +Replace with: +```rust +use event_transport::EventConsumerAdapter; +use nats::NatsMessageSource; +let consumer = EventConsumerAdapter::new(NatsMessageSource::new(nats_client)); +``` + +Also add the `use` statements at the top of `main.rs` alongside existing imports. + +- [ ] **Run:** `cargo check --workspace` — Expected: no errors. + +- [ ] **Run full test suite:** + +```bash +DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres cargo test --workspace 2>&1 | tail -3 +``` + +Expected: all tests pass (79 existing + 2 new event-transport consumer tests = 81+). + +- [ ] **Commit:** + +```bash +git add crates/worker/ +git commit -m "feat(worker): use EventConsumerAdapter — transport-agnostic consuming" +``` + +--- + +## Self-Review + +**Spec coverage:** +- ✅ `event-publisher` renamed to `event-transport` everywhere (Task 1) +- ✅ `RawMessage { subject, payload, ack, nack }` in `event-transport` (Task 2) +- ✅ `MessageSource` trait with `messages() -> BoxStream` (Task 2) +- ✅ `EventConsumerAdapter` implementing `EventConsumer` (Task 2) +- ✅ Invalid messages skipped with warning, stream continues (Task 2) +- ✅ 2 new tests: valid deserialization + invalid JSON skip (Task 2) +- ✅ `NatsEventConsumer` removed from nats (Task 3) +- ✅ `NatsMessageSource` implementing `MessageSource` added to nats (Task 3) +- ✅ Worker uses `EventConsumerAdapter::new(NatsMessageSource::new(client))` (Task 4) + +**Adding Kafka later:** +```toml +# kafka/Cargo.toml: event-transport = { workspace = true } +``` +```rust +// kafka/src/lib.rs +pub struct KafkaMessageSource { ... } +impl MessageSource for KafkaMessageSource { ... } // yields RawMessage + real ack/nack + +pub struct KafkaTransport { ... } +impl Transport for KafkaTransport { ... } +``` +```rust +// bootstrap/src/factory.rs — two lines change: +EventPublisherAdapter::new(KafkaTransport::new(...)) +EventConsumerAdapter::new(KafkaMessageSource::new(...)) +``` + +**Type consistency:** +- `EventConsumerAdapter` — `NatsMessageSource` implements `MessageSource`, adapter implements `EventConsumer` ✓ +- `RawMessage.ack` / `.nack` transferred to `EventEnvelope.ack` / `.nack` in consumer adapter ✓ +- `event_transport::` (underscore) is the Rust module name for `event-transport` (dash) crate ✓