diff --git a/crates/adapters/event-payload/Cargo.toml b/crates/adapters/event-payload/Cargo.toml index ae5332c..dbf32c5 100644 --- a/crates/adapters/event-payload/Cargo.toml +++ b/crates/adapters/event-payload/Cargo.toml @@ -6,3 +6,5 @@ edition = "2021" [dependencies] serde = { workspace = true } serde_json = { workspace = true } +domain = { workspace = true } +uuid = { workspace = true } diff --git a/crates/adapters/event-payload/src/lib.rs b/crates/adapters/event-payload/src/lib.rs index a2a4b5f..db4ecc6 100644 --- a/crates/adapters/event-payload/src/lib.rs +++ b/crates/adapters/event-payload/src/lib.rs @@ -1,3 +1,8 @@ +use domain::{ + errors::DomainError, + events::DomainEvent, + value_objects::{BoostId, LikeId, ThoughtId, UserId}, +}; use serde::{Deserialize, Serialize}; /// Serializable mirror of domain::events::DomainEvent. @@ -78,6 +83,122 @@ impl EventPayload { } } +// ── DomainEvent → EventPayload ───────────────────────────────────────────── + +impl From<&DomainEvent> for EventPayload { + fn from(e: &DomainEvent) -> Self { + match e { + DomainEvent::ThoughtCreated { thought_id, user_id, in_reply_to_id } => Self::ThoughtCreated { + thought_id: thought_id.to_string(), + user_id: user_id.to_string(), + in_reply_to_id: in_reply_to_id.as_ref().map(|x| x.to_string()), + }, + DomainEvent::ThoughtDeleted { thought_id, user_id } => Self::ThoughtDeleted { + thought_id: thought_id.to_string(), user_id: user_id.to_string(), + }, + DomainEvent::ThoughtUpdated { thought_id, user_id } => Self::ThoughtUpdated { + thought_id: thought_id.to_string(), user_id: user_id.to_string(), + }, + DomainEvent::LikeAdded { like_id, user_id, thought_id } => Self::LikeAdded { + like_id: like_id.to_string(), user_id: user_id.to_string(), thought_id: thought_id.to_string(), + }, + DomainEvent::LikeRemoved { user_id, thought_id } => Self::LikeRemoved { + user_id: user_id.to_string(), thought_id: thought_id.to_string(), + }, + DomainEvent::BoostAdded { boost_id, user_id, thought_id } => Self::BoostAdded { + boost_id: boost_id.to_string(), user_id: user_id.to_string(), thought_id: thought_id.to_string(), + }, + DomainEvent::BoostRemoved { user_id, thought_id } => Self::BoostRemoved { + user_id: user_id.to_string(), thought_id: thought_id.to_string(), + }, + DomainEvent::FollowRequested { follower_id, following_id } => Self::FollowRequested { + follower_id: follower_id.to_string(), following_id: following_id.to_string(), + }, + DomainEvent::FollowAccepted { follower_id, following_id } => Self::FollowAccepted { + follower_id: follower_id.to_string(), following_id: following_id.to_string(), + }, + DomainEvent::FollowRejected { follower_id, following_id } => Self::FollowRejected { + follower_id: follower_id.to_string(), following_id: following_id.to_string(), + }, + DomainEvent::Unfollowed { follower_id, following_id } => Self::Unfollowed { + follower_id: follower_id.to_string(), following_id: following_id.to_string(), + }, + DomainEvent::UserBlocked { blocker_id, blocked_id } => Self::UserBlocked { + blocker_id: blocker_id.to_string(), blocked_id: blocked_id.to_string(), + }, + } + } +} + +// ── EventPayload → DomainEvent ───────────────────────────────────────────── + +fn parse_uuid(s: &str, field: &str) -> Result { + uuid::Uuid::parse_str(s) + .map_err(|_| DomainError::Internal(format!("invalid uuid for {field}: {s}"))) +} + +impl TryFrom for DomainEvent { + type Error = DomainError; + + fn try_from(p: EventPayload) -> Result { + Ok(match p { + EventPayload::ThoughtCreated { thought_id, user_id, in_reply_to_id } => DomainEvent::ThoughtCreated { + thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?), + user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?), + in_reply_to_id: in_reply_to_id + .map(|s| parse_uuid(&s, "in_reply_to_id").map(ThoughtId::from_uuid)) + .transpose()?, + }, + EventPayload::ThoughtDeleted { thought_id, user_id } => DomainEvent::ThoughtDeleted { + thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?), + user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?), + }, + EventPayload::ThoughtUpdated { thought_id, user_id } => DomainEvent::ThoughtUpdated { + thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?), + user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?), + }, + EventPayload::LikeAdded { like_id, user_id, thought_id } => DomainEvent::LikeAdded { + like_id: LikeId::from_uuid(parse_uuid(&like_id, "like_id")?), + user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?), + thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?), + }, + EventPayload::LikeRemoved { user_id, thought_id } => DomainEvent::LikeRemoved { + user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?), + thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?), + }, + EventPayload::BoostAdded { boost_id, user_id, thought_id } => DomainEvent::BoostAdded { + boost_id: BoostId::from_uuid(parse_uuid(&boost_id, "boost_id")?), + user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?), + thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?), + }, + EventPayload::BoostRemoved { user_id, thought_id } => DomainEvent::BoostRemoved { + user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?), + thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?), + }, + EventPayload::FollowRequested { follower_id, following_id } => DomainEvent::FollowRequested { + follower_id: UserId::from_uuid(parse_uuid(&follower_id, "follower_id")?), + following_id: UserId::from_uuid(parse_uuid(&following_id, "following_id")?), + }, + EventPayload::FollowAccepted { follower_id, following_id } => DomainEvent::FollowAccepted { + follower_id: UserId::from_uuid(parse_uuid(&follower_id, "follower_id")?), + following_id: UserId::from_uuid(parse_uuid(&following_id, "following_id")?), + }, + EventPayload::FollowRejected { follower_id, following_id } => DomainEvent::FollowRejected { + follower_id: UserId::from_uuid(parse_uuid(&follower_id, "follower_id")?), + following_id: UserId::from_uuid(parse_uuid(&following_id, "following_id")?), + }, + EventPayload::Unfollowed { follower_id, following_id } => DomainEvent::Unfollowed { + follower_id: UserId::from_uuid(parse_uuid(&follower_id, "follower_id")?), + following_id: UserId::from_uuid(parse_uuid(&following_id, "following_id")?), + }, + EventPayload::UserBlocked { blocker_id, blocked_id } => DomainEvent::UserBlocked { + blocker_id: UserId::from_uuid(parse_uuid(&blocker_id, "blocker_id")?), + blocked_id: UserId::from_uuid(parse_uuid(&blocked_id, "blocked_id")?), + }, + }) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/adapters/nats/Cargo.toml b/crates/adapters/nats/Cargo.toml index a0b1380..3eb4fcb 100644 --- a/crates/adapters/nats/Cargo.toml +++ b/crates/adapters/nats/Cargo.toml @@ -2,3 +2,15 @@ name = "nats" version = "0.1.0" edition = "2021" + +[dependencies] +domain = { workspace = true } +event-payload = { workspace = true } +async-nats = { workspace = true } +async-stream = { workspace = true } +serde_json = { workspace = true } +futures = { workspace = true } +tokio = { workspace = true } +async-trait = { workspace = true } +tracing = { workspace = true } +uuid = { workspace = true } diff --git a/crates/adapters/nats/src/lib.rs b/crates/adapters/nats/src/lib.rs index e69de29..f874ec8 100644 --- a/crates/adapters/nats/src/lib.rs +++ b/crates/adapters/nats/src/lib.rs @@ -0,0 +1,114 @@ +use async_trait::async_trait; +use domain::{ + errors::DomainError, + events::{DomainEvent, EventEnvelope}, + ports::{EventConsumer, EventPublisher}, +}; +use event_payload::EventPayload; +use futures::stream::BoxStream; + +// ── NatsEventPublisher ──────────────────────────────────────────────────── + +pub struct NatsEventPublisher { + client: async_nats::Client, +} + +impl NatsEventPublisher { + pub fn new(client: async_nats::Client) -> Self { Self { client } } +} + +#[async_trait] +impl EventPublisher for NatsEventPublisher { + async fn publish(&self, event: &DomainEvent) -> Result<(), DomainError> { + let payload = EventPayload::from(event); + let subject = payload.subject(); + let bytes = serde_json::to_vec(&payload) + .map_err(|e| DomainError::Internal(e.to_string()))?; + self.client + .publish(subject, bytes.into()) + .await + .map_err(|e| DomainError::Internal(e.to_string())) + } +} + +// ── NatsEventConsumer ───────────────────────────────────────────────────── + +pub struct NatsEventConsumer { + client: async_nats::Client, +} + +impl NatsEventConsumer { + pub fn new(client: async_nats::Client) -> Self { Self { client } } +} + +impl EventConsumer for NatsEventConsumer { + fn consume(&self) -> BoxStream<'_, Result> { + let client = self.client.clone(); + Box::pin(async_stream::try_stream! { + let mut sub = client + .subscribe(">") + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + use futures::StreamExt; + while let Some(msg) = sub.next().await { + let payload = match serde_json::from_slice::(&msg.payload) { + Ok(p) => p, + Err(e) => { + tracing::warn!("failed to deserialize event payload: {e}"); + continue; + } + }; + let event = match DomainEvent::try_from(payload) { + Ok(e) => e, + Err(e) => { + tracing::warn!("failed to convert payload to domain event: {e}"); + continue; + } + }; + // Basic NATS: no ack/nack (at-most-once delivery) + yield EventEnvelope { + event, + ack: Box::new(|| {}), + nack: Box::new(|| {}), + }; + } + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use domain::value_objects::{LikeId, ThoughtId, UserId}; + + #[test] + fn payload_from_domain_event_has_correct_subject() { + let event = DomainEvent::ThoughtCreated { + thought_id: ThoughtId::new(), + user_id: UserId::new(), + in_reply_to_id: None, + }; + let payload = EventPayload::from(&event); + assert_eq!(payload.subject(), "thoughts.created"); + } + + #[test] + fn domain_event_roundtrip_via_payload() { + let uid = UserId::new(); + let tid = ThoughtId::new(); + let event = DomainEvent::LikeAdded { + like_id: LikeId::new(), + user_id: uid.clone(), + thought_id: tid.clone(), + }; + let payload = EventPayload::from(&event); + let back = DomainEvent::try_from(payload).unwrap(); + if let DomainEvent::LikeAdded { user_id, thought_id, .. } = back { + assert_eq!(user_id, uid); + assert_eq!(thought_id, tid); + } else { + panic!("wrong variant"); + } + } +}