From 2fd8734d2386bd3d7278926a604a2721d9738e39 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Tue, 12 May 2026 19:05:22 +0200 Subject: [PATCH] fix: close search index consistency gaps (orphan cleanup, discovery indexing, poster sync) --- crates/adapters/postgres/src/persons.rs | 20 ++++++++ crates/adapters/sqlite/src/persons.rs | 20 ++++++++ crates/application/src/lib.rs | 2 + .../src/movie_discovery_indexer.rs | 51 +++++++++++++++++++ crates/application/src/search_cleanup.rs | 21 ++++++-- .../application/src/use_cases/sync_poster.rs | 15 ++++++ crates/domain/src/ports.rs | 3 ++ crates/presentation/src/tests/api_handlers.rs | 3 ++ crates/presentation/src/tests/extractors.rs | 1 + crates/presentation/tests/api_test.rs | 1 + crates/worker/src/main.rs | 16 +++--- 11 files changed, 141 insertions(+), 12 deletions(-) create mode 100644 crates/application/src/movie_discovery_indexer.rs diff --git a/crates/adapters/postgres/src/persons.rs b/crates/adapters/postgres/src/persons.rs index 2224a16..2fb4254 100644 --- a/crates/adapters/postgres/src/persons.rs +++ b/crates/adapters/postgres/src/persons.rs @@ -195,4 +195,24 @@ impl PersonQuery for PostgresPersonAdapter { Ok(PersonCredits { person, cast, crew }) } + + async fn list_orphaned_persons(&self) -> Result, DomainError> { + let rows: Vec<(String,)> = sqlx::query_as( + "SELECT id FROM persons + WHERE NOT EXISTS ( + SELECT 1 FROM movie_cast WHERE movie_cast.tmdb_person_id = persons.tmdb_person_id + ) + AND NOT EXISTS ( + SELECT 1 FROM movie_crew WHERE movie_crew.tmdb_person_id = persons.tmdb_person_id + )", + ) + .fetch_all(&self.pool) + .await + .map_err(map_err)?; + + Ok(rows + .into_iter() + .filter_map(|(id,)| uuid::Uuid::parse_str(&id).ok().map(PersonId::from_uuid)) + .collect()) + } } diff --git a/crates/adapters/sqlite/src/persons.rs b/crates/adapters/sqlite/src/persons.rs index eef62c8..8a8c0b7 100644 --- a/crates/adapters/sqlite/src/persons.rs +++ b/crates/adapters/sqlite/src/persons.rs @@ -145,6 +145,26 @@ impl PersonQuery for SqlitePersonAdapter { Ok(PersonCredits { person, cast, crew }) } + + async fn list_orphaned_persons(&self) -> Result, DomainError> { + let rows: Vec<(String,)> = sqlx::query_as( + "SELECT id FROM persons + WHERE NOT EXISTS ( + SELECT 1 FROM movie_cast WHERE movie_cast.tmdb_person_id = persons.tmdb_person_id + ) + AND NOT EXISTS ( + SELECT 1 FROM movie_crew WHERE movie_crew.tmdb_person_id = persons.tmdb_person_id + )", + ) + .fetch_all(&self.pool) + .await + .map_err(map_err)?; + + Ok(rows + .into_iter() + .filter_map(|(id,)| uuid::Uuid::parse_str(&id).ok().map(PersonId::from_uuid)) + .collect()) + } } // ── Row types ──────────────────────────────────────────────────────────────── diff --git a/crates/application/src/lib.rs b/crates/application/src/lib.rs index 33523c0..aa921c0 100644 --- a/crates/application/src/lib.rs +++ b/crates/application/src/lib.rs @@ -7,6 +7,8 @@ pub mod movie_resolver; pub mod ports; pub mod queries; pub mod use_cases; +pub mod movie_discovery_indexer; pub mod search_cleanup; +pub use movie_discovery_indexer::MovieDiscoveryIndexer; pub use search_cleanup::SearchCleanupHandler; diff --git a/crates/application/src/movie_discovery_indexer.rs b/crates/application/src/movie_discovery_indexer.rs new file mode 100644 index 0000000..893dff4 --- /dev/null +++ b/crates/application/src/movie_discovery_indexer.rs @@ -0,0 +1,51 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use domain::{ + errors::DomainError, + events::DomainEvent, + models::IndexableDocument, + ports::{EventHandler, MovieRepository, SearchCommand}, +}; + +/// Reacts to `MovieDiscovered` and inserts a bare search index entry immediately, +/// so movies are findable before TMDb enrichment runs. +/// Enrichment will later overwrite this with the full document (cast, genres, etc.). +pub struct MovieDiscoveryIndexer { + movie_repository: Arc, + search_command: Arc, +} + +impl MovieDiscoveryIndexer { + pub fn new(movie_repository: Arc, search_command: Arc) -> Self { + Self { movie_repository, search_command } + } +} + +#[async_trait] +impl EventHandler for MovieDiscoveryIndexer { + async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> { + let movie_id = match event { + DomainEvent::MovieDiscovered { movie_id, .. } => movie_id.clone(), + _ => return Ok(()), + }; + + let Some(movie) = self.movie_repository.get_movie_by_id(&movie_id).await? else { + tracing::warn!(movie_id = %movie_id.value(), "MovieDiscoveryIndexer: movie not found"); + return Ok(()); + }; + + if let Err(e) = self.search_command + .index(IndexableDocument::Movie { + id: movie_id.clone(), + movie: Box::new(movie), + profile: None, + }) + .await + { + tracing::warn!(movie_id = %movie_id.value(), "failed to index movie on discovery: {e}"); + } + + Ok(()) + } +} diff --git a/crates/application/src/search_cleanup.rs b/crates/application/src/search_cleanup.rs index a28c561..c1d32b9 100644 --- a/crates/application/src/search_cleanup.rs +++ b/crates/application/src/search_cleanup.rs @@ -5,16 +5,17 @@ use domain::{ errors::DomainError, events::DomainEvent, models::EntityType, - ports::{EventHandler, SearchCommand}, + ports::{EventHandler, PersonQuery, SearchCommand}, }; pub struct SearchCleanupHandler { search_command: Arc, + person_query: Arc, } impl SearchCleanupHandler { - pub fn new(search_command: Arc) -> Self { - Self { search_command } + pub fn new(search_command: Arc, person_query: Arc) -> Self { + Self { search_command, person_query } } } @@ -29,6 +30,20 @@ impl EventHandler for SearchCleanupHandler { if let Err(e) = self.search_command.remove(EntityType::Movie, &movie_id).await { tracing::warn!("search cleanup failed for movie {movie_id}: {e}"); } + + // Remove persons who have no remaining movie credits (orphaned after cascade delete). + match self.person_query.list_orphaned_persons().await { + Ok(orphans) => { + for person_id in orphans { + let id = person_id.value().to_string(); + if let Err(e) = self.search_command.remove(EntityType::Person, &id).await { + tracing::warn!("search cleanup failed for orphaned person {id}: {e}"); + } + } + } + Err(e) => tracing::warn!("failed to list orphaned persons after movie {movie_id} deletion: {e}"), + } + Ok(()) } } diff --git a/crates/application/src/use_cases/sync_poster.rs b/crates/application/src/use_cases/sync_poster.rs index 1fc11e0..ac62798 100644 --- a/crates/application/src/use_cases/sync_poster.rs +++ b/crates/application/src/use_cases/sync_poster.rs @@ -1,6 +1,7 @@ use domain::{ errors::DomainError, events::DomainEvent, + models::IndexableDocument, value_objects::{ExternalMetadataId, MovieId, PosterPath}, }; @@ -53,5 +54,19 @@ pub async fn execute(ctx: &AppContext, cmd: SyncPosterCommand) -> Result<(), Dom movie.update_poster(poster_path); ctx.movie_repository.upsert_movie(&movie).await?; + // Refresh search index so the new poster_path is reflected immediately. + // Fetch existing profile if available for a complete index document. + let profile = ctx.movie_profile_repository.get_by_movie_id(&movie_id).await.ok().flatten(); + if let Err(e) = ctx.search_command + .index(IndexableDocument::Movie { + id: movie_id.clone(), + movie: Box::new(movie), + profile: profile.map(Box::new), + }) + .await + { + tracing::warn!(movie_id = %movie_id.value(), "failed to refresh search index after poster sync: {e}"); + } + Ok(()) } diff --git a/crates/domain/src/ports.rs b/crates/domain/src/ports.rs index 46aeb5b..ea7b9fe 100644 --- a/crates/domain/src/ports.rs +++ b/crates/domain/src/ports.rs @@ -291,6 +291,9 @@ pub trait PersonQuery: Send + Sync { async fn get_by_external_id(&self, id: &ExternalPersonId) -> Result, DomainError>; /// Returns the person's full cast and crew credit history across all indexed movies. async fn get_credits(&self, id: &PersonId) -> Result; + /// Returns persons who have no remaining entries in movie_cast or movie_crew. + /// Called after movie deletion to find index entries that can be pruned. + async fn list_orphaned_persons(&self) -> Result, DomainError>; } /// Read port — executes search queries. No mutations. diff --git a/crates/presentation/src/tests/api_handlers.rs b/crates/presentation/src/tests/api_handlers.rs index e4aa094..b6fd08d 100644 --- a/crates/presentation/src/tests/api_handlers.rs +++ b/crates/presentation/src/tests/api_handlers.rs @@ -45,6 +45,9 @@ impl domain::ports::PersonQuery for PersonQueryStub { async fn get_credits(&self, _: &domain::models::PersonId) -> Result { Err(DomainError::NotFound("Person not found".into())) } + async fn list_orphaned_persons(&self) -> Result, DomainError> { + Ok(vec![]) + } } // --- Search endpoint tests --- diff --git a/crates/presentation/src/tests/extractors.rs b/crates/presentation/src/tests/extractors.rs index b2c81fe..450f30e 100644 --- a/crates/presentation/src/tests/extractors.rs +++ b/crates/presentation/src/tests/extractors.rs @@ -362,6 +362,7 @@ impl PersonQuery for Panic { async fn get_by_id(&self, _: &PersonId) -> Result, DomainError> { panic!() } async fn get_by_external_id(&self, _: &domain::models::ExternalPersonId) -> Result, DomainError> { panic!() } async fn get_credits(&self, _: &PersonId) -> Result { panic!() } + async fn list_orphaned_persons(&self) -> Result, DomainError> { panic!() } } #[async_trait::async_trait] impl SearchPort for Panic { diff --git a/crates/presentation/tests/api_test.rs b/crates/presentation/tests/api_test.rs index 812274a..bc8f3ab 100644 --- a/crates/presentation/tests/api_test.rs +++ b/crates/presentation/tests/api_test.rs @@ -176,6 +176,7 @@ impl PersonQuery for PanicPersonQuery { async fn get_by_id(&self, _: &PersonId) -> Result, DomainError> { panic!() } async fn get_by_external_id(&self, _: &ExternalPersonId) -> Result, DomainError> { panic!() } async fn get_credits(&self, _: &PersonId) -> Result { panic!() } + async fn list_orphaned_persons(&self) -> Result, DomainError> { panic!() } } struct PanicSearchPort; diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index c6737c0..c45dd5e 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -4,7 +4,7 @@ mod event_bus; use std::sync::Arc; use anyhow::Context; -use application::{config::AppConfig, context::AppContext, worker::WorkerService, SearchCleanupHandler}; +use application::{config::AppConfig, context::AppContext, worker::WorkerService, MovieDiscoveryIndexer, SearchCleanupHandler}; use export::ExportAdapter; use importer::ImporterDocumentParser; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -145,10 +145,9 @@ async fn main() -> anyhow::Result<()> { #[cfg(not(feature = "federation"))] { - let search_cleanup = Arc::new(SearchCleanupHandler::new( - Arc::clone(&ctx.search_command), - )) as Arc; - let mut h = vec![poster, cleanup, search_cleanup]; + let search_cleanup = Arc::new(SearchCleanupHandler::new(Arc::clone(&ctx.search_command), Arc::clone(&ctx.person_query))) as Arc; + let discovery_indexer = Arc::new(MovieDiscoveryIndexer::new(Arc::clone(&ctx.movie_repository), Arc::clone(&ctx.search_command))) as Arc; + let mut h = vec![poster, cleanup, search_cleanup, discovery_indexer]; if let Some(e) = enrichment_handler { h.push(e); } if let Some((ref conv_handler, _)) = conversion { h.push(Arc::clone(conv_handler)); } h @@ -174,11 +173,10 @@ async fn main() -> anyhow::Result<()> { allow_registration, ).await?.event_handler; - let search_cleanup = Arc::new(SearchCleanupHandler::new( - Arc::clone(&ctx.search_command), - )) as Arc; + let search_cleanup = Arc::new(SearchCleanupHandler::new(Arc::clone(&ctx.search_command), Arc::clone(&ctx.person_query))) as Arc; + let discovery_indexer = Arc::new(MovieDiscoveryIndexer::new(Arc::clone(&ctx.movie_repository), Arc::clone(&ctx.search_command))) as Arc; tracing::info!("federation event handler registered"); - let mut h = vec![poster, cleanup, ap, search_cleanup]; + let mut h = vec![poster, cleanup, ap, search_cleanup, discovery_indexer]; if let Some(e) = enrichment_handler { h.push(e); } if let Some((ref conv_handler, _)) = conversion { h.push(Arc::clone(conv_handler)); } h