feat: event infrastructure — payload, transport, NATS adapter
- EventPublisher now takes &DomainEvent (11 call sites + 3 impls updated) - EventEnvelope + EventConsumer port in domain - event-payload: serializable DomainEvent mirror with subject routing - event-transport: generic Transport/MessageSource traits, publisher/consumer adapters - adapters-nats: JetStream publish + durable pull consumer
This commit is contained in:
214
crates/adapters/nats/src/lib.rs
Normal file
214
crates/adapters/nats/src/lib.rs
Normal file
@@ -0,0 +1,214 @@
|
||||
use async_nats::jetstream::{self, AckKind, stream::Config as StreamConfig};
|
||||
use async_trait::async_trait;
|
||||
use domain::errors::DomainError;
|
||||
use event_transport::{MessageSource, RawMessage, Transport};
|
||||
use futures::stream::BoxStream;
|
||||
use std::sync::Arc;
|
||||
|
||||
const STREAM_NAME: &str = "KPHOTOS_EVENTS";
|
||||
const STREAM_SUBJECT: &str = "kphotos-events.>";
|
||||
const CONSUMER_NAME: &str = "worker";
|
||||
const MAX_MESSAGES: i64 = 100_000;
|
||||
|
||||
pub const CONSUMER_MAX_DELIVER: i64 = 5;
|
||||
const CONSUMER_ACK_WAIT_SECS: u64 = 30;
|
||||
const ACK_TASK_TIMEOUT_SECS: u64 = 5;
|
||||
|
||||
fn stream_config() -> StreamConfig {
|
||||
StreamConfig {
|
||||
name: STREAM_NAME.to_string(),
|
||||
subjects: vec![STREAM_SUBJECT.to_string()],
|
||||
max_messages: MAX_MESSAGES,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn ensure_stream(client: &async_nats::Client) -> Result<(), DomainError> {
|
||||
let js = jetstream::new(client.clone());
|
||||
|
||||
if js.update_stream(stream_config()).await.is_ok() {
|
||||
tracing::info!(subject = STREAM_SUBJECT, "JetStream stream updated");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
tracing::warn!(
|
||||
"JetStream stream update failed (incompatible config), deleting '{STREAM_NAME}' and recreating"
|
||||
);
|
||||
let _ = js.delete_stream(STREAM_NAME).await;
|
||||
|
||||
js.create_stream(stream_config())
|
||||
.await
|
||||
.map(|_| ())
|
||||
.map_err(|e| DomainError::Internal(format!("JetStream stream create failed: {e}")))
|
||||
}
|
||||
|
||||
pub struct NatsTransport {
|
||||
jetstream: jetstream::Context,
|
||||
}
|
||||
|
||||
impl NatsTransport {
|
||||
pub fn new(client: async_nats::Client) -> Self {
|
||||
Self {
|
||||
jetstream: jetstream::new(client),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Transport for NatsTransport {
|
||||
async fn publish_bytes(&self, subject: &str, bytes: &[u8]) -> Result<(), DomainError> {
|
||||
let full_subject = format!("kphotos-events.{subject}");
|
||||
self.jetstream
|
||||
.publish(full_subject, bytes.to_vec().into())
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct NatsMessageSource {
|
||||
jetstream: jetstream::Context,
|
||||
}
|
||||
|
||||
impl NatsMessageSource {
|
||||
pub fn new(client: async_nats::Client) -> Self {
|
||||
Self {
|
||||
jetstream: jetstream::new(client),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl MessageSource for NatsMessageSource {
|
||||
fn messages(&self) -> BoxStream<'_, Result<RawMessage, DomainError>> {
|
||||
use futures::stream;
|
||||
use tokio::sync::Mutex as TokioMutex;
|
||||
|
||||
let js = self.jetstream.clone();
|
||||
let (tx, rx) = tokio::sync::mpsc::channel::<Result<RawMessage, DomainError>>(128);
|
||||
|
||||
tokio::spawn(async move {
|
||||
let stream = match js.get_stream(STREAM_NAME).await {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
let _ = tx.send(Err(DomainError::Internal(e.to_string()))).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if let Ok(info) = stream.consumer_info(CONSUMER_NAME).await
|
||||
&& info.config.deliver_subject.is_some()
|
||||
{
|
||||
tracing::info!(
|
||||
"deleting old push consumer '{CONSUMER_NAME}', replacing with pull"
|
||||
);
|
||||
let _ = stream.delete_consumer(CONSUMER_NAME).await;
|
||||
}
|
||||
|
||||
let consumer = match stream
|
||||
.get_or_create_consumer(
|
||||
CONSUMER_NAME,
|
||||
jetstream::consumer::pull::Config {
|
||||
durable_name: Some(CONSUMER_NAME.to_string()),
|
||||
deliver_policy: jetstream::consumer::DeliverPolicy::New,
|
||||
ack_policy: jetstream::consumer::AckPolicy::Explicit,
|
||||
ack_wait: std::time::Duration::from_secs(CONSUMER_ACK_WAIT_SECS),
|
||||
max_deliver: CONSUMER_MAX_DELIVER,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(c) => c,
|
||||
Err(e) => {
|
||||
let _ = tx.send(Err(DomainError::Internal(e.to_string()))).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
tracing::info!("NATS pull consumer ready");
|
||||
|
||||
loop {
|
||||
let mut messages = match consumer.messages().await {
|
||||
Ok(m) => m,
|
||||
Err(e) => {
|
||||
tracing::error!("NATS messages() failed: {e}");
|
||||
let _ = tx.send(Err(DomainError::Internal(e.to_string()))).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
use futures::StreamExt;
|
||||
while let Some(result) = messages.next().await {
|
||||
let msg = match result {
|
||||
Ok(m) => m,
|
||||
Err(e) => {
|
||||
tracing::warn!("NATS message error: {e}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let subject = msg.subject.to_string();
|
||||
let payload = msg.payload.to_vec();
|
||||
let delivery_count = msg
|
||||
.info()
|
||||
.map(|info| info.delivered.max(0) as u64)
|
||||
.unwrap_or(1);
|
||||
let msg = Arc::new(msg);
|
||||
let msg_nack = Arc::clone(&msg);
|
||||
|
||||
let raw = RawMessage {
|
||||
subject,
|
||||
payload,
|
||||
delivery_count,
|
||||
ack: Box::new(move || {
|
||||
let m = Arc::clone(&msg);
|
||||
tokio::spawn(async move {
|
||||
let result = tokio::time::timeout(
|
||||
std::time::Duration::from_secs(ACK_TASK_TIMEOUT_SECS),
|
||||
m.ack(),
|
||||
)
|
||||
.await;
|
||||
match result {
|
||||
Ok(Ok(())) => {}
|
||||
Ok(Err(e)) => tracing::warn!("NATS ack failed: {e}"),
|
||||
Err(_) => tracing::warn!(
|
||||
"NATS ack timed out after {ACK_TASK_TIMEOUT_SECS}s"
|
||||
),
|
||||
}
|
||||
});
|
||||
}),
|
||||
nack: Box::new(move || {
|
||||
let m = Arc::clone(&msg_nack);
|
||||
tokio::spawn(async move {
|
||||
let result = tokio::time::timeout(
|
||||
std::time::Duration::from_secs(ACK_TASK_TIMEOUT_SECS),
|
||||
m.ack_with(AckKind::Nak(None)),
|
||||
)
|
||||
.await;
|
||||
match result {
|
||||
Ok(Ok(())) => {}
|
||||
Ok(Err(e)) => tracing::warn!("NATS nack failed: {e}"),
|
||||
Err(_) => tracing::warn!(
|
||||
"NATS nack timed out after {ACK_TASK_TIMEOUT_SECS}s"
|
||||
),
|
||||
}
|
||||
});
|
||||
}),
|
||||
};
|
||||
|
||||
if tx.send(Ok(raw)).await.is_err() {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let rx = Arc::new(TokioMutex::new(rx));
|
||||
Box::pin(stream::unfold(rx, |rx| async move {
|
||||
let item = rx.lock().await.recv().await?;
|
||||
Some((item, rx))
|
||||
}))
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user