409 lines
13 KiB
Markdown
409 lines
13 KiB
Markdown
# event-publisher Transport Abstraction Plan
|
|
|
|
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
|
|
|
**Goal:** Fill `event-publisher` with a `Transport` trait + `EventPublisherAdapter<T: Transport>`, strip `NatsEventPublisher` from the `nats` crate and replace it with `NatsTransport` implementing `Transport`, then wire `EventPublisherAdapter::new(NatsTransport::new(client))` in bootstrap — so adding Kafka/Redis later only requires a new transport crate.
|
|
|
|
**Architecture:** `event-publisher` defines the abstraction (`Transport` + `EventPublisherAdapter`). `nats` implements `Transport` for NATS (pure bytes: publish/subscribe). `event-publisher` never imports `nats`. `bootstrap` wires them together. `NatsEventConsumer` stays in `nats` — it's transport-specific and will never be shared.
|
|
|
|
**Dependency chain after refactor:**
|
|
```
|
|
event-publisher → domain, event-payload, serde_json
|
|
nats → domain, event-payload, event-publisher, async-nats
|
|
bootstrap → event-publisher, nats (+ all others)
|
|
```
|
|
|
|
---
|
|
|
|
## File Map
|
|
|
|
```
|
|
Modify: crates/adapters/event-publisher/Cargo.toml ← add deps
|
|
Modify: crates/adapters/event-publisher/src/lib.rs ← Transport trait + EventPublisherAdapter<T>
|
|
Modify: crates/adapters/nats/Cargo.toml ← add event-publisher dep
|
|
Modify: crates/adapters/nats/src/lib.rs ← remove NatsEventPublisher, add NatsTransport
|
|
Modify: crates/bootstrap/src/factory.rs ← use EventPublisherAdapter<NatsTransport>
|
|
Modify: crates/bootstrap/Cargo.toml ← add event-publisher dep (if missing)
|
|
```
|
|
|
|
---
|
|
|
|
### Task 1: Fill event-publisher — Transport trait + EventPublisherAdapter
|
|
|
|
**Files:**
|
|
- Modify: `crates/adapters/event-publisher/Cargo.toml`
|
|
- Modify: `crates/adapters/event-publisher/src/lib.rs`
|
|
|
|
- [ ] **Write tests** at the bottom of `crates/adapters/event-publisher/src/lib.rs`:
|
|
|
|
```rust
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use async_trait::async_trait;
|
|
use std::sync::{Arc, Mutex};
|
|
use domain::value_objects::{ThoughtId, UserId};
|
|
|
|
struct SpyTransport {
|
|
calls: Arc<Mutex<Vec<(String, Vec<u8>)>>>,
|
|
}
|
|
impl SpyTransport {
|
|
fn new() -> (Self, Arc<Mutex<Vec<(String, Vec<u8>)>>>) {
|
|
let calls = Arc::new(Mutex::new(vec![]));
|
|
(Self { calls: calls.clone() }, calls)
|
|
}
|
|
}
|
|
#[async_trait]
|
|
impl Transport for SpyTransport {
|
|
async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), domain::errors::DomainError> {
|
|
self.calls.lock().unwrap().push((subject.to_string(), bytes.to_vec()));
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn thought_created_routes_to_correct_subject() {
|
|
let (spy, calls) = SpyTransport::new();
|
|
let publisher = EventPublisherAdapter::new(spy);
|
|
publisher.publish(&domain::events::DomainEvent::ThoughtCreated {
|
|
thought_id: ThoughtId::new(),
|
|
user_id: UserId::new(),
|
|
in_reply_to_id: None,
|
|
}).await.unwrap();
|
|
let calls = calls.lock().unwrap();
|
|
assert_eq!(calls.len(), 1);
|
|
assert_eq!(calls[0].0, "thoughts.created");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn serialized_payload_is_valid_json() {
|
|
let (spy, calls) = SpyTransport::new();
|
|
let publisher = EventPublisherAdapter::new(spy);
|
|
publisher.publish(&domain::events::DomainEvent::UserBlocked {
|
|
blocker_id: UserId::new(),
|
|
blocked_id: UserId::new(),
|
|
}).await.unwrap();
|
|
let bytes = &calls.lock().unwrap()[0].1.clone();
|
|
let json: serde_json::Value = serde_json::from_slice(bytes).expect("valid JSON");
|
|
assert_eq!(json["type"], "UserBlocked");
|
|
}
|
|
}
|
|
```
|
|
|
|
- [ ] **Run:** `cargo test -p event-publisher` — Expected: FAIL (no implementation yet).
|
|
|
|
- [ ] **Write `crates/adapters/event-publisher/Cargo.toml`:**
|
|
|
|
```toml
|
|
[package]
|
|
name = "event-publisher"
|
|
version = "0.1.0"
|
|
edition = "2021"
|
|
|
|
[dependencies]
|
|
domain = { workspace = true }
|
|
event-payload = { workspace = true }
|
|
serde_json = { workspace = true }
|
|
async-trait = { workspace = true }
|
|
tracing = { workspace = true }
|
|
|
|
[dev-dependencies]
|
|
tokio = { workspace = true, features = ["full"] }
|
|
```
|
|
|
|
- [ ] **Write `crates/adapters/event-publisher/src/lib.rs`:**
|
|
|
|
```rust
|
|
use async_trait::async_trait;
|
|
use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher};
|
|
use event_payload::EventPayload;
|
|
|
|
/// Abstraction over any pub/sub transport backend.
|
|
/// Implement this for NATS, Kafka, Redis Streams, etc.
|
|
/// The adapter calls `publish_bytes(subject, bytes)` — subjects come from `EventPayload::subject()`.
|
|
#[async_trait]
|
|
pub trait Transport: Send + Sync {
|
|
async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), DomainError>;
|
|
}
|
|
|
|
/// Routes domain events to a transport backend.
|
|
///
|
|
/// Converts: `DomainEvent` → `EventPayload` (via `From`) → JSON bytes → `transport.publish_bytes(subject, bytes)`
|
|
///
|
|
/// To swap transports (e.g. NATS → Kafka), replace the `T` at the composition root.
|
|
/// This type never needs to change.
|
|
pub struct EventPublisherAdapter<T: Transport> {
|
|
transport: T,
|
|
}
|
|
|
|
impl<T: Transport> EventPublisherAdapter<T> {
|
|
pub fn new(transport: T) -> Self {
|
|
Self { transport }
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl<T: Transport> EventPublisher for EventPublisherAdapter<T> {
|
|
async fn publish(&self, event: &DomainEvent) -> Result<(), DomainError> {
|
|
let payload = EventPayload::from(event);
|
|
let subject = payload.subject();
|
|
let bytes = serde_json::to_vec(&payload)
|
|
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
|
tracing::debug!(subject, "publishing event");
|
|
self.transport.publish_bytes(subject, &bytes).await
|
|
}
|
|
}
|
|
```
|
|
|
|
- [ ] **Run:** `cargo test -p event-publisher` — Expected: 2 tests pass.
|
|
|
|
- [ ] **Commit:**
|
|
|
|
```bash
|
|
git add crates/adapters/event-publisher/
|
|
git commit -m "feat(event-publisher): Transport trait + EventPublisherAdapter for transport-agnostic event routing"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 2: Refactor nats — strip NatsEventPublisher, add NatsTransport
|
|
|
|
**Files:**
|
|
- Modify: `crates/adapters/nats/Cargo.toml`
|
|
- Modify: `crates/adapters/nats/src/lib.rs`
|
|
|
|
- [ ] **Add `event-publisher` to `crates/adapters/nats/Cargo.toml`:**
|
|
|
|
```toml
|
|
event-publisher = { workspace = true }
|
|
```
|
|
|
|
- [ ] **Rewrite `crates/adapters/nats/src/lib.rs`** — remove `NatsEventPublisher`, add `NatsTransport`:
|
|
|
|
```rust
|
|
use async_trait::async_trait;
|
|
use domain::{
|
|
errors::DomainError,
|
|
events::{DomainEvent, EventEnvelope},
|
|
ports::EventConsumer,
|
|
};
|
|
use event_payload::EventPayload;
|
|
use event_publisher::Transport;
|
|
use futures::stream::BoxStream;
|
|
|
|
// ── NatsTransport — raw NATS publish backend ────────────────────────────────
|
|
|
|
pub struct NatsTransport {
|
|
client: async_nats::Client,
|
|
}
|
|
|
|
impl NatsTransport {
|
|
pub fn new(client: async_nats::Client) -> Self { Self { client } }
|
|
}
|
|
|
|
#[async_trait]
|
|
impl Transport for NatsTransport {
|
|
async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), DomainError> {
|
|
self.client
|
|
.publish(subject, bytes.to_vec().into())
|
|
.await
|
|
.map_err(|e| DomainError::Internal(e.to_string()))
|
|
}
|
|
}
|
|
|
|
// ── NatsEventConsumer — subscribes and yields EventEnvelopes ────────────────
|
|
|
|
pub struct NatsEventConsumer {
|
|
client: async_nats::Client,
|
|
}
|
|
|
|
impl NatsEventConsumer {
|
|
pub fn new(client: async_nats::Client) -> Self { Self { client } }
|
|
}
|
|
|
|
impl EventConsumer for NatsEventConsumer {
|
|
fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> {
|
|
let client = self.client.clone();
|
|
Box::pin(async_stream::try_stream! {
|
|
let mut sub = client
|
|
.subscribe(">")
|
|
.await
|
|
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
|
|
|
use futures::StreamExt;
|
|
while let Some(msg) = sub.next().await {
|
|
let payload = match serde_json::from_slice::<EventPayload>(&msg.payload) {
|
|
Ok(p) => p,
|
|
Err(e) => {
|
|
tracing::warn!("failed to deserialize event payload: {e}");
|
|
continue;
|
|
}
|
|
};
|
|
let event = match DomainEvent::try_from(payload) {
|
|
Ok(e) => e,
|
|
Err(e) => {
|
|
tracing::warn!("failed to convert payload to domain event: {e}");
|
|
continue;
|
|
}
|
|
};
|
|
yield EventEnvelope {
|
|
event,
|
|
ack: Box::new(|| {}),
|
|
nack: Box::new(|| {}),
|
|
};
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use domain::value_objects::{LikeId, ThoughtId, UserId};
|
|
|
|
#[test]
|
|
fn payload_from_domain_event_has_correct_subject() {
|
|
let event = DomainEvent::ThoughtCreated {
|
|
thought_id: ThoughtId::new(),
|
|
user_id: UserId::new(),
|
|
in_reply_to_id: None,
|
|
};
|
|
let payload = EventPayload::from(&event);
|
|
assert_eq!(payload.subject(), "thoughts.created");
|
|
}
|
|
|
|
#[test]
|
|
fn domain_event_roundtrip_via_payload() {
|
|
let uid = UserId::new();
|
|
let tid = ThoughtId::new();
|
|
let event = DomainEvent::LikeAdded {
|
|
like_id: LikeId::new(),
|
|
user_id: uid.clone(),
|
|
thought_id: tid.clone(),
|
|
};
|
|
let payload = EventPayload::from(&event);
|
|
let back = DomainEvent::try_from(payload).unwrap();
|
|
if let DomainEvent::LikeAdded { user_id, thought_id, .. } = back {
|
|
assert_eq!(user_id, uid);
|
|
assert_eq!(thought_id, tid);
|
|
} else {
|
|
panic!("wrong variant");
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
- [ ] **Run:** `cargo test -p nats` — Expected: 2 tests pass.
|
|
|
|
- [ ] **Run:** `cargo check --workspace` — Expected: one error in `bootstrap` (uses removed `NatsEventPublisher`) — this is expected and fixed in Task 3.
|
|
|
|
- [ ] **Commit:**
|
|
|
|
```bash
|
|
git add crates/adapters/nats/
|
|
git commit -m "refactor(nats): strip NatsEventPublisher, add NatsTransport implementing Transport"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 3: Wire EventPublisherAdapter in bootstrap
|
|
|
|
**Files:**
|
|
- Modify: `crates/bootstrap/Cargo.toml`
|
|
- Modify: `crates/bootstrap/src/factory.rs`
|
|
|
|
- [ ] **Add `event-publisher` to `crates/bootstrap/Cargo.toml`:**
|
|
|
|
```toml
|
|
event-publisher = { workspace = true }
|
|
```
|
|
|
|
- [ ] **Update `crates/bootstrap/src/factory.rs`** — find the NATS event publisher section and replace:
|
|
|
|
Find (in the `build` function):
|
|
```rust
|
|
Arc::new(nats::NatsEventPublisher::new(client))
|
|
```
|
|
|
|
Replace with:
|
|
```rust
|
|
Arc::new(event_publisher::EventPublisherAdapter::new(nats::NatsTransport::new(client)))
|
|
```
|
|
|
|
The `use` imports at the top of `factory.rs` need `event_publisher` in scope. Add:
|
|
```rust
|
|
use event_publisher::EventPublisherAdapter;
|
|
```
|
|
|
|
The `NoOpEventPublisher` struct and its `impl EventPublisher` stays in `factory.rs` — it's the fallback when NATS is unavailable and lives correctly in the composition root.
|
|
|
|
- [ ] **Run:** `cargo check -p bootstrap` — Expected: no errors.
|
|
|
|
- [ ] **Run:** `cargo check --workspace` — Expected: no errors.
|
|
|
|
- [ ] **Run full test suite:**
|
|
|
|
```bash
|
|
DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres cargo test --workspace 2>&1 | tail -3
|
|
```
|
|
|
|
Expected: all tests pass (including new event-publisher tests).
|
|
|
|
- [ ] **Smoke test:**
|
|
|
|
```bash
|
|
DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres \
|
|
JWT_SECRET=dev BASE_URL=http://localhost:3000 \
|
|
RUST_LOG=info cargo run -p bootstrap &
|
|
sleep 3
|
|
curl -s http://localhost:3000/health | jq .
|
|
kill %1 2>/dev/null
|
|
```
|
|
|
|
Expected: `{"status":"ok","db":"connected"}`.
|
|
|
|
- [ ] **Commit:**
|
|
|
|
```bash
|
|
git add crates/bootstrap/
|
|
git commit -m "feat(bootstrap): wire EventPublisherAdapter<NatsTransport> — transport-agnostic event publishing"
|
|
```
|
|
|
|
---
|
|
|
|
## Self-Review
|
|
|
|
**Spec coverage:**
|
|
- ✅ `Transport` trait in `event-publisher` with `publish_bytes(subject, bytes)` (Task 1)
|
|
- ✅ `EventPublisherAdapter<T: Transport>` implements `EventPublisher` (Task 1)
|
|
- ✅ 2 tests: correct subject routing, valid JSON serialization (Task 1)
|
|
- ✅ `NatsEventPublisher` removed from `nats` (Task 2)
|
|
- ✅ `NatsTransport` implements `Transport` for NATS (Task 2)
|
|
- ✅ `NatsEventConsumer` unchanged — stays in `nats` (Task 2)
|
|
- ✅ `bootstrap` wires `EventPublisherAdapter::new(NatsTransport::new(client))` (Task 3)
|
|
- ✅ `NoOpEventPublisher` stays in `factory.rs` as fallback (Task 3)
|
|
|
|
**Placeholder scan:** None.
|
|
|
|
**Type consistency:**
|
|
- `EventPublisherAdapter<NatsTransport>` — `NatsTransport` implements `Transport`, `EventPublisherAdapter<T: Transport>` implements `EventPublisher` ✓
|
|
- `event_publisher::Transport` imported in `nats/src/lib.rs` — `nats` depends on `event-publisher` ✓
|
|
- `factory.rs` uses `event_publisher::EventPublisherAdapter` and `nats::NatsTransport` — both in bootstrap deps ✓
|
|
|
|
**Adding Kafka later:**
|
|
```toml
|
|
# kafka/Cargo.toml
|
|
[dependencies]
|
|
event-publisher = { workspace = true }
|
|
rdkafka = "..."
|
|
```
|
|
```rust
|
|
// kafka/src/lib.rs
|
|
pub struct KafkaTransport { ... }
|
|
#[async_trait] impl Transport for KafkaTransport { ... }
|
|
```
|
|
```rust
|
|
// bootstrap/src/factory.rs — only this line changes:
|
|
Arc::new(EventPublisherAdapter::new(KafkaTransport::new(...)))
|
|
```
|