From 5334312d648584a8284f70561f69bc25326904c1 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Thu, 4 Jun 2026 16:09:35 +0200 Subject: [PATCH] refactor: extract reindex + enrichment logic from handlers into use cases --- architecture.mmd | 2 +- crates/adapters/tmdb-enrichment/src/lib.rs | 57 ++--- crates/application/src/movies/mod.rs | 1 + .../application/src/movies/reindex_search.rs | 242 +++++++++--------- .../src/movies/request_enrichment.rs | 44 ++++ 5 files changed, 188 insertions(+), 158 deletions(-) create mode 100644 crates/application/src/movies/request_enrichment.rs diff --git a/architecture.mmd b/architecture.mmd index 0f838dc..149c14b 100644 --- a/architecture.mmd +++ b/architecture.mmd @@ -14,7 +14,7 @@ graph TB subgraph UseCases["Use Cases"] UC_AUTH["auth
login, register"] UC_DIARY["diary
log_review, get_diary,
get_activity_feed, export"] - UC_MOVIES["movies
get_movies, get_movie_profile,
enrich_movie, sync_poster,
reindex_search"] + UC_MOVIES["movies
get_movies, get_movie_profile,
enrich_movie, request_enrichment,
sync_poster, reindex_search"] UC_IMPORT["import
create_session, apply_mapping,
execute, profiles"] UC_USERS["users
get_users, get_profile,
update_profile"] UC_WATCHLIST["watchlist
add, remove, get"] diff --git a/crates/adapters/tmdb-enrichment/src/lib.rs b/crates/adapters/tmdb-enrichment/src/lib.rs index 35da787..bb11ddb 100644 --- a/crates/adapters/tmdb-enrichment/src/lib.rs +++ b/crates/adapters/tmdb-enrichment/src/lib.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use application::movies::{commands::EnrichMovieCommand, enrich_movie}; +use application::movies::{commands::EnrichMovieCommand, enrich_movie, request_enrichment}; use async_trait::async_trait; use chrono::Utc; use domain::{ @@ -288,42 +288,25 @@ impl EventHandler for EnrichmentHandler { _ => return Ok(()), }; - // Skip if profile is fresh (< 30 days old) - if let Ok(Some(existing)) = self.profile_repo.get_by_movie_id(&movie_id).await { - let age = Utc::now() - existing.enriched_at; - if age.num_days() < 30 { - tracing::debug!( - movie_id = %movie_id.value(), - "skipping enrichment — profile is {} days old", - age.num_days() - ); - return Ok(()); - } - } + let Some(profile) = request_enrichment::fetch_if_stale( + self.enrichment_client.as_ref(), + &self.profile_repo, + movie_id.clone(), + &external_metadata_id, + ) + .await? + else { + return Ok(()); + }; - tracing::info!(movie_id = %movie_id.value(), external_id = %external_metadata_id, "enriching movie"); - - match self - .enrichment_client - .fetch_profile(movie_id.clone(), &external_metadata_id) - .await - { - Ok(profile) => { - self.download_cast_photos(&profile).await; - enrich_movie::execute( - &self.movie_repository, - &self.profile_repo, - &self.person_command, - &self.search_command, - EnrichMovieCommand { movie_id, profile }, - ) - .await?; - } - Err(DomainError::NotFound(msg)) => { - tracing::warn!(movie_id = %movie_id.value(), "TMDb lookup found nothing: {msg}"); - } - Err(e) => return Err(e), - } - Ok(()) + self.download_cast_photos(&profile).await; + enrich_movie::execute( + &self.movie_repository, + &self.profile_repo, + &self.person_command, + &self.search_command, + EnrichMovieCommand { movie_id, profile }, + ) + .await } } diff --git a/crates/application/src/movies/mod.rs b/crates/application/src/movies/mod.rs index 94a1fc3..7223ce1 100644 --- a/crates/application/src/movies/mod.rs +++ b/crates/application/src/movies/mod.rs @@ -5,6 +5,7 @@ pub mod get_movie_profile; pub mod get_movies; pub mod queries; pub mod reindex_search; +pub mod request_enrichment; pub mod search_cleanup; pub mod sync_poster; diff --git a/crates/application/src/movies/reindex_search.rs b/crates/application/src/movies/reindex_search.rs index 0447079..389e403 100644 --- a/crates/application/src/movies/reindex_search.rs +++ b/crates/application/src/movies/reindex_search.rs @@ -11,6 +11,115 @@ use crate::context::AppContext; const BATCH_SIZE: u32 = 500; +pub struct ReindexResult { + pub movies_indexed: u64, + pub persons_indexed: u64, + pub persons_backfilled: u64, +} + +pub async fn execute(ctx: &AppContext) -> Result { + let movies_indexed = reindex_movies(ctx).await?; + let persons_backfilled = backfill_persons(ctx).await?; + let persons_indexed = reindex_persons(ctx).await?; + + Ok(ReindexResult { + movies_indexed, + persons_indexed, + persons_backfilled, + }) +} + +async fn reindex_movies(ctx: &AppContext) -> Result { + let mut count: u64 = 0; + let mut offset: u32 = 0; + loop { + let page = ctx + .repos + .movie + .list_movies( + &PageParams { + limit: BATCH_SIZE, + offset, + }, + &MovieFilter::default(), + ) + .await?; + + for summary in &page.items { + let movie_id = summary.movie.id().clone(); + let profile = ctx.repos.movie_profile.get_by_movie_id(&movie_id).await?; + + if let Err(e) = ctx + .repos + .search_command + .index(IndexableDocument::Movie { + id: movie_id.clone(), + movie: Box::new(summary.movie.clone()), + profile: profile.map(Box::new), + }) + .await + { + tracing::warn!(movie_id = %movie_id.value(), "reindex movie failed: {e}"); + } + count += 1; + } + + if (page.items.len() as u32) < BATCH_SIZE { + break; + } + offset += BATCH_SIZE; + tokio::task::yield_now().await; + } + Ok(count) +} + +async fn backfill_persons(ctx: &AppContext) -> Result { + let mut total = 0u64; + loop { + let (count, has_more) = ctx + .repos + .person_command + .backfill_from_credits_batch(BATCH_SIZE) + .await?; + total += count; + if !has_more { + break; + } + tokio::task::yield_now().await; + } + Ok(total) +} + +async fn reindex_persons(ctx: &AppContext) -> Result { + let mut count: u64 = 0; + let mut offset: u32 = 0; + loop { + let persons = ctx.repos.person_query.list_page(BATCH_SIZE, offset).await?; + + for person in &persons { + if let Err(e) = ctx + .repos + .search_command + .index(IndexableDocument::Person { + id: person.id().clone(), + person: Box::new(person.clone()), + }) + .await + { + tracing::warn!(person = %person.name(), "reindex person failed: {e}"); + } + count += 1; + } + + if (persons.len() as u32) < BATCH_SIZE { + break; + } + offset += BATCH_SIZE; + tokio::task::yield_now().await; + } + Ok(count) +} + pub struct SearchReindexHandler { ctx: AppContext, running: AtomicBool, @@ -37,129 +146,22 @@ impl EventHandler for SearchReindexHandler { return Ok(()); } - let result = self.run_reindex().await; - self.running.store(false, Ordering::SeqCst); - result - } -} - -impl SearchReindexHandler { - async fn run_reindex(&self) -> Result<(), DomainError> { tracing::info!("search reindex started"); + let result = execute(&self.ctx).await; + self.running.store(false, Ordering::SeqCst); - let movies_indexed = self.reindex_movies().await?; - let backfilled = self.backfill_persons().await?; - if backfilled > 0 { - tracing::info!(backfilled, "backfilled missing persons from credits"); + let r = result?; + if r.persons_backfilled > 0 { + tracing::info!( + backfilled = r.persons_backfilled, + "backfilled missing persons from credits" + ); } - let persons_indexed = self.reindex_persons().await?; - - tracing::info!(movies_indexed, persons_indexed, "search reindex completed"); + tracing::info!( + movies_indexed = r.movies_indexed, + persons_indexed = r.persons_indexed, + "search reindex completed" + ); Ok(()) } - - async fn reindex_movies(&self) -> Result { - let mut count: u64 = 0; - let mut offset: u32 = 0; - loop { - let page = self - .ctx - .repos - .movie - .list_movies( - &PageParams { - limit: BATCH_SIZE, - offset, - }, - &MovieFilter::default(), - ) - .await?; - - for summary in &page.items { - let movie_id = summary.movie.id().clone(); - let profile = self - .ctx - .repos - .movie_profile - .get_by_movie_id(&movie_id) - .await?; - - if let Err(e) = self - .ctx - .repos - .search_command - .index(IndexableDocument::Movie { - id: movie_id.clone(), - movie: Box::new(summary.movie.clone()), - profile: profile.map(Box::new), - }) - .await - { - tracing::warn!(movie_id = %movie_id.value(), "reindex movie failed: {e}"); - } - count += 1; - } - - if (page.items.len() as u32) < BATCH_SIZE { - break; - } - offset += BATCH_SIZE; - tokio::task::yield_now().await; - } - Ok(count) - } - - async fn backfill_persons(&self) -> Result { - let mut total = 0u64; - loop { - let (count, has_more) = self - .ctx - .repos - .person_command - .backfill_from_credits_batch(BATCH_SIZE) - .await?; - total += count; - if !has_more { - break; - } - tokio::task::yield_now().await; - } - Ok(total) - } - - async fn reindex_persons(&self) -> Result { - let mut count: u64 = 0; - let mut offset: u32 = 0; - loop { - let persons = self - .ctx - .repos - .person_query - .list_page(BATCH_SIZE, offset) - .await?; - - for person in &persons { - if let Err(e) = self - .ctx - .repos - .search_command - .index(IndexableDocument::Person { - id: person.id().clone(), - person: Box::new(person.clone()), - }) - .await - { - tracing::warn!(person = %person.name(), "reindex person failed: {e}"); - } - count += 1; - } - - if (persons.len() as u32) < BATCH_SIZE { - break; - } - offset += BATCH_SIZE; - tokio::task::yield_now().await; - } - Ok(count) - } } diff --git a/crates/application/src/movies/request_enrichment.rs b/crates/application/src/movies/request_enrichment.rs new file mode 100644 index 0000000..95faf7b --- /dev/null +++ b/crates/application/src/movies/request_enrichment.rs @@ -0,0 +1,44 @@ +use std::sync::Arc; + +use chrono::Utc; +use domain::{ + errors::DomainError, + models::MovieProfile, + ports::{MovieEnrichmentClient, MovieProfileRepository}, + value_objects::MovieId, +}; + +const STALENESS_DAYS: i64 = 30; + +pub async fn fetch_if_stale( + enrichment_client: &dyn MovieEnrichmentClient, + profile_repo: &Arc, + movie_id: MovieId, + external_metadata_id: &str, +) -> Result, DomainError> { + if let Ok(Some(existing)) = profile_repo.get_by_movie_id(&movie_id).await { + let age = Utc::now() - existing.enriched_at; + if age.num_days() < STALENESS_DAYS { + tracing::debug!( + movie_id = %movie_id.value(), + "skipping enrichment — profile is {} days old", + age.num_days() + ); + return Ok(None); + } + } + + tracing::info!(movie_id = %movie_id.value(), external_id = %external_metadata_id, "enriching movie"); + + match enrichment_client + .fetch_profile(movie_id, external_metadata_id) + .await + { + Ok(profile) => Ok(Some(profile)), + Err(DomainError::NotFound(msg)) => { + tracing::warn!("TMDb lookup found nothing: {msg}"); + Ok(None) + } + Err(e) => Err(e), + } +}