docs: event-publisher transport abstraction plan

This commit is contained in:
2026-05-14 12:18:41 +02:00
parent c072ee95cd
commit 42d3dbd251

View File

@@ -0,0 +1,408 @@
# event-publisher Transport Abstraction Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Fill `event-publisher` with a `Transport` trait + `EventPublisherAdapter<T: Transport>`, strip `NatsEventPublisher` from the `nats` crate and replace it with `NatsTransport` implementing `Transport`, then wire `EventPublisherAdapter::new(NatsTransport::new(client))` in bootstrap — so adding Kafka/Redis later only requires a new transport crate.
**Architecture:** `event-publisher` defines the abstraction (`Transport` + `EventPublisherAdapter`). `nats` implements `Transport` for NATS (pure bytes: publish/subscribe). `event-publisher` never imports `nats`. `bootstrap` wires them together. `NatsEventConsumer` stays in `nats` — it's transport-specific and will never be shared.
**Dependency chain after refactor:**
```
event-publisher → domain, event-payload, serde_json
nats → domain, event-payload, event-publisher, async-nats
bootstrap → event-publisher, nats (+ all others)
```
---
## File Map
```
Modify: crates/adapters/event-publisher/Cargo.toml ← add deps
Modify: crates/adapters/event-publisher/src/lib.rs ← Transport trait + EventPublisherAdapter<T>
Modify: crates/adapters/nats/Cargo.toml ← add event-publisher dep
Modify: crates/adapters/nats/src/lib.rs ← remove NatsEventPublisher, add NatsTransport
Modify: crates/bootstrap/src/factory.rs ← use EventPublisherAdapter<NatsTransport>
Modify: crates/bootstrap/Cargo.toml ← add event-publisher dep (if missing)
```
---
### Task 1: Fill event-publisher — Transport trait + EventPublisherAdapter
**Files:**
- Modify: `crates/adapters/event-publisher/Cargo.toml`
- Modify: `crates/adapters/event-publisher/src/lib.rs`
- [ ] **Write tests** at the bottom of `crates/adapters/event-publisher/src/lib.rs`:
```rust
#[cfg(test)]
mod tests {
use super::*;
use async_trait::async_trait;
use std::sync::{Arc, Mutex};
use domain::value_objects::{ThoughtId, UserId};
struct SpyTransport {
calls: Arc<Mutex<Vec<(String, Vec<u8>)>>>,
}
impl SpyTransport {
fn new() -> (Self, Arc<Mutex<Vec<(String, Vec<u8>)>>>) {
let calls = Arc::new(Mutex::new(vec![]));
(Self { calls: calls.clone() }, calls)
}
}
#[async_trait]
impl Transport for SpyTransport {
async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), domain::errors::DomainError> {
self.calls.lock().unwrap().push((subject.to_string(), bytes.to_vec()));
Ok(())
}
}
#[tokio::test]
async fn thought_created_routes_to_correct_subject() {
let (spy, calls) = SpyTransport::new();
let publisher = EventPublisherAdapter::new(spy);
publisher.publish(&domain::events::DomainEvent::ThoughtCreated {
thought_id: ThoughtId::new(),
user_id: UserId::new(),
in_reply_to_id: None,
}).await.unwrap();
let calls = calls.lock().unwrap();
assert_eq!(calls.len(), 1);
assert_eq!(calls[0].0, "thoughts.created");
}
#[tokio::test]
async fn serialized_payload_is_valid_json() {
let (spy, calls) = SpyTransport::new();
let publisher = EventPublisherAdapter::new(spy);
publisher.publish(&domain::events::DomainEvent::UserBlocked {
blocker_id: UserId::new(),
blocked_id: UserId::new(),
}).await.unwrap();
let bytes = &calls.lock().unwrap()[0].1.clone();
let json: serde_json::Value = serde_json::from_slice(bytes).expect("valid JSON");
assert_eq!(json["type"], "UserBlocked");
}
}
```
- [ ] **Run:** `cargo test -p event-publisher` — Expected: FAIL (no implementation yet).
- [ ] **Write `crates/adapters/event-publisher/Cargo.toml`:**
```toml
[package]
name = "event-publisher"
version = "0.1.0"
edition = "2021"
[dependencies]
domain = { workspace = true }
event-payload = { workspace = true }
serde_json = { workspace = true }
async-trait = { workspace = true }
tracing = { workspace = true }
[dev-dependencies]
tokio = { workspace = true, features = ["full"] }
```
- [ ] **Write `crates/adapters/event-publisher/src/lib.rs`:**
```rust
use async_trait::async_trait;
use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher};
use event_payload::EventPayload;
/// Abstraction over any pub/sub transport backend.
/// Implement this for NATS, Kafka, Redis Streams, etc.
/// The adapter calls `publish_bytes(subject, bytes)` — subjects come from `EventPayload::subject()`.
#[async_trait]
pub trait Transport: Send + Sync {
async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), DomainError>;
}
/// Routes domain events to a transport backend.
///
/// Converts: `DomainEvent` → `EventPayload` (via `From`) → JSON bytes → `transport.publish_bytes(subject, bytes)`
///
/// To swap transports (e.g. NATS → Kafka), replace the `T` at the composition root.
/// This type never needs to change.
pub struct EventPublisherAdapter<T: Transport> {
transport: T,
}
impl<T: Transport> EventPublisherAdapter<T> {
pub fn new(transport: T) -> Self {
Self { transport }
}
}
#[async_trait]
impl<T: Transport> EventPublisher for EventPublisherAdapter<T> {
async fn publish(&self, event: &DomainEvent) -> Result<(), DomainError> {
let payload = EventPayload::from(event);
let subject = payload.subject();
let bytes = serde_json::to_vec(&payload)
.map_err(|e| DomainError::Internal(e.to_string()))?;
tracing::debug!(subject, "publishing event");
self.transport.publish_bytes(subject, &bytes).await
}
}
```
- [ ] **Run:** `cargo test -p event-publisher` — Expected: 2 tests pass.
- [ ] **Commit:**
```bash
git add crates/adapters/event-publisher/
git commit -m "feat(event-publisher): Transport trait + EventPublisherAdapter for transport-agnostic event routing"
```
---
### Task 2: Refactor nats — strip NatsEventPublisher, add NatsTransport
**Files:**
- Modify: `crates/adapters/nats/Cargo.toml`
- Modify: `crates/adapters/nats/src/lib.rs`
- [ ] **Add `event-publisher` to `crates/adapters/nats/Cargo.toml`:**
```toml
event-publisher = { workspace = true }
```
- [ ] **Rewrite `crates/adapters/nats/src/lib.rs`** — remove `NatsEventPublisher`, add `NatsTransport`:
```rust
use async_trait::async_trait;
use domain::{
errors::DomainError,
events::{DomainEvent, EventEnvelope},
ports::EventConsumer,
};
use event_payload::EventPayload;
use event_publisher::Transport;
use futures::stream::BoxStream;
// ── NatsTransport — raw NATS publish backend ────────────────────────────────
pub struct NatsTransport {
client: async_nats::Client,
}
impl NatsTransport {
pub fn new(client: async_nats::Client) -> Self { Self { client } }
}
#[async_trait]
impl Transport for NatsTransport {
async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), DomainError> {
self.client
.publish(subject, bytes.to_vec().into())
.await
.map_err(|e| DomainError::Internal(e.to_string()))
}
}
// ── NatsEventConsumer — subscribes and yields EventEnvelopes ────────────────
pub struct NatsEventConsumer {
client: async_nats::Client,
}
impl NatsEventConsumer {
pub fn new(client: async_nats::Client) -> Self { Self { client } }
}
impl EventConsumer for NatsEventConsumer {
fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> {
let client = self.client.clone();
Box::pin(async_stream::try_stream! {
let mut sub = client
.subscribe(">")
.await
.map_err(|e| DomainError::Internal(e.to_string()))?;
use futures::StreamExt;
while let Some(msg) = sub.next().await {
let payload = match serde_json::from_slice::<EventPayload>(&msg.payload) {
Ok(p) => p,
Err(e) => {
tracing::warn!("failed to deserialize event payload: {e}");
continue;
}
};
let event = match DomainEvent::try_from(payload) {
Ok(e) => e,
Err(e) => {
tracing::warn!("failed to convert payload to domain event: {e}");
continue;
}
};
yield EventEnvelope {
event,
ack: Box::new(|| {}),
nack: Box::new(|| {}),
};
}
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use domain::value_objects::{LikeId, ThoughtId, UserId};
#[test]
fn payload_from_domain_event_has_correct_subject() {
let event = DomainEvent::ThoughtCreated {
thought_id: ThoughtId::new(),
user_id: UserId::new(),
in_reply_to_id: None,
};
let payload = EventPayload::from(&event);
assert_eq!(payload.subject(), "thoughts.created");
}
#[test]
fn domain_event_roundtrip_via_payload() {
let uid = UserId::new();
let tid = ThoughtId::new();
let event = DomainEvent::LikeAdded {
like_id: LikeId::new(),
user_id: uid.clone(),
thought_id: tid.clone(),
};
let payload = EventPayload::from(&event);
let back = DomainEvent::try_from(payload).unwrap();
if let DomainEvent::LikeAdded { user_id, thought_id, .. } = back {
assert_eq!(user_id, uid);
assert_eq!(thought_id, tid);
} else {
panic!("wrong variant");
}
}
}
```
- [ ] **Run:** `cargo test -p nats` — Expected: 2 tests pass.
- [ ] **Run:** `cargo check --workspace` — Expected: one error in `bootstrap` (uses removed `NatsEventPublisher`) — this is expected and fixed in Task 3.
- [ ] **Commit:**
```bash
git add crates/adapters/nats/
git commit -m "refactor(nats): strip NatsEventPublisher, add NatsTransport implementing Transport"
```
---
### Task 3: Wire EventPublisherAdapter in bootstrap
**Files:**
- Modify: `crates/bootstrap/Cargo.toml`
- Modify: `crates/bootstrap/src/factory.rs`
- [ ] **Add `event-publisher` to `crates/bootstrap/Cargo.toml`:**
```toml
event-publisher = { workspace = true }
```
- [ ] **Update `crates/bootstrap/src/factory.rs`** — find the NATS event publisher section and replace:
Find (in the `build` function):
```rust
Arc::new(nats::NatsEventPublisher::new(client))
```
Replace with:
```rust
Arc::new(event_publisher::EventPublisherAdapter::new(nats::NatsTransport::new(client)))
```
The `use` imports at the top of `factory.rs` need `event_publisher` in scope. Add:
```rust
use event_publisher::EventPublisherAdapter;
```
The `NoOpEventPublisher` struct and its `impl EventPublisher` stays in `factory.rs` — it's the fallback when NATS is unavailable and lives correctly in the composition root.
- [ ] **Run:** `cargo check -p bootstrap` — Expected: no errors.
- [ ] **Run:** `cargo check --workspace` — Expected: no errors.
- [ ] **Run full test suite:**
```bash
DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres cargo test --workspace 2>&1 | tail -3
```
Expected: all tests pass (including new event-publisher tests).
- [ ] **Smoke test:**
```bash
DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres \
JWT_SECRET=dev BASE_URL=http://localhost:3000 \
RUST_LOG=info cargo run -p bootstrap &
sleep 3
curl -s http://localhost:3000/health | jq .
kill %1 2>/dev/null
```
Expected: `{"status":"ok","db":"connected"}`.
- [ ] **Commit:**
```bash
git add crates/bootstrap/
git commit -m "feat(bootstrap): wire EventPublisherAdapter<NatsTransport> — transport-agnostic event publishing"
```
---
## Self-Review
**Spec coverage:**
-`Transport` trait in `event-publisher` with `publish_bytes(subject, bytes)` (Task 1)
-`EventPublisherAdapter<T: Transport>` implements `EventPublisher` (Task 1)
- ✅ 2 tests: correct subject routing, valid JSON serialization (Task 1)
-`NatsEventPublisher` removed from `nats` (Task 2)
-`NatsTransport` implements `Transport` for NATS (Task 2)
-`NatsEventConsumer` unchanged — stays in `nats` (Task 2)
-`bootstrap` wires `EventPublisherAdapter::new(NatsTransport::new(client))` (Task 3)
-`NoOpEventPublisher` stays in `factory.rs` as fallback (Task 3)
**Placeholder scan:** None.
**Type consistency:**
- `EventPublisherAdapter<NatsTransport>``NatsTransport` implements `Transport`, `EventPublisherAdapter<T: Transport>` implements `EventPublisher`
- `event_publisher::Transport` imported in `nats/src/lib.rs``nats` depends on `event-publisher`
- `factory.rs` uses `event_publisher::EventPublisherAdapter` and `nats::NatsTransport` — both in bootstrap deps ✓
**Adding Kafka later:**
```toml
# kafka/Cargo.toml
[dependencies]
event-publisher = { workspace = true }
rdkafka = "..."
```
```rust
// kafka/src/lib.rs
pub struct KafkaTransport { ... }
#[async_trait] impl Transport for KafkaTransport { ... }
```
```rust
// bootstrap/src/factory.rs — only this line changes:
Arc::new(EventPublisherAdapter::new(KafkaTransport::new(...)))
```