feat(event-publisher): add EventHandler trait and fan-out in EventWorker

This commit is contained in:
2026-05-04 12:32:44 +02:00
parent 17f90726e8
commit a38f78d261

View File

@@ -16,6 +16,11 @@ impl EventPublisherConfig {
} }
} }
#[async_trait]
pub trait EventHandler: Send + Sync {
async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError>;
}
pub struct ChannelEventPublisher { pub struct ChannelEventPublisher {
sender: mpsc::Sender<DomainEvent>, sender: mpsc::Sender<DomainEvent>,
} }
@@ -32,6 +37,7 @@ impl EventPublisher for ChannelEventPublisher {
pub struct EventWorker { pub struct EventWorker {
receiver: mpsc::Receiver<DomainEvent>, receiver: mpsc::Receiver<DomainEvent>,
handlers: Vec<Box<dyn EventHandler>>,
} }
impl EventWorker { impl EventWorker {
@@ -65,15 +71,128 @@ impl EventWorker {
); );
} }
} }
for handler in &self.handlers {
if let Err(e) = handler.handle(&event).await {
tracing::error!("event handler error: {e}");
}
}
} }
tracing::info!("event worker shut down"); tracing::info!("event worker shut down");
} }
} }
pub fn create_event_channel(config: EventPublisherConfig) -> (ChannelEventPublisher, EventWorker) { pub fn create_event_channel(
config: EventPublisherConfig,
handlers: Vec<Box<dyn EventHandler>>,
) -> (ChannelEventPublisher, EventWorker) {
let (tx, rx) = mpsc::channel(config.channel_buffer); let (tx, rx) = mpsc::channel(config.channel_buffer);
( (
ChannelEventPublisher { sender: tx }, ChannelEventPublisher { sender: tx },
EventWorker { receiver: rx }, EventWorker {
receiver: rx,
handlers,
},
) )
} }
#[cfg(test)]
mod tests {
use super::*;
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use domain::{
errors::DomainError,
events::DomainEvent,
value_objects::{ExternalMetadataId, MovieId},
};
struct RecordingHandler {
calls: Arc<Mutex<Vec<String>>>,
}
#[async_trait]
impl EventHandler for RecordingHandler {
async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> {
let label = match event {
DomainEvent::MovieDiscovered { .. } => "movie_discovered",
DomainEvent::ReviewLogged { .. } => "review_logged",
};
self.calls.lock().unwrap().push(label.to_string());
Ok(())
}
}
#[tokio::test]
async fn single_handler_receives_event() {
let calls = Arc::new(Mutex::new(vec![]));
let handler = RecordingHandler { calls: Arc::clone(&calls) };
let config = EventPublisherConfig { channel_buffer: 8 };
let (publisher, worker) = create_event_channel(config, vec![Box::new(handler)]);
tokio::spawn(worker.run());
let event = DomainEvent::MovieDiscovered {
movie_id: MovieId::generate(),
external_metadata_id: ExternalMetadataId::new("tt1234567".into()).unwrap(),
};
publisher.publish(&event).await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
assert_eq!(*calls.lock().unwrap(), vec!["movie_discovered"]);
}
#[tokio::test]
async fn multiple_handlers_all_receive_event() {
let calls1 = Arc::new(Mutex::new(vec![]));
let calls2 = Arc::new(Mutex::new(vec![]));
let handler1 = RecordingHandler { calls: Arc::clone(&calls1) };
let handler2 = RecordingHandler { calls: Arc::clone(&calls2) };
let config = EventPublisherConfig { channel_buffer: 8 };
let (publisher, worker) = create_event_channel(
config,
vec![Box::new(handler1), Box::new(handler2)],
);
tokio::spawn(worker.run());
let event = DomainEvent::MovieDiscovered {
movie_id: MovieId::generate(),
external_metadata_id: ExternalMetadataId::new("tt9999999".into()).unwrap(),
};
publisher.publish(&event).await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
assert_eq!(calls1.lock().unwrap().len(), 1);
assert_eq!(calls2.lock().unwrap().len(), 1);
}
#[tokio::test]
async fn handler_error_does_not_stop_worker() {
struct FailingHandler;
#[async_trait]
impl EventHandler for FailingHandler {
async fn handle(&self, _: &DomainEvent) -> Result<(), DomainError> {
Err(DomainError::InfrastructureError("boom".into()))
}
}
let calls = Arc::new(Mutex::new(vec![]));
let good = RecordingHandler { calls: Arc::clone(&calls) };
let config = EventPublisherConfig { channel_buffer: 8 };
let (publisher, worker) = create_event_channel(
config,
vec![Box::new(FailingHandler), Box::new(good)],
);
tokio::spawn(worker.run());
let event = DomainEvent::MovieDiscovered {
movie_id: MovieId::generate(),
external_metadata_id: ExternalMetadataId::new("tt0000001".into()).unwrap(),
};
publisher.publish(&event).await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
assert_eq!(calls.lock().unwrap().len(), 1);
}
}