From 42d3dbd251b150e0d38cb0eeb9aa8a8a30bd7714 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Thu, 14 May 2026 12:18:41 +0200 Subject: [PATCH] docs: event-publisher transport abstraction plan --- .../2026-05-14-event-publisher-refactor.md | 408 ++++++++++++++++++ 1 file changed, 408 insertions(+) create mode 100644 docs/superpowers/plans/2026-05-14-event-publisher-refactor.md diff --git a/docs/superpowers/plans/2026-05-14-event-publisher-refactor.md b/docs/superpowers/plans/2026-05-14-event-publisher-refactor.md new file mode 100644 index 0000000..da8ec28 --- /dev/null +++ b/docs/superpowers/plans/2026-05-14-event-publisher-refactor.md @@ -0,0 +1,408 @@ +# event-publisher Transport Abstraction Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Fill `event-publisher` with a `Transport` trait + `EventPublisherAdapter`, strip `NatsEventPublisher` from the `nats` crate and replace it with `NatsTransport` implementing `Transport`, then wire `EventPublisherAdapter::new(NatsTransport::new(client))` in bootstrap — so adding Kafka/Redis later only requires a new transport crate. + +**Architecture:** `event-publisher` defines the abstraction (`Transport` + `EventPublisherAdapter`). `nats` implements `Transport` for NATS (pure bytes: publish/subscribe). `event-publisher` never imports `nats`. `bootstrap` wires them together. `NatsEventConsumer` stays in `nats` — it's transport-specific and will never be shared. + +**Dependency chain after refactor:** +``` +event-publisher → domain, event-payload, serde_json +nats → domain, event-payload, event-publisher, async-nats +bootstrap → event-publisher, nats (+ all others) +``` + +--- + +## File Map + +``` +Modify: crates/adapters/event-publisher/Cargo.toml ← add deps +Modify: crates/adapters/event-publisher/src/lib.rs ← Transport trait + EventPublisherAdapter +Modify: crates/adapters/nats/Cargo.toml ← add event-publisher dep +Modify: crates/adapters/nats/src/lib.rs ← remove NatsEventPublisher, add NatsTransport +Modify: crates/bootstrap/src/factory.rs ← use EventPublisherAdapter +Modify: crates/bootstrap/Cargo.toml ← add event-publisher dep (if missing) +``` + +--- + +### Task 1: Fill event-publisher — Transport trait + EventPublisherAdapter + +**Files:** +- Modify: `crates/adapters/event-publisher/Cargo.toml` +- Modify: `crates/adapters/event-publisher/src/lib.rs` + +- [ ] **Write tests** at the bottom of `crates/adapters/event-publisher/src/lib.rs`: + +```rust +#[cfg(test)] +mod tests { + use super::*; + use async_trait::async_trait; + use std::sync::{Arc, Mutex}; + use domain::value_objects::{ThoughtId, UserId}; + + struct SpyTransport { + calls: Arc)>>>, + } + impl SpyTransport { + fn new() -> (Self, Arc)>>>) { + let calls = Arc::new(Mutex::new(vec![])); + (Self { calls: calls.clone() }, calls) + } + } + #[async_trait] + impl Transport for SpyTransport { + async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), domain::errors::DomainError> { + self.calls.lock().unwrap().push((subject.to_string(), bytes.to_vec())); + Ok(()) + } + } + + #[tokio::test] + async fn thought_created_routes_to_correct_subject() { + let (spy, calls) = SpyTransport::new(); + let publisher = EventPublisherAdapter::new(spy); + publisher.publish(&domain::events::DomainEvent::ThoughtCreated { + thought_id: ThoughtId::new(), + user_id: UserId::new(), + in_reply_to_id: None, + }).await.unwrap(); + let calls = calls.lock().unwrap(); + assert_eq!(calls.len(), 1); + assert_eq!(calls[0].0, "thoughts.created"); + } + + #[tokio::test] + async fn serialized_payload_is_valid_json() { + let (spy, calls) = SpyTransport::new(); + let publisher = EventPublisherAdapter::new(spy); + publisher.publish(&domain::events::DomainEvent::UserBlocked { + blocker_id: UserId::new(), + blocked_id: UserId::new(), + }).await.unwrap(); + let bytes = &calls.lock().unwrap()[0].1.clone(); + let json: serde_json::Value = serde_json::from_slice(bytes).expect("valid JSON"); + assert_eq!(json["type"], "UserBlocked"); + } +} +``` + +- [ ] **Run:** `cargo test -p event-publisher` — Expected: FAIL (no implementation yet). + +- [ ] **Write `crates/adapters/event-publisher/Cargo.toml`:** + +```toml +[package] +name = "event-publisher" +version = "0.1.0" +edition = "2021" + +[dependencies] +domain = { workspace = true } +event-payload = { workspace = true } +serde_json = { workspace = true } +async-trait = { workspace = true } +tracing = { workspace = true } + +[dev-dependencies] +tokio = { workspace = true, features = ["full"] } +``` + +- [ ] **Write `crates/adapters/event-publisher/src/lib.rs`:** + +```rust +use async_trait::async_trait; +use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher}; +use event_payload::EventPayload; + +/// Abstraction over any pub/sub transport backend. +/// Implement this for NATS, Kafka, Redis Streams, etc. +/// The adapter calls `publish_bytes(subject, bytes)` — subjects come from `EventPayload::subject()`. +#[async_trait] +pub trait Transport: Send + Sync { + async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), DomainError>; +} + +/// Routes domain events to a transport backend. +/// +/// Converts: `DomainEvent` → `EventPayload` (via `From`) → JSON bytes → `transport.publish_bytes(subject, bytes)` +/// +/// To swap transports (e.g. NATS → Kafka), replace the `T` at the composition root. +/// This type never needs to change. +pub struct EventPublisherAdapter { + transport: T, +} + +impl EventPublisherAdapter { + pub fn new(transport: T) -> Self { + Self { transport } + } +} + +#[async_trait] +impl EventPublisher for EventPublisherAdapter { + async fn publish(&self, event: &DomainEvent) -> Result<(), DomainError> { + let payload = EventPayload::from(event); + let subject = payload.subject(); + let bytes = serde_json::to_vec(&payload) + .map_err(|e| DomainError::Internal(e.to_string()))?; + tracing::debug!(subject, "publishing event"); + self.transport.publish_bytes(subject, &bytes).await + } +} +``` + +- [ ] **Run:** `cargo test -p event-publisher` — Expected: 2 tests pass. + +- [ ] **Commit:** + +```bash +git add crates/adapters/event-publisher/ +git commit -m "feat(event-publisher): Transport trait + EventPublisherAdapter for transport-agnostic event routing" +``` + +--- + +### Task 2: Refactor nats — strip NatsEventPublisher, add NatsTransport + +**Files:** +- Modify: `crates/adapters/nats/Cargo.toml` +- Modify: `crates/adapters/nats/src/lib.rs` + +- [ ] **Add `event-publisher` to `crates/adapters/nats/Cargo.toml`:** + +```toml +event-publisher = { workspace = true } +``` + +- [ ] **Rewrite `crates/adapters/nats/src/lib.rs`** — remove `NatsEventPublisher`, add `NatsTransport`: + +```rust +use async_trait::async_trait; +use domain::{ + errors::DomainError, + events::{DomainEvent, EventEnvelope}, + ports::EventConsumer, +}; +use event_payload::EventPayload; +use event_publisher::Transport; +use futures::stream::BoxStream; + +// ── NatsTransport — raw NATS publish backend ──────────────────────────────── + +pub struct NatsTransport { + client: async_nats::Client, +} + +impl NatsTransport { + pub fn new(client: async_nats::Client) -> Self { Self { client } } +} + +#[async_trait] +impl Transport for NatsTransport { + async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), DomainError> { + self.client + .publish(subject, bytes.to_vec().into()) + .await + .map_err(|e| DomainError::Internal(e.to_string())) + } +} + +// ── NatsEventConsumer — subscribes and yields EventEnvelopes ──────────────── + +pub struct NatsEventConsumer { + client: async_nats::Client, +} + +impl NatsEventConsumer { + pub fn new(client: async_nats::Client) -> Self { Self { client } } +} + +impl EventConsumer for NatsEventConsumer { + fn consume(&self) -> BoxStream<'_, Result> { + let client = self.client.clone(); + Box::pin(async_stream::try_stream! { + let mut sub = client + .subscribe(">") + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + use futures::StreamExt; + while let Some(msg) = sub.next().await { + let payload = match serde_json::from_slice::(&msg.payload) { + Ok(p) => p, + Err(e) => { + tracing::warn!("failed to deserialize event payload: {e}"); + continue; + } + }; + let event = match DomainEvent::try_from(payload) { + Ok(e) => e, + Err(e) => { + tracing::warn!("failed to convert payload to domain event: {e}"); + continue; + } + }; + yield EventEnvelope { + event, + ack: Box::new(|| {}), + nack: Box::new(|| {}), + }; + } + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use domain::value_objects::{LikeId, ThoughtId, UserId}; + + #[test] + fn payload_from_domain_event_has_correct_subject() { + let event = DomainEvent::ThoughtCreated { + thought_id: ThoughtId::new(), + user_id: UserId::new(), + in_reply_to_id: None, + }; + let payload = EventPayload::from(&event); + assert_eq!(payload.subject(), "thoughts.created"); + } + + #[test] + fn domain_event_roundtrip_via_payload() { + let uid = UserId::new(); + let tid = ThoughtId::new(); + let event = DomainEvent::LikeAdded { + like_id: LikeId::new(), + user_id: uid.clone(), + thought_id: tid.clone(), + }; + let payload = EventPayload::from(&event); + let back = DomainEvent::try_from(payload).unwrap(); + if let DomainEvent::LikeAdded { user_id, thought_id, .. } = back { + assert_eq!(user_id, uid); + assert_eq!(thought_id, tid); + } else { + panic!("wrong variant"); + } + } +} +``` + +- [ ] **Run:** `cargo test -p nats` — Expected: 2 tests pass. + +- [ ] **Run:** `cargo check --workspace` — Expected: one error in `bootstrap` (uses removed `NatsEventPublisher`) — this is expected and fixed in Task 3. + +- [ ] **Commit:** + +```bash +git add crates/adapters/nats/ +git commit -m "refactor(nats): strip NatsEventPublisher, add NatsTransport implementing Transport" +``` + +--- + +### Task 3: Wire EventPublisherAdapter in bootstrap + +**Files:** +- Modify: `crates/bootstrap/Cargo.toml` +- Modify: `crates/bootstrap/src/factory.rs` + +- [ ] **Add `event-publisher` to `crates/bootstrap/Cargo.toml`:** + +```toml +event-publisher = { workspace = true } +``` + +- [ ] **Update `crates/bootstrap/src/factory.rs`** — find the NATS event publisher section and replace: + +Find (in the `build` function): +```rust +Arc::new(nats::NatsEventPublisher::new(client)) +``` + +Replace with: +```rust +Arc::new(event_publisher::EventPublisherAdapter::new(nats::NatsTransport::new(client))) +``` + +The `use` imports at the top of `factory.rs` need `event_publisher` in scope. Add: +```rust +use event_publisher::EventPublisherAdapter; +``` + +The `NoOpEventPublisher` struct and its `impl EventPublisher` stays in `factory.rs` — it's the fallback when NATS is unavailable and lives correctly in the composition root. + +- [ ] **Run:** `cargo check -p bootstrap` — Expected: no errors. + +- [ ] **Run:** `cargo check --workspace` — Expected: no errors. + +- [ ] **Run full test suite:** + +```bash +DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres cargo test --workspace 2>&1 | tail -3 +``` + +Expected: all tests pass (including new event-publisher tests). + +- [ ] **Smoke test:** + +```bash +DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres \ +JWT_SECRET=dev BASE_URL=http://localhost:3000 \ +RUST_LOG=info cargo run -p bootstrap & +sleep 3 +curl -s http://localhost:3000/health | jq . +kill %1 2>/dev/null +``` + +Expected: `{"status":"ok","db":"connected"}`. + +- [ ] **Commit:** + +```bash +git add crates/bootstrap/ +git commit -m "feat(bootstrap): wire EventPublisherAdapter — transport-agnostic event publishing" +``` + +--- + +## Self-Review + +**Spec coverage:** +- ✅ `Transport` trait in `event-publisher` with `publish_bytes(subject, bytes)` (Task 1) +- ✅ `EventPublisherAdapter` implements `EventPublisher` (Task 1) +- ✅ 2 tests: correct subject routing, valid JSON serialization (Task 1) +- ✅ `NatsEventPublisher` removed from `nats` (Task 2) +- ✅ `NatsTransport` implements `Transport` for NATS (Task 2) +- ✅ `NatsEventConsumer` unchanged — stays in `nats` (Task 2) +- ✅ `bootstrap` wires `EventPublisherAdapter::new(NatsTransport::new(client))` (Task 3) +- ✅ `NoOpEventPublisher` stays in `factory.rs` as fallback (Task 3) + +**Placeholder scan:** None. + +**Type consistency:** +- `EventPublisherAdapter` — `NatsTransport` implements `Transport`, `EventPublisherAdapter` implements `EventPublisher` ✓ +- `event_publisher::Transport` imported in `nats/src/lib.rs` — `nats` depends on `event-publisher` ✓ +- `factory.rs` uses `event_publisher::EventPublisherAdapter` and `nats::NatsTransport` — both in bootstrap deps ✓ + +**Adding Kafka later:** +```toml +# kafka/Cargo.toml +[dependencies] +event-publisher = { workspace = true } +rdkafka = "..." +``` +```rust +// kafka/src/lib.rs +pub struct KafkaTransport { ... } +#[async_trait] impl Transport for KafkaTransport { ... } +``` +```rust +// bootstrap/src/factory.rs — only this line changes: +Arc::new(EventPublisherAdapter::new(KafkaTransport::new(...))) +```