diff --git a/architecture.mmd b/architecture.mmd
index 0f838dc..149c14b 100644
--- a/architecture.mmd
+++ b/architecture.mmd
@@ -14,7 +14,7 @@ graph TB
subgraph UseCases["Use Cases"]
UC_AUTH["auth
login, register"]
UC_DIARY["diary
log_review, get_diary,
get_activity_feed, export"]
- UC_MOVIES["movies
get_movies, get_movie_profile,
enrich_movie, sync_poster,
reindex_search"]
+ UC_MOVIES["movies
get_movies, get_movie_profile,
enrich_movie, request_enrichment,
sync_poster, reindex_search"]
UC_IMPORT["import
create_session, apply_mapping,
execute, profiles"]
UC_USERS["users
get_users, get_profile,
update_profile"]
UC_WATCHLIST["watchlist
add, remove, get"]
diff --git a/crates/adapters/tmdb-enrichment/src/lib.rs b/crates/adapters/tmdb-enrichment/src/lib.rs
index 35da787..bb11ddb 100644
--- a/crates/adapters/tmdb-enrichment/src/lib.rs
+++ b/crates/adapters/tmdb-enrichment/src/lib.rs
@@ -1,6 +1,6 @@
use std::sync::Arc;
-use application::movies::{commands::EnrichMovieCommand, enrich_movie};
+use application::movies::{commands::EnrichMovieCommand, enrich_movie, request_enrichment};
use async_trait::async_trait;
use chrono::Utc;
use domain::{
@@ -288,42 +288,25 @@ impl EventHandler for EnrichmentHandler {
_ => return Ok(()),
};
- // Skip if profile is fresh (< 30 days old)
- if let Ok(Some(existing)) = self.profile_repo.get_by_movie_id(&movie_id).await {
- let age = Utc::now() - existing.enriched_at;
- if age.num_days() < 30 {
- tracing::debug!(
- movie_id = %movie_id.value(),
- "skipping enrichment — profile is {} days old",
- age.num_days()
- );
- return Ok(());
- }
- }
+ let Some(profile) = request_enrichment::fetch_if_stale(
+ self.enrichment_client.as_ref(),
+ &self.profile_repo,
+ movie_id.clone(),
+ &external_metadata_id,
+ )
+ .await?
+ else {
+ return Ok(());
+ };
- tracing::info!(movie_id = %movie_id.value(), external_id = %external_metadata_id, "enriching movie");
-
- match self
- .enrichment_client
- .fetch_profile(movie_id.clone(), &external_metadata_id)
- .await
- {
- Ok(profile) => {
- self.download_cast_photos(&profile).await;
- enrich_movie::execute(
- &self.movie_repository,
- &self.profile_repo,
- &self.person_command,
- &self.search_command,
- EnrichMovieCommand { movie_id, profile },
- )
- .await?;
- }
- Err(DomainError::NotFound(msg)) => {
- tracing::warn!(movie_id = %movie_id.value(), "TMDb lookup found nothing: {msg}");
- }
- Err(e) => return Err(e),
- }
- Ok(())
+ self.download_cast_photos(&profile).await;
+ enrich_movie::execute(
+ &self.movie_repository,
+ &self.profile_repo,
+ &self.person_command,
+ &self.search_command,
+ EnrichMovieCommand { movie_id, profile },
+ )
+ .await
}
}
diff --git a/crates/application/src/movies/mod.rs b/crates/application/src/movies/mod.rs
index 94a1fc3..7223ce1 100644
--- a/crates/application/src/movies/mod.rs
+++ b/crates/application/src/movies/mod.rs
@@ -5,6 +5,7 @@ pub mod get_movie_profile;
pub mod get_movies;
pub mod queries;
pub mod reindex_search;
+pub mod request_enrichment;
pub mod search_cleanup;
pub mod sync_poster;
diff --git a/crates/application/src/movies/reindex_search.rs b/crates/application/src/movies/reindex_search.rs
index 0447079..389e403 100644
--- a/crates/application/src/movies/reindex_search.rs
+++ b/crates/application/src/movies/reindex_search.rs
@@ -11,6 +11,115 @@ use crate::context::AppContext;
const BATCH_SIZE: u32 = 500;
+pub struct ReindexResult {
+ pub movies_indexed: u64,
+ pub persons_indexed: u64,
+ pub persons_backfilled: u64,
+}
+
+pub async fn execute(ctx: &AppContext) -> Result {
+ let movies_indexed = reindex_movies(ctx).await?;
+ let persons_backfilled = backfill_persons(ctx).await?;
+ let persons_indexed = reindex_persons(ctx).await?;
+
+ Ok(ReindexResult {
+ movies_indexed,
+ persons_indexed,
+ persons_backfilled,
+ })
+}
+
+async fn reindex_movies(ctx: &AppContext) -> Result {
+ let mut count: u64 = 0;
+ let mut offset: u32 = 0;
+ loop {
+ let page = ctx
+ .repos
+ .movie
+ .list_movies(
+ &PageParams {
+ limit: BATCH_SIZE,
+ offset,
+ },
+ &MovieFilter::default(),
+ )
+ .await?;
+
+ for summary in &page.items {
+ let movie_id = summary.movie.id().clone();
+ let profile = ctx.repos.movie_profile.get_by_movie_id(&movie_id).await?;
+
+ if let Err(e) = ctx
+ .repos
+ .search_command
+ .index(IndexableDocument::Movie {
+ id: movie_id.clone(),
+ movie: Box::new(summary.movie.clone()),
+ profile: profile.map(Box::new),
+ })
+ .await
+ {
+ tracing::warn!(movie_id = %movie_id.value(), "reindex movie failed: {e}");
+ }
+ count += 1;
+ }
+
+ if (page.items.len() as u32) < BATCH_SIZE {
+ break;
+ }
+ offset += BATCH_SIZE;
+ tokio::task::yield_now().await;
+ }
+ Ok(count)
+}
+
+async fn backfill_persons(ctx: &AppContext) -> Result {
+ let mut total = 0u64;
+ loop {
+ let (count, has_more) = ctx
+ .repos
+ .person_command
+ .backfill_from_credits_batch(BATCH_SIZE)
+ .await?;
+ total += count;
+ if !has_more {
+ break;
+ }
+ tokio::task::yield_now().await;
+ }
+ Ok(total)
+}
+
+async fn reindex_persons(ctx: &AppContext) -> Result {
+ let mut count: u64 = 0;
+ let mut offset: u32 = 0;
+ loop {
+ let persons = ctx.repos.person_query.list_page(BATCH_SIZE, offset).await?;
+
+ for person in &persons {
+ if let Err(e) = ctx
+ .repos
+ .search_command
+ .index(IndexableDocument::Person {
+ id: person.id().clone(),
+ person: Box::new(person.clone()),
+ })
+ .await
+ {
+ tracing::warn!(person = %person.name(), "reindex person failed: {e}");
+ }
+ count += 1;
+ }
+
+ if (persons.len() as u32) < BATCH_SIZE {
+ break;
+ }
+ offset += BATCH_SIZE;
+ tokio::task::yield_now().await;
+ }
+ Ok(count)
+}
+
pub struct SearchReindexHandler {
ctx: AppContext,
running: AtomicBool,
@@ -37,129 +146,22 @@ impl EventHandler for SearchReindexHandler {
return Ok(());
}
- let result = self.run_reindex().await;
- self.running.store(false, Ordering::SeqCst);
- result
- }
-}
-
-impl SearchReindexHandler {
- async fn run_reindex(&self) -> Result<(), DomainError> {
tracing::info!("search reindex started");
+ let result = execute(&self.ctx).await;
+ self.running.store(false, Ordering::SeqCst);
- let movies_indexed = self.reindex_movies().await?;
- let backfilled = self.backfill_persons().await?;
- if backfilled > 0 {
- tracing::info!(backfilled, "backfilled missing persons from credits");
+ let r = result?;
+ if r.persons_backfilled > 0 {
+ tracing::info!(
+ backfilled = r.persons_backfilled,
+ "backfilled missing persons from credits"
+ );
}
- let persons_indexed = self.reindex_persons().await?;
-
- tracing::info!(movies_indexed, persons_indexed, "search reindex completed");
+ tracing::info!(
+ movies_indexed = r.movies_indexed,
+ persons_indexed = r.persons_indexed,
+ "search reindex completed"
+ );
Ok(())
}
-
- async fn reindex_movies(&self) -> Result {
- let mut count: u64 = 0;
- let mut offset: u32 = 0;
- loop {
- let page = self
- .ctx
- .repos
- .movie
- .list_movies(
- &PageParams {
- limit: BATCH_SIZE,
- offset,
- },
- &MovieFilter::default(),
- )
- .await?;
-
- for summary in &page.items {
- let movie_id = summary.movie.id().clone();
- let profile = self
- .ctx
- .repos
- .movie_profile
- .get_by_movie_id(&movie_id)
- .await?;
-
- if let Err(e) = self
- .ctx
- .repos
- .search_command
- .index(IndexableDocument::Movie {
- id: movie_id.clone(),
- movie: Box::new(summary.movie.clone()),
- profile: profile.map(Box::new),
- })
- .await
- {
- tracing::warn!(movie_id = %movie_id.value(), "reindex movie failed: {e}");
- }
- count += 1;
- }
-
- if (page.items.len() as u32) < BATCH_SIZE {
- break;
- }
- offset += BATCH_SIZE;
- tokio::task::yield_now().await;
- }
- Ok(count)
- }
-
- async fn backfill_persons(&self) -> Result {
- let mut total = 0u64;
- loop {
- let (count, has_more) = self
- .ctx
- .repos
- .person_command
- .backfill_from_credits_batch(BATCH_SIZE)
- .await?;
- total += count;
- if !has_more {
- break;
- }
- tokio::task::yield_now().await;
- }
- Ok(total)
- }
-
- async fn reindex_persons(&self) -> Result {
- let mut count: u64 = 0;
- let mut offset: u32 = 0;
- loop {
- let persons = self
- .ctx
- .repos
- .person_query
- .list_page(BATCH_SIZE, offset)
- .await?;
-
- for person in &persons {
- if let Err(e) = self
- .ctx
- .repos
- .search_command
- .index(IndexableDocument::Person {
- id: person.id().clone(),
- person: Box::new(person.clone()),
- })
- .await
- {
- tracing::warn!(person = %person.name(), "reindex person failed: {e}");
- }
- count += 1;
- }
-
- if (persons.len() as u32) < BATCH_SIZE {
- break;
- }
- offset += BATCH_SIZE;
- tokio::task::yield_now().await;
- }
- Ok(count)
- }
}
diff --git a/crates/application/src/movies/request_enrichment.rs b/crates/application/src/movies/request_enrichment.rs
new file mode 100644
index 0000000..95faf7b
--- /dev/null
+++ b/crates/application/src/movies/request_enrichment.rs
@@ -0,0 +1,44 @@
+use std::sync::Arc;
+
+use chrono::Utc;
+use domain::{
+ errors::DomainError,
+ models::MovieProfile,
+ ports::{MovieEnrichmentClient, MovieProfileRepository},
+ value_objects::MovieId,
+};
+
+const STALENESS_DAYS: i64 = 30;
+
+pub async fn fetch_if_stale(
+ enrichment_client: &dyn MovieEnrichmentClient,
+ profile_repo: &Arc,
+ movie_id: MovieId,
+ external_metadata_id: &str,
+) -> Result