From 6f34b7b5ec6a92de5c18ad2f5b4cbf559f3c4291 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Fri, 12 Jun 2026 00:30:06 +0200 Subject: [PATCH] fix(worker): nack on transient handler failures, ack on permanent --- crates/application/src/tests/worker.rs | 255 +++++++++++++++++-------- crates/application/src/worker.rs | 14 +- 2 files changed, 190 insertions(+), 79 deletions(-) diff --git a/crates/application/src/tests/worker.rs b/crates/application/src/tests/worker.rs index 48b0b44..641cb43 100644 --- a/crates/application/src/tests/worker.rs +++ b/crates/application/src/tests/worker.rs @@ -20,6 +20,23 @@ impl AckHandle for NoopAck { } } +struct TrackingAck { + acked: Arc>, + nacked: Arc>, +} + +#[async_trait] +impl AckHandle for TrackingAck { + async fn ack(&self) -> Result<(), DomainError> { + *self.acked.lock().unwrap() = true; + Ok(()) + } + async fn nack(&self) -> Result<(), DomainError> { + *self.nacked.lock().unwrap() = true; + Ok(()) + } +} + struct VecConsumer { events: Vec, } @@ -97,85 +114,15 @@ async fn dispatches_to_all_handlers() { assert_eq!(*calls.lock().unwrap(), vec!["movie_discovered"]); } -#[tokio::test] -async fn acks_even_when_handler_fails() { - let ack_called = Arc::new(Mutex::new(false)); - - struct TrackingAck { - ack_called: Arc>, - } - - #[async_trait] - impl AckHandle for TrackingAck { - async fn ack(&self) -> Result<(), DomainError> { - *self.ack_called.lock().unwrap() = true; - Ok(()) - } - async fn nack(&self) -> Result<(), DomainError> { - Ok(()) - } - } - - struct TrackingConsumer { - event: DomainEvent, - ack_called: Arc>, - } - - impl EventConsumer for TrackingConsumer { - fn consume(&self) -> BoxStream<'_, Result> { - let envelope = EventEnvelope::new( - self.event.clone(), - Box::new(TrackingAck { - ack_called: Arc::clone(&self.ack_called), - }), - ); - Box::pin(stream::iter(vec![Ok(envelope)])) - } - } - - struct FailingHandler; - - #[async_trait] - impl EventHandler for FailingHandler { - async fn handle(&self, _: &DomainEvent) -> Result<(), DomainError> { - Err(DomainError::InfrastructureError("boom".into())) - } - } - - let consumer = TrackingConsumer { - event: movie_discovered(), - ack_called: Arc::clone(&ack_called), - }; - - WorkerService::new(Arc::new(consumer), vec![Arc::new(FailingHandler)]) - .run(tokio::sync::watch::channel(false).1) - .await; - - assert!(*ack_called.lock().unwrap()); -} - #[tokio::test] async fn acks_when_all_handlers_succeed() { - let ack_called = Arc::new(Mutex::new(false)); - - struct TrackingAck { - ack_called: Arc>, - } - - #[async_trait] - impl AckHandle for TrackingAck { - async fn ack(&self) -> Result<(), DomainError> { - *self.ack_called.lock().unwrap() = true; - Ok(()) - } - async fn nack(&self) -> Result<(), DomainError> { - Ok(()) - } - } + let acked = Arc::new(Mutex::new(false)); + let nacked = Arc::new(Mutex::new(false)); struct TrackingConsumer { event: DomainEvent, - ack_called: Arc>, + acked: Arc>, + nacked: Arc>, } impl EventConsumer for TrackingConsumer { @@ -183,7 +130,8 @@ async fn acks_when_all_handlers_succeed() { let envelope = EventEnvelope::new( self.event.clone(), Box::new(TrackingAck { - ack_called: Arc::clone(&self.ack_called), + acked: Arc::clone(&self.acked), + nacked: Arc::clone(&self.nacked), }), ); Box::pin(stream::iter(vec![Ok(envelope)])) @@ -192,12 +140,165 @@ async fn acks_when_all_handlers_succeed() { let consumer = TrackingConsumer { event: movie_discovered(), - ack_called: Arc::clone(&ack_called), + acked: Arc::clone(&acked), + nacked: Arc::clone(&nacked), }; WorkerService::new(Arc::new(consumer), vec![]) .run(tokio::sync::watch::channel(false).1) .await; - assert!(*ack_called.lock().unwrap()); + assert!(*acked.lock().unwrap()); + assert!(!*nacked.lock().unwrap()); +} + +#[tokio::test] +async fn nacks_on_transient_handler_failure() { + let acked = Arc::new(Mutex::new(false)); + let nacked = Arc::new(Mutex::new(false)); + + struct TrackingConsumer { + event: DomainEvent, + acked: Arc>, + nacked: Arc>, + } + impl EventConsumer for TrackingConsumer { + fn consume(&self) -> BoxStream<'_, Result> { + let envelope = EventEnvelope::new( + self.event.clone(), + Box::new(TrackingAck { + acked: Arc::clone(&self.acked), + nacked: Arc::clone(&self.nacked), + }), + ); + Box::pin(stream::iter(vec![Ok(envelope)])) + } + } + struct TransientHandler; + #[async_trait] + impl EventHandler for TransientHandler { + async fn handle(&self, _: &DomainEvent) -> Result<(), DomainError> { + Err(DomainError::InfrastructureError("network timeout".into())) + } + } + + WorkerService::new( + Arc::new(TrackingConsumer { + event: movie_discovered(), + acked: Arc::clone(&acked), + nacked: Arc::clone(&nacked), + }), + vec![Arc::new(TransientHandler)], + ) + .run(tokio::sync::watch::channel(false).1) + .await; + + assert!(!*acked.lock().unwrap(), "should not ack on transient error"); + assert!(*nacked.lock().unwrap(), "should nack on transient error"); +} + +#[tokio::test] +async fn acks_on_permanent_handler_failure() { + let acked = Arc::new(Mutex::new(false)); + let nacked = Arc::new(Mutex::new(false)); + + struct TrackingConsumer { + event: DomainEvent, + acked: Arc>, + nacked: Arc>, + } + impl EventConsumer for TrackingConsumer { + fn consume(&self) -> BoxStream<'_, Result> { + let envelope = EventEnvelope::new( + self.event.clone(), + Box::new(TrackingAck { + acked: Arc::clone(&self.acked), + nacked: Arc::clone(&self.nacked), + }), + ); + Box::pin(stream::iter(vec![Ok(envelope)])) + } + } + struct PermanentHandler; + #[async_trait] + impl EventHandler for PermanentHandler { + async fn handle(&self, _: &DomainEvent) -> Result<(), DomainError> { + Err(DomainError::NotFound("movie not on tmdb".into())) + } + } + + WorkerService::new( + Arc::new(TrackingConsumer { + event: movie_discovered(), + acked: Arc::clone(&acked), + nacked: Arc::clone(&nacked), + }), + vec![Arc::new(PermanentHandler)], + ) + .run(tokio::sync::watch::channel(false).1) + .await; + + assert!(*acked.lock().unwrap(), "should ack on permanent error"); + assert!( + !*nacked.lock().unwrap(), + "should not nack on permanent error" + ); +} + +#[tokio::test] +async fn nacks_if_any_handler_is_transient() { + let acked = Arc::new(Mutex::new(false)); + let nacked = Arc::new(Mutex::new(false)); + + struct TrackingConsumer { + event: DomainEvent, + acked: Arc>, + nacked: Arc>, + } + impl EventConsumer for TrackingConsumer { + fn consume(&self) -> BoxStream<'_, Result> { + let envelope = EventEnvelope::new( + self.event.clone(), + Box::new(TrackingAck { + acked: Arc::clone(&self.acked), + nacked: Arc::clone(&self.nacked), + }), + ); + Box::pin(stream::iter(vec![Ok(envelope)])) + } + } + struct OkHandler; + #[async_trait] + impl EventHandler for OkHandler { + async fn handle(&self, _: &DomainEvent) -> Result<(), DomainError> { + Ok(()) + } + } + struct TransientHandler; + #[async_trait] + impl EventHandler for TransientHandler { + async fn handle(&self, _: &DomainEvent) -> Result<(), DomainError> { + Err(DomainError::InfrastructureError("db gone".into())) + } + } + + WorkerService::new( + Arc::new(TrackingConsumer { + event: movie_discovered(), + acked: Arc::clone(&acked), + nacked: Arc::clone(&nacked), + }), + vec![Arc::new(OkHandler), Arc::new(TransientHandler)], + ) + .run(tokio::sync::watch::channel(false).1) + .await; + + assert!( + !*acked.lock().unwrap(), + "should not ack when any handler is transient" + ); + assert!( + *nacked.lock().unwrap(), + "should nack when any handler is transient" + ); } diff --git a/crates/application/src/worker.rs b/crates/application/src/worker.rs index b30dc81..7daae4b 100644 --- a/crates/application/src/worker.rs +++ b/crates/application/src/worker.rs @@ -65,12 +65,22 @@ impl WorkerService { } async fn dispatch(handlers: Arc>>, envelope: EventEnvelope) { + let mut any_transient = false; for handler in handlers.iter() { if let Err(e) = handler.handle(&envelope.event).await { - tracing::warn!("event handler error (non-fatal): {e}"); + if e.is_transient() { + tracing::warn!("transient handler error, will retry: {e}"); + any_transient = true; + } else { + tracing::warn!("permanent handler error (not retrying): {e}"); + } } } - if let Err(e) = envelope.ack().await { + if any_transient { + if let Err(e) = envelope.nack().await { + tracing::error!("nack failed: {e}"); + } + } else if let Err(e) = envelope.ack().await { tracing::error!("ack failed: {e}"); } }