Files
thoughts/docs/superpowers/plans/2026-05-14-event-transport-rename.md

16 KiB

event-transport Rename + Consumer Abstraction Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Rename event-publisherevent-transport and add the symmetric consumer abstraction (MessageSource trait + EventConsumerAdapter<S>) so both publish and subscribe are transport-agnostic.

Architecture after this plan:

event-transport/  ← Transport + EventPublisherAdapter<T>  (existing)
                  ← MessageSource + EventConsumerAdapter<S> (new)
                  ← RawMessage { subject, payload, ack, nack } (new)

nats/             ← NatsTransport (existing, implements Transport)
                  ← NatsMessageSource (new, implements MessageSource)
                  ← NatsEventConsumer removed

worker/           ← EventConsumerAdapter::new(NatsMessageSource::new(client))

Dependency chain:

event-transport → domain, event-payload, serde_json, async-trait
nats            → domain, event-payload, event-transport, async-nats
worker          → domain, nats, event-transport, postgres

File Map

Rename: crates/adapters/event-publisher/  →  crates/adapters/event-transport/
Modify: Cargo.toml (root)                 ← update member path + workspace dep name
Modify: crates/adapters/event-transport/Cargo.toml  ← name = "event-transport"
Modify: crates/adapters/nats/Cargo.toml   ← event-publisher → event-transport
Modify: crates/adapters/nats/src/lib.rs   ← use event_transport; add NatsMessageSource; remove NatsEventConsumer
Modify: crates/bootstrap/Cargo.toml       ← event-publisher → event-transport
Modify: crates/bootstrap/src/factory.rs   ← use event_transport; update EventConsumerAdapter wiring
Modify: crates/worker/Cargo.toml          ← add event-transport dep
Modify: crates/worker/src/main.rs         ← EventConsumerAdapter<NatsMessageSource>
Modify: crates/adapters/event-transport/src/lib.rs  ← add RawMessage + MessageSource + EventConsumerAdapter

Task 1: Rename crate + update all references

Files: root Cargo.toml, event-publisher/Cargo.toml (renamed), nats/Cargo.toml, bootstrap/Cargo.toml, nats/src/lib.rs, bootstrap/src/factory.rs

  • Rename the directory:
git mv crates/adapters/event-publisher crates/adapters/event-transport
  • Update crates/adapters/event-transport/Cargo.toml — change the package name:
[package]
name = "event-transport"
version = "0.1.0"
edition = "2021"

[dependencies]
domain        = { workspace = true }
event-payload = { workspace = true }
serde_json    = { workspace = true }
async-trait   = { workspace = true }
tracing       = { workspace = true }

[dev-dependencies]
tokio = { workspace = true, features = ["full"] }
  • Update root Cargo.toml:

In [workspace] members, change:

"crates/adapters/event-publisher",

to:

"crates/adapters/event-transport",

In [workspace.dependencies], change:

event-publisher  = { path = "crates/adapters/event-publisher" }

to:

event-transport  = { path = "crates/adapters/event-transport" }
  • Update crates/adapters/nats/Cargo.toml:

Change event-publisher = { workspace = true } to event-transport = { workspace = true }.

  • Update crates/adapters/nats/src/lib.rs:

Change use event_publisher::Transport; to use event_transport::Transport;.

  • Update crates/bootstrap/Cargo.toml:

Change event-publisher = { workspace = true } to event-transport = { workspace = true }.

  • Update crates/bootstrap/src/factory.rs:

Change use event_publisher::EventPublisherAdapter; to use event_transport::EventPublisherAdapter;.

  • Run: cargo check --workspace — Expected: no errors.

  • Run: cargo test -p event-transport — Expected: 2 tests pass (same tests as before, just crate renamed).

  • Commit:

git add Cargo.toml \
        crates/adapters/event-transport/ \
        crates/adapters/nats/Cargo.toml \
        crates/adapters/nats/src/lib.rs \
        crates/bootstrap/Cargo.toml \
        crates/bootstrap/src/factory.rs
git commit -m "refactor: rename event-publisher → event-transport"

Task 2: Add MessageSource + EventConsumerAdapter to event-transport

Files:

  • Modify: crates/adapters/event-transport/src/lib.rs

  • Write failing tests — append to the test module in src/lib.rs:

    #[tokio::test]
    async fn consumer_adapter_deserializes_and_yields_event() {
        use domain::value_objects::ThoughtId;
        use futures::StreamExt;

        // Produce a serialized EventPayload for ThoughtCreated
        let event = DomainEvent::ThoughtCreated {
            thought_id: ThoughtId::new(),
            user_id: UserId::new(),
            in_reply_to_id: None,
        };
        let payload = EventPayload::from(&event);
        let bytes = serde_json::to_vec(&payload).unwrap();

        // A MessageSource that yields one message then ends
        struct OneMessageSource { bytes: Vec<u8> }
        #[async_trait]
        impl MessageSource for OneMessageSource {
            fn messages(&self) -> futures::stream::BoxStream<'_, Result<RawMessage, DomainError>> {
                let msg = RawMessage {
                    subject: "thoughts.created".to_string(),
                    payload: self.bytes.clone(),
                    ack:  Box::new(|| {}),
                    nack: Box::new(|| {}),
                };
                Box::pin(futures::stream::once(async { Ok(msg) }))
            }
        }

        let adapter = EventConsumerAdapter::new(OneMessageSource { bytes });
        let mut stream = adapter.consume();
        let envelope = stream.next().await.unwrap().unwrap();
        assert!(matches!(envelope.event, DomainEvent::ThoughtCreated { .. }));
    }

    #[tokio::test]
    async fn consumer_adapter_skips_invalid_payloads() {
        use futures::StreamExt;

        struct BadMessageSource;
        #[async_trait]
        impl MessageSource for BadMessageSource {
            fn messages(&self) -> futures::stream::BoxStream<'_, Result<RawMessage, DomainError>> {
                let msg = RawMessage {
                    subject: "bad".to_string(),
                    payload: b"not valid json".to_vec(),
                    ack:  Box::new(|| {}),
                    nack: Box::new(|| {}),
                };
                Box::pin(futures::stream::once(async { Ok(msg) }))
            }
        }

        let adapter = EventConsumerAdapter::new(BadMessageSource);
        let mut stream = adapter.consume();
        // Invalid JSON should be skipped — stream ends with no items
        assert!(stream.next().await.is_none());
    }
  • Run: cargo test -p event-transport — Expected: FAIL (MessageSource, RawMessage, EventConsumerAdapter not defined).

  • Add to crates/adapters/event-transport/src/lib.rs — append after the existing EventPublisherAdapter impl and before #[cfg(test)]:

use domain::{events::EventEnvelope, ports::EventConsumer};
use futures::stream::BoxStream;

/// A raw inbound message from a transport backend.
/// `ack` and `nack` are transport-level acknowledgements (e.g. Kafka offset commit).
/// For at-most-once transports (basic NATS), both are no-ops.
pub struct RawMessage {
    pub subject: String,
    pub payload: Vec<u8>,
    pub ack:  Box<dyn Fn() + Send + Sync>,
    pub nack: Box<dyn Fn() + Send + Sync>,
}

/// Abstraction over any subscribe/consume backend.
/// Implement this for NATS, Kafka, Redis Streams, etc.
pub trait MessageSource: Send + Sync {
    fn messages(&self) -> BoxStream<'_, Result<RawMessage, DomainError>>;
}

/// Deserializes raw transport messages into domain `EventEnvelope`s.
///
/// Converts: `RawMessage.payload` → `EventPayload` → `DomainEvent` → `EventEnvelope`
///
/// Invalid or unknown messages are skipped with a warning — the stream continues.
pub struct EventConsumerAdapter<S: MessageSource> {
    source: S,
}

impl<S: MessageSource> EventConsumerAdapter<S> {
    pub fn new(source: S) -> Self { Self { source } }
}

impl<S: MessageSource> EventConsumer for EventConsumerAdapter<S> {
    fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> {
        use futures::StreamExt;
        let stream = self.source.messages();
        Box::pin(stream.filter_map(|result| async move {
            match result {
                Err(e) => {
                    tracing::warn!("transport error: {e}");
                    None
                }
                Ok(msg) => {
                    let payload = match serde_json::from_slice::<EventPayload>(&msg.payload) {
                        Ok(p) => p,
                        Err(e) => {
                            tracing::warn!("failed to deserialize event payload: {e}");
                            return None;
                        }
                    };
                    let event = match DomainEvent::try_from(payload) {
                        Ok(e) => e,
                        Err(e) => {
                            tracing::warn!("unknown event type: {e}");
                            return None;
                        }
                    };
                    Some(Ok(EventEnvelope {
                        event,
                        ack:  msg.ack,
                        nack: msg.nack,
                    }))
                }
            }
        }))
    }
}

Note: the existing imports at the top of lib.rs already have use domain::... — add EventEnvelope and EventConsumer to those imports. Also add futures::stream::BoxStream if not already present.

Also add futures = { workspace = true } to event-transport/Cargo.toml dependencies (needed for BoxStream and StreamExt).

  • Run: cargo test -p event-transport — Expected: 4 tests pass (2 existing + 2 new).

  • Commit:

git add crates/adapters/event-transport/
git commit -m "feat(event-transport): MessageSource trait + EventConsumerAdapter for transport-agnostic consuming"

Task 3: nats — add NatsMessageSource, remove NatsEventConsumer

Files:

  • Modify: crates/adapters/nats/src/lib.rs

  • Rewrite crates/adapters/nats/src/lib.rs — remove NatsEventConsumer, add NatsMessageSource:

use async_trait::async_trait;
use domain::errors::DomainError;
use event_payload::EventPayload;
use event_transport::{MessageSource, RawMessage, Transport};
use futures::stream::BoxStream;

// ── NatsTransport — raw NATS publish backend ────────────────────────────────

pub struct NatsTransport {
    client: async_nats::Client,
}

impl NatsTransport {
    pub fn new(client: async_nats::Client) -> Self { Self { client } }
}

#[async_trait]
impl Transport for NatsTransport {
    async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), DomainError> {
        self.client
            .publish(subject, bytes.to_vec().into())
            .await
            .map_err(|e| DomainError::Internal(e.to_string()))
    }
}

// ── NatsMessageSource — raw NATS subscribe backend ──────────────────────────

pub struct NatsMessageSource {
    client: async_nats::Client,
}

impl NatsMessageSource {
    pub fn new(client: async_nats::Client) -> Self { Self { client } }
}

impl MessageSource for NatsMessageSource {
    fn messages(&self) -> BoxStream<'_, Result<RawMessage, DomainError>> {
        let client = self.client.clone();
        Box::pin(async_stream::try_stream! {
            let mut sub = client
                .subscribe(">")
                .await
                .map_err(|e| DomainError::Internal(e.to_string()))?;

            use futures::StreamExt;
            while let Some(msg) = sub.next().await {
                let subject = msg.subject.to_string();
                let payload = msg.payload.to_vec();
                // Basic NATS: at-most-once delivery — ack/nack are no-ops.
                // Replace with JetStream for at-least-once delivery.
                yield RawMessage {
                    subject,
                    payload,
                    ack:  Box::new(|| {}),
                    nack: Box::new(|| {}),
                };
            }
        })
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use domain::{events::DomainEvent, value_objects::{LikeId, ThoughtId, UserId}};

    #[test]
    fn payload_from_domain_event_has_correct_subject() {
        let event = DomainEvent::ThoughtCreated {
            thought_id: ThoughtId::new(),
            user_id: UserId::new(),
            in_reply_to_id: None,
        };
        let payload = EventPayload::from(&event);
        assert_eq!(payload.subject(), "thoughts.created");
    }

    #[test]
    fn domain_event_roundtrip_via_payload() {
        let uid = UserId::new();
        let tid = ThoughtId::new();
        let event = DomainEvent::LikeAdded {
            like_id: LikeId::new(),
            user_id: uid.clone(),
            thought_id: tid.clone(),
        };
        let payload = EventPayload::from(&event);
        let back = DomainEvent::try_from(payload).unwrap();
        if let DomainEvent::LikeAdded { user_id, thought_id, .. } = back {
            assert_eq!(user_id, uid);
            assert_eq!(thought_id, tid);
        } else {
            panic!("wrong variant");
        }
    }
}
  • Run: cargo test -p nats — Expected: 2 tests pass.

  • Run: cargo check --workspace — Expected: one error in worker (uses removed NatsEventConsumer). That's expected — fixed in Task 4.

  • Commit:

git add crates/adapters/nats/src/lib.rs
git commit -m "refactor(nats): replace NatsEventConsumer with NatsMessageSource implementing MessageSource"

Task 4: Update worker + full verification

Files:

  • Modify: crates/worker/Cargo.toml

  • Modify: crates/worker/src/main.rs

  • Add event-transport = { workspace = true } to crates/worker/Cargo.toml.

  • Update crates/worker/src/main.rs — find and update the consumer creation.

Current code in main.rs:

let consumer = nats::NatsEventConsumer::new(nats_client);

Replace with:

use event_transport::EventConsumerAdapter;
use nats::NatsMessageSource;
let consumer = EventConsumerAdapter::new(NatsMessageSource::new(nats_client));

Also add the use statements at the top of main.rs alongside existing imports.

  • Run: cargo check --workspace — Expected: no errors.

  • Run full test suite:

DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres cargo test --workspace 2>&1 | tail -3

Expected: all tests pass (79 existing + 2 new event-transport consumer tests = 81+).

  • Commit:
git add crates/worker/ 
git commit -m "feat(worker): use EventConsumerAdapter<NatsMessageSource> — transport-agnostic consuming"

Self-Review

Spec coverage:

  • event-publisher renamed to event-transport everywhere (Task 1)
  • RawMessage { subject, payload, ack, nack } in event-transport (Task 2)
  • MessageSource trait with messages() -> BoxStream<RawMessage> (Task 2)
  • EventConsumerAdapter<S: MessageSource> implementing EventConsumer (Task 2)
  • Invalid messages skipped with warning, stream continues (Task 2)
  • 2 new tests: valid deserialization + invalid JSON skip (Task 2)
  • NatsEventConsumer removed from nats (Task 3)
  • NatsMessageSource implementing MessageSource added to nats (Task 3)
  • Worker uses EventConsumerAdapter::new(NatsMessageSource::new(client)) (Task 4)

Adding Kafka later:

# kafka/Cargo.toml: event-transport = { workspace = true }
// kafka/src/lib.rs
pub struct KafkaMessageSource { ... }
impl MessageSource for KafkaMessageSource { ... }  // yields RawMessage + real ack/nack

pub struct KafkaTransport { ... }
impl Transport for KafkaTransport { ... }
// bootstrap/src/factory.rs — two lines change:
EventPublisherAdapter::new(KafkaTransport::new(...))
EventConsumerAdapter::new(KafkaMessageSource::new(...))

Type consistency:

  • EventConsumerAdapter<NatsMessageSource>NatsMessageSource implements MessageSource, adapter implements EventConsumer
  • RawMessage.ack / .nack transferred to EventEnvelope.ack / .nack in consumer adapter ✓
  • event_transport:: (underscore) is the Rust module name for event-transport (dash) crate ✓