diff --git a/docs/superpowers/plans/2026-05-04-event-driven-poster-sync.md b/docs/superpowers/plans/2026-05-04-event-driven-poster-sync.md new file mode 100644 index 0000000..66da1d1 --- /dev/null +++ b/docs/superpowers/plans/2026-05-04-event-driven-poster-sync.md @@ -0,0 +1,620 @@ +# Event-Driven Poster Sync Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Add an `EventHandler` trait to the event-publisher adapter and implement `PosterSyncHandler` so that a `MovieDiscovered` event automatically triggers the existing `sync_poster` use case with exponential-backoff retry. + +**Architecture:** `EventWorker` gains a `Vec>` and fans out each received event to all registered handlers sequentially. `PosterSyncHandler` lives in the `presentation` crate (composition root), holds `AppContext`, and calls `sync_poster::execute` on `MovieDiscovered` events — ignoring all others. Retry is up to 3 retries (4 total attempts) with delays 1s → 2s → 4s. + +**Tech Stack:** Rust, tokio::sync::mpsc, async-trait, existing `sync_poster` use case + +--- + +## File Map + +| File | Status | Responsibility | +|---|---|---| +| `crates/adapters/event-publisher/src/lib.rs` | Modify | Add `EventHandler` trait; extend `EventWorker` and `create_event_channel` | +| `crates/application/src/commands.rs` | Modify | Add `#[derive(Clone)]` to `SyncPosterCommand` | +| `crates/presentation/src/lib.rs` | Modify | Expose `pub mod event_handlers` | +| `crates/presentation/src/event_handlers.rs` | Create | `PosterSyncHandler` implementation | +| `crates/presentation/src/main.rs` | Modify | Wire `PosterSyncHandler` into `create_event_channel` | + +--- + +## Task 1: Add `EventHandler` trait and update `EventWorker` + +**Files:** +- Modify: `crates/adapters/event-publisher/src/lib.rs` + +- [ ] **Step 1: Write the failing test** + +Add to the bottom of `crates/adapters/event-publisher/src/lib.rs`: + +```rust +#[cfg(test)] +mod tests { + use super::*; + use std::sync::{Arc, Mutex}; + use async_trait::async_trait; + use domain::{ + errors::DomainError, + events::DomainEvent, + value_objects::{ExternalMetadataId, MovieId}, + }; + + struct RecordingHandler { + calls: Arc>>, + } + + #[async_trait] + impl EventHandler for RecordingHandler { + async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> { + let label = match event { + DomainEvent::MovieDiscovered { .. } => "movie_discovered", + DomainEvent::ReviewLogged { .. } => "review_logged", + }; + self.calls.lock().unwrap().push(label.to_string()); + Ok(()) + } + } + + #[tokio::test] + async fn single_handler_receives_event() { + let calls = Arc::new(Mutex::new(vec![])); + let handler = RecordingHandler { calls: Arc::clone(&calls) }; + let config = EventPublisherConfig { channel_buffer: 8 }; + let (publisher, worker) = create_event_channel(config, vec![Box::new(handler)]); + + tokio::spawn(worker.run()); + + let event = DomainEvent::MovieDiscovered { + movie_id: MovieId::generate(), + external_metadata_id: ExternalMetadataId::new("tt1234567".into()).unwrap(), + }; + publisher.publish(&event).await.unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + assert_eq!(*calls.lock().unwrap(), vec!["movie_discovered"]); + } + + #[tokio::test] + async fn multiple_handlers_all_receive_event() { + let calls1 = Arc::new(Mutex::new(vec![])); + let calls2 = Arc::new(Mutex::new(vec![])); + let handler1 = RecordingHandler { calls: Arc::clone(&calls1) }; + let handler2 = RecordingHandler { calls: Arc::clone(&calls2) }; + let config = EventPublisherConfig { channel_buffer: 8 }; + let (publisher, worker) = create_event_channel( + config, + vec![Box::new(handler1), Box::new(handler2)], + ); + + tokio::spawn(worker.run()); + + let event = DomainEvent::MovieDiscovered { + movie_id: MovieId::generate(), + external_metadata_id: ExternalMetadataId::new("tt9999999".into()).unwrap(), + }; + publisher.publish(&event).await.unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + assert_eq!(calls1.lock().unwrap().len(), 1); + assert_eq!(calls2.lock().unwrap().len(), 1); + } + + #[tokio::test] + async fn handler_error_does_not_stop_worker() { + struct FailingHandler; + #[async_trait] + impl EventHandler for FailingHandler { + async fn handle(&self, _: &DomainEvent) -> Result<(), DomainError> { + Err(DomainError::InfrastructureError("boom".into())) + } + } + + let calls = Arc::new(Mutex::new(vec![])); + let good = RecordingHandler { calls: Arc::clone(&calls) }; + let config = EventPublisherConfig { channel_buffer: 8 }; + let (publisher, worker) = create_event_channel( + config, + vec![Box::new(FailingHandler), Box::new(good)], + ); + + tokio::spawn(worker.run()); + + let event = DomainEvent::MovieDiscovered { + movie_id: MovieId::generate(), + external_metadata_id: ExternalMetadataId::new("tt0000001".into()).unwrap(), + }; + publisher.publish(&event).await.unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // good handler still ran despite failing handler before it + assert_eq!(calls.lock().unwrap().len(), 1); + } +} +``` + +- [ ] **Step 2: Run tests to verify they fail** + +```bash +cargo test -p event-publisher 2>&1 | tail -20 +``` + +Expected: compile errors — `EventHandler` not defined, `create_event_channel` wrong arity. + +- [ ] **Step 3: Replace `lib.rs` with updated implementation** + +Replace the full content of `crates/adapters/event-publisher/src/lib.rs` with: + +```rust +use async_trait::async_trait; +use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher}; +use tokio::sync::mpsc; + +pub struct EventPublisherConfig { + pub channel_buffer: usize, +} + +impl EventPublisherConfig { + pub fn from_env() -> Self { + let channel_buffer = std::env::var("EVENT_CHANNEL_BUFFER") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(128); + Self { channel_buffer } + } +} + +#[async_trait] +pub trait EventHandler: Send + Sync { + async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError>; +} + +pub struct ChannelEventPublisher { + sender: mpsc::Sender, +} + +#[async_trait] +impl EventPublisher for ChannelEventPublisher { + async fn publish(&self, event: &DomainEvent) -> Result<(), DomainError> { + self.sender + .send(event.clone()) + .await + .map_err(|e| DomainError::InfrastructureError(e.to_string())) + } +} + +pub struct EventWorker { + receiver: mpsc::Receiver, + handlers: Vec>, +} + +impl EventWorker { + pub async fn run(mut self) { + while let Some(event) = self.receiver.recv().await { + match &event { + DomainEvent::ReviewLogged { + review_id, + movie_id, + user_id, + rating, + watched_at, + } => { + tracing::info!( + review_id = %review_id.value(), + movie_id = %movie_id.value(), + user_id = %user_id.value(), + rating = rating.value(), + watched_at = %watched_at, + "event: review_logged" + ); + } + DomainEvent::MovieDiscovered { + movie_id, + external_metadata_id, + } => { + tracing::info!( + movie_id = %movie_id.value(), + external_id = external_metadata_id.value(), + "event: movie_discovered" + ); + } + } + for handler in &self.handlers { + if let Err(e) = handler.handle(&event).await { + tracing::error!("event handler error: {e}"); + } + } + } + tracing::info!("event worker shut down"); + } +} + +pub fn create_event_channel( + config: EventPublisherConfig, + handlers: Vec>, +) -> (ChannelEventPublisher, EventWorker) { + let (tx, rx) = mpsc::channel(config.channel_buffer); + ( + ChannelEventPublisher { sender: tx }, + EventWorker { + receiver: rx, + handlers, + }, + ) +} + +#[cfg(test)] +mod tests { + // paste the test module from Step 1 here +} +``` + +- [ ] **Step 4: Run tests to verify they pass** + +```bash +cargo test -p event-publisher 2>&1 | tail -20 +``` + +Expected: `test result: ok. 3 passed` + +- [ ] **Step 5: Commit** + +```bash +git add crates/adapters/event-publisher/src/lib.rs +git commit -m "feat(event-publisher): add EventHandler trait and fan-out in EventWorker" +``` + +--- + +## Task 2: Derive `Clone` on `SyncPosterCommand` + +**Files:** +- Modify: `crates/application/src/commands.rs` + +The `PosterSyncHandler` retry loop reconstructs the command on each attempt, which requires `Clone` on `String` (already impl'd) and `Uuid` (Copy) — but it's cleaner to `#[derive(Clone)]` directly. + +- [ ] **Step 1: Add `#[derive(Clone)]` to `SyncPosterCommand`** + +In `crates/application/src/commands.rs`, find the `SyncPosterCommand` struct (line ~17) and add the derive: + +```rust +#[derive(Clone)] +pub struct SyncPosterCommand { + pub movie_id: Uuid, + pub external_metadata_id: String, +} +``` + +- [ ] **Step 2: Verify it compiles** + +```bash +cargo build -p application 2>&1 | tail -10 +``` + +Expected: clean build. + +- [ ] **Step 3: Commit** + +```bash +git add crates/application/src/commands.rs +git commit -m "feat(application): derive Clone on SyncPosterCommand" +``` + +--- + +## Task 3: Implement `PosterSyncHandler` + +**Files:** +- Create: `crates/presentation/src/event_handlers.rs` +- Modify: `crates/presentation/src/lib.rs` + +- [ ] **Step 1: Write the failing test first — create `event_handlers.rs` with tests only** + +Create `crates/presentation/src/event_handlers.rs`: + +```rust +use std::time::Duration; + +use application::{commands::SyncPosterCommand, context::AppContext, use_cases::sync_poster}; +use async_trait::async_trait; +use domain::{errors::DomainError, events::DomainEvent}; +use event_publisher::EventHandler; + +pub struct PosterSyncHandler { + ctx: AppContext, + max_retries: u32, +} + +impl PosterSyncHandler { + pub fn new(ctx: AppContext, max_retries: u32) -> Self { + Self { ctx, max_retries } + } +} + +#[async_trait] +impl EventHandler for PosterSyncHandler { + async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> { + todo!() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Arc; + use async_trait::async_trait; + use application::config::AppConfig; + use domain::{ + errors::DomainError, + events::DomainEvent, + models::{DiaryEntry, DiaryFilter, Movie, Review, ReviewHistory, User, collections::Paginated}, + ports::{ + AuthService, EventPublisher, GeneratedToken, MetadataClient, MetadataSearchCriteria, + MovieRepository, PasswordHasher, PosterFetcherClient, PosterStorage, UserRepository, + }, + value_objects::{ + Email, ExternalMetadataId, MovieId, MovieTitle, PasswordHash, PosterPath, PosterUrl, + Rating, ReleaseYear, ReviewId, UserId, + }, + }; + + // Panic stubs — never called in the "ignored event" test path + struct PanicRepo; + struct PanicMetadata; + struct PanicFetcher; + struct PanicStorage; + struct PanicAuth; + struct PanicHasher; + struct PanicUserRepo; + struct NoopPublisher; + + #[async_trait] + impl MovieRepository for PanicRepo { + async fn get_movie_by_external_id(&self, _: &ExternalMetadataId) -> Result, DomainError> { panic!("unexpected") } + async fn get_movie_by_id(&self, _: &MovieId) -> Result, DomainError> { panic!("unexpected") } + async fn get_movies_by_title_and_year(&self, _: &MovieTitle, _: &ReleaseYear) -> Result, DomainError> { panic!("unexpected") } + async fn upsert_movie(&self, _: &Movie) -> Result<(), DomainError> { panic!("unexpected") } + async fn save_review(&self, _: &Review) -> Result { panic!("unexpected") } + async fn query_diary(&self, _: &DiaryFilter) -> Result, DomainError> { panic!("unexpected") } + async fn get_review_history(&self, _: &MovieId) -> Result { panic!("unexpected") } + } + + #[async_trait] + impl MetadataClient for PanicMetadata { + async fn fetch_movie_metadata(&self, _: &MetadataSearchCriteria) -> Result { panic!("unexpected") } + async fn get_poster_url(&self, _: &ExternalMetadataId) -> Result, DomainError> { panic!("unexpected") } + } + + #[async_trait] + impl PosterFetcherClient for PanicFetcher { + async fn fetch_poster_bytes(&self, _: &PosterUrl) -> Result, DomainError> { panic!("unexpected") } + } + + #[async_trait] + impl PosterStorage for PanicStorage { + async fn store_poster(&self, _: &MovieId, _: &[u8]) -> Result { panic!("unexpected") } + async fn get_poster(&self, _: &PosterPath) -> Result, DomainError> { panic!("unexpected") } + } + + #[async_trait] + impl AuthService for PanicAuth { + async fn generate_token(&self, _: &UserId) -> Result { panic!("unexpected") } + async fn validate_token(&self, _: &str) -> Result { panic!("unexpected") } + } + + #[async_trait] + impl PasswordHasher for PanicHasher { + async fn hash(&self, _: &str) -> Result { panic!("unexpected") } + async fn verify(&self, _: &str, _: &PasswordHash) -> Result { panic!("unexpected") } + } + + #[async_trait] + impl UserRepository for PanicUserRepo { + async fn find_by_email(&self, _: &Email) -> Result, DomainError> { panic!("unexpected") } + async fn save(&self, _: &User) -> Result<(), DomainError> { panic!("unexpected") } + } + + #[async_trait] + impl EventPublisher for NoopPublisher { + async fn publish(&self, _: &DomainEvent) -> Result<(), DomainError> { Ok(()) } + } + + fn panic_ctx() -> AppContext { + AppContext { + repository: Arc::new(PanicRepo), + metadata_client: Arc::new(PanicMetadata), + poster_fetcher: Arc::new(PanicFetcher), + poster_storage: Arc::new(PanicStorage), + event_publisher: Arc::new(NoopPublisher), + auth_service: Arc::new(PanicAuth), + password_hasher: Arc::new(PanicHasher), + user_repository: Arc::new(PanicUserRepo), + config: AppConfig { allow_registration: false }, + } + } + + #[tokio::test] + async fn review_logged_is_ignored() { + let handler = PosterSyncHandler::new(panic_ctx(), 3); + let event = DomainEvent::ReviewLogged { + review_id: ReviewId::generate(), + movie_id: MovieId::generate(), + user_id: UserId::generate(), + rating: Rating::new(4).unwrap(), + watched_at: chrono::NaiveDateTime::from_timestamp_opt(0, 0).unwrap(), + }; + // returns Ok without touching any panic stubs + assert!(handler.handle(&event).await.is_ok()); + } +} +``` + +- [ ] **Step 2: Expose the module in `lib.rs`** + +Add to `crates/presentation/src/lib.rs`: + +```rust +pub mod event_handlers; +``` + +- [ ] **Step 3: Run the test to verify it fails** + +```bash +cargo test -p presentation event_handlers 2>&1 | tail -20 +``` + +Expected: compile error or test failure because `handle` is `todo!()`. + +- [ ] **Step 4: Implement `handle` in `PosterSyncHandler`** + +Replace the `todo!()` body in `crates/presentation/src/event_handlers.rs`: + +```rust +#[async_trait] +impl EventHandler for PosterSyncHandler { + async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> { + let (movie_id, external_metadata_id) = match event { + DomainEvent::MovieDiscovered { + movie_id, + external_metadata_id, + } => (movie_id.value(), external_metadata_id.value().to_owned()), + _ => return Ok(()), + }; + + let mut last_err: Option = None; + for attempt in 0..=self.max_retries { + let cmd = SyncPosterCommand { + movie_id, + external_metadata_id: external_metadata_id.clone(), + }; + match sync_poster::execute(&self.ctx, cmd).await { + Ok(()) => return Ok(()), + Err(e) => { + if attempt < self.max_retries { + let delay = Duration::from_secs(2u64.pow(attempt)); + tracing::warn!( + attempt = attempt + 1, + max_attempts = self.max_retries + 1, + delay_secs = delay.as_secs(), + "poster sync failed, retrying: {e}" + ); + tokio::time::sleep(delay).await; + } + last_err = Some(e); + } + } + } + + let err = last_err.unwrap(); + tracing::error!( + attempts = self.max_retries + 1, + "poster sync failed after all attempts: {err}" + ); + Err(err) + } +} +``` + +- [ ] **Step 5: Run the test to verify it passes** + +```bash +cargo test -p presentation event_handlers 2>&1 | tail -20 +``` + +Expected: `test result: ok. 1 passed` + +- [ ] **Step 6: Commit** + +```bash +git add crates/presentation/src/event_handlers.rs crates/presentation/src/lib.rs +git commit -m "feat(presentation): implement PosterSyncHandler with retry" +``` + +--- + +## Task 4: Wire `PosterSyncHandler` in `main.rs` + +**Files:** +- Modify: `crates/presentation/src/main.rs` + +- [ ] **Step 1: Add the import** + +In `crates/presentation/src/main.rs`, update the import block. The existing line is: + +```rust +use event_publisher::{EventPublisherConfig, create_event_channel}; +``` + +Add below it: + +```rust +use presentation::event_handlers::PosterSyncHandler; +``` + +- [ ] **Step 2: Wire the handler** + +In `wire_dependencies`, find the two existing lines: + +```rust +let (event_publisher, event_worker) = create_event_channel(EventPublisherConfig::from_env()); +tokio::spawn(event_worker.run()); +``` + +Replace with: + +```rust +let poster_handler = PosterSyncHandler::new(app_ctx.clone(), 3); // 3 retries = 4 total attempts +let (event_publisher, event_worker) = create_event_channel( + EventPublisherConfig::from_env(), + vec![Box::new(poster_handler)], +); +tokio::spawn(event_worker.run()); +``` + +Note: `app_ctx.clone()` is cheap — all fields are `Arc`. + +- [ ] **Step 3: Build the full workspace** + +```bash +cargo build 2>&1 | tail -20 +``` + +Expected: clean build with no errors. + +- [ ] **Step 4: Run all tests** + +```bash +cargo test 2>&1 | tail -20 +``` + +Expected: all tests pass. + +- [ ] **Step 5: Commit** + +```bash +git add crates/presentation/src/main.rs +git commit -m "feat(presentation): wire PosterSyncHandler into event worker" +``` + +--- + +## Verification + +After all tasks complete, smoke-test end-to-end: + +```bash +# Start the server +RUST_LOG=info cargo run -p presentation + +# In another terminal: log a review for a movie not yet in the DB +# (requires valid JWT — use the existing login endpoint first) + +# Watch the server logs for: +# event: movie_discovered movie_id= external_id=tt... +# poster sync attempt logs (or success with no retries needed) +``` + +To confirm the poster was stored, check the configured object store bucket/directory for a file named with the movie's UUID.