diff --git a/crates/adapters/event-transport/src/lib.rs b/crates/adapters/event-transport/src/lib.rs index f0c63cc..901b810 100644 --- a/crates/adapters/event-transport/src/lib.rs +++ b/crates/adapters/event-transport/src/lib.rs @@ -48,6 +48,7 @@ impl EventPublisher for EventPublisherAdapter { pub struct RawMessage { pub subject: String, pub payload: Vec, + pub delivery_count: u64, pub ack: Box, pub nack: Box, } @@ -83,19 +84,22 @@ impl EventConsumer for EventConsumerAdapter { let payload = match serde_json::from_slice::(&msg.payload) { Ok(p) => p, Err(e) => { - tracing::warn!("failed to deserialize event payload: {e}"); + tracing::warn!("failed to deserialize event payload — acking to prevent orphan: {e}"); + (msg.ack)(); return None; } }; let event = match DomainEvent::try_from(payload) { Ok(e) => e, Err(e) => { - tracing::warn!("unknown event type: {e}"); + tracing::warn!("unknown or malformed event type — acking to prevent orphan: {e}"); + (msg.ack)(); return None; } }; Some(Ok(EventEnvelope { event, + delivery_count: msg.delivery_count, ack: msg.ack, nack: msg.nack, })) @@ -192,6 +196,7 @@ mod tests { let msg = RawMessage { subject: "thoughts.created".to_string(), payload: self.bytes.clone(), + delivery_count: 1, ack: Box::new(|| {}), nack: Box::new(|| {}), }; @@ -216,6 +221,7 @@ mod tests { let msg = RawMessage { subject: "bad".to_string(), payload: b"not valid json".to_vec(), + delivery_count: 1, ack: Box::new(|| {}), nack: Box::new(|| {}), }; diff --git a/crates/adapters/nats/src/lib.rs b/crates/adapters/nats/src/lib.rs index cc3b749..2d047c3 100644 --- a/crates/adapters/nats/src/lib.rs +++ b/crates/adapters/nats/src/lib.rs @@ -10,6 +10,13 @@ const STREAM_SUBJECT: &str = "thoughts-events.>"; const CONSUMER_NAME: &str = "worker"; const MAX_MESSAGES: i64 = 100_000; +/// Maximum NATS delivery attempts before a message is considered exhausted. +pub const CONSUMER_MAX_DELIVER: i64 = 5; +/// How long NATS waits for an ack before redelivering. +const CONSUMER_ACK_WAIT_SECS: u64 = 30; +/// Timeout for spawned ack/nack async tasks. +const ACK_TASK_TIMEOUT_SECS: u64 = 5; + fn stream_config() -> StreamConfig { StreamConfig { name: STREAM_NAME.to_string(), @@ -121,6 +128,10 @@ impl MessageSource for NatsMessageSource { CONSUMER_NAME, jetstream::consumer::pull::Config { durable_name: Some(CONSUMER_NAME.to_string()), + deliver_policy: jetstream::consumer::DeliverPolicy::New, + ack_policy: jetstream::consumer::AckPolicy::Explicit, + ack_wait: std::time::Duration::from_secs(CONSUMER_ACK_WAIT_SECS), + max_deliver: CONSUMER_MAX_DELIVER, // No filter_subject — consume everything from the stream. // filter_subject matching the stream's own wildcard can be // inconsistent across NATS server versions. @@ -164,25 +175,48 @@ impl MessageSource for NatsMessageSource { let subject = msg.subject.to_string(); let payload = msg.payload.to_vec(); + let delivery_count = msg + .info() + .map(|info| info.delivered.max(0) as u64) + .unwrap_or(1); let msg = Arc::new(msg); let msg_nack = Arc::clone(&msg); let raw = RawMessage { subject, payload, + delivery_count, ack: Box::new(move || { let m = Arc::clone(&msg); tokio::spawn(async move { - if let Err(e) = m.ack().await { - tracing::warn!("NATS ack failed: {e}"); + let result = tokio::time::timeout( + std::time::Duration::from_secs(ACK_TASK_TIMEOUT_SECS), + m.ack(), + ) + .await; + match result { + Ok(Ok(())) => {} + Ok(Err(e)) => tracing::warn!("NATS ack failed: {e}"), + Err(_) => tracing::warn!( + "NATS ack timed out after {ACK_TASK_TIMEOUT_SECS}s" + ), } }); }), nack: Box::new(move || { let m = Arc::clone(&msg_nack); tokio::spawn(async move { - if let Err(e) = m.ack_with(AckKind::Nak(None)).await { - tracing::warn!("NATS nak failed: {e}"); + let result = tokio::time::timeout( + std::time::Duration::from_secs(ACK_TASK_TIMEOUT_SECS), + m.ack_with(AckKind::Nak(None)), + ) + .await; + match result { + Ok(Ok(())) => {} + Ok(Err(e)) => tracing::warn!("NATS nack failed: {e}"), + Err(_) => tracing::warn!( + "NATS nack timed out after {ACK_TASK_TIMEOUT_SECS}s" + ), } }); }), diff --git a/crates/domain/src/events.rs b/crates/domain/src/events.rs index 69936a4..81382bc 100644 --- a/crates/domain/src/events.rs +++ b/crates/domain/src/events.rs @@ -72,6 +72,7 @@ pub enum DomainEvent { pub struct EventEnvelope { pub event: DomainEvent, + pub delivery_count: u64, pub ack: Box, pub nack: Box, } @@ -79,6 +80,7 @@ impl std::fmt::Debug for EventEnvelope { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("EventEnvelope") .field("event", &self.event) + .field("delivery_count", &self.delivery_count) .finish() } }