fix(nats): explicit consumer config, ack timeouts, unknown-event acking, delivery_count
This commit is contained in:
@@ -48,6 +48,7 @@ impl<T: Transport> EventPublisher for EventPublisherAdapter<T> {
|
|||||||
pub struct RawMessage {
|
pub struct RawMessage {
|
||||||
pub subject: String,
|
pub subject: String,
|
||||||
pub payload: Vec<u8>,
|
pub payload: Vec<u8>,
|
||||||
|
pub delivery_count: u64,
|
||||||
pub ack: Box<dyn Fn() + Send + Sync>,
|
pub ack: Box<dyn Fn() + Send + Sync>,
|
||||||
pub nack: Box<dyn Fn() + Send + Sync>,
|
pub nack: Box<dyn Fn() + Send + Sync>,
|
||||||
}
|
}
|
||||||
@@ -83,19 +84,22 @@ impl<S: MessageSource> EventConsumer for EventConsumerAdapter<S> {
|
|||||||
let payload = match serde_json::from_slice::<EventPayload>(&msg.payload) {
|
let payload = match serde_json::from_slice::<EventPayload>(&msg.payload) {
|
||||||
Ok(p) => p,
|
Ok(p) => p,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::warn!("failed to deserialize event payload: {e}");
|
tracing::warn!("failed to deserialize event payload — acking to prevent orphan: {e}");
|
||||||
|
(msg.ack)();
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let event = match DomainEvent::try_from(payload) {
|
let event = match DomainEvent::try_from(payload) {
|
||||||
Ok(e) => e,
|
Ok(e) => e,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::warn!("unknown event type: {e}");
|
tracing::warn!("unknown or malformed event type — acking to prevent orphan: {e}");
|
||||||
|
(msg.ack)();
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
Some(Ok(EventEnvelope {
|
Some(Ok(EventEnvelope {
|
||||||
event,
|
event,
|
||||||
|
delivery_count: msg.delivery_count,
|
||||||
ack: msg.ack,
|
ack: msg.ack,
|
||||||
nack: msg.nack,
|
nack: msg.nack,
|
||||||
}))
|
}))
|
||||||
@@ -192,6 +196,7 @@ mod tests {
|
|||||||
let msg = RawMessage {
|
let msg = RawMessage {
|
||||||
subject: "thoughts.created".to_string(),
|
subject: "thoughts.created".to_string(),
|
||||||
payload: self.bytes.clone(),
|
payload: self.bytes.clone(),
|
||||||
|
delivery_count: 1,
|
||||||
ack: Box::new(|| {}),
|
ack: Box::new(|| {}),
|
||||||
nack: Box::new(|| {}),
|
nack: Box::new(|| {}),
|
||||||
};
|
};
|
||||||
@@ -216,6 +221,7 @@ mod tests {
|
|||||||
let msg = RawMessage {
|
let msg = RawMessage {
|
||||||
subject: "bad".to_string(),
|
subject: "bad".to_string(),
|
||||||
payload: b"not valid json".to_vec(),
|
payload: b"not valid json".to_vec(),
|
||||||
|
delivery_count: 1,
|
||||||
ack: Box::new(|| {}),
|
ack: Box::new(|| {}),
|
||||||
nack: Box::new(|| {}),
|
nack: Box::new(|| {}),
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -10,6 +10,13 @@ const STREAM_SUBJECT: &str = "thoughts-events.>";
|
|||||||
const CONSUMER_NAME: &str = "worker";
|
const CONSUMER_NAME: &str = "worker";
|
||||||
const MAX_MESSAGES: i64 = 100_000;
|
const MAX_MESSAGES: i64 = 100_000;
|
||||||
|
|
||||||
|
/// Maximum NATS delivery attempts before a message is considered exhausted.
|
||||||
|
pub const CONSUMER_MAX_DELIVER: i64 = 5;
|
||||||
|
/// How long NATS waits for an ack before redelivering.
|
||||||
|
const CONSUMER_ACK_WAIT_SECS: u64 = 30;
|
||||||
|
/// Timeout for spawned ack/nack async tasks.
|
||||||
|
const ACK_TASK_TIMEOUT_SECS: u64 = 5;
|
||||||
|
|
||||||
fn stream_config() -> StreamConfig {
|
fn stream_config() -> StreamConfig {
|
||||||
StreamConfig {
|
StreamConfig {
|
||||||
name: STREAM_NAME.to_string(),
|
name: STREAM_NAME.to_string(),
|
||||||
@@ -121,6 +128,10 @@ impl MessageSource for NatsMessageSource {
|
|||||||
CONSUMER_NAME,
|
CONSUMER_NAME,
|
||||||
jetstream::consumer::pull::Config {
|
jetstream::consumer::pull::Config {
|
||||||
durable_name: Some(CONSUMER_NAME.to_string()),
|
durable_name: Some(CONSUMER_NAME.to_string()),
|
||||||
|
deliver_policy: jetstream::consumer::DeliverPolicy::New,
|
||||||
|
ack_policy: jetstream::consumer::AckPolicy::Explicit,
|
||||||
|
ack_wait: std::time::Duration::from_secs(CONSUMER_ACK_WAIT_SECS),
|
||||||
|
max_deliver: CONSUMER_MAX_DELIVER,
|
||||||
// No filter_subject — consume everything from the stream.
|
// No filter_subject — consume everything from the stream.
|
||||||
// filter_subject matching the stream's own wildcard can be
|
// filter_subject matching the stream's own wildcard can be
|
||||||
// inconsistent across NATS server versions.
|
// inconsistent across NATS server versions.
|
||||||
@@ -164,25 +175,48 @@ impl MessageSource for NatsMessageSource {
|
|||||||
|
|
||||||
let subject = msg.subject.to_string();
|
let subject = msg.subject.to_string();
|
||||||
let payload = msg.payload.to_vec();
|
let payload = msg.payload.to_vec();
|
||||||
|
let delivery_count = msg
|
||||||
|
.info()
|
||||||
|
.map(|info| info.delivered.max(0) as u64)
|
||||||
|
.unwrap_or(1);
|
||||||
let msg = Arc::new(msg);
|
let msg = Arc::new(msg);
|
||||||
let msg_nack = Arc::clone(&msg);
|
let msg_nack = Arc::clone(&msg);
|
||||||
|
|
||||||
let raw = RawMessage {
|
let raw = RawMessage {
|
||||||
subject,
|
subject,
|
||||||
payload,
|
payload,
|
||||||
|
delivery_count,
|
||||||
ack: Box::new(move || {
|
ack: Box::new(move || {
|
||||||
let m = Arc::clone(&msg);
|
let m = Arc::clone(&msg);
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
if let Err(e) = m.ack().await {
|
let result = tokio::time::timeout(
|
||||||
tracing::warn!("NATS ack failed: {e}");
|
std::time::Duration::from_secs(ACK_TASK_TIMEOUT_SECS),
|
||||||
|
m.ack(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
match result {
|
||||||
|
Ok(Ok(())) => {}
|
||||||
|
Ok(Err(e)) => tracing::warn!("NATS ack failed: {e}"),
|
||||||
|
Err(_) => tracing::warn!(
|
||||||
|
"NATS ack timed out after {ACK_TASK_TIMEOUT_SECS}s"
|
||||||
|
),
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}),
|
}),
|
||||||
nack: Box::new(move || {
|
nack: Box::new(move || {
|
||||||
let m = Arc::clone(&msg_nack);
|
let m = Arc::clone(&msg_nack);
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
if let Err(e) = m.ack_with(AckKind::Nak(None)).await {
|
let result = tokio::time::timeout(
|
||||||
tracing::warn!("NATS nak failed: {e}");
|
std::time::Duration::from_secs(ACK_TASK_TIMEOUT_SECS),
|
||||||
|
m.ack_with(AckKind::Nak(None)),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
match result {
|
||||||
|
Ok(Ok(())) => {}
|
||||||
|
Ok(Err(e)) => tracing::warn!("NATS nack failed: {e}"),
|
||||||
|
Err(_) => tracing::warn!(
|
||||||
|
"NATS nack timed out after {ACK_TASK_TIMEOUT_SECS}s"
|
||||||
|
),
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}),
|
}),
|
||||||
|
|||||||
@@ -72,6 +72,7 @@ pub enum DomainEvent {
|
|||||||
|
|
||||||
pub struct EventEnvelope {
|
pub struct EventEnvelope {
|
||||||
pub event: DomainEvent,
|
pub event: DomainEvent,
|
||||||
|
pub delivery_count: u64,
|
||||||
pub ack: Box<dyn Fn() + Send + Sync>,
|
pub ack: Box<dyn Fn() + Send + Sync>,
|
||||||
pub nack: Box<dyn Fn() + Send + Sync>,
|
pub nack: Box<dyn Fn() + Send + Sync>,
|
||||||
}
|
}
|
||||||
@@ -79,6 +80,7 @@ impl std::fmt::Debug for EventEnvelope {
|
|||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
f.debug_struct("EventEnvelope")
|
f.debug_struct("EventEnvelope")
|
||||||
.field("event", &self.event)
|
.field("event", &self.event)
|
||||||
|
.field("delivery_count", &self.delivery_count)
|
||||||
.finish()
|
.finish()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user