fix: close search index consistency gaps (orphan cleanup, discovery indexing, poster sync)
This commit is contained in:
@@ -195,4 +195,24 @@ impl PersonQuery for PostgresPersonAdapter {
|
|||||||
|
|
||||||
Ok(PersonCredits { person, cast, crew })
|
Ok(PersonCredits { person, cast, crew })
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn list_orphaned_persons(&self) -> Result<Vec<PersonId>, DomainError> {
|
||||||
|
let rows: Vec<(String,)> = sqlx::query_as(
|
||||||
|
"SELECT id FROM persons
|
||||||
|
WHERE NOT EXISTS (
|
||||||
|
SELECT 1 FROM movie_cast WHERE movie_cast.tmdb_person_id = persons.tmdb_person_id
|
||||||
|
)
|
||||||
|
AND NOT EXISTS (
|
||||||
|
SELECT 1 FROM movie_crew WHERE movie_crew.tmdb_person_id = persons.tmdb_person_id
|
||||||
|
)",
|
||||||
|
)
|
||||||
|
.fetch_all(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(map_err)?;
|
||||||
|
|
||||||
|
Ok(rows
|
||||||
|
.into_iter()
|
||||||
|
.filter_map(|(id,)| uuid::Uuid::parse_str(&id).ok().map(PersonId::from_uuid))
|
||||||
|
.collect())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -145,6 +145,26 @@ impl PersonQuery for SqlitePersonAdapter {
|
|||||||
|
|
||||||
Ok(PersonCredits { person, cast, crew })
|
Ok(PersonCredits { person, cast, crew })
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn list_orphaned_persons(&self) -> Result<Vec<PersonId>, DomainError> {
|
||||||
|
let rows: Vec<(String,)> = sqlx::query_as(
|
||||||
|
"SELECT id FROM persons
|
||||||
|
WHERE NOT EXISTS (
|
||||||
|
SELECT 1 FROM movie_cast WHERE movie_cast.tmdb_person_id = persons.tmdb_person_id
|
||||||
|
)
|
||||||
|
AND NOT EXISTS (
|
||||||
|
SELECT 1 FROM movie_crew WHERE movie_crew.tmdb_person_id = persons.tmdb_person_id
|
||||||
|
)",
|
||||||
|
)
|
||||||
|
.fetch_all(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(map_err)?;
|
||||||
|
|
||||||
|
Ok(rows
|
||||||
|
.into_iter()
|
||||||
|
.filter_map(|(id,)| uuid::Uuid::parse_str(&id).ok().map(PersonId::from_uuid))
|
||||||
|
.collect())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Row types ────────────────────────────────────────────────────────────────
|
// ── Row types ────────────────────────────────────────────────────────────────
|
||||||
|
|||||||
@@ -7,6 +7,8 @@ pub mod movie_resolver;
|
|||||||
pub mod ports;
|
pub mod ports;
|
||||||
pub mod queries;
|
pub mod queries;
|
||||||
pub mod use_cases;
|
pub mod use_cases;
|
||||||
|
pub mod movie_discovery_indexer;
|
||||||
pub mod search_cleanup;
|
pub mod search_cleanup;
|
||||||
|
|
||||||
|
pub use movie_discovery_indexer::MovieDiscoveryIndexer;
|
||||||
pub use search_cleanup::SearchCleanupHandler;
|
pub use search_cleanup::SearchCleanupHandler;
|
||||||
|
|||||||
51
crates/application/src/movie_discovery_indexer.rs
Normal file
51
crates/application/src/movie_discovery_indexer.rs
Normal file
@@ -0,0 +1,51 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use domain::{
|
||||||
|
errors::DomainError,
|
||||||
|
events::DomainEvent,
|
||||||
|
models::IndexableDocument,
|
||||||
|
ports::{EventHandler, MovieRepository, SearchCommand},
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Reacts to `MovieDiscovered` and inserts a bare search index entry immediately,
|
||||||
|
/// so movies are findable before TMDb enrichment runs.
|
||||||
|
/// Enrichment will later overwrite this with the full document (cast, genres, etc.).
|
||||||
|
pub struct MovieDiscoveryIndexer {
|
||||||
|
movie_repository: Arc<dyn MovieRepository>,
|
||||||
|
search_command: Arc<dyn SearchCommand>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MovieDiscoveryIndexer {
|
||||||
|
pub fn new(movie_repository: Arc<dyn MovieRepository>, search_command: Arc<dyn SearchCommand>) -> Self {
|
||||||
|
Self { movie_repository, search_command }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl EventHandler for MovieDiscoveryIndexer {
|
||||||
|
async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> {
|
||||||
|
let movie_id = match event {
|
||||||
|
DomainEvent::MovieDiscovered { movie_id, .. } => movie_id.clone(),
|
||||||
|
_ => return Ok(()),
|
||||||
|
};
|
||||||
|
|
||||||
|
let Some(movie) = self.movie_repository.get_movie_by_id(&movie_id).await? else {
|
||||||
|
tracing::warn!(movie_id = %movie_id.value(), "MovieDiscoveryIndexer: movie not found");
|
||||||
|
return Ok(());
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(e) = self.search_command
|
||||||
|
.index(IndexableDocument::Movie {
|
||||||
|
id: movie_id.clone(),
|
||||||
|
movie: Box::new(movie),
|
||||||
|
profile: None,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
tracing::warn!(movie_id = %movie_id.value(), "failed to index movie on discovery: {e}");
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -5,16 +5,17 @@ use domain::{
|
|||||||
errors::DomainError,
|
errors::DomainError,
|
||||||
events::DomainEvent,
|
events::DomainEvent,
|
||||||
models::EntityType,
|
models::EntityType,
|
||||||
ports::{EventHandler, SearchCommand},
|
ports::{EventHandler, PersonQuery, SearchCommand},
|
||||||
};
|
};
|
||||||
|
|
||||||
pub struct SearchCleanupHandler {
|
pub struct SearchCleanupHandler {
|
||||||
search_command: Arc<dyn SearchCommand>,
|
search_command: Arc<dyn SearchCommand>,
|
||||||
|
person_query: Arc<dyn PersonQuery>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SearchCleanupHandler {
|
impl SearchCleanupHandler {
|
||||||
pub fn new(search_command: Arc<dyn SearchCommand>) -> Self {
|
pub fn new(search_command: Arc<dyn SearchCommand>, person_query: Arc<dyn PersonQuery>) -> Self {
|
||||||
Self { search_command }
|
Self { search_command, person_query }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -29,6 +30,20 @@ impl EventHandler for SearchCleanupHandler {
|
|||||||
if let Err(e) = self.search_command.remove(EntityType::Movie, &movie_id).await {
|
if let Err(e) = self.search_command.remove(EntityType::Movie, &movie_id).await {
|
||||||
tracing::warn!("search cleanup failed for movie {movie_id}: {e}");
|
tracing::warn!("search cleanup failed for movie {movie_id}: {e}");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Remove persons who have no remaining movie credits (orphaned after cascade delete).
|
||||||
|
match self.person_query.list_orphaned_persons().await {
|
||||||
|
Ok(orphans) => {
|
||||||
|
for person_id in orphans {
|
||||||
|
let id = person_id.value().to_string();
|
||||||
|
if let Err(e) = self.search_command.remove(EntityType::Person, &id).await {
|
||||||
|
tracing::warn!("search cleanup failed for orphaned person {id}: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => tracing::warn!("failed to list orphaned persons after movie {movie_id} deletion: {e}"),
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
use domain::{
|
use domain::{
|
||||||
errors::DomainError,
|
errors::DomainError,
|
||||||
events::DomainEvent,
|
events::DomainEvent,
|
||||||
|
models::IndexableDocument,
|
||||||
value_objects::{ExternalMetadataId, MovieId, PosterPath},
|
value_objects::{ExternalMetadataId, MovieId, PosterPath},
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -53,5 +54,19 @@ pub async fn execute(ctx: &AppContext, cmd: SyncPosterCommand) -> Result<(), Dom
|
|||||||
movie.update_poster(poster_path);
|
movie.update_poster(poster_path);
|
||||||
ctx.movie_repository.upsert_movie(&movie).await?;
|
ctx.movie_repository.upsert_movie(&movie).await?;
|
||||||
|
|
||||||
|
// Refresh search index so the new poster_path is reflected immediately.
|
||||||
|
// Fetch existing profile if available for a complete index document.
|
||||||
|
let profile = ctx.movie_profile_repository.get_by_movie_id(&movie_id).await.ok().flatten();
|
||||||
|
if let Err(e) = ctx.search_command
|
||||||
|
.index(IndexableDocument::Movie {
|
||||||
|
id: movie_id.clone(),
|
||||||
|
movie: Box::new(movie),
|
||||||
|
profile: profile.map(Box::new),
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
tracing::warn!(movie_id = %movie_id.value(), "failed to refresh search index after poster sync: {e}");
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -291,6 +291,9 @@ pub trait PersonQuery: Send + Sync {
|
|||||||
async fn get_by_external_id(&self, id: &ExternalPersonId) -> Result<Option<Person>, DomainError>;
|
async fn get_by_external_id(&self, id: &ExternalPersonId) -> Result<Option<Person>, DomainError>;
|
||||||
/// Returns the person's full cast and crew credit history across all indexed movies.
|
/// Returns the person's full cast and crew credit history across all indexed movies.
|
||||||
async fn get_credits(&self, id: &PersonId) -> Result<PersonCredits, DomainError>;
|
async fn get_credits(&self, id: &PersonId) -> Result<PersonCredits, DomainError>;
|
||||||
|
/// Returns persons who have no remaining entries in movie_cast or movie_crew.
|
||||||
|
/// Called after movie deletion to find index entries that can be pruned.
|
||||||
|
async fn list_orphaned_persons(&self) -> Result<Vec<PersonId>, DomainError>;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Read port — executes search queries. No mutations.
|
/// Read port — executes search queries. No mutations.
|
||||||
|
|||||||
@@ -45,6 +45,9 @@ impl domain::ports::PersonQuery for PersonQueryStub {
|
|||||||
async fn get_credits(&self, _: &domain::models::PersonId) -> Result<domain::models::PersonCredits, DomainError> {
|
async fn get_credits(&self, _: &domain::models::PersonId) -> Result<domain::models::PersonCredits, DomainError> {
|
||||||
Err(DomainError::NotFound("Person not found".into()))
|
Err(DomainError::NotFound("Person not found".into()))
|
||||||
}
|
}
|
||||||
|
async fn list_orphaned_persons(&self) -> Result<Vec<domain::models::PersonId>, DomainError> {
|
||||||
|
Ok(vec![])
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- Search endpoint tests ---
|
// --- Search endpoint tests ---
|
||||||
|
|||||||
@@ -362,6 +362,7 @@ impl PersonQuery for Panic {
|
|||||||
async fn get_by_id(&self, _: &PersonId) -> Result<Option<Person>, DomainError> { panic!() }
|
async fn get_by_id(&self, _: &PersonId) -> Result<Option<Person>, DomainError> { panic!() }
|
||||||
async fn get_by_external_id(&self, _: &domain::models::ExternalPersonId) -> Result<Option<Person>, DomainError> { panic!() }
|
async fn get_by_external_id(&self, _: &domain::models::ExternalPersonId) -> Result<Option<Person>, DomainError> { panic!() }
|
||||||
async fn get_credits(&self, _: &PersonId) -> Result<PersonCredits, DomainError> { panic!() }
|
async fn get_credits(&self, _: &PersonId) -> Result<PersonCredits, DomainError> { panic!() }
|
||||||
|
async fn list_orphaned_persons(&self) -> Result<Vec<PersonId>, DomainError> { panic!() }
|
||||||
}
|
}
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl SearchPort for Panic {
|
impl SearchPort for Panic {
|
||||||
|
|||||||
@@ -176,6 +176,7 @@ impl PersonQuery for PanicPersonQuery {
|
|||||||
async fn get_by_id(&self, _: &PersonId) -> Result<Option<Person>, DomainError> { panic!() }
|
async fn get_by_id(&self, _: &PersonId) -> Result<Option<Person>, DomainError> { panic!() }
|
||||||
async fn get_by_external_id(&self, _: &ExternalPersonId) -> Result<Option<Person>, DomainError> { panic!() }
|
async fn get_by_external_id(&self, _: &ExternalPersonId) -> Result<Option<Person>, DomainError> { panic!() }
|
||||||
async fn get_credits(&self, _: &PersonId) -> Result<PersonCredits, DomainError> { panic!() }
|
async fn get_credits(&self, _: &PersonId) -> Result<PersonCredits, DomainError> { panic!() }
|
||||||
|
async fn list_orphaned_persons(&self) -> Result<Vec<PersonId>, DomainError> { panic!() }
|
||||||
}
|
}
|
||||||
|
|
||||||
struct PanicSearchPort;
|
struct PanicSearchPort;
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ mod event_bus;
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use application::{config::AppConfig, context::AppContext, worker::WorkerService, SearchCleanupHandler};
|
use application::{config::AppConfig, context::AppContext, worker::WorkerService, MovieDiscoveryIndexer, SearchCleanupHandler};
|
||||||
use export::ExportAdapter;
|
use export::ExportAdapter;
|
||||||
use importer::ImporterDocumentParser;
|
use importer::ImporterDocumentParser;
|
||||||
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
||||||
@@ -145,10 +145,9 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
|
|
||||||
#[cfg(not(feature = "federation"))]
|
#[cfg(not(feature = "federation"))]
|
||||||
{
|
{
|
||||||
let search_cleanup = Arc::new(SearchCleanupHandler::new(
|
let search_cleanup = Arc::new(SearchCleanupHandler::new(Arc::clone(&ctx.search_command), Arc::clone(&ctx.person_query))) as Arc<dyn EventHandler>;
|
||||||
Arc::clone(&ctx.search_command),
|
let discovery_indexer = Arc::new(MovieDiscoveryIndexer::new(Arc::clone(&ctx.movie_repository), Arc::clone(&ctx.search_command))) as Arc<dyn EventHandler>;
|
||||||
)) as Arc<dyn EventHandler>;
|
let mut h = vec![poster, cleanup, search_cleanup, discovery_indexer];
|
||||||
let mut h = vec![poster, cleanup, search_cleanup];
|
|
||||||
if let Some(e) = enrichment_handler { h.push(e); }
|
if let Some(e) = enrichment_handler { h.push(e); }
|
||||||
if let Some((ref conv_handler, _)) = conversion { h.push(Arc::clone(conv_handler)); }
|
if let Some((ref conv_handler, _)) = conversion { h.push(Arc::clone(conv_handler)); }
|
||||||
h
|
h
|
||||||
@@ -174,11 +173,10 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
allow_registration,
|
allow_registration,
|
||||||
).await?.event_handler;
|
).await?.event_handler;
|
||||||
|
|
||||||
let search_cleanup = Arc::new(SearchCleanupHandler::new(
|
let search_cleanup = Arc::new(SearchCleanupHandler::new(Arc::clone(&ctx.search_command), Arc::clone(&ctx.person_query))) as Arc<dyn EventHandler>;
|
||||||
Arc::clone(&ctx.search_command),
|
let discovery_indexer = Arc::new(MovieDiscoveryIndexer::new(Arc::clone(&ctx.movie_repository), Arc::clone(&ctx.search_command))) as Arc<dyn EventHandler>;
|
||||||
)) as Arc<dyn EventHandler>;
|
|
||||||
tracing::info!("federation event handler registered");
|
tracing::info!("federation event handler registered");
|
||||||
let mut h = vec![poster, cleanup, ap, search_cleanup];
|
let mut h = vec![poster, cleanup, ap, search_cleanup, discovery_indexer];
|
||||||
if let Some(e) = enrichment_handler { h.push(e); }
|
if let Some(e) = enrichment_handler { h.push(e); }
|
||||||
if let Some((ref conv_handler, _)) = conversion { h.push(Arc::clone(conv_handler)); }
|
if let Some((ref conv_handler, _)) = conversion { h.push(Arc::clone(conv_handler)); }
|
||||||
h
|
h
|
||||||
|
|||||||
Reference in New Issue
Block a user