background worker
This commit is contained in:
@@ -1,5 +1,12 @@
|
||||
use async_trait::async_trait;
|
||||
use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher};
|
||||
use domain::{
|
||||
errors::DomainError,
|
||||
events::{AckHandle, DomainEvent, EventEnvelope},
|
||||
ports::{EventConsumer, EventPublisher},
|
||||
};
|
||||
use futures::stream::{self, BoxStream};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
pub use domain::ports::EventHandler;
|
||||
@@ -32,65 +39,26 @@ impl EventPublisher for ChannelEventPublisher {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct EventWorker {
|
||||
receiver: mpsc::Receiver<DomainEvent>,
|
||||
handlers: Vec<Box<dyn EventHandler>>,
|
||||
struct NoopAck;
|
||||
|
||||
#[async_trait]
|
||||
impl AckHandle for NoopAck {
|
||||
async fn ack(&self) -> Result<(), DomainError> { Ok(()) }
|
||||
async fn nack(&self) -> Result<(), DomainError> { Ok(()) }
|
||||
}
|
||||
|
||||
impl EventWorker {
|
||||
pub async fn run(mut self) {
|
||||
while let Some(event) = self.receiver.recv().await {
|
||||
match &event {
|
||||
DomainEvent::ReviewLogged {
|
||||
review_id,
|
||||
movie_id,
|
||||
user_id,
|
||||
rating,
|
||||
watched_at,
|
||||
} => {
|
||||
tracing::info!(
|
||||
review_id = %review_id.value(),
|
||||
movie_id = %movie_id.value(),
|
||||
user_id = %user_id.value(),
|
||||
rating = rating.value(),
|
||||
watched_at = %watched_at,
|
||||
"event: review_logged"
|
||||
);
|
||||
}
|
||||
DomainEvent::ReviewUpdated {
|
||||
review_id,
|
||||
movie_id,
|
||||
user_id,
|
||||
rating,
|
||||
watched_at,
|
||||
} => {
|
||||
tracing::info!(
|
||||
review_id = %review_id.value(),
|
||||
movie_id = %movie_id.value(),
|
||||
user_id = %user_id.value(),
|
||||
rating = rating.value(),
|
||||
watched_at = %watched_at,
|
||||
"event: review_updated"
|
||||
);
|
||||
}
|
||||
DomainEvent::MovieDiscovered {
|
||||
movie_id,
|
||||
external_metadata_id,
|
||||
} => {
|
||||
tracing::info!(
|
||||
movie_id = %movie_id.value(),
|
||||
external_id = external_metadata_id.value(),
|
||||
"event: movie_discovered"
|
||||
);
|
||||
}
|
||||
}
|
||||
for handler in &self.handlers {
|
||||
if let Err(e) = handler.handle(&event).await {
|
||||
tracing::error!("event handler error: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
tracing::info!("event worker shut down");
|
||||
pub struct ChannelEventConsumer {
|
||||
receiver: Arc<Mutex<mpsc::Receiver<DomainEvent>>>,
|
||||
}
|
||||
|
||||
impl EventConsumer for ChannelEventConsumer {
|
||||
fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> {
|
||||
let receiver = Arc::clone(&self.receiver);
|
||||
Box::pin(stream::unfold(receiver, |rx| async move {
|
||||
let event = rx.lock().await.recv().await?;
|
||||
let envelope = EventEnvelope::new(event, Box::new(NoopAck));
|
||||
Some((Ok(envelope), rx))
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -105,14 +73,12 @@ impl EventPublisher for NoopEventPublisher {
|
||||
|
||||
pub fn create_event_channel(
|
||||
config: EventPublisherConfig,
|
||||
handlers: Vec<Box<dyn EventHandler>>,
|
||||
) -> (ChannelEventPublisher, EventWorker) {
|
||||
) -> (ChannelEventPublisher, ChannelEventConsumer) {
|
||||
let (tx, rx) = mpsc::channel(config.channel_buffer);
|
||||
(
|
||||
ChannelEventPublisher { sender: tx },
|
||||
EventWorker {
|
||||
receiver: rx,
|
||||
handlers,
|
||||
ChannelEventConsumer {
|
||||
receiver: Arc::new(Mutex::new(rx)),
|
||||
},
|
||||
)
|
||||
}
|
||||
@@ -121,107 +87,56 @@ pub fn create_event_channel(
|
||||
mod tests {
|
||||
use super::*;
|
||||
use domain::{
|
||||
errors::DomainError,
|
||||
events::DomainEvent,
|
||||
value_objects::{ExternalMetadataId, MovieId},
|
||||
};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use futures::StreamExt;
|
||||
|
||||
struct RecordingHandler {
|
||||
calls: Arc<Mutex<Vec<String>>>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl EventHandler for RecordingHandler {
|
||||
async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> {
|
||||
let label = match event {
|
||||
DomainEvent::ReviewLogged { .. } => "review_logged",
|
||||
DomainEvent::ReviewUpdated { .. } => "review_updated",
|
||||
DomainEvent::MovieDiscovered { .. } => "movie_discovered",
|
||||
};
|
||||
self.calls.lock().unwrap().push(label.to_string());
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn single_handler_receives_event() {
|
||||
let calls = Arc::new(Mutex::new(vec![]));
|
||||
let handler = RecordingHandler {
|
||||
calls: Arc::clone(&calls),
|
||||
};
|
||||
let config = EventPublisherConfig { channel_buffer: 8 };
|
||||
let (publisher, worker) = create_event_channel(config, vec![Box::new(handler)]);
|
||||
|
||||
let handle = tokio::spawn(worker.run());
|
||||
|
||||
let event = DomainEvent::MovieDiscovered {
|
||||
fn movie_discovered() -> DomainEvent {
|
||||
DomainEvent::MovieDiscovered {
|
||||
movie_id: MovieId::generate(),
|
||||
external_metadata_id: ExternalMetadataId::new("tt1234567".into()).unwrap(),
|
||||
};
|
||||
publisher.publish(&event).await.unwrap();
|
||||
drop(publisher);
|
||||
handle.await.unwrap();
|
||||
|
||||
assert_eq!(*calls.lock().unwrap(), vec!["movie_discovered"]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn multiple_handlers_all_receive_event() {
|
||||
let calls1 = Arc::new(Mutex::new(vec![]));
|
||||
let calls2 = Arc::new(Mutex::new(vec![]));
|
||||
let handler1 = RecordingHandler {
|
||||
calls: Arc::clone(&calls1),
|
||||
};
|
||||
let handler2 = RecordingHandler {
|
||||
calls: Arc::clone(&calls2),
|
||||
};
|
||||
let config = EventPublisherConfig { channel_buffer: 8 };
|
||||
let (publisher, worker) =
|
||||
create_event_channel(config, vec![Box::new(handler1), Box::new(handler2)]);
|
||||
|
||||
let handle = tokio::spawn(worker.run());
|
||||
|
||||
let event = DomainEvent::MovieDiscovered {
|
||||
movie_id: MovieId::generate(),
|
||||
external_metadata_id: ExternalMetadataId::new("tt9999999".into()).unwrap(),
|
||||
};
|
||||
publisher.publish(&event).await.unwrap();
|
||||
drop(publisher);
|
||||
handle.await.unwrap();
|
||||
|
||||
assert_eq!(calls1.lock().unwrap().len(), 1);
|
||||
assert_eq!(calls2.lock().unwrap().len(), 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn handler_error_does_not_stop_worker() {
|
||||
struct FailingHandler;
|
||||
#[async_trait]
|
||||
impl EventHandler for FailingHandler {
|
||||
async fn handle(&self, _: &DomainEvent) -> Result<(), DomainError> {
|
||||
Err(DomainError::InfrastructureError("boom".into()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let calls = Arc::new(Mutex::new(vec![]));
|
||||
let good = RecordingHandler {
|
||||
calls: Arc::clone(&calls),
|
||||
};
|
||||
#[tokio::test]
|
||||
async fn consumer_yields_published_events() {
|
||||
let config = EventPublisherConfig { channel_buffer: 8 };
|
||||
let (publisher, worker) =
|
||||
create_event_channel(config, vec![Box::new(FailingHandler), Box::new(good)]);
|
||||
let (publisher, consumer) = create_event_channel(config);
|
||||
|
||||
let handle = tokio::spawn(worker.run());
|
||||
|
||||
let event = DomainEvent::MovieDiscovered {
|
||||
movie_id: MovieId::generate(),
|
||||
external_metadata_id: ExternalMetadataId::new("tt0000001".into()).unwrap(),
|
||||
};
|
||||
publisher.publish(&event).await.unwrap();
|
||||
publisher.publish(&movie_discovered()).await.unwrap();
|
||||
drop(publisher);
|
||||
handle.await.unwrap();
|
||||
|
||||
assert_eq!(calls.lock().unwrap().len(), 1);
|
||||
let mut stream = consumer.consume();
|
||||
let envelope = stream.next().await.unwrap().unwrap();
|
||||
assert!(matches!(envelope.event, DomainEvent::MovieDiscovered { .. }));
|
||||
assert!(stream.next().await.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn consumer_yields_multiple_events_in_order() {
|
||||
let config = EventPublisherConfig { channel_buffer: 8 };
|
||||
let (publisher, consumer) = create_event_channel(config);
|
||||
|
||||
publisher.publish(&movie_discovered()).await.unwrap();
|
||||
publisher.publish(&movie_discovered()).await.unwrap();
|
||||
drop(publisher);
|
||||
|
||||
let mut stream = consumer.consume();
|
||||
let first = stream.next().await.unwrap().unwrap();
|
||||
let second = stream.next().await.unwrap().unwrap();
|
||||
assert!(matches!(first.event, DomainEvent::MovieDiscovered { .. }));
|
||||
assert!(matches!(second.event, DomainEvent::MovieDiscovered { .. }));
|
||||
assert!(stream.next().await.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn stream_ends_when_publisher_dropped() {
|
||||
let config = EventPublisherConfig { channel_buffer: 8 };
|
||||
let (publisher, consumer) = create_event_channel(config);
|
||||
drop(publisher);
|
||||
|
||||
let mut stream = consumer.consume();
|
||||
assert!(stream.next().await.is_none());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user