feat: update dependencies and improve code formatting in worker tests

Co-authored-by: Copilot <copilot@github.com>
This commit is contained in:
2026-05-13 01:38:33 +02:00
parent bc6c767c29
commit 10fcc27339
2 changed files with 36 additions and 13 deletions

3
Cargo.lock generated
View File

@@ -29,7 +29,9 @@ dependencies = [
"async-trait", "async-trait",
"axum", "axum",
"chrono", "chrono",
"domain",
"enum_delegate", "enum_delegate",
"reqwest 0.13.3",
"serde", "serde",
"serde_json", "serde_json",
"tokio", "tokio",
@@ -6808,6 +6810,7 @@ dependencies = [
"activitypub", "activitypub",
"anyhow", "anyhow",
"application", "application",
"async-trait",
"auth", "auth",
"domain", "domain",
"dotenvy", "dotenvy",

View File

@@ -1,7 +1,10 @@
use super::*; use super::*;
use async_trait::async_trait; use async_trait::async_trait;
use domain::{errors::DomainError, events::{AckHandle, DomainEvent}};
use domain::value_objects::{ExternalMetadataId, MovieId}; use domain::value_objects::{ExternalMetadataId, MovieId};
use domain::{
errors::DomainError,
events::{AckHandle, DomainEvent},
};
use futures::{stream, stream::BoxStream}; use futures::{stream, stream::BoxStream};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
@@ -9,8 +12,12 @@ struct NoopAck;
#[async_trait] #[async_trait]
impl AckHandle for NoopAck { impl AckHandle for NoopAck {
async fn ack(&self) -> Result<(), DomainError> { Ok(()) } async fn ack(&self) -> Result<(), DomainError> {
async fn nack(&self) -> Result<(), DomainError> { Ok(()) } Ok(())
}
async fn nack(&self) -> Result<(), DomainError> {
Ok(())
}
} }
struct VecConsumer { struct VecConsumer {
@@ -45,7 +52,10 @@ impl EventHandler for RecordingHandler {
DomainEvent::UserUpdated { .. } => "user_updated", DomainEvent::UserUpdated { .. } => "user_updated",
DomainEvent::MovieEnrichmentRequested { .. } => "movie_enrichment_requested", DomainEvent::MovieEnrichmentRequested { .. } => "movie_enrichment_requested",
DomainEvent::ImageStored { .. } => "image_stored", DomainEvent::ImageStored { .. } => "image_stored",
DomainEvent::WatchlistEntryAdded { .. } | DomainEvent::WatchlistEntryRemoved { .. } => "watchlist", DomainEvent::WatchlistEntryAdded { .. } | DomainEvent::WatchlistEntryRemoved { .. } => {
"watchlist"
}
DomainEvent::FollowAccepted { .. } => "follow_accepted",
}; };
self.calls.lock().unwrap().push(label); self.calls.lock().unwrap().push(label);
Ok(()) Ok(())
@@ -62,8 +72,12 @@ fn movie_discovered() -> DomainEvent {
#[tokio::test] #[tokio::test]
async fn dispatches_to_all_handlers() { async fn dispatches_to_all_handlers() {
let calls = Arc::new(Mutex::new(vec![])); let calls = Arc::new(Mutex::new(vec![]));
let consumer = VecConsumer { events: vec![movie_discovered()] }; let consumer = VecConsumer {
let handler = RecordingHandler { calls: Arc::clone(&calls) }; events: vec![movie_discovered()],
};
let handler = RecordingHandler {
calls: Arc::clone(&calls),
};
WorkerService::new(Arc::new(consumer), vec![Arc::new(handler)]) WorkerService::new(Arc::new(consumer), vec![Arc::new(handler)])
.run() .run()
@@ -82,7 +96,9 @@ async fn nacks_when_handler_fails() {
#[async_trait] #[async_trait]
impl AckHandle for TrackingAck { impl AckHandle for TrackingAck {
async fn ack(&self) -> Result<(), DomainError> { Ok(()) } async fn ack(&self) -> Result<(), DomainError> {
Ok(())
}
async fn nack(&self) -> Result<(), DomainError> { async fn nack(&self) -> Result<(), DomainError> {
*self.nack_called.lock().unwrap() = true; *self.nack_called.lock().unwrap() = true;
Ok(()) Ok(())
@@ -98,7 +114,9 @@ async fn nacks_when_handler_fails() {
fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> { fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> {
let envelope = EventEnvelope::new( let envelope = EventEnvelope::new(
self.event.clone(), self.event.clone(),
Box::new(TrackingAck { nack_called: Arc::clone(&self.nack_called) }), Box::new(TrackingAck {
nack_called: Arc::clone(&self.nack_called),
}),
); );
Box::pin(stream::iter(vec![Ok(envelope)])) Box::pin(stream::iter(vec![Ok(envelope)]))
} }
@@ -139,7 +157,9 @@ async fn acks_when_all_handlers_succeed() {
*self.ack_called.lock().unwrap() = true; *self.ack_called.lock().unwrap() = true;
Ok(()) Ok(())
} }
async fn nack(&self) -> Result<(), DomainError> { Ok(()) } async fn nack(&self) -> Result<(), DomainError> {
Ok(())
}
} }
struct TrackingConsumer { struct TrackingConsumer {
@@ -151,7 +171,9 @@ async fn acks_when_all_handlers_succeed() {
fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> { fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> {
let envelope = EventEnvelope::new( let envelope = EventEnvelope::new(
self.event.clone(), self.event.clone(),
Box::new(TrackingAck { ack_called: Arc::clone(&self.ack_called) }), Box::new(TrackingAck {
ack_called: Arc::clone(&self.ack_called),
}),
); );
Box::pin(stream::iter(vec![Ok(envelope)])) Box::pin(stream::iter(vec![Ok(envelope)]))
} }
@@ -162,9 +184,7 @@ async fn acks_when_all_handlers_succeed() {
ack_called: Arc::clone(&ack_called), ack_called: Arc::clone(&ack_called),
}; };
WorkerService::new(Arc::new(consumer), vec![]) WorkerService::new(Arc::new(consumer), vec![]).run().await;
.run()
.await;
assert!(*ack_called.lock().unwrap()); assert!(*ack_called.lock().unwrap());
} }