fix(worker): nack on transient handler failures, ack on permanent
Some checks failed
CI / Check / Test (push) Has been cancelled

This commit is contained in:
2026-06-12 00:30:06 +02:00
parent 17d4de461b
commit 6f34b7b5ec
2 changed files with 190 additions and 79 deletions

View File

@@ -20,6 +20,23 @@ impl AckHandle for NoopAck {
} }
} }
struct TrackingAck {
acked: Arc<Mutex<bool>>,
nacked: Arc<Mutex<bool>>,
}
#[async_trait]
impl AckHandle for TrackingAck {
async fn ack(&self) -> Result<(), DomainError> {
*self.acked.lock().unwrap() = true;
Ok(())
}
async fn nack(&self) -> Result<(), DomainError> {
*self.nacked.lock().unwrap() = true;
Ok(())
}
}
struct VecConsumer { struct VecConsumer {
events: Vec<DomainEvent>, events: Vec<DomainEvent>,
} }
@@ -97,85 +114,15 @@ async fn dispatches_to_all_handlers() {
assert_eq!(*calls.lock().unwrap(), vec!["movie_discovered"]); assert_eq!(*calls.lock().unwrap(), vec!["movie_discovered"]);
} }
#[tokio::test]
async fn acks_even_when_handler_fails() {
let ack_called = Arc::new(Mutex::new(false));
struct TrackingAck {
ack_called: Arc<Mutex<bool>>,
}
#[async_trait]
impl AckHandle for TrackingAck {
async fn ack(&self) -> Result<(), DomainError> {
*self.ack_called.lock().unwrap() = true;
Ok(())
}
async fn nack(&self) -> Result<(), DomainError> {
Ok(())
}
}
struct TrackingConsumer {
event: DomainEvent,
ack_called: Arc<Mutex<bool>>,
}
impl EventConsumer for TrackingConsumer {
fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> {
let envelope = EventEnvelope::new(
self.event.clone(),
Box::new(TrackingAck {
ack_called: Arc::clone(&self.ack_called),
}),
);
Box::pin(stream::iter(vec![Ok(envelope)]))
}
}
struct FailingHandler;
#[async_trait]
impl EventHandler for FailingHandler {
async fn handle(&self, _: &DomainEvent) -> Result<(), DomainError> {
Err(DomainError::InfrastructureError("boom".into()))
}
}
let consumer = TrackingConsumer {
event: movie_discovered(),
ack_called: Arc::clone(&ack_called),
};
WorkerService::new(Arc::new(consumer), vec![Arc::new(FailingHandler)])
.run(tokio::sync::watch::channel(false).1)
.await;
assert!(*ack_called.lock().unwrap());
}
#[tokio::test] #[tokio::test]
async fn acks_when_all_handlers_succeed() { async fn acks_when_all_handlers_succeed() {
let ack_called = Arc::new(Mutex::new(false)); let acked = Arc::new(Mutex::new(false));
let nacked = Arc::new(Mutex::new(false));
struct TrackingAck {
ack_called: Arc<Mutex<bool>>,
}
#[async_trait]
impl AckHandle for TrackingAck {
async fn ack(&self) -> Result<(), DomainError> {
*self.ack_called.lock().unwrap() = true;
Ok(())
}
async fn nack(&self) -> Result<(), DomainError> {
Ok(())
}
}
struct TrackingConsumer { struct TrackingConsumer {
event: DomainEvent, event: DomainEvent,
ack_called: Arc<Mutex<bool>>, acked: Arc<Mutex<bool>>,
nacked: Arc<Mutex<bool>>,
} }
impl EventConsumer for TrackingConsumer { impl EventConsumer for TrackingConsumer {
@@ -183,7 +130,8 @@ async fn acks_when_all_handlers_succeed() {
let envelope = EventEnvelope::new( let envelope = EventEnvelope::new(
self.event.clone(), self.event.clone(),
Box::new(TrackingAck { Box::new(TrackingAck {
ack_called: Arc::clone(&self.ack_called), acked: Arc::clone(&self.acked),
nacked: Arc::clone(&self.nacked),
}), }),
); );
Box::pin(stream::iter(vec![Ok(envelope)])) Box::pin(stream::iter(vec![Ok(envelope)]))
@@ -192,12 +140,165 @@ async fn acks_when_all_handlers_succeed() {
let consumer = TrackingConsumer { let consumer = TrackingConsumer {
event: movie_discovered(), event: movie_discovered(),
ack_called: Arc::clone(&ack_called), acked: Arc::clone(&acked),
nacked: Arc::clone(&nacked),
}; };
WorkerService::new(Arc::new(consumer), vec![]) WorkerService::new(Arc::new(consumer), vec![])
.run(tokio::sync::watch::channel(false).1) .run(tokio::sync::watch::channel(false).1)
.await; .await;
assert!(*ack_called.lock().unwrap()); assert!(*acked.lock().unwrap());
assert!(!*nacked.lock().unwrap());
}
#[tokio::test]
async fn nacks_on_transient_handler_failure() {
let acked = Arc::new(Mutex::new(false));
let nacked = Arc::new(Mutex::new(false));
struct TrackingConsumer {
event: DomainEvent,
acked: Arc<Mutex<bool>>,
nacked: Arc<Mutex<bool>>,
}
impl EventConsumer for TrackingConsumer {
fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> {
let envelope = EventEnvelope::new(
self.event.clone(),
Box::new(TrackingAck {
acked: Arc::clone(&self.acked),
nacked: Arc::clone(&self.nacked),
}),
);
Box::pin(stream::iter(vec![Ok(envelope)]))
}
}
struct TransientHandler;
#[async_trait]
impl EventHandler for TransientHandler {
async fn handle(&self, _: &DomainEvent) -> Result<(), DomainError> {
Err(DomainError::InfrastructureError("network timeout".into()))
}
}
WorkerService::new(
Arc::new(TrackingConsumer {
event: movie_discovered(),
acked: Arc::clone(&acked),
nacked: Arc::clone(&nacked),
}),
vec![Arc::new(TransientHandler)],
)
.run(tokio::sync::watch::channel(false).1)
.await;
assert!(!*acked.lock().unwrap(), "should not ack on transient error");
assert!(*nacked.lock().unwrap(), "should nack on transient error");
}
#[tokio::test]
async fn acks_on_permanent_handler_failure() {
let acked = Arc::new(Mutex::new(false));
let nacked = Arc::new(Mutex::new(false));
struct TrackingConsumer {
event: DomainEvent,
acked: Arc<Mutex<bool>>,
nacked: Arc<Mutex<bool>>,
}
impl EventConsumer for TrackingConsumer {
fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> {
let envelope = EventEnvelope::new(
self.event.clone(),
Box::new(TrackingAck {
acked: Arc::clone(&self.acked),
nacked: Arc::clone(&self.nacked),
}),
);
Box::pin(stream::iter(vec![Ok(envelope)]))
}
}
struct PermanentHandler;
#[async_trait]
impl EventHandler for PermanentHandler {
async fn handle(&self, _: &DomainEvent) -> Result<(), DomainError> {
Err(DomainError::NotFound("movie not on tmdb".into()))
}
}
WorkerService::new(
Arc::new(TrackingConsumer {
event: movie_discovered(),
acked: Arc::clone(&acked),
nacked: Arc::clone(&nacked),
}),
vec![Arc::new(PermanentHandler)],
)
.run(tokio::sync::watch::channel(false).1)
.await;
assert!(*acked.lock().unwrap(), "should ack on permanent error");
assert!(
!*nacked.lock().unwrap(),
"should not nack on permanent error"
);
}
#[tokio::test]
async fn nacks_if_any_handler_is_transient() {
let acked = Arc::new(Mutex::new(false));
let nacked = Arc::new(Mutex::new(false));
struct TrackingConsumer {
event: DomainEvent,
acked: Arc<Mutex<bool>>,
nacked: Arc<Mutex<bool>>,
}
impl EventConsumer for TrackingConsumer {
fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> {
let envelope = EventEnvelope::new(
self.event.clone(),
Box::new(TrackingAck {
acked: Arc::clone(&self.acked),
nacked: Arc::clone(&self.nacked),
}),
);
Box::pin(stream::iter(vec![Ok(envelope)]))
}
}
struct OkHandler;
#[async_trait]
impl EventHandler for OkHandler {
async fn handle(&self, _: &DomainEvent) -> Result<(), DomainError> {
Ok(())
}
}
struct TransientHandler;
#[async_trait]
impl EventHandler for TransientHandler {
async fn handle(&self, _: &DomainEvent) -> Result<(), DomainError> {
Err(DomainError::InfrastructureError("db gone".into()))
}
}
WorkerService::new(
Arc::new(TrackingConsumer {
event: movie_discovered(),
acked: Arc::clone(&acked),
nacked: Arc::clone(&nacked),
}),
vec![Arc::new(OkHandler), Arc::new(TransientHandler)],
)
.run(tokio::sync::watch::channel(false).1)
.await;
assert!(
!*acked.lock().unwrap(),
"should not ack when any handler is transient"
);
assert!(
*nacked.lock().unwrap(),
"should nack when any handler is transient"
);
} }

View File

@@ -65,12 +65,22 @@ impl WorkerService {
} }
async fn dispatch(handlers: Arc<Vec<Arc<dyn EventHandler>>>, envelope: EventEnvelope) { async fn dispatch(handlers: Arc<Vec<Arc<dyn EventHandler>>>, envelope: EventEnvelope) {
let mut any_transient = false;
for handler in handlers.iter() { for handler in handlers.iter() {
if let Err(e) = handler.handle(&envelope.event).await { if let Err(e) = handler.handle(&envelope.event).await {
tracing::warn!("event handler error (non-fatal): {e}"); if e.is_transient() {
tracing::warn!("transient handler error, will retry: {e}");
any_transient = true;
} else {
tracing::warn!("permanent handler error (not retrying): {e}");
} }
} }
if let Err(e) = envelope.ack().await { }
if any_transient {
if let Err(e) = envelope.nack().await {
tracing::error!("nack failed: {e}");
}
} else if let Err(e) = envelope.ack().await {
tracing::error!("ack failed: {e}"); tracing::error!("ack failed: {e}");
} }
} }