20 KiB
Event-Driven Poster Sync Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Add an EventHandler trait to the event-publisher adapter and implement PosterSyncHandler so that a MovieDiscovered event automatically triggers the existing sync_poster use case with exponential-backoff retry.
Architecture: EventWorker gains a Vec<Box<dyn EventHandler>> and fans out each received event to all registered handlers sequentially. PosterSyncHandler lives in the presentation crate (composition root), holds AppContext, and calls sync_poster::execute on MovieDiscovered events — ignoring all others. Retry is up to 3 retries (4 total attempts) with delays 1s → 2s → 4s.
Tech Stack: Rust, tokio::sync::mpsc, async-trait, existing sync_poster use case
File Map
| File | Status | Responsibility |
|---|---|---|
crates/adapters/event-publisher/src/lib.rs |
Modify | Add EventHandler trait; extend EventWorker and create_event_channel |
crates/application/src/commands.rs |
Modify | Add #[derive(Clone)] to SyncPosterCommand |
crates/presentation/src/lib.rs |
Modify | Expose pub mod event_handlers |
crates/presentation/src/event_handlers.rs |
Create | PosterSyncHandler implementation |
crates/presentation/src/main.rs |
Modify | Wire PosterSyncHandler into create_event_channel |
Task 1: Add EventHandler trait and update EventWorker
Files:
-
Modify:
crates/adapters/event-publisher/src/lib.rs -
Step 1: Write the failing test
Add to the bottom of crates/adapters/event-publisher/src/lib.rs:
#[cfg(test)]
mod tests {
use super::*;
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use domain::{
errors::DomainError,
events::DomainEvent,
value_objects::{ExternalMetadataId, MovieId},
};
struct RecordingHandler {
calls: Arc<Mutex<Vec<String>>>,
}
#[async_trait]
impl EventHandler for RecordingHandler {
async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> {
let label = match event {
DomainEvent::MovieDiscovered { .. } => "movie_discovered",
DomainEvent::ReviewLogged { .. } => "review_logged",
};
self.calls.lock().unwrap().push(label.to_string());
Ok(())
}
}
#[tokio::test]
async fn single_handler_receives_event() {
let calls = Arc::new(Mutex::new(vec![]));
let handler = RecordingHandler { calls: Arc::clone(&calls) };
let config = EventPublisherConfig { channel_buffer: 8 };
let (publisher, worker) = create_event_channel(config, vec![Box::new(handler)]);
tokio::spawn(worker.run());
let event = DomainEvent::MovieDiscovered {
movie_id: MovieId::generate(),
external_metadata_id: ExternalMetadataId::new("tt1234567".into()).unwrap(),
};
publisher.publish(&event).await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
assert_eq!(*calls.lock().unwrap(), vec!["movie_discovered"]);
}
#[tokio::test]
async fn multiple_handlers_all_receive_event() {
let calls1 = Arc::new(Mutex::new(vec![]));
let calls2 = Arc::new(Mutex::new(vec![]));
let handler1 = RecordingHandler { calls: Arc::clone(&calls1) };
let handler2 = RecordingHandler { calls: Arc::clone(&calls2) };
let config = EventPublisherConfig { channel_buffer: 8 };
let (publisher, worker) = create_event_channel(
config,
vec![Box::new(handler1), Box::new(handler2)],
);
tokio::spawn(worker.run());
let event = DomainEvent::MovieDiscovered {
movie_id: MovieId::generate(),
external_metadata_id: ExternalMetadataId::new("tt9999999".into()).unwrap(),
};
publisher.publish(&event).await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
assert_eq!(calls1.lock().unwrap().len(), 1);
assert_eq!(calls2.lock().unwrap().len(), 1);
}
#[tokio::test]
async fn handler_error_does_not_stop_worker() {
struct FailingHandler;
#[async_trait]
impl EventHandler for FailingHandler {
async fn handle(&self, _: &DomainEvent) -> Result<(), DomainError> {
Err(DomainError::InfrastructureError("boom".into()))
}
}
let calls = Arc::new(Mutex::new(vec![]));
let good = RecordingHandler { calls: Arc::clone(&calls) };
let config = EventPublisherConfig { channel_buffer: 8 };
let (publisher, worker) = create_event_channel(
config,
vec![Box::new(FailingHandler), Box::new(good)],
);
tokio::spawn(worker.run());
let event = DomainEvent::MovieDiscovered {
movie_id: MovieId::generate(),
external_metadata_id: ExternalMetadataId::new("tt0000001".into()).unwrap(),
};
publisher.publish(&event).await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
// good handler still ran despite failing handler before it
assert_eq!(calls.lock().unwrap().len(), 1);
}
}
- Step 2: Run tests to verify they fail
cargo test -p event-publisher 2>&1 | tail -20
Expected: compile errors — EventHandler not defined, create_event_channel wrong arity.
- Step 3: Replace
lib.rswith updated implementation
Replace the full content of crates/adapters/event-publisher/src/lib.rs with:
use async_trait::async_trait;
use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher};
use tokio::sync::mpsc;
pub struct EventPublisherConfig {
pub channel_buffer: usize,
}
impl EventPublisherConfig {
pub fn from_env() -> Self {
let channel_buffer = std::env::var("EVENT_CHANNEL_BUFFER")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(128);
Self { channel_buffer }
}
}
#[async_trait]
pub trait EventHandler: Send + Sync {
async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError>;
}
pub struct ChannelEventPublisher {
sender: mpsc::Sender<DomainEvent>,
}
#[async_trait]
impl EventPublisher for ChannelEventPublisher {
async fn publish(&self, event: &DomainEvent) -> Result<(), DomainError> {
self.sender
.send(event.clone())
.await
.map_err(|e| DomainError::InfrastructureError(e.to_string()))
}
}
pub struct EventWorker {
receiver: mpsc::Receiver<DomainEvent>,
handlers: Vec<Box<dyn EventHandler>>,
}
impl EventWorker {
pub async fn run(mut self) {
while let Some(event) = self.receiver.recv().await {
match &event {
DomainEvent::ReviewLogged {
review_id,
movie_id,
user_id,
rating,
watched_at,
} => {
tracing::info!(
review_id = %review_id.value(),
movie_id = %movie_id.value(),
user_id = %user_id.value(),
rating = rating.value(),
watched_at = %watched_at,
"event: review_logged"
);
}
DomainEvent::MovieDiscovered {
movie_id,
external_metadata_id,
} => {
tracing::info!(
movie_id = %movie_id.value(),
external_id = external_metadata_id.value(),
"event: movie_discovered"
);
}
}
for handler in &self.handlers {
if let Err(e) = handler.handle(&event).await {
tracing::error!("event handler error: {e}");
}
}
}
tracing::info!("event worker shut down");
}
}
pub fn create_event_channel(
config: EventPublisherConfig,
handlers: Vec<Box<dyn EventHandler>>,
) -> (ChannelEventPublisher, EventWorker) {
let (tx, rx) = mpsc::channel(config.channel_buffer);
(
ChannelEventPublisher { sender: tx },
EventWorker {
receiver: rx,
handlers,
},
)
}
#[cfg(test)]
mod tests {
// paste the test module from Step 1 here
}
- Step 4: Run tests to verify they pass
cargo test -p event-publisher 2>&1 | tail -20
Expected: test result: ok. 3 passed
- Step 5: Commit
git add crates/adapters/event-publisher/src/lib.rs
git commit -m "feat(event-publisher): add EventHandler trait and fan-out in EventWorker"
Task 2: Derive Clone on SyncPosterCommand
Files:
- Modify:
crates/application/src/commands.rs
The PosterSyncHandler retry loop reconstructs the command on each attempt, which requires Clone on String (already impl'd) and Uuid (Copy) — but it's cleaner to #[derive(Clone)] directly.
- Step 1: Add
#[derive(Clone)]toSyncPosterCommand
In crates/application/src/commands.rs, find the SyncPosterCommand struct (line ~17) and add the derive:
#[derive(Clone)]
pub struct SyncPosterCommand {
pub movie_id: Uuid,
pub external_metadata_id: String,
}
- Step 2: Verify it compiles
cargo build -p application 2>&1 | tail -10
Expected: clean build.
- Step 3: Commit
git add crates/application/src/commands.rs
git commit -m "feat(application): derive Clone on SyncPosterCommand"
Task 3: Implement PosterSyncHandler
Files:
-
Create:
crates/presentation/src/event_handlers.rs -
Modify:
crates/presentation/src/lib.rs -
Step 1: Write the failing test first — create
event_handlers.rswith tests only
Create crates/presentation/src/event_handlers.rs:
use std::time::Duration;
use application::{commands::SyncPosterCommand, context::AppContext, use_cases::sync_poster};
use async_trait::async_trait;
use domain::{errors::DomainError, events::DomainEvent};
use event_publisher::EventHandler;
pub struct PosterSyncHandler {
ctx: AppContext,
max_retries: u32,
}
impl PosterSyncHandler {
pub fn new(ctx: AppContext, max_retries: u32) -> Self {
Self { ctx, max_retries }
}
}
#[async_trait]
impl EventHandler for PosterSyncHandler {
async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> {
todo!()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use async_trait::async_trait;
use application::config::AppConfig;
use domain::{
errors::DomainError,
events::DomainEvent,
models::{DiaryEntry, DiaryFilter, Movie, Review, ReviewHistory, User, collections::Paginated},
ports::{
AuthService, EventPublisher, GeneratedToken, MetadataClient, MetadataSearchCriteria,
MovieRepository, PasswordHasher, PosterFetcherClient, PosterStorage, UserRepository,
},
value_objects::{
Email, ExternalMetadataId, MovieId, MovieTitle, PasswordHash, PosterPath, PosterUrl,
Rating, ReleaseYear, ReviewId, UserId,
},
};
// Panic stubs — never called in the "ignored event" test path
struct PanicRepo;
struct PanicMetadata;
struct PanicFetcher;
struct PanicStorage;
struct PanicAuth;
struct PanicHasher;
struct PanicUserRepo;
struct NoopPublisher;
#[async_trait]
impl MovieRepository for PanicRepo {
async fn get_movie_by_external_id(&self, _: &ExternalMetadataId) -> Result<Option<Movie>, DomainError> { panic!("unexpected") }
async fn get_movie_by_id(&self, _: &MovieId) -> Result<Option<Movie>, DomainError> { panic!("unexpected") }
async fn get_movies_by_title_and_year(&self, _: &MovieTitle, _: &ReleaseYear) -> Result<Vec<Movie>, DomainError> { panic!("unexpected") }
async fn upsert_movie(&self, _: &Movie) -> Result<(), DomainError> { panic!("unexpected") }
async fn save_review(&self, _: &Review) -> Result<DomainEvent, DomainError> { panic!("unexpected") }
async fn query_diary(&self, _: &DiaryFilter) -> Result<Paginated<DiaryEntry>, DomainError> { panic!("unexpected") }
async fn get_review_history(&self, _: &MovieId) -> Result<ReviewHistory, DomainError> { panic!("unexpected") }
}
#[async_trait]
impl MetadataClient for PanicMetadata {
async fn fetch_movie_metadata(&self, _: &MetadataSearchCriteria) -> Result<Movie, DomainError> { panic!("unexpected") }
async fn get_poster_url(&self, _: &ExternalMetadataId) -> Result<Option<PosterUrl>, DomainError> { panic!("unexpected") }
}
#[async_trait]
impl PosterFetcherClient for PanicFetcher {
async fn fetch_poster_bytes(&self, _: &PosterUrl) -> Result<Vec<u8>, DomainError> { panic!("unexpected") }
}
#[async_trait]
impl PosterStorage for PanicStorage {
async fn store_poster(&self, _: &MovieId, _: &[u8]) -> Result<PosterPath, DomainError> { panic!("unexpected") }
async fn get_poster(&self, _: &PosterPath) -> Result<Vec<u8>, DomainError> { panic!("unexpected") }
}
#[async_trait]
impl AuthService for PanicAuth {
async fn generate_token(&self, _: &UserId) -> Result<GeneratedToken, DomainError> { panic!("unexpected") }
async fn validate_token(&self, _: &str) -> Result<UserId, DomainError> { panic!("unexpected") }
}
#[async_trait]
impl PasswordHasher for PanicHasher {
async fn hash(&self, _: &str) -> Result<PasswordHash, DomainError> { panic!("unexpected") }
async fn verify(&self, _: &str, _: &PasswordHash) -> Result<bool, DomainError> { panic!("unexpected") }
}
#[async_trait]
impl UserRepository for PanicUserRepo {
async fn find_by_email(&self, _: &Email) -> Result<Option<User>, DomainError> { panic!("unexpected") }
async fn save(&self, _: &User) -> Result<(), DomainError> { panic!("unexpected") }
}
#[async_trait]
impl EventPublisher for NoopPublisher {
async fn publish(&self, _: &DomainEvent) -> Result<(), DomainError> { Ok(()) }
}
fn panic_ctx() -> AppContext {
AppContext {
repository: Arc::new(PanicRepo),
metadata_client: Arc::new(PanicMetadata),
poster_fetcher: Arc::new(PanicFetcher),
poster_storage: Arc::new(PanicStorage),
event_publisher: Arc::new(NoopPublisher),
auth_service: Arc::new(PanicAuth),
password_hasher: Arc::new(PanicHasher),
user_repository: Arc::new(PanicUserRepo),
config: AppConfig { allow_registration: false },
}
}
#[tokio::test]
async fn review_logged_is_ignored() {
let handler = PosterSyncHandler::new(panic_ctx(), 3);
let event = DomainEvent::ReviewLogged {
review_id: ReviewId::generate(),
movie_id: MovieId::generate(),
user_id: UserId::generate(),
rating: Rating::new(4).unwrap(),
watched_at: chrono::NaiveDateTime::from_timestamp_opt(0, 0).unwrap(),
};
// returns Ok without touching any panic stubs
assert!(handler.handle(&event).await.is_ok());
}
}
- Step 2: Expose the module in
lib.rs
Add to crates/presentation/src/lib.rs:
pub mod event_handlers;
- Step 3: Run the test to verify it fails
cargo test -p presentation event_handlers 2>&1 | tail -20
Expected: compile error or test failure because handle is todo!().
- Step 4: Implement
handleinPosterSyncHandler
Replace the todo!() body in crates/presentation/src/event_handlers.rs:
#[async_trait]
impl EventHandler for PosterSyncHandler {
async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> {
let (movie_id, external_metadata_id) = match event {
DomainEvent::MovieDiscovered {
movie_id,
external_metadata_id,
} => (movie_id.value(), external_metadata_id.value().to_owned()),
_ => return Ok(()),
};
let mut last_err: Option<DomainError> = None;
for attempt in 0..=self.max_retries {
let cmd = SyncPosterCommand {
movie_id,
external_metadata_id: external_metadata_id.clone(),
};
match sync_poster::execute(&self.ctx, cmd).await {
Ok(()) => return Ok(()),
Err(e) => {
if attempt < self.max_retries {
let delay = Duration::from_secs(2u64.pow(attempt));
tracing::warn!(
attempt = attempt + 1,
max_attempts = self.max_retries + 1,
delay_secs = delay.as_secs(),
"poster sync failed, retrying: {e}"
);
tokio::time::sleep(delay).await;
}
last_err = Some(e);
}
}
}
let err = last_err.unwrap();
tracing::error!(
attempts = self.max_retries + 1,
"poster sync failed after all attempts: {err}"
);
Err(err)
}
}
- Step 5: Run the test to verify it passes
cargo test -p presentation event_handlers 2>&1 | tail -20
Expected: test result: ok. 1 passed
- Step 6: Commit
git add crates/presentation/src/event_handlers.rs crates/presentation/src/lib.rs
git commit -m "feat(presentation): implement PosterSyncHandler with retry"
Task 4: Wire PosterSyncHandler in main.rs
Files:
-
Modify:
crates/presentation/src/main.rs -
Step 1: Add the import
In crates/presentation/src/main.rs, update the import block. The existing line is:
use event_publisher::{EventPublisherConfig, create_event_channel};
Add below it:
use presentation::event_handlers::PosterSyncHandler;
- Step 2: Wire the handler
In wire_dependencies, find the two existing lines:
let (event_publisher, event_worker) = create_event_channel(EventPublisherConfig::from_env());
tokio::spawn(event_worker.run());
Replace with:
let poster_handler = PosterSyncHandler::new(app_ctx.clone(), 3); // 3 retries = 4 total attempts
let (event_publisher, event_worker) = create_event_channel(
EventPublisherConfig::from_env(),
vec![Box::new(poster_handler)],
);
tokio::spawn(event_worker.run());
Note: app_ctx.clone() is cheap — all fields are Arc<dyn Trait>.
- Step 3: Build the full workspace
cargo build 2>&1 | tail -20
Expected: clean build with no errors.
- Step 4: Run all tests
cargo test 2>&1 | tail -20
Expected: all tests pass.
- Step 5: Commit
git add crates/presentation/src/main.rs
git commit -m "feat(presentation): wire PosterSyncHandler into event worker"
Verification
After all tasks complete, smoke-test end-to-end:
# Start the server
RUST_LOG=info cargo run -p presentation
# In another terminal: log a review for a movie not yet in the DB
# (requires valid JWT — use the existing login endpoint first)
# Watch the server logs for:
# event: movie_discovered movie_id=<uuid> external_id=tt...
# poster sync attempt logs (or success with no retries needed)
To confirm the poster was stored, check the configured object store bucket/directory for a file named with the movie's UUID.