# event-publisher Transport Abstraction Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Fill `event-publisher` with a `Transport` trait + `EventPublisherAdapter`, strip `NatsEventPublisher` from the `nats` crate and replace it with `NatsTransport` implementing `Transport`, then wire `EventPublisherAdapter::new(NatsTransport::new(client))` in bootstrap — so adding Kafka/Redis later only requires a new transport crate. **Architecture:** `event-publisher` defines the abstraction (`Transport` + `EventPublisherAdapter`). `nats` implements `Transport` for NATS (pure bytes: publish/subscribe). `event-publisher` never imports `nats`. `bootstrap` wires them together. `NatsEventConsumer` stays in `nats` — it's transport-specific and will never be shared. **Dependency chain after refactor:** ``` event-publisher → domain, event-payload, serde_json nats → domain, event-payload, event-publisher, async-nats bootstrap → event-publisher, nats (+ all others) ``` --- ## File Map ``` Modify: crates/adapters/event-publisher/Cargo.toml ← add deps Modify: crates/adapters/event-publisher/src/lib.rs ← Transport trait + EventPublisherAdapter Modify: crates/adapters/nats/Cargo.toml ← add event-publisher dep Modify: crates/adapters/nats/src/lib.rs ← remove NatsEventPublisher, add NatsTransport Modify: crates/bootstrap/src/factory.rs ← use EventPublisherAdapter Modify: crates/bootstrap/Cargo.toml ← add event-publisher dep (if missing) ``` --- ### Task 1: Fill event-publisher — Transport trait + EventPublisherAdapter **Files:** - Modify: `crates/adapters/event-publisher/Cargo.toml` - Modify: `crates/adapters/event-publisher/src/lib.rs` - [ ] **Write tests** at the bottom of `crates/adapters/event-publisher/src/lib.rs`: ```rust #[cfg(test)] mod tests { use super::*; use async_trait::async_trait; use std::sync::{Arc, Mutex}; use domain::value_objects::{ThoughtId, UserId}; struct SpyTransport { calls: Arc)>>>, } impl SpyTransport { fn new() -> (Self, Arc)>>>) { let calls = Arc::new(Mutex::new(vec![])); (Self { calls: calls.clone() }, calls) } } #[async_trait] impl Transport for SpyTransport { async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), domain::errors::DomainError> { self.calls.lock().unwrap().push((subject.to_string(), bytes.to_vec())); Ok(()) } } #[tokio::test] async fn thought_created_routes_to_correct_subject() { let (spy, calls) = SpyTransport::new(); let publisher = EventPublisherAdapter::new(spy); publisher.publish(&domain::events::DomainEvent::ThoughtCreated { thought_id: ThoughtId::new(), user_id: UserId::new(), in_reply_to_id: None, }).await.unwrap(); let calls = calls.lock().unwrap(); assert_eq!(calls.len(), 1); assert_eq!(calls[0].0, "thoughts.created"); } #[tokio::test] async fn serialized_payload_is_valid_json() { let (spy, calls) = SpyTransport::new(); let publisher = EventPublisherAdapter::new(spy); publisher.publish(&domain::events::DomainEvent::UserBlocked { blocker_id: UserId::new(), blocked_id: UserId::new(), }).await.unwrap(); let bytes = &calls.lock().unwrap()[0].1.clone(); let json: serde_json::Value = serde_json::from_slice(bytes).expect("valid JSON"); assert_eq!(json["type"], "UserBlocked"); } } ``` - [ ] **Run:** `cargo test -p event-publisher` — Expected: FAIL (no implementation yet). - [ ] **Write `crates/adapters/event-publisher/Cargo.toml`:** ```toml [package] name = "event-publisher" version = "0.1.0" edition = "2021" [dependencies] domain = { workspace = true } event-payload = { workspace = true } serde_json = { workspace = true } async-trait = { workspace = true } tracing = { workspace = true } [dev-dependencies] tokio = { workspace = true, features = ["full"] } ``` - [ ] **Write `crates/adapters/event-publisher/src/lib.rs`:** ```rust use async_trait::async_trait; use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher}; use event_payload::EventPayload; /// Abstraction over any pub/sub transport backend. /// Implement this for NATS, Kafka, Redis Streams, etc. /// The adapter calls `publish_bytes(subject, bytes)` — subjects come from `EventPayload::subject()`. #[async_trait] pub trait Transport: Send + Sync { async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), DomainError>; } /// Routes domain events to a transport backend. /// /// Converts: `DomainEvent` → `EventPayload` (via `From`) → JSON bytes → `transport.publish_bytes(subject, bytes)` /// /// To swap transports (e.g. NATS → Kafka), replace the `T` at the composition root. /// This type never needs to change. pub struct EventPublisherAdapter { transport: T, } impl EventPublisherAdapter { pub fn new(transport: T) -> Self { Self { transport } } } #[async_trait] impl EventPublisher for EventPublisherAdapter { async fn publish(&self, event: &DomainEvent) -> Result<(), DomainError> { let payload = EventPayload::from(event); let subject = payload.subject(); let bytes = serde_json::to_vec(&payload) .map_err(|e| DomainError::Internal(e.to_string()))?; tracing::debug!(subject, "publishing event"); self.transport.publish_bytes(subject, &bytes).await } } ``` - [ ] **Run:** `cargo test -p event-publisher` — Expected: 2 tests pass. - [ ] **Commit:** ```bash git add crates/adapters/event-publisher/ git commit -m "feat(event-publisher): Transport trait + EventPublisherAdapter for transport-agnostic event routing" ``` --- ### Task 2: Refactor nats — strip NatsEventPublisher, add NatsTransport **Files:** - Modify: `crates/adapters/nats/Cargo.toml` - Modify: `crates/adapters/nats/src/lib.rs` - [ ] **Add `event-publisher` to `crates/adapters/nats/Cargo.toml`:** ```toml event-publisher = { workspace = true } ``` - [ ] **Rewrite `crates/adapters/nats/src/lib.rs`** — remove `NatsEventPublisher`, add `NatsTransport`: ```rust use async_trait::async_trait; use domain::{ errors::DomainError, events::{DomainEvent, EventEnvelope}, ports::EventConsumer, }; use event_payload::EventPayload; use event_publisher::Transport; use futures::stream::BoxStream; // ── NatsTransport — raw NATS publish backend ──────────────────────────────── pub struct NatsTransport { client: async_nats::Client, } impl NatsTransport { pub fn new(client: async_nats::Client) -> Self { Self { client } } } #[async_trait] impl Transport for NatsTransport { async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), DomainError> { self.client .publish(subject, bytes.to_vec().into()) .await .map_err(|e| DomainError::Internal(e.to_string())) } } // ── NatsEventConsumer — subscribes and yields EventEnvelopes ──────────────── pub struct NatsEventConsumer { client: async_nats::Client, } impl NatsEventConsumer { pub fn new(client: async_nats::Client) -> Self { Self { client } } } impl EventConsumer for NatsEventConsumer { fn consume(&self) -> BoxStream<'_, Result> { let client = self.client.clone(); Box::pin(async_stream::try_stream! { let mut sub = client .subscribe(">") .await .map_err(|e| DomainError::Internal(e.to_string()))?; use futures::StreamExt; while let Some(msg) = sub.next().await { let payload = match serde_json::from_slice::(&msg.payload) { Ok(p) => p, Err(e) => { tracing::warn!("failed to deserialize event payload: {e}"); continue; } }; let event = match DomainEvent::try_from(payload) { Ok(e) => e, Err(e) => { tracing::warn!("failed to convert payload to domain event: {e}"); continue; } }; yield EventEnvelope { event, ack: Box::new(|| {}), nack: Box::new(|| {}), }; } }) } } #[cfg(test)] mod tests { use super::*; use domain::value_objects::{LikeId, ThoughtId, UserId}; #[test] fn payload_from_domain_event_has_correct_subject() { let event = DomainEvent::ThoughtCreated { thought_id: ThoughtId::new(), user_id: UserId::new(), in_reply_to_id: None, }; let payload = EventPayload::from(&event); assert_eq!(payload.subject(), "thoughts.created"); } #[test] fn domain_event_roundtrip_via_payload() { let uid = UserId::new(); let tid = ThoughtId::new(); let event = DomainEvent::LikeAdded { like_id: LikeId::new(), user_id: uid.clone(), thought_id: tid.clone(), }; let payload = EventPayload::from(&event); let back = DomainEvent::try_from(payload).unwrap(); if let DomainEvent::LikeAdded { user_id, thought_id, .. } = back { assert_eq!(user_id, uid); assert_eq!(thought_id, tid); } else { panic!("wrong variant"); } } } ``` - [ ] **Run:** `cargo test -p nats` — Expected: 2 tests pass. - [ ] **Run:** `cargo check --workspace` — Expected: one error in `bootstrap` (uses removed `NatsEventPublisher`) — this is expected and fixed in Task 3. - [ ] **Commit:** ```bash git add crates/adapters/nats/ git commit -m "refactor(nats): strip NatsEventPublisher, add NatsTransport implementing Transport" ``` --- ### Task 3: Wire EventPublisherAdapter in bootstrap **Files:** - Modify: `crates/bootstrap/Cargo.toml` - Modify: `crates/bootstrap/src/factory.rs` - [ ] **Add `event-publisher` to `crates/bootstrap/Cargo.toml`:** ```toml event-publisher = { workspace = true } ``` - [ ] **Update `crates/bootstrap/src/factory.rs`** — find the NATS event publisher section and replace: Find (in the `build` function): ```rust Arc::new(nats::NatsEventPublisher::new(client)) ``` Replace with: ```rust Arc::new(event_publisher::EventPublisherAdapter::new(nats::NatsTransport::new(client))) ``` The `use` imports at the top of `factory.rs` need `event_publisher` in scope. Add: ```rust use event_publisher::EventPublisherAdapter; ``` The `NoOpEventPublisher` struct and its `impl EventPublisher` stays in `factory.rs` — it's the fallback when NATS is unavailable and lives correctly in the composition root. - [ ] **Run:** `cargo check -p bootstrap` — Expected: no errors. - [ ] **Run:** `cargo check --workspace` — Expected: no errors. - [ ] **Run full test suite:** ```bash DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres cargo test --workspace 2>&1 | tail -3 ``` Expected: all tests pass (including new event-publisher tests). - [ ] **Smoke test:** ```bash DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres \ JWT_SECRET=dev BASE_URL=http://localhost:3000 \ RUST_LOG=info cargo run -p bootstrap & sleep 3 curl -s http://localhost:3000/health | jq . kill %1 2>/dev/null ``` Expected: `{"status":"ok","db":"connected"}`. - [ ] **Commit:** ```bash git add crates/bootstrap/ git commit -m "feat(bootstrap): wire EventPublisherAdapter — transport-agnostic event publishing" ``` --- ## Self-Review **Spec coverage:** - ✅ `Transport` trait in `event-publisher` with `publish_bytes(subject, bytes)` (Task 1) - ✅ `EventPublisherAdapter` implements `EventPublisher` (Task 1) - ✅ 2 tests: correct subject routing, valid JSON serialization (Task 1) - ✅ `NatsEventPublisher` removed from `nats` (Task 2) - ✅ `NatsTransport` implements `Transport` for NATS (Task 2) - ✅ `NatsEventConsumer` unchanged — stays in `nats` (Task 2) - ✅ `bootstrap` wires `EventPublisherAdapter::new(NatsTransport::new(client))` (Task 3) - ✅ `NoOpEventPublisher` stays in `factory.rs` as fallback (Task 3) **Placeholder scan:** None. **Type consistency:** - `EventPublisherAdapter` — `NatsTransport` implements `Transport`, `EventPublisherAdapter` implements `EventPublisher` ✓ - `event_publisher::Transport` imported in `nats/src/lib.rs` — `nats` depends on `event-publisher` ✓ - `factory.rs` uses `event_publisher::EventPublisherAdapter` and `nats::NatsTransport` — both in bootstrap deps ✓ **Adding Kafka later:** ```toml # kafka/Cargo.toml [dependencies] event-publisher = { workspace = true } rdkafka = "..." ``` ```rust // kafka/src/lib.rs pub struct KafkaTransport { ... } #[async_trait] impl Transport for KafkaTransport { ... } ``` ```rust // bootstrap/src/factory.rs — only this line changes: Arc::new(EventPublisherAdapter::new(KafkaTransport::new(...))) ```