Files
thoughts/docs/superpowers/plans/2026-05-14-event-publisher-refactor.md

13 KiB

event-publisher Transport Abstraction Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Fill event-publisher with a Transport trait + EventPublisherAdapter<T: Transport>, strip NatsEventPublisher from the nats crate and replace it with NatsTransport implementing Transport, then wire EventPublisherAdapter::new(NatsTransport::new(client)) in bootstrap — so adding Kafka/Redis later only requires a new transport crate.

Architecture: event-publisher defines the abstraction (Transport + EventPublisherAdapter). nats implements Transport for NATS (pure bytes: publish/subscribe). event-publisher never imports nats. bootstrap wires them together. NatsEventConsumer stays in nats — it's transport-specific and will never be shared.

Dependency chain after refactor:

event-publisher → domain, event-payload, serde_json
nats            → domain, event-payload, event-publisher, async-nats
bootstrap       → event-publisher, nats (+ all others)

File Map

Modify: crates/adapters/event-publisher/Cargo.toml  ← add deps
Modify: crates/adapters/event-publisher/src/lib.rs  ← Transport trait + EventPublisherAdapter<T>
Modify: crates/adapters/nats/Cargo.toml             ← add event-publisher dep
Modify: crates/adapters/nats/src/lib.rs             ← remove NatsEventPublisher, add NatsTransport
Modify: crates/bootstrap/src/factory.rs             ← use EventPublisherAdapter<NatsTransport>
Modify: crates/bootstrap/Cargo.toml                 ← add event-publisher dep (if missing)

Task 1: Fill event-publisher — Transport trait + EventPublisherAdapter

Files:

  • Modify: crates/adapters/event-publisher/Cargo.toml

  • Modify: crates/adapters/event-publisher/src/lib.rs

  • Write tests at the bottom of crates/adapters/event-publisher/src/lib.rs:

#[cfg(test)]
mod tests {
    use super::*;
    use async_trait::async_trait;
    use std::sync::{Arc, Mutex};
    use domain::value_objects::{ThoughtId, UserId};

    struct SpyTransport {
        calls: Arc<Mutex<Vec<(String, Vec<u8>)>>>,
    }
    impl SpyTransport {
        fn new() -> (Self, Arc<Mutex<Vec<(String, Vec<u8>)>>>) {
            let calls = Arc::new(Mutex::new(vec![]));
            (Self { calls: calls.clone() }, calls)
        }
    }
    #[async_trait]
    impl Transport for SpyTransport {
        async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), domain::errors::DomainError> {
            self.calls.lock().unwrap().push((subject.to_string(), bytes.to_vec()));
            Ok(())
        }
    }

    #[tokio::test]
    async fn thought_created_routes_to_correct_subject() {
        let (spy, calls) = SpyTransport::new();
        let publisher = EventPublisherAdapter::new(spy);
        publisher.publish(&domain::events::DomainEvent::ThoughtCreated {
            thought_id: ThoughtId::new(),
            user_id: UserId::new(),
            in_reply_to_id: None,
        }).await.unwrap();
        let calls = calls.lock().unwrap();
        assert_eq!(calls.len(), 1);
        assert_eq!(calls[0].0, "thoughts.created");
    }

    #[tokio::test]
    async fn serialized_payload_is_valid_json() {
        let (spy, calls) = SpyTransport::new();
        let publisher = EventPublisherAdapter::new(spy);
        publisher.publish(&domain::events::DomainEvent::UserBlocked {
            blocker_id: UserId::new(),
            blocked_id: UserId::new(),
        }).await.unwrap();
        let bytes = &calls.lock().unwrap()[0].1.clone();
        let json: serde_json::Value = serde_json::from_slice(bytes).expect("valid JSON");
        assert_eq!(json["type"], "UserBlocked");
    }
}
  • Run: cargo test -p event-publisher — Expected: FAIL (no implementation yet).

  • Write crates/adapters/event-publisher/Cargo.toml:

[package]
name = "event-publisher"
version = "0.1.0"
edition = "2021"

[dependencies]
domain        = { workspace = true }
event-payload = { workspace = true }
serde_json    = { workspace = true }
async-trait   = { workspace = true }
tracing       = { workspace = true }

[dev-dependencies]
tokio = { workspace = true, features = ["full"] }
  • Write crates/adapters/event-publisher/src/lib.rs:
use async_trait::async_trait;
use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher};
use event_payload::EventPayload;

/// Abstraction over any pub/sub transport backend.
/// Implement this for NATS, Kafka, Redis Streams, etc.
/// The adapter calls `publish_bytes(subject, bytes)` — subjects come from `EventPayload::subject()`.
#[async_trait]
pub trait Transport: Send + Sync {
    async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), DomainError>;
}

/// Routes domain events to a transport backend.
///
/// Converts: `DomainEvent` → `EventPayload` (via `From`) → JSON bytes → `transport.publish_bytes(subject, bytes)`
///
/// To swap transports (e.g. NATS → Kafka), replace the `T` at the composition root.
/// This type never needs to change.
pub struct EventPublisherAdapter<T: Transport> {
    transport: T,
}

impl<T: Transport> EventPublisherAdapter<T> {
    pub fn new(transport: T) -> Self {
        Self { transport }
    }
}

#[async_trait]
impl<T: Transport> EventPublisher for EventPublisherAdapter<T> {
    async fn publish(&self, event: &DomainEvent) -> Result<(), DomainError> {
        let payload = EventPayload::from(event);
        let subject = payload.subject();
        let bytes = serde_json::to_vec(&payload)
            .map_err(|e| DomainError::Internal(e.to_string()))?;
        tracing::debug!(subject, "publishing event");
        self.transport.publish_bytes(subject, &bytes).await
    }
}
  • Run: cargo test -p event-publisher — Expected: 2 tests pass.

  • Commit:

git add crates/adapters/event-publisher/
git commit -m "feat(event-publisher): Transport trait + EventPublisherAdapter for transport-agnostic event routing"

Task 2: Refactor nats — strip NatsEventPublisher, add NatsTransport

Files:

  • Modify: crates/adapters/nats/Cargo.toml

  • Modify: crates/adapters/nats/src/lib.rs

  • Add event-publisher to crates/adapters/nats/Cargo.toml:

event-publisher = { workspace = true }
  • Rewrite crates/adapters/nats/src/lib.rs — remove NatsEventPublisher, add NatsTransport:
use async_trait::async_trait;
use domain::{
    errors::DomainError,
    events::{DomainEvent, EventEnvelope},
    ports::EventConsumer,
};
use event_payload::EventPayload;
use event_publisher::Transport;
use futures::stream::BoxStream;

// ── NatsTransport — raw NATS publish backend ────────────────────────────────

pub struct NatsTransport {
    client: async_nats::Client,
}

impl NatsTransport {
    pub fn new(client: async_nats::Client) -> Self { Self { client } }
}

#[async_trait]
impl Transport for NatsTransport {
    async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), DomainError> {
        self.client
            .publish(subject, bytes.to_vec().into())
            .await
            .map_err(|e| DomainError::Internal(e.to_string()))
    }
}

// ── NatsEventConsumer — subscribes and yields EventEnvelopes ────────────────

pub struct NatsEventConsumer {
    client: async_nats::Client,
}

impl NatsEventConsumer {
    pub fn new(client: async_nats::Client) -> Self { Self { client } }
}

impl EventConsumer for NatsEventConsumer {
    fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> {
        let client = self.client.clone();
        Box::pin(async_stream::try_stream! {
            let mut sub = client
                .subscribe(">")
                .await
                .map_err(|e| DomainError::Internal(e.to_string()))?;

            use futures::StreamExt;
            while let Some(msg) = sub.next().await {
                let payload = match serde_json::from_slice::<EventPayload>(&msg.payload) {
                    Ok(p) => p,
                    Err(e) => {
                        tracing::warn!("failed to deserialize event payload: {e}");
                        continue;
                    }
                };
                let event = match DomainEvent::try_from(payload) {
                    Ok(e) => e,
                    Err(e) => {
                        tracing::warn!("failed to convert payload to domain event: {e}");
                        continue;
                    }
                };
                yield EventEnvelope {
                    event,
                    ack:  Box::new(|| {}),
                    nack: Box::new(|| {}),
                };
            }
        })
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use domain::value_objects::{LikeId, ThoughtId, UserId};

    #[test]
    fn payload_from_domain_event_has_correct_subject() {
        let event = DomainEvent::ThoughtCreated {
            thought_id: ThoughtId::new(),
            user_id: UserId::new(),
            in_reply_to_id: None,
        };
        let payload = EventPayload::from(&event);
        assert_eq!(payload.subject(), "thoughts.created");
    }

    #[test]
    fn domain_event_roundtrip_via_payload() {
        let uid = UserId::new();
        let tid = ThoughtId::new();
        let event = DomainEvent::LikeAdded {
            like_id: LikeId::new(),
            user_id: uid.clone(),
            thought_id: tid.clone(),
        };
        let payload = EventPayload::from(&event);
        let back = DomainEvent::try_from(payload).unwrap();
        if let DomainEvent::LikeAdded { user_id, thought_id, .. } = back {
            assert_eq!(user_id, uid);
            assert_eq!(thought_id, tid);
        } else {
            panic!("wrong variant");
        }
    }
}
  • Run: cargo test -p nats — Expected: 2 tests pass.

  • Run: cargo check --workspace — Expected: one error in bootstrap (uses removed NatsEventPublisher) — this is expected and fixed in Task 3.

  • Commit:

git add crates/adapters/nats/
git commit -m "refactor(nats): strip NatsEventPublisher, add NatsTransport implementing Transport"

Task 3: Wire EventPublisherAdapter in bootstrap

Files:

  • Modify: crates/bootstrap/Cargo.toml

  • Modify: crates/bootstrap/src/factory.rs

  • Add event-publisher to crates/bootstrap/Cargo.toml:

event-publisher = { workspace = true }
  • Update crates/bootstrap/src/factory.rs — find the NATS event publisher section and replace:

Find (in the build function):

Arc::new(nats::NatsEventPublisher::new(client))

Replace with:

Arc::new(event_publisher::EventPublisherAdapter::new(nats::NatsTransport::new(client)))

The use imports at the top of factory.rs need event_publisher in scope. Add:

use event_publisher::EventPublisherAdapter;

The NoOpEventPublisher struct and its impl EventPublisher stays in factory.rs — it's the fallback when NATS is unavailable and lives correctly in the composition root.

  • Run: cargo check -p bootstrap — Expected: no errors.

  • Run: cargo check --workspace — Expected: no errors.

  • Run full test suite:

DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres cargo test --workspace 2>&1 | tail -3

Expected: all tests pass (including new event-publisher tests).

  • Smoke test:
DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres \
JWT_SECRET=dev BASE_URL=http://localhost:3000 \
RUST_LOG=info cargo run -p bootstrap &
sleep 3
curl -s http://localhost:3000/health | jq .
kill %1 2>/dev/null

Expected: {"status":"ok","db":"connected"}.

  • Commit:
git add crates/bootstrap/
git commit -m "feat(bootstrap): wire EventPublisherAdapter<NatsTransport> — transport-agnostic event publishing"

Self-Review

Spec coverage:

  • Transport trait in event-publisher with publish_bytes(subject, bytes) (Task 1)
  • EventPublisherAdapter<T: Transport> implements EventPublisher (Task 1)
  • 2 tests: correct subject routing, valid JSON serialization (Task 1)
  • NatsEventPublisher removed from nats (Task 2)
  • NatsTransport implements Transport for NATS (Task 2)
  • NatsEventConsumer unchanged — stays in nats (Task 2)
  • bootstrap wires EventPublisherAdapter::new(NatsTransport::new(client)) (Task 3)
  • NoOpEventPublisher stays in factory.rs as fallback (Task 3)

Placeholder scan: None.

Type consistency:

  • EventPublisherAdapter<NatsTransport>NatsTransport implements Transport, EventPublisherAdapter<T: Transport> implements EventPublisher
  • event_publisher::Transport imported in nats/src/lib.rsnats depends on event-publisher
  • factory.rs uses event_publisher::EventPublisherAdapter and nats::NatsTransport — both in bootstrap deps ✓

Adding Kafka later:

# kafka/Cargo.toml
[dependencies]
event-publisher = { workspace = true }
rdkafka = "..."
// kafka/src/lib.rs
pub struct KafkaTransport { ... }
#[async_trait] impl Transport for KafkaTransport { ... }
// bootstrap/src/factory.rs — only this line changes:
Arc::new(EventPublisherAdapter::new(KafkaTransport::new(...)))