Compare commits
2 Commits
40cb15e7cb
...
6f34b7b5ec
| Author | SHA1 | Date | |
|---|---|---|---|
| 6f34b7b5ec | |||
| 17d4de461b |
@@ -20,6 +20,23 @@ impl AckHandle for NoopAck {
|
||||
}
|
||||
}
|
||||
|
||||
struct TrackingAck {
|
||||
acked: Arc<Mutex<bool>>,
|
||||
nacked: Arc<Mutex<bool>>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl AckHandle for TrackingAck {
|
||||
async fn ack(&self) -> Result<(), DomainError> {
|
||||
*self.acked.lock().unwrap() = true;
|
||||
Ok(())
|
||||
}
|
||||
async fn nack(&self) -> Result<(), DomainError> {
|
||||
*self.nacked.lock().unwrap() = true;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
struct VecConsumer {
|
||||
events: Vec<DomainEvent>,
|
||||
}
|
||||
@@ -97,85 +114,15 @@ async fn dispatches_to_all_handlers() {
|
||||
assert_eq!(*calls.lock().unwrap(), vec!["movie_discovered"]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn acks_even_when_handler_fails() {
|
||||
let ack_called = Arc::new(Mutex::new(false));
|
||||
|
||||
struct TrackingAck {
|
||||
ack_called: Arc<Mutex<bool>>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl AckHandle for TrackingAck {
|
||||
async fn ack(&self) -> Result<(), DomainError> {
|
||||
*self.ack_called.lock().unwrap() = true;
|
||||
Ok(())
|
||||
}
|
||||
async fn nack(&self) -> Result<(), DomainError> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
struct TrackingConsumer {
|
||||
event: DomainEvent,
|
||||
ack_called: Arc<Mutex<bool>>,
|
||||
}
|
||||
|
||||
impl EventConsumer for TrackingConsumer {
|
||||
fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> {
|
||||
let envelope = EventEnvelope::new(
|
||||
self.event.clone(),
|
||||
Box::new(TrackingAck {
|
||||
ack_called: Arc::clone(&self.ack_called),
|
||||
}),
|
||||
);
|
||||
Box::pin(stream::iter(vec![Ok(envelope)]))
|
||||
}
|
||||
}
|
||||
|
||||
struct FailingHandler;
|
||||
|
||||
#[async_trait]
|
||||
impl EventHandler for FailingHandler {
|
||||
async fn handle(&self, _: &DomainEvent) -> Result<(), DomainError> {
|
||||
Err(DomainError::InfrastructureError("boom".into()))
|
||||
}
|
||||
}
|
||||
|
||||
let consumer = TrackingConsumer {
|
||||
event: movie_discovered(),
|
||||
ack_called: Arc::clone(&ack_called),
|
||||
};
|
||||
|
||||
WorkerService::new(Arc::new(consumer), vec![Arc::new(FailingHandler)])
|
||||
.run(tokio::sync::watch::channel(false).1)
|
||||
.await;
|
||||
|
||||
assert!(*ack_called.lock().unwrap());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn acks_when_all_handlers_succeed() {
|
||||
let ack_called = Arc::new(Mutex::new(false));
|
||||
|
||||
struct TrackingAck {
|
||||
ack_called: Arc<Mutex<bool>>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl AckHandle for TrackingAck {
|
||||
async fn ack(&self) -> Result<(), DomainError> {
|
||||
*self.ack_called.lock().unwrap() = true;
|
||||
Ok(())
|
||||
}
|
||||
async fn nack(&self) -> Result<(), DomainError> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
let acked = Arc::new(Mutex::new(false));
|
||||
let nacked = Arc::new(Mutex::new(false));
|
||||
|
||||
struct TrackingConsumer {
|
||||
event: DomainEvent,
|
||||
ack_called: Arc<Mutex<bool>>,
|
||||
acked: Arc<Mutex<bool>>,
|
||||
nacked: Arc<Mutex<bool>>,
|
||||
}
|
||||
|
||||
impl EventConsumer for TrackingConsumer {
|
||||
@@ -183,7 +130,8 @@ async fn acks_when_all_handlers_succeed() {
|
||||
let envelope = EventEnvelope::new(
|
||||
self.event.clone(),
|
||||
Box::new(TrackingAck {
|
||||
ack_called: Arc::clone(&self.ack_called),
|
||||
acked: Arc::clone(&self.acked),
|
||||
nacked: Arc::clone(&self.nacked),
|
||||
}),
|
||||
);
|
||||
Box::pin(stream::iter(vec![Ok(envelope)]))
|
||||
@@ -192,12 +140,165 @@ async fn acks_when_all_handlers_succeed() {
|
||||
|
||||
let consumer = TrackingConsumer {
|
||||
event: movie_discovered(),
|
||||
ack_called: Arc::clone(&ack_called),
|
||||
acked: Arc::clone(&acked),
|
||||
nacked: Arc::clone(&nacked),
|
||||
};
|
||||
|
||||
WorkerService::new(Arc::new(consumer), vec![])
|
||||
.run(tokio::sync::watch::channel(false).1)
|
||||
.await;
|
||||
|
||||
assert!(*ack_called.lock().unwrap());
|
||||
assert!(*acked.lock().unwrap());
|
||||
assert!(!*nacked.lock().unwrap());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn nacks_on_transient_handler_failure() {
|
||||
let acked = Arc::new(Mutex::new(false));
|
||||
let nacked = Arc::new(Mutex::new(false));
|
||||
|
||||
struct TrackingConsumer {
|
||||
event: DomainEvent,
|
||||
acked: Arc<Mutex<bool>>,
|
||||
nacked: Arc<Mutex<bool>>,
|
||||
}
|
||||
impl EventConsumer for TrackingConsumer {
|
||||
fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> {
|
||||
let envelope = EventEnvelope::new(
|
||||
self.event.clone(),
|
||||
Box::new(TrackingAck {
|
||||
acked: Arc::clone(&self.acked),
|
||||
nacked: Arc::clone(&self.nacked),
|
||||
}),
|
||||
);
|
||||
Box::pin(stream::iter(vec![Ok(envelope)]))
|
||||
}
|
||||
}
|
||||
struct TransientHandler;
|
||||
#[async_trait]
|
||||
impl EventHandler for TransientHandler {
|
||||
async fn handle(&self, _: &DomainEvent) -> Result<(), DomainError> {
|
||||
Err(DomainError::InfrastructureError("network timeout".into()))
|
||||
}
|
||||
}
|
||||
|
||||
WorkerService::new(
|
||||
Arc::new(TrackingConsumer {
|
||||
event: movie_discovered(),
|
||||
acked: Arc::clone(&acked),
|
||||
nacked: Arc::clone(&nacked),
|
||||
}),
|
||||
vec![Arc::new(TransientHandler)],
|
||||
)
|
||||
.run(tokio::sync::watch::channel(false).1)
|
||||
.await;
|
||||
|
||||
assert!(!*acked.lock().unwrap(), "should not ack on transient error");
|
||||
assert!(*nacked.lock().unwrap(), "should nack on transient error");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn acks_on_permanent_handler_failure() {
|
||||
let acked = Arc::new(Mutex::new(false));
|
||||
let nacked = Arc::new(Mutex::new(false));
|
||||
|
||||
struct TrackingConsumer {
|
||||
event: DomainEvent,
|
||||
acked: Arc<Mutex<bool>>,
|
||||
nacked: Arc<Mutex<bool>>,
|
||||
}
|
||||
impl EventConsumer for TrackingConsumer {
|
||||
fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> {
|
||||
let envelope = EventEnvelope::new(
|
||||
self.event.clone(),
|
||||
Box::new(TrackingAck {
|
||||
acked: Arc::clone(&self.acked),
|
||||
nacked: Arc::clone(&self.nacked),
|
||||
}),
|
||||
);
|
||||
Box::pin(stream::iter(vec![Ok(envelope)]))
|
||||
}
|
||||
}
|
||||
struct PermanentHandler;
|
||||
#[async_trait]
|
||||
impl EventHandler for PermanentHandler {
|
||||
async fn handle(&self, _: &DomainEvent) -> Result<(), DomainError> {
|
||||
Err(DomainError::NotFound("movie not on tmdb".into()))
|
||||
}
|
||||
}
|
||||
|
||||
WorkerService::new(
|
||||
Arc::new(TrackingConsumer {
|
||||
event: movie_discovered(),
|
||||
acked: Arc::clone(&acked),
|
||||
nacked: Arc::clone(&nacked),
|
||||
}),
|
||||
vec![Arc::new(PermanentHandler)],
|
||||
)
|
||||
.run(tokio::sync::watch::channel(false).1)
|
||||
.await;
|
||||
|
||||
assert!(*acked.lock().unwrap(), "should ack on permanent error");
|
||||
assert!(
|
||||
!*nacked.lock().unwrap(),
|
||||
"should not nack on permanent error"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn nacks_if_any_handler_is_transient() {
|
||||
let acked = Arc::new(Mutex::new(false));
|
||||
let nacked = Arc::new(Mutex::new(false));
|
||||
|
||||
struct TrackingConsumer {
|
||||
event: DomainEvent,
|
||||
acked: Arc<Mutex<bool>>,
|
||||
nacked: Arc<Mutex<bool>>,
|
||||
}
|
||||
impl EventConsumer for TrackingConsumer {
|
||||
fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> {
|
||||
let envelope = EventEnvelope::new(
|
||||
self.event.clone(),
|
||||
Box::new(TrackingAck {
|
||||
acked: Arc::clone(&self.acked),
|
||||
nacked: Arc::clone(&self.nacked),
|
||||
}),
|
||||
);
|
||||
Box::pin(stream::iter(vec![Ok(envelope)]))
|
||||
}
|
||||
}
|
||||
struct OkHandler;
|
||||
#[async_trait]
|
||||
impl EventHandler for OkHandler {
|
||||
async fn handle(&self, _: &DomainEvent) -> Result<(), DomainError> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
struct TransientHandler;
|
||||
#[async_trait]
|
||||
impl EventHandler for TransientHandler {
|
||||
async fn handle(&self, _: &DomainEvent) -> Result<(), DomainError> {
|
||||
Err(DomainError::InfrastructureError("db gone".into()))
|
||||
}
|
||||
}
|
||||
|
||||
WorkerService::new(
|
||||
Arc::new(TrackingConsumer {
|
||||
event: movie_discovered(),
|
||||
acked: Arc::clone(&acked),
|
||||
nacked: Arc::clone(&nacked),
|
||||
}),
|
||||
vec![Arc::new(OkHandler), Arc::new(TransientHandler)],
|
||||
)
|
||||
.run(tokio::sync::watch::channel(false).1)
|
||||
.await;
|
||||
|
||||
assert!(
|
||||
!*acked.lock().unwrap(),
|
||||
"should not ack when any handler is transient"
|
||||
);
|
||||
assert!(
|
||||
*nacked.lock().unwrap(),
|
||||
"should nack when any handler is transient"
|
||||
);
|
||||
}
|
||||
|
||||
@@ -65,12 +65,22 @@ impl WorkerService {
|
||||
}
|
||||
|
||||
async fn dispatch(handlers: Arc<Vec<Arc<dyn EventHandler>>>, envelope: EventEnvelope) {
|
||||
let mut any_transient = false;
|
||||
for handler in handlers.iter() {
|
||||
if let Err(e) = handler.handle(&envelope.event).await {
|
||||
tracing::warn!("event handler error (non-fatal): {e}");
|
||||
if e.is_transient() {
|
||||
tracing::warn!("transient handler error, will retry: {e}");
|
||||
any_transient = true;
|
||||
} else {
|
||||
tracing::warn!("permanent handler error (not retrying): {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Err(e) = envelope.ack().await {
|
||||
if any_transient {
|
||||
if let Err(e) = envelope.nack().await {
|
||||
tracing::error!("nack failed: {e}");
|
||||
}
|
||||
} else if let Err(e) = envelope.ack().await {
|
||||
tracing::error!("ack failed: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,3 +20,44 @@ pub enum DomainError {
|
||||
#[error("Forbidden: {0}")]
|
||||
Forbidden(String),
|
||||
}
|
||||
|
||||
impl DomainError {
|
||||
pub fn is_transient(&self) -> bool {
|
||||
matches!(self, DomainError::InfrastructureError(_))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn infrastructure_error_is_transient() {
|
||||
assert!(DomainError::InfrastructureError("network timeout".into()).is_transient());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn not_found_is_not_transient() {
|
||||
assert!(!DomainError::NotFound("thing".into()).is_transient());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn validation_error_is_not_transient() {
|
||||
assert!(!DomainError::ValidationError("bad input".into()).is_transient());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unauthorized_is_not_transient() {
|
||||
assert!(!DomainError::Unauthorized("token expired".into()).is_transient());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn forbidden_is_not_transient() {
|
||||
assert!(!DomainError::Forbidden("no access".into()).is_transient());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn invalid_rating_is_not_transient() {
|
||||
assert!(!DomainError::InvalidRating { max: 5, given: 9 }.is_transient());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user