feat(nats): NatsEventPublisher and NatsEventConsumer with payload conversion
This commit is contained in:
@@ -6,3 +6,5 @@ edition = "2021"
|
|||||||
[dependencies]
|
[dependencies]
|
||||||
serde = { workspace = true }
|
serde = { workspace = true }
|
||||||
serde_json = { workspace = true }
|
serde_json = { workspace = true }
|
||||||
|
domain = { workspace = true }
|
||||||
|
uuid = { workspace = true }
|
||||||
|
|||||||
@@ -1,3 +1,8 @@
|
|||||||
|
use domain::{
|
||||||
|
errors::DomainError,
|
||||||
|
events::DomainEvent,
|
||||||
|
value_objects::{BoostId, LikeId, ThoughtId, UserId},
|
||||||
|
};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
/// Serializable mirror of domain::events::DomainEvent.
|
/// Serializable mirror of domain::events::DomainEvent.
|
||||||
@@ -78,6 +83,122 @@ impl EventPayload {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── DomainEvent → EventPayload ─────────────────────────────────────────────
|
||||||
|
|
||||||
|
impl From<&DomainEvent> for EventPayload {
|
||||||
|
fn from(e: &DomainEvent) -> Self {
|
||||||
|
match e {
|
||||||
|
DomainEvent::ThoughtCreated { thought_id, user_id, in_reply_to_id } => Self::ThoughtCreated {
|
||||||
|
thought_id: thought_id.to_string(),
|
||||||
|
user_id: user_id.to_string(),
|
||||||
|
in_reply_to_id: in_reply_to_id.as_ref().map(|x| x.to_string()),
|
||||||
|
},
|
||||||
|
DomainEvent::ThoughtDeleted { thought_id, user_id } => Self::ThoughtDeleted {
|
||||||
|
thought_id: thought_id.to_string(), user_id: user_id.to_string(),
|
||||||
|
},
|
||||||
|
DomainEvent::ThoughtUpdated { thought_id, user_id } => Self::ThoughtUpdated {
|
||||||
|
thought_id: thought_id.to_string(), user_id: user_id.to_string(),
|
||||||
|
},
|
||||||
|
DomainEvent::LikeAdded { like_id, user_id, thought_id } => Self::LikeAdded {
|
||||||
|
like_id: like_id.to_string(), user_id: user_id.to_string(), thought_id: thought_id.to_string(),
|
||||||
|
},
|
||||||
|
DomainEvent::LikeRemoved { user_id, thought_id } => Self::LikeRemoved {
|
||||||
|
user_id: user_id.to_string(), thought_id: thought_id.to_string(),
|
||||||
|
},
|
||||||
|
DomainEvent::BoostAdded { boost_id, user_id, thought_id } => Self::BoostAdded {
|
||||||
|
boost_id: boost_id.to_string(), user_id: user_id.to_string(), thought_id: thought_id.to_string(),
|
||||||
|
},
|
||||||
|
DomainEvent::BoostRemoved { user_id, thought_id } => Self::BoostRemoved {
|
||||||
|
user_id: user_id.to_string(), thought_id: thought_id.to_string(),
|
||||||
|
},
|
||||||
|
DomainEvent::FollowRequested { follower_id, following_id } => Self::FollowRequested {
|
||||||
|
follower_id: follower_id.to_string(), following_id: following_id.to_string(),
|
||||||
|
},
|
||||||
|
DomainEvent::FollowAccepted { follower_id, following_id } => Self::FollowAccepted {
|
||||||
|
follower_id: follower_id.to_string(), following_id: following_id.to_string(),
|
||||||
|
},
|
||||||
|
DomainEvent::FollowRejected { follower_id, following_id } => Self::FollowRejected {
|
||||||
|
follower_id: follower_id.to_string(), following_id: following_id.to_string(),
|
||||||
|
},
|
||||||
|
DomainEvent::Unfollowed { follower_id, following_id } => Self::Unfollowed {
|
||||||
|
follower_id: follower_id.to_string(), following_id: following_id.to_string(),
|
||||||
|
},
|
||||||
|
DomainEvent::UserBlocked { blocker_id, blocked_id } => Self::UserBlocked {
|
||||||
|
blocker_id: blocker_id.to_string(), blocked_id: blocked_id.to_string(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── EventPayload → DomainEvent ─────────────────────────────────────────────
|
||||||
|
|
||||||
|
fn parse_uuid(s: &str, field: &str) -> Result<uuid::Uuid, DomainError> {
|
||||||
|
uuid::Uuid::parse_str(s)
|
||||||
|
.map_err(|_| DomainError::Internal(format!("invalid uuid for {field}: {s}")))
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TryFrom<EventPayload> for DomainEvent {
|
||||||
|
type Error = DomainError;
|
||||||
|
|
||||||
|
fn try_from(p: EventPayload) -> Result<Self, DomainError> {
|
||||||
|
Ok(match p {
|
||||||
|
EventPayload::ThoughtCreated { thought_id, user_id, in_reply_to_id } => DomainEvent::ThoughtCreated {
|
||||||
|
thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?),
|
||||||
|
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
|
||||||
|
in_reply_to_id: in_reply_to_id
|
||||||
|
.map(|s| parse_uuid(&s, "in_reply_to_id").map(ThoughtId::from_uuid))
|
||||||
|
.transpose()?,
|
||||||
|
},
|
||||||
|
EventPayload::ThoughtDeleted { thought_id, user_id } => DomainEvent::ThoughtDeleted {
|
||||||
|
thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?),
|
||||||
|
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
|
||||||
|
},
|
||||||
|
EventPayload::ThoughtUpdated { thought_id, user_id } => DomainEvent::ThoughtUpdated {
|
||||||
|
thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?),
|
||||||
|
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
|
||||||
|
},
|
||||||
|
EventPayload::LikeAdded { like_id, user_id, thought_id } => DomainEvent::LikeAdded {
|
||||||
|
like_id: LikeId::from_uuid(parse_uuid(&like_id, "like_id")?),
|
||||||
|
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
|
||||||
|
thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?),
|
||||||
|
},
|
||||||
|
EventPayload::LikeRemoved { user_id, thought_id } => DomainEvent::LikeRemoved {
|
||||||
|
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
|
||||||
|
thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?),
|
||||||
|
},
|
||||||
|
EventPayload::BoostAdded { boost_id, user_id, thought_id } => DomainEvent::BoostAdded {
|
||||||
|
boost_id: BoostId::from_uuid(parse_uuid(&boost_id, "boost_id")?),
|
||||||
|
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
|
||||||
|
thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?),
|
||||||
|
},
|
||||||
|
EventPayload::BoostRemoved { user_id, thought_id } => DomainEvent::BoostRemoved {
|
||||||
|
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
|
||||||
|
thought_id: ThoughtId::from_uuid(parse_uuid(&thought_id, "thought_id")?),
|
||||||
|
},
|
||||||
|
EventPayload::FollowRequested { follower_id, following_id } => DomainEvent::FollowRequested {
|
||||||
|
follower_id: UserId::from_uuid(parse_uuid(&follower_id, "follower_id")?),
|
||||||
|
following_id: UserId::from_uuid(parse_uuid(&following_id, "following_id")?),
|
||||||
|
},
|
||||||
|
EventPayload::FollowAccepted { follower_id, following_id } => DomainEvent::FollowAccepted {
|
||||||
|
follower_id: UserId::from_uuid(parse_uuid(&follower_id, "follower_id")?),
|
||||||
|
following_id: UserId::from_uuid(parse_uuid(&following_id, "following_id")?),
|
||||||
|
},
|
||||||
|
EventPayload::FollowRejected { follower_id, following_id } => DomainEvent::FollowRejected {
|
||||||
|
follower_id: UserId::from_uuid(parse_uuid(&follower_id, "follower_id")?),
|
||||||
|
following_id: UserId::from_uuid(parse_uuid(&following_id, "following_id")?),
|
||||||
|
},
|
||||||
|
EventPayload::Unfollowed { follower_id, following_id } => DomainEvent::Unfollowed {
|
||||||
|
follower_id: UserId::from_uuid(parse_uuid(&follower_id, "follower_id")?),
|
||||||
|
following_id: UserId::from_uuid(parse_uuid(&following_id, "following_id")?),
|
||||||
|
},
|
||||||
|
EventPayload::UserBlocked { blocker_id, blocked_id } => DomainEvent::UserBlocked {
|
||||||
|
blocker_id: UserId::from_uuid(parse_uuid(&blocker_id, "blocker_id")?),
|
||||||
|
blocked_id: UserId::from_uuid(parse_uuid(&blocked_id, "blocked_id")?),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|||||||
@@ -2,3 +2,15 @@
|
|||||||
name = "nats"
|
name = "nats"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
domain = { workspace = true }
|
||||||
|
event-payload = { workspace = true }
|
||||||
|
async-nats = { workspace = true }
|
||||||
|
async-stream = { workspace = true }
|
||||||
|
serde_json = { workspace = true }
|
||||||
|
futures = { workspace = true }
|
||||||
|
tokio = { workspace = true }
|
||||||
|
async-trait = { workspace = true }
|
||||||
|
tracing = { workspace = true }
|
||||||
|
uuid = { workspace = true }
|
||||||
|
|||||||
@@ -0,0 +1,114 @@
|
|||||||
|
use async_trait::async_trait;
|
||||||
|
use domain::{
|
||||||
|
errors::DomainError,
|
||||||
|
events::{DomainEvent, EventEnvelope},
|
||||||
|
ports::{EventConsumer, EventPublisher},
|
||||||
|
};
|
||||||
|
use event_payload::EventPayload;
|
||||||
|
use futures::stream::BoxStream;
|
||||||
|
|
||||||
|
// ── NatsEventPublisher ────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
pub struct NatsEventPublisher {
|
||||||
|
client: async_nats::Client,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl NatsEventPublisher {
|
||||||
|
pub fn new(client: async_nats::Client) -> Self { Self { client } }
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl EventPublisher for NatsEventPublisher {
|
||||||
|
async fn publish(&self, event: &DomainEvent) -> Result<(), DomainError> {
|
||||||
|
let payload = EventPayload::from(event);
|
||||||
|
let subject = payload.subject();
|
||||||
|
let bytes = serde_json::to_vec(&payload)
|
||||||
|
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||||
|
self.client
|
||||||
|
.publish(subject, bytes.into())
|
||||||
|
.await
|
||||||
|
.map_err(|e| DomainError::Internal(e.to_string()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── NatsEventConsumer ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
pub struct NatsEventConsumer {
|
||||||
|
client: async_nats::Client,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl NatsEventConsumer {
|
||||||
|
pub fn new(client: async_nats::Client) -> Self { Self { client } }
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EventConsumer for NatsEventConsumer {
|
||||||
|
fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> {
|
||||||
|
let client = self.client.clone();
|
||||||
|
Box::pin(async_stream::try_stream! {
|
||||||
|
let mut sub = client
|
||||||
|
.subscribe(">")
|
||||||
|
.await
|
||||||
|
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||||
|
|
||||||
|
use futures::StreamExt;
|
||||||
|
while let Some(msg) = sub.next().await {
|
||||||
|
let payload = match serde_json::from_slice::<EventPayload>(&msg.payload) {
|
||||||
|
Ok(p) => p,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!("failed to deserialize event payload: {e}");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let event = match DomainEvent::try_from(payload) {
|
||||||
|
Ok(e) => e,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!("failed to convert payload to domain event: {e}");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
// Basic NATS: no ack/nack (at-most-once delivery)
|
||||||
|
yield EventEnvelope {
|
||||||
|
event,
|
||||||
|
ack: Box::new(|| {}),
|
||||||
|
nack: Box::new(|| {}),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use domain::value_objects::{LikeId, ThoughtId, UserId};
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn payload_from_domain_event_has_correct_subject() {
|
||||||
|
let event = DomainEvent::ThoughtCreated {
|
||||||
|
thought_id: ThoughtId::new(),
|
||||||
|
user_id: UserId::new(),
|
||||||
|
in_reply_to_id: None,
|
||||||
|
};
|
||||||
|
let payload = EventPayload::from(&event);
|
||||||
|
assert_eq!(payload.subject(), "thoughts.created");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn domain_event_roundtrip_via_payload() {
|
||||||
|
let uid = UserId::new();
|
||||||
|
let tid = ThoughtId::new();
|
||||||
|
let event = DomainEvent::LikeAdded {
|
||||||
|
like_id: LikeId::new(),
|
||||||
|
user_id: uid.clone(),
|
||||||
|
thought_id: tid.clone(),
|
||||||
|
};
|
||||||
|
let payload = EventPayload::from(&event);
|
||||||
|
let back = DomainEvent::try_from(payload).unwrap();
|
||||||
|
if let DomainEvent::LikeAdded { user_id, thought_id, .. } = back {
|
||||||
|
assert_eq!(user_id, uid);
|
||||||
|
assert_eq!(thought_id, tid);
|
||||||
|
} else {
|
||||||
|
panic!("wrong variant");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user