Compare commits
5 Commits
ff82764eb0
...
69b55058ce
| Author | SHA1 | Date | |
|---|---|---|---|
| 69b55058ce | |||
| e995b29be1 | |||
| c202eded05 | |||
| f9ca5836fb | |||
| 7963278189 |
@@ -14,7 +14,7 @@ members = [
|
|||||||
"crates/adapters/auth",
|
"crates/adapters/auth",
|
||||||
"crates/adapters/nats",
|
"crates/adapters/nats",
|
||||||
"crates/adapters/event-payload",
|
"crates/adapters/event-payload",
|
||||||
"crates/adapters/event-publisher",
|
"crates/adapters/event-transport",
|
||||||
]
|
]
|
||||||
resolver = "2"
|
resolver = "2"
|
||||||
|
|
||||||
@@ -51,4 +51,4 @@ activitypub = { path = "crates/adapters/activitypub" }
|
|||||||
auth = { path = "crates/adapters/auth" }
|
auth = { path = "crates/adapters/auth" }
|
||||||
nats = { path = "crates/adapters/nats" }
|
nats = { path = "crates/adapters/nats" }
|
||||||
event-payload = { path = "crates/adapters/event-payload" }
|
event-payload = { path = "crates/adapters/event-payload" }
|
||||||
event-publisher = { path = "crates/adapters/event-publisher" }
|
event-transport = { path = "crates/adapters/event-transport" }
|
||||||
|
|||||||
@@ -1,90 +0,0 @@
|
|||||||
use async_trait::async_trait;
|
|
||||||
use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher};
|
|
||||||
use event_payload::EventPayload;
|
|
||||||
|
|
||||||
/// Abstraction over any pub/sub transport backend.
|
|
||||||
/// Implement this for NATS, Kafka, Redis Streams, etc.
|
|
||||||
/// The adapter calls `publish_bytes(subject, bytes)` — subjects come from `EventPayload::subject()`.
|
|
||||||
#[async_trait]
|
|
||||||
pub trait Transport: Send + Sync {
|
|
||||||
async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), DomainError>;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Routes domain events to a transport backend.
|
|
||||||
///
|
|
||||||
/// Converts: `DomainEvent` → `EventPayload` → JSON bytes → `transport.publish_bytes(subject, bytes)`
|
|
||||||
///
|
|
||||||
/// To swap transports (e.g. NATS → Kafka), replace the `T` at the composition root.
|
|
||||||
pub struct EventPublisherAdapter<T: Transport> {
|
|
||||||
transport: T,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: Transport> EventPublisherAdapter<T> {
|
|
||||||
pub fn new(transport: T) -> Self {
|
|
||||||
Self { transport }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl<T: Transport> EventPublisher for EventPublisherAdapter<T> {
|
|
||||||
async fn publish(&self, event: &DomainEvent) -> Result<(), DomainError> {
|
|
||||||
let payload = EventPayload::from(event);
|
|
||||||
let subject = payload.subject();
|
|
||||||
let bytes = serde_json::to_vec(&payload)
|
|
||||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
|
||||||
tracing::debug!(subject, "publishing event");
|
|
||||||
self.transport.publish_bytes(subject, &bytes).await
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
use async_trait::async_trait;
|
|
||||||
use std::sync::{Arc, Mutex};
|
|
||||||
use domain::value_objects::{ThoughtId, UserId};
|
|
||||||
|
|
||||||
struct SpyTransport {
|
|
||||||
calls: Arc<Mutex<Vec<(String, Vec<u8>)>>>,
|
|
||||||
}
|
|
||||||
impl SpyTransport {
|
|
||||||
fn new() -> (Self, Arc<Mutex<Vec<(String, Vec<u8>)>>>) {
|
|
||||||
let calls = Arc::new(Mutex::new(vec![]));
|
|
||||||
(Self { calls: calls.clone() }, calls)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#[async_trait]
|
|
||||||
impl Transport for SpyTransport {
|
|
||||||
async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), DomainError> {
|
|
||||||
self.calls.lock().unwrap().push((subject.to_string(), bytes.to_vec()));
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn thought_created_routes_to_correct_subject() {
|
|
||||||
let (spy, calls) = SpyTransport::new();
|
|
||||||
let publisher = EventPublisherAdapter::new(spy);
|
|
||||||
publisher.publish(&DomainEvent::ThoughtCreated {
|
|
||||||
thought_id: ThoughtId::new(),
|
|
||||||
user_id: UserId::new(),
|
|
||||||
in_reply_to_id: None,
|
|
||||||
}).await.unwrap();
|
|
||||||
let calls = calls.lock().unwrap();
|
|
||||||
assert_eq!(calls.len(), 1);
|
|
||||||
assert_eq!(calls[0].0, "thoughts.created");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn serialized_payload_is_valid_json() {
|
|
||||||
let (spy, calls) = SpyTransport::new();
|
|
||||||
let publisher = EventPublisherAdapter::new(spy);
|
|
||||||
publisher.publish(&DomainEvent::UserBlocked {
|
|
||||||
blocker_id: UserId::new(),
|
|
||||||
blocked_id: UserId::new(),
|
|
||||||
}).await.unwrap();
|
|
||||||
let bytes = calls.lock().unwrap()[0].1.clone();
|
|
||||||
let json: serde_json::Value = serde_json::from_slice(&bytes).expect("valid JSON");
|
|
||||||
assert_eq!(json["type"], "UserBlocked");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,5 +1,5 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "event-publisher"
|
name = "event-transport"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
@@ -9,6 +9,7 @@ event-payload = { workspace = true }
|
|||||||
serde_json = { workspace = true }
|
serde_json = { workspace = true }
|
||||||
async-trait = { workspace = true }
|
async-trait = { workspace = true }
|
||||||
tracing = { workspace = true }
|
tracing = { workspace = true }
|
||||||
|
futures = { workspace = true }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
tokio = { workspace = true, features = ["full"] }
|
tokio = { workspace = true, features = ["full"] }
|
||||||
208
crates/adapters/event-transport/src/lib.rs
Normal file
208
crates/adapters/event-transport/src/lib.rs
Normal file
@@ -0,0 +1,208 @@
|
|||||||
|
use async_trait::async_trait;
|
||||||
|
use domain::{errors::DomainError, events::{DomainEvent, EventEnvelope}, ports::{EventConsumer, EventPublisher}};
|
||||||
|
use event_payload::EventPayload;
|
||||||
|
use futures::stream::BoxStream;
|
||||||
|
|
||||||
|
/// Abstraction over any pub/sub transport backend.
|
||||||
|
/// Implement this for NATS, Kafka, Redis Streams, etc.
|
||||||
|
/// The adapter calls `publish_bytes(subject, bytes)` — subjects come from `EventPayload::subject()`.
|
||||||
|
#[async_trait]
|
||||||
|
pub trait Transport: Send + Sync {
|
||||||
|
async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), DomainError>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Routes domain events to a transport backend.
|
||||||
|
///
|
||||||
|
/// Converts: `DomainEvent` → `EventPayload` → JSON bytes → `transport.publish_bytes(subject, bytes)`
|
||||||
|
///
|
||||||
|
/// To swap transports (e.g. NATS → Kafka), replace the `T` at the composition root.
|
||||||
|
pub struct EventPublisherAdapter<T: Transport> {
|
||||||
|
transport: T,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Transport> EventPublisherAdapter<T> {
|
||||||
|
pub fn new(transport: T) -> Self {
|
||||||
|
Self { transport }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl<T: Transport> EventPublisher for EventPublisherAdapter<T> {
|
||||||
|
async fn publish(&self, event: &DomainEvent) -> Result<(), DomainError> {
|
||||||
|
let payload = EventPayload::from(event);
|
||||||
|
let subject = payload.subject();
|
||||||
|
let bytes = serde_json::to_vec(&payload)
|
||||||
|
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||||
|
tracing::debug!(subject, "publishing event");
|
||||||
|
self.transport.publish_bytes(subject, &bytes).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A raw inbound message from a transport backend.
|
||||||
|
/// `ack` and `nack` are transport-level acknowledgements (e.g. Kafka offset commit).
|
||||||
|
/// For at-most-once transports (basic NATS), both are no-ops.
|
||||||
|
pub struct RawMessage {
|
||||||
|
pub subject: String,
|
||||||
|
pub payload: Vec<u8>,
|
||||||
|
pub ack: Box<dyn Fn() + Send + Sync>,
|
||||||
|
pub nack: Box<dyn Fn() + Send + Sync>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Abstraction over any subscribe/consume backend.
|
||||||
|
pub trait MessageSource: Send + Sync {
|
||||||
|
fn messages(&self) -> BoxStream<'_, Result<RawMessage, DomainError>>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Deserializes raw transport messages into domain `EventEnvelope`s.
|
||||||
|
/// Invalid or unknown messages are skipped with a warning — stream continues.
|
||||||
|
pub struct EventConsumerAdapter<S: MessageSource> {
|
||||||
|
source: S,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S: MessageSource> EventConsumerAdapter<S> {
|
||||||
|
pub fn new(source: S) -> Self { Self { source } }
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S: MessageSource> EventConsumer for EventConsumerAdapter<S> {
|
||||||
|
fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> {
|
||||||
|
use futures::StreamExt;
|
||||||
|
let stream = self.source.messages();
|
||||||
|
Box::pin(stream.filter_map(|result| async move {
|
||||||
|
match result {
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!("transport error: {e}");
|
||||||
|
None
|
||||||
|
}
|
||||||
|
Ok(msg) => {
|
||||||
|
let payload = match serde_json::from_slice::<EventPayload>(&msg.payload) {
|
||||||
|
Ok(p) => p,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!("failed to deserialize event payload: {e}");
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let event = match DomainEvent::try_from(payload) {
|
||||||
|
Ok(e) => e,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!("unknown event type: {e}");
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
Some(Ok(EventEnvelope {
|
||||||
|
event,
|
||||||
|
ack: msg.ack,
|
||||||
|
nack: msg.nack,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
use domain::value_objects::{ThoughtId, UserId};
|
||||||
|
|
||||||
|
struct SpyTransport {
|
||||||
|
calls: Arc<Mutex<Vec<(String, Vec<u8>)>>>,
|
||||||
|
}
|
||||||
|
impl SpyTransport {
|
||||||
|
fn new() -> (Self, Arc<Mutex<Vec<(String, Vec<u8>)>>>) {
|
||||||
|
let calls = Arc::new(Mutex::new(vec![]));
|
||||||
|
(Self { calls: calls.clone() }, calls)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#[async_trait]
|
||||||
|
impl Transport for SpyTransport {
|
||||||
|
async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), DomainError> {
|
||||||
|
self.calls.lock().unwrap().push((subject.to_string(), bytes.to_vec()));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn thought_created_routes_to_correct_subject() {
|
||||||
|
let (spy, calls) = SpyTransport::new();
|
||||||
|
let publisher = EventPublisherAdapter::new(spy);
|
||||||
|
publisher.publish(&DomainEvent::ThoughtCreated {
|
||||||
|
thought_id: ThoughtId::new(),
|
||||||
|
user_id: UserId::new(),
|
||||||
|
in_reply_to_id: None,
|
||||||
|
}).await.unwrap();
|
||||||
|
let calls = calls.lock().unwrap();
|
||||||
|
assert_eq!(calls.len(), 1);
|
||||||
|
assert_eq!(calls[0].0, "thoughts.created");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn serialized_payload_is_valid_json() {
|
||||||
|
let (spy, calls) = SpyTransport::new();
|
||||||
|
let publisher = EventPublisherAdapter::new(spy);
|
||||||
|
publisher.publish(&DomainEvent::UserBlocked {
|
||||||
|
blocker_id: UserId::new(),
|
||||||
|
blocked_id: UserId::new(),
|
||||||
|
}).await.unwrap();
|
||||||
|
let bytes = calls.lock().unwrap()[0].1.clone();
|
||||||
|
let json: serde_json::Value = serde_json::from_slice(&bytes).expect("valid JSON");
|
||||||
|
assert_eq!(json["type"], "UserBlocked");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn consumer_adapter_deserializes_and_yields_event() {
|
||||||
|
use domain::value_objects::ThoughtId;
|
||||||
|
use futures::StreamExt;
|
||||||
|
|
||||||
|
let event = DomainEvent::ThoughtCreated {
|
||||||
|
thought_id: ThoughtId::new(),
|
||||||
|
user_id: UserId::new(),
|
||||||
|
in_reply_to_id: None,
|
||||||
|
};
|
||||||
|
let payload = EventPayload::from(&event);
|
||||||
|
let bytes = serde_json::to_vec(&payload).unwrap();
|
||||||
|
|
||||||
|
struct OneMessageSource { bytes: Vec<u8> }
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl MessageSource for OneMessageSource {
|
||||||
|
fn messages(&self) -> futures::stream::BoxStream<'_, Result<RawMessage, DomainError>> {
|
||||||
|
let msg = RawMessage {
|
||||||
|
subject: "thoughts.created".to_string(),
|
||||||
|
payload: self.bytes.clone(),
|
||||||
|
ack: Box::new(|| {}),
|
||||||
|
nack: Box::new(|| {}),
|
||||||
|
};
|
||||||
|
Box::pin(futures::stream::once(async { Ok(msg) }))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let adapter = EventConsumerAdapter::new(OneMessageSource { bytes });
|
||||||
|
let mut stream = adapter.consume();
|
||||||
|
let envelope = stream.next().await.unwrap().unwrap();
|
||||||
|
assert!(matches!(envelope.event, DomainEvent::ThoughtCreated { .. }));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn consumer_adapter_skips_invalid_payloads() {
|
||||||
|
use futures::StreamExt;
|
||||||
|
|
||||||
|
struct BadMessageSource;
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl MessageSource for BadMessageSource {
|
||||||
|
fn messages(&self) -> futures::stream::BoxStream<'_, Result<RawMessage, DomainError>> {
|
||||||
|
let msg = RawMessage {
|
||||||
|
subject: "bad".to_string(),
|
||||||
|
payload: b"not valid json".to_vec(),
|
||||||
|
ack: Box::new(|| {}),
|
||||||
|
nack: Box::new(|| {}),
|
||||||
|
};
|
||||||
|
Box::pin(futures::stream::once(async { Ok(msg) }))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let adapter = EventConsumerAdapter::new(BadMessageSource);
|
||||||
|
let mut stream = adapter.consume();
|
||||||
|
assert!(stream.next().await.is_none());
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -6,7 +6,7 @@ edition = "2021"
|
|||||||
[dependencies]
|
[dependencies]
|
||||||
domain = { workspace = true }
|
domain = { workspace = true }
|
||||||
event-payload = { workspace = true }
|
event-payload = { workspace = true }
|
||||||
event-publisher = { workspace = true }
|
event-transport = { workspace = true }
|
||||||
async-nats = { workspace = true }
|
async-nats = { workspace = true }
|
||||||
async-stream = { workspace = true }
|
async-stream = { workspace = true }
|
||||||
serde_json = { workspace = true }
|
serde_json = { workspace = true }
|
||||||
|
|||||||
@@ -1,11 +1,6 @@
|
|||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use domain::{
|
use domain::errors::DomainError;
|
||||||
errors::DomainError,
|
use event_transport::{MessageSource, RawMessage, Transport};
|
||||||
events::{DomainEvent, EventEnvelope},
|
|
||||||
ports::EventConsumer,
|
|
||||||
};
|
|
||||||
use event_payload::EventPayload;
|
|
||||||
use event_publisher::Transport;
|
|
||||||
use futures::stream::BoxStream;
|
use futures::stream::BoxStream;
|
||||||
|
|
||||||
// ── NatsTransport — raw NATS publish backend ────────────────────────────────
|
// ── NatsTransport — raw NATS publish backend ────────────────────────────────
|
||||||
@@ -28,18 +23,18 @@ impl Transport for NatsTransport {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── NatsEventConsumer — subscribes and yields EventEnvelopes ────────────────
|
// ── NatsMessageSource — raw NATS subscribe backend ──────────────────────────
|
||||||
|
|
||||||
pub struct NatsEventConsumer {
|
pub struct NatsMessageSource {
|
||||||
client: async_nats::Client,
|
client: async_nats::Client,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl NatsEventConsumer {
|
impl NatsMessageSource {
|
||||||
pub fn new(client: async_nats::Client) -> Self { Self { client } }
|
pub fn new(client: async_nats::Client) -> Self { Self { client } }
|
||||||
}
|
}
|
||||||
|
|
||||||
impl EventConsumer for NatsEventConsumer {
|
impl MessageSource for NatsMessageSource {
|
||||||
fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> {
|
fn messages(&self) -> BoxStream<'_, Result<RawMessage, DomainError>> {
|
||||||
let client = self.client.clone();
|
let client = self.client.clone();
|
||||||
Box::pin(async_stream::try_stream! {
|
Box::pin(async_stream::try_stream! {
|
||||||
let mut sub = client
|
let mut sub = client
|
||||||
@@ -49,22 +44,12 @@ impl EventConsumer for NatsEventConsumer {
|
|||||||
|
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
while let Some(msg) = sub.next().await {
|
while let Some(msg) = sub.next().await {
|
||||||
let payload = match serde_json::from_slice::<EventPayload>(&msg.payload) {
|
let subject = msg.subject.to_string();
|
||||||
Ok(p) => p,
|
let payload = msg.payload.to_vec();
|
||||||
Err(e) => {
|
// Basic NATS: at-most-once — ack/nack are no-ops.
|
||||||
tracing::warn!("failed to deserialize event payload: {e}");
|
yield RawMessage {
|
||||||
continue;
|
subject,
|
||||||
}
|
payload,
|
||||||
};
|
|
||||||
let event = match DomainEvent::try_from(payload) {
|
|
||||||
Ok(e) => e,
|
|
||||||
Err(e) => {
|
|
||||||
tracing::warn!("failed to convert payload to domain event: {e}");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
yield EventEnvelope {
|
|
||||||
event,
|
|
||||||
ack: Box::new(|| {}),
|
ack: Box::new(|| {}),
|
||||||
nack: Box::new(|| {}),
|
nack: Box::new(|| {}),
|
||||||
};
|
};
|
||||||
@@ -76,7 +61,8 @@ impl EventConsumer for NatsEventConsumer {
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use domain::value_objects::{LikeId, ThoughtId, UserId};
|
use domain::{events::DomainEvent, value_objects::{LikeId, ThoughtId, UserId}};
|
||||||
|
use event_payload::EventPayload;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn payload_from_domain_event_has_correct_subject() {
|
fn payload_from_domain_event_has_correct_subject() {
|
||||||
|
|||||||
@@ -16,7 +16,7 @@ postgres-federation = { workspace = true }
|
|||||||
activitypub = { workspace = true }
|
activitypub = { workspace = true }
|
||||||
activitypub-base = { workspace = true }
|
activitypub-base = { workspace = true }
|
||||||
nats = { workspace = true }
|
nats = { workspace = true }
|
||||||
event-publisher = { workspace = true }
|
event-transport = { workspace = true }
|
||||||
auth = { workspace = true }
|
auth = { workspace = true }
|
||||||
sqlx = { workspace = true }
|
sqlx = { workspace = true }
|
||||||
async-nats = { workspace = true }
|
async-nats = { workspace = true }
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ use sqlx::PgPool;
|
|||||||
use activitypub::ThoughtsObjectHandler;
|
use activitypub::ThoughtsObjectHandler;
|
||||||
use activitypub_base::{ApFederationConfig, FederationData};
|
use activitypub_base::{ApFederationConfig, FederationData};
|
||||||
use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher};
|
use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher};
|
||||||
use event_publisher::EventPublisherAdapter;
|
use event_transport::EventPublisherAdapter;
|
||||||
use nats::NatsTransport;
|
use nats::NatsTransport;
|
||||||
use postgres::activitypub::PgActivityPubRepository;
|
use postgres::activitypub::PgActivityPubRepository;
|
||||||
use postgres_federation::{PostgresApUserRepository, PostgresFederationRepository};
|
use postgres_federation::{PostgresApUserRepository, PostgresFederationRepository};
|
||||||
|
|||||||
@@ -8,9 +8,10 @@ name = "thoughts-worker"
|
|||||||
path = "src/main.rs"
|
path = "src/main.rs"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
domain = { workspace = true }
|
domain = { workspace = true }
|
||||||
nats = { workspace = true }
|
nats = { workspace = true }
|
||||||
event-payload = { workspace = true }
|
event-payload = { workspace = true }
|
||||||
|
event-transport = { workspace = true }
|
||||||
postgres = { workspace = true }
|
postgres = { workspace = true }
|
||||||
async-nats = { workspace = true }
|
async-nats = { workspace = true }
|
||||||
tokio = { workspace = true, features = ["full"] }
|
tokio = { workspace = true, features = ["full"] }
|
||||||
|
|||||||
@@ -20,7 +20,7 @@ async fn main() {
|
|||||||
|
|
||||||
tracing::info!("Connecting to NATS at {nats_url}...");
|
tracing::info!("Connecting to NATS at {nats_url}...");
|
||||||
let nats_client = async_nats::connect(&nats_url).await.expect("NATS connect failed");
|
let nats_client = async_nats::connect(&nats_url).await.expect("NATS connect failed");
|
||||||
let consumer = nats::NatsEventConsumer::new(nats_client);
|
let consumer = event_transport::EventConsumerAdapter::new(nats::NatsMessageSource::new(nats_client));
|
||||||
|
|
||||||
let notification_handler = handlers::NotificationHandler {
|
let notification_handler = handlers::NotificationHandler {
|
||||||
thoughts: Arc::new(postgres::thought::PgThoughtRepository::new(pool.clone())),
|
thoughts: Arc::new(postgres::thought::PgThoughtRepository::new(pool.clone())),
|
||||||
|
|||||||
483
docs/superpowers/plans/2026-05-14-event-transport-rename.md
Normal file
483
docs/superpowers/plans/2026-05-14-event-transport-rename.md
Normal file
@@ -0,0 +1,483 @@
|
|||||||
|
# event-transport Rename + Consumer Abstraction Plan
|
||||||
|
|
||||||
|
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
||||||
|
|
||||||
|
**Goal:** Rename `event-publisher` → `event-transport` and add the symmetric consumer abstraction (`MessageSource` trait + `EventConsumerAdapter<S>`) so both publish and subscribe are transport-agnostic.
|
||||||
|
|
||||||
|
**Architecture after this plan:**
|
||||||
|
```
|
||||||
|
event-transport/ ← Transport + EventPublisherAdapter<T> (existing)
|
||||||
|
← MessageSource + EventConsumerAdapter<S> (new)
|
||||||
|
← RawMessage { subject, payload, ack, nack } (new)
|
||||||
|
|
||||||
|
nats/ ← NatsTransport (existing, implements Transport)
|
||||||
|
← NatsMessageSource (new, implements MessageSource)
|
||||||
|
← NatsEventConsumer removed
|
||||||
|
|
||||||
|
worker/ ← EventConsumerAdapter::new(NatsMessageSource::new(client))
|
||||||
|
```
|
||||||
|
|
||||||
|
**Dependency chain:**
|
||||||
|
```
|
||||||
|
event-transport → domain, event-payload, serde_json, async-trait
|
||||||
|
nats → domain, event-payload, event-transport, async-nats
|
||||||
|
worker → domain, nats, event-transport, postgres
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## File Map
|
||||||
|
|
||||||
|
```
|
||||||
|
Rename: crates/adapters/event-publisher/ → crates/adapters/event-transport/
|
||||||
|
Modify: Cargo.toml (root) ← update member path + workspace dep name
|
||||||
|
Modify: crates/adapters/event-transport/Cargo.toml ← name = "event-transport"
|
||||||
|
Modify: crates/adapters/nats/Cargo.toml ← event-publisher → event-transport
|
||||||
|
Modify: crates/adapters/nats/src/lib.rs ← use event_transport; add NatsMessageSource; remove NatsEventConsumer
|
||||||
|
Modify: crates/bootstrap/Cargo.toml ← event-publisher → event-transport
|
||||||
|
Modify: crates/bootstrap/src/factory.rs ← use event_transport; update EventConsumerAdapter wiring
|
||||||
|
Modify: crates/worker/Cargo.toml ← add event-transport dep
|
||||||
|
Modify: crates/worker/src/main.rs ← EventConsumerAdapter<NatsMessageSource>
|
||||||
|
Modify: crates/adapters/event-transport/src/lib.rs ← add RawMessage + MessageSource + EventConsumerAdapter
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Task 1: Rename crate + update all references
|
||||||
|
|
||||||
|
**Files:** root `Cargo.toml`, `event-publisher/Cargo.toml` (renamed), `nats/Cargo.toml`, `bootstrap/Cargo.toml`, `nats/src/lib.rs`, `bootstrap/src/factory.rs`
|
||||||
|
|
||||||
|
- [ ] **Rename the directory:**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
git mv crates/adapters/event-publisher crates/adapters/event-transport
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Update `crates/adapters/event-transport/Cargo.toml`** — change the package name:
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[package]
|
||||||
|
name = "event-transport"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
domain = { workspace = true }
|
||||||
|
event-payload = { workspace = true }
|
||||||
|
serde_json = { workspace = true }
|
||||||
|
async-trait = { workspace = true }
|
||||||
|
tracing = { workspace = true }
|
||||||
|
|
||||||
|
[dev-dependencies]
|
||||||
|
tokio = { workspace = true, features = ["full"] }
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Update root `Cargo.toml`:**
|
||||||
|
|
||||||
|
In `[workspace] members`, change:
|
||||||
|
```toml
|
||||||
|
"crates/adapters/event-publisher",
|
||||||
|
```
|
||||||
|
to:
|
||||||
|
```toml
|
||||||
|
"crates/adapters/event-transport",
|
||||||
|
```
|
||||||
|
|
||||||
|
In `[workspace.dependencies]`, change:
|
||||||
|
```toml
|
||||||
|
event-publisher = { path = "crates/adapters/event-publisher" }
|
||||||
|
```
|
||||||
|
to:
|
||||||
|
```toml
|
||||||
|
event-transport = { path = "crates/adapters/event-transport" }
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Update `crates/adapters/nats/Cargo.toml`:**
|
||||||
|
|
||||||
|
Change `event-publisher = { workspace = true }` to `event-transport = { workspace = true }`.
|
||||||
|
|
||||||
|
- [ ] **Update `crates/adapters/nats/src/lib.rs`:**
|
||||||
|
|
||||||
|
Change `use event_publisher::Transport;` to `use event_transport::Transport;`.
|
||||||
|
|
||||||
|
- [ ] **Update `crates/bootstrap/Cargo.toml`:**
|
||||||
|
|
||||||
|
Change `event-publisher = { workspace = true }` to `event-transport = { workspace = true }`.
|
||||||
|
|
||||||
|
- [ ] **Update `crates/bootstrap/src/factory.rs`:**
|
||||||
|
|
||||||
|
Change `use event_publisher::EventPublisherAdapter;` to `use event_transport::EventPublisherAdapter;`.
|
||||||
|
|
||||||
|
- [ ] **Run:** `cargo check --workspace` — Expected: no errors.
|
||||||
|
|
||||||
|
- [ ] **Run:** `cargo test -p event-transport` — Expected: 2 tests pass (same tests as before, just crate renamed).
|
||||||
|
|
||||||
|
- [ ] **Commit:**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
git add Cargo.toml \
|
||||||
|
crates/adapters/event-transport/ \
|
||||||
|
crates/adapters/nats/Cargo.toml \
|
||||||
|
crates/adapters/nats/src/lib.rs \
|
||||||
|
crates/bootstrap/Cargo.toml \
|
||||||
|
crates/bootstrap/src/factory.rs
|
||||||
|
git commit -m "refactor: rename event-publisher → event-transport"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Task 2: Add MessageSource + EventConsumerAdapter to event-transport
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Modify: `crates/adapters/event-transport/src/lib.rs`
|
||||||
|
|
||||||
|
- [ ] **Write failing tests** — append to the test module in `src/lib.rs`:
|
||||||
|
|
||||||
|
```rust
|
||||||
|
#[tokio::test]
|
||||||
|
async fn consumer_adapter_deserializes_and_yields_event() {
|
||||||
|
use domain::value_objects::ThoughtId;
|
||||||
|
use futures::StreamExt;
|
||||||
|
|
||||||
|
// Produce a serialized EventPayload for ThoughtCreated
|
||||||
|
let event = DomainEvent::ThoughtCreated {
|
||||||
|
thought_id: ThoughtId::new(),
|
||||||
|
user_id: UserId::new(),
|
||||||
|
in_reply_to_id: None,
|
||||||
|
};
|
||||||
|
let payload = EventPayload::from(&event);
|
||||||
|
let bytes = serde_json::to_vec(&payload).unwrap();
|
||||||
|
|
||||||
|
// A MessageSource that yields one message then ends
|
||||||
|
struct OneMessageSource { bytes: Vec<u8> }
|
||||||
|
#[async_trait]
|
||||||
|
impl MessageSource for OneMessageSource {
|
||||||
|
fn messages(&self) -> futures::stream::BoxStream<'_, Result<RawMessage, DomainError>> {
|
||||||
|
let msg = RawMessage {
|
||||||
|
subject: "thoughts.created".to_string(),
|
||||||
|
payload: self.bytes.clone(),
|
||||||
|
ack: Box::new(|| {}),
|
||||||
|
nack: Box::new(|| {}),
|
||||||
|
};
|
||||||
|
Box::pin(futures::stream::once(async { Ok(msg) }))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let adapter = EventConsumerAdapter::new(OneMessageSource { bytes });
|
||||||
|
let mut stream = adapter.consume();
|
||||||
|
let envelope = stream.next().await.unwrap().unwrap();
|
||||||
|
assert!(matches!(envelope.event, DomainEvent::ThoughtCreated { .. }));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn consumer_adapter_skips_invalid_payloads() {
|
||||||
|
use futures::StreamExt;
|
||||||
|
|
||||||
|
struct BadMessageSource;
|
||||||
|
#[async_trait]
|
||||||
|
impl MessageSource for BadMessageSource {
|
||||||
|
fn messages(&self) -> futures::stream::BoxStream<'_, Result<RawMessage, DomainError>> {
|
||||||
|
let msg = RawMessage {
|
||||||
|
subject: "bad".to_string(),
|
||||||
|
payload: b"not valid json".to_vec(),
|
||||||
|
ack: Box::new(|| {}),
|
||||||
|
nack: Box::new(|| {}),
|
||||||
|
};
|
||||||
|
Box::pin(futures::stream::once(async { Ok(msg) }))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let adapter = EventConsumerAdapter::new(BadMessageSource);
|
||||||
|
let mut stream = adapter.consume();
|
||||||
|
// Invalid JSON should be skipped — stream ends with no items
|
||||||
|
assert!(stream.next().await.is_none());
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Run:** `cargo test -p event-transport` — Expected: FAIL (MessageSource, RawMessage, EventConsumerAdapter not defined).
|
||||||
|
|
||||||
|
- [ ] **Add to `crates/adapters/event-transport/src/lib.rs`** — append after the existing `EventPublisherAdapter` impl and before `#[cfg(test)]`:
|
||||||
|
|
||||||
|
```rust
|
||||||
|
use domain::{events::EventEnvelope, ports::EventConsumer};
|
||||||
|
use futures::stream::BoxStream;
|
||||||
|
|
||||||
|
/// A raw inbound message from a transport backend.
|
||||||
|
/// `ack` and `nack` are transport-level acknowledgements (e.g. Kafka offset commit).
|
||||||
|
/// For at-most-once transports (basic NATS), both are no-ops.
|
||||||
|
pub struct RawMessage {
|
||||||
|
pub subject: String,
|
||||||
|
pub payload: Vec<u8>,
|
||||||
|
pub ack: Box<dyn Fn() + Send + Sync>,
|
||||||
|
pub nack: Box<dyn Fn() + Send + Sync>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Abstraction over any subscribe/consume backend.
|
||||||
|
/// Implement this for NATS, Kafka, Redis Streams, etc.
|
||||||
|
pub trait MessageSource: Send + Sync {
|
||||||
|
fn messages(&self) -> BoxStream<'_, Result<RawMessage, DomainError>>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Deserializes raw transport messages into domain `EventEnvelope`s.
|
||||||
|
///
|
||||||
|
/// Converts: `RawMessage.payload` → `EventPayload` → `DomainEvent` → `EventEnvelope`
|
||||||
|
///
|
||||||
|
/// Invalid or unknown messages are skipped with a warning — the stream continues.
|
||||||
|
pub struct EventConsumerAdapter<S: MessageSource> {
|
||||||
|
source: S,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S: MessageSource> EventConsumerAdapter<S> {
|
||||||
|
pub fn new(source: S) -> Self { Self { source } }
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S: MessageSource> EventConsumer for EventConsumerAdapter<S> {
|
||||||
|
fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> {
|
||||||
|
use futures::StreamExt;
|
||||||
|
let stream = self.source.messages();
|
||||||
|
Box::pin(stream.filter_map(|result| async move {
|
||||||
|
match result {
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!("transport error: {e}");
|
||||||
|
None
|
||||||
|
}
|
||||||
|
Ok(msg) => {
|
||||||
|
let payload = match serde_json::from_slice::<EventPayload>(&msg.payload) {
|
||||||
|
Ok(p) => p,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!("failed to deserialize event payload: {e}");
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let event = match DomainEvent::try_from(payload) {
|
||||||
|
Ok(e) => e,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!("unknown event type: {e}");
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
Some(Ok(EventEnvelope {
|
||||||
|
event,
|
||||||
|
ack: msg.ack,
|
||||||
|
nack: msg.nack,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
Note: the existing imports at the top of `lib.rs` already have `use domain::...` — add `EventEnvelope` and `EventConsumer` to those imports. Also add `futures::stream::BoxStream` if not already present.
|
||||||
|
|
||||||
|
Also add `futures = { workspace = true }` to `event-transport/Cargo.toml` dependencies (needed for `BoxStream` and `StreamExt`).
|
||||||
|
|
||||||
|
- [ ] **Run:** `cargo test -p event-transport` — Expected: 4 tests pass (2 existing + 2 new).
|
||||||
|
|
||||||
|
- [ ] **Commit:**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
git add crates/adapters/event-transport/
|
||||||
|
git commit -m "feat(event-transport): MessageSource trait + EventConsumerAdapter for transport-agnostic consuming"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Task 3: nats — add NatsMessageSource, remove NatsEventConsumer
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Modify: `crates/adapters/nats/src/lib.rs`
|
||||||
|
|
||||||
|
- [ ] **Rewrite `crates/adapters/nats/src/lib.rs`** — remove `NatsEventConsumer`, add `NatsMessageSource`:
|
||||||
|
|
||||||
|
```rust
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use domain::errors::DomainError;
|
||||||
|
use event_payload::EventPayload;
|
||||||
|
use event_transport::{MessageSource, RawMessage, Transport};
|
||||||
|
use futures::stream::BoxStream;
|
||||||
|
|
||||||
|
// ── NatsTransport — raw NATS publish backend ────────────────────────────────
|
||||||
|
|
||||||
|
pub struct NatsTransport {
|
||||||
|
client: async_nats::Client,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl NatsTransport {
|
||||||
|
pub fn new(client: async_nats::Client) -> Self { Self { client } }
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl Transport for NatsTransport {
|
||||||
|
async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), DomainError> {
|
||||||
|
self.client
|
||||||
|
.publish(subject, bytes.to_vec().into())
|
||||||
|
.await
|
||||||
|
.map_err(|e| DomainError::Internal(e.to_string()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── NatsMessageSource — raw NATS subscribe backend ──────────────────────────
|
||||||
|
|
||||||
|
pub struct NatsMessageSource {
|
||||||
|
client: async_nats::Client,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl NatsMessageSource {
|
||||||
|
pub fn new(client: async_nats::Client) -> Self { Self { client } }
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MessageSource for NatsMessageSource {
|
||||||
|
fn messages(&self) -> BoxStream<'_, Result<RawMessage, DomainError>> {
|
||||||
|
let client = self.client.clone();
|
||||||
|
Box::pin(async_stream::try_stream! {
|
||||||
|
let mut sub = client
|
||||||
|
.subscribe(">")
|
||||||
|
.await
|
||||||
|
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||||
|
|
||||||
|
use futures::StreamExt;
|
||||||
|
while let Some(msg) = sub.next().await {
|
||||||
|
let subject = msg.subject.to_string();
|
||||||
|
let payload = msg.payload.to_vec();
|
||||||
|
// Basic NATS: at-most-once delivery — ack/nack are no-ops.
|
||||||
|
// Replace with JetStream for at-least-once delivery.
|
||||||
|
yield RawMessage {
|
||||||
|
subject,
|
||||||
|
payload,
|
||||||
|
ack: Box::new(|| {}),
|
||||||
|
nack: Box::new(|| {}),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use domain::{events::DomainEvent, value_objects::{LikeId, ThoughtId, UserId}};
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn payload_from_domain_event_has_correct_subject() {
|
||||||
|
let event = DomainEvent::ThoughtCreated {
|
||||||
|
thought_id: ThoughtId::new(),
|
||||||
|
user_id: UserId::new(),
|
||||||
|
in_reply_to_id: None,
|
||||||
|
};
|
||||||
|
let payload = EventPayload::from(&event);
|
||||||
|
assert_eq!(payload.subject(), "thoughts.created");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn domain_event_roundtrip_via_payload() {
|
||||||
|
let uid = UserId::new();
|
||||||
|
let tid = ThoughtId::new();
|
||||||
|
let event = DomainEvent::LikeAdded {
|
||||||
|
like_id: LikeId::new(),
|
||||||
|
user_id: uid.clone(),
|
||||||
|
thought_id: tid.clone(),
|
||||||
|
};
|
||||||
|
let payload = EventPayload::from(&event);
|
||||||
|
let back = DomainEvent::try_from(payload).unwrap();
|
||||||
|
if let DomainEvent::LikeAdded { user_id, thought_id, .. } = back {
|
||||||
|
assert_eq!(user_id, uid);
|
||||||
|
assert_eq!(thought_id, tid);
|
||||||
|
} else {
|
||||||
|
panic!("wrong variant");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Run:** `cargo test -p nats` — Expected: 2 tests pass.
|
||||||
|
|
||||||
|
- [ ] **Run:** `cargo check --workspace` — Expected: one error in `worker` (uses removed `NatsEventConsumer`). That's expected — fixed in Task 4.
|
||||||
|
|
||||||
|
- [ ] **Commit:**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
git add crates/adapters/nats/src/lib.rs
|
||||||
|
git commit -m "refactor(nats): replace NatsEventConsumer with NatsMessageSource implementing MessageSource"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Task 4: Update worker + full verification
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Modify: `crates/worker/Cargo.toml`
|
||||||
|
- Modify: `crates/worker/src/main.rs`
|
||||||
|
|
||||||
|
- [ ] **Add `event-transport = { workspace = true }` to `crates/worker/Cargo.toml`.**
|
||||||
|
|
||||||
|
- [ ] **Update `crates/worker/src/main.rs`** — find and update the consumer creation.
|
||||||
|
|
||||||
|
Current code in `main.rs`:
|
||||||
|
```rust
|
||||||
|
let consumer = nats::NatsEventConsumer::new(nats_client);
|
||||||
|
```
|
||||||
|
|
||||||
|
Replace with:
|
||||||
|
```rust
|
||||||
|
use event_transport::EventConsumerAdapter;
|
||||||
|
use nats::NatsMessageSource;
|
||||||
|
let consumer = EventConsumerAdapter::new(NatsMessageSource::new(nats_client));
|
||||||
|
```
|
||||||
|
|
||||||
|
Also add the `use` statements at the top of `main.rs` alongside existing imports.
|
||||||
|
|
||||||
|
- [ ] **Run:** `cargo check --workspace` — Expected: no errors.
|
||||||
|
|
||||||
|
- [ ] **Run full test suite:**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres cargo test --workspace 2>&1 | tail -3
|
||||||
|
```
|
||||||
|
|
||||||
|
Expected: all tests pass (79 existing + 2 new event-transport consumer tests = 81+).
|
||||||
|
|
||||||
|
- [ ] **Commit:**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
git add crates/worker/
|
||||||
|
git commit -m "feat(worker): use EventConsumerAdapter<NatsMessageSource> — transport-agnostic consuming"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Self-Review
|
||||||
|
|
||||||
|
**Spec coverage:**
|
||||||
|
- ✅ `event-publisher` renamed to `event-transport` everywhere (Task 1)
|
||||||
|
- ✅ `RawMessage { subject, payload, ack, nack }` in `event-transport` (Task 2)
|
||||||
|
- ✅ `MessageSource` trait with `messages() -> BoxStream<RawMessage>` (Task 2)
|
||||||
|
- ✅ `EventConsumerAdapter<S: MessageSource>` implementing `EventConsumer` (Task 2)
|
||||||
|
- ✅ Invalid messages skipped with warning, stream continues (Task 2)
|
||||||
|
- ✅ 2 new tests: valid deserialization + invalid JSON skip (Task 2)
|
||||||
|
- ✅ `NatsEventConsumer` removed from nats (Task 3)
|
||||||
|
- ✅ `NatsMessageSource` implementing `MessageSource` added to nats (Task 3)
|
||||||
|
- ✅ Worker uses `EventConsumerAdapter::new(NatsMessageSource::new(client))` (Task 4)
|
||||||
|
|
||||||
|
**Adding Kafka later:**
|
||||||
|
```toml
|
||||||
|
# kafka/Cargo.toml: event-transport = { workspace = true }
|
||||||
|
```
|
||||||
|
```rust
|
||||||
|
// kafka/src/lib.rs
|
||||||
|
pub struct KafkaMessageSource { ... }
|
||||||
|
impl MessageSource for KafkaMessageSource { ... } // yields RawMessage + real ack/nack
|
||||||
|
|
||||||
|
pub struct KafkaTransport { ... }
|
||||||
|
impl Transport for KafkaTransport { ... }
|
||||||
|
```
|
||||||
|
```rust
|
||||||
|
// bootstrap/src/factory.rs — two lines change:
|
||||||
|
EventPublisherAdapter::new(KafkaTransport::new(...))
|
||||||
|
EventConsumerAdapter::new(KafkaMessageSource::new(...))
|
||||||
|
```
|
||||||
|
|
||||||
|
**Type consistency:**
|
||||||
|
- `EventConsumerAdapter<NatsMessageSource>` — `NatsMessageSource` implements `MessageSource`, adapter implements `EventConsumer` ✓
|
||||||
|
- `RawMessage.ack` / `.nack` transferred to `EventEnvelope.ack` / `.nack` in consumer adapter ✓
|
||||||
|
- `event_transport::` (underscore) is the Rust module name for `event-transport` (dash) crate ✓
|
||||||
Reference in New Issue
Block a user