refactor: extract reindex + enrichment logic from handlers into use cases
Some checks failed
CI / Check / Test (push) Failing after 6m45s
Some checks failed
CI / Check / Test (push) Failing after 6m45s
This commit is contained in:
@@ -5,6 +5,7 @@ pub mod get_movie_profile;
|
||||
pub mod get_movies;
|
||||
pub mod queries;
|
||||
pub mod reindex_search;
|
||||
pub mod request_enrichment;
|
||||
pub mod search_cleanup;
|
||||
pub mod sync_poster;
|
||||
|
||||
|
||||
@@ -11,6 +11,115 @@ use crate::context::AppContext;
|
||||
|
||||
const BATCH_SIZE: u32 = 500;
|
||||
|
||||
pub struct ReindexResult {
|
||||
pub movies_indexed: u64,
|
||||
pub persons_indexed: u64,
|
||||
pub persons_backfilled: u64,
|
||||
}
|
||||
|
||||
pub async fn execute(ctx: &AppContext) -> Result<ReindexResult, DomainError> {
|
||||
let movies_indexed = reindex_movies(ctx).await?;
|
||||
let persons_backfilled = backfill_persons(ctx).await?;
|
||||
let persons_indexed = reindex_persons(ctx).await?;
|
||||
|
||||
Ok(ReindexResult {
|
||||
movies_indexed,
|
||||
persons_indexed,
|
||||
persons_backfilled,
|
||||
})
|
||||
}
|
||||
|
||||
async fn reindex_movies(ctx: &AppContext) -> Result<u64, DomainError> {
|
||||
let mut count: u64 = 0;
|
||||
let mut offset: u32 = 0;
|
||||
loop {
|
||||
let page = ctx
|
||||
.repos
|
||||
.movie
|
||||
.list_movies(
|
||||
&PageParams {
|
||||
limit: BATCH_SIZE,
|
||||
offset,
|
||||
},
|
||||
&MovieFilter::default(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
for summary in &page.items {
|
||||
let movie_id = summary.movie.id().clone();
|
||||
let profile = ctx.repos.movie_profile.get_by_movie_id(&movie_id).await?;
|
||||
|
||||
if let Err(e) = ctx
|
||||
.repos
|
||||
.search_command
|
||||
.index(IndexableDocument::Movie {
|
||||
id: movie_id.clone(),
|
||||
movie: Box::new(summary.movie.clone()),
|
||||
profile: profile.map(Box::new),
|
||||
})
|
||||
.await
|
||||
{
|
||||
tracing::warn!(movie_id = %movie_id.value(), "reindex movie failed: {e}");
|
||||
}
|
||||
count += 1;
|
||||
}
|
||||
|
||||
if (page.items.len() as u32) < BATCH_SIZE {
|
||||
break;
|
||||
}
|
||||
offset += BATCH_SIZE;
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
Ok(count)
|
||||
}
|
||||
|
||||
async fn backfill_persons(ctx: &AppContext) -> Result<u64, DomainError> {
|
||||
let mut total = 0u64;
|
||||
loop {
|
||||
let (count, has_more) = ctx
|
||||
.repos
|
||||
.person_command
|
||||
.backfill_from_credits_batch(BATCH_SIZE)
|
||||
.await?;
|
||||
total += count;
|
||||
if !has_more {
|
||||
break;
|
||||
}
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
Ok(total)
|
||||
}
|
||||
|
||||
async fn reindex_persons(ctx: &AppContext) -> Result<u64, DomainError> {
|
||||
let mut count: u64 = 0;
|
||||
let mut offset: u32 = 0;
|
||||
loop {
|
||||
let persons = ctx.repos.person_query.list_page(BATCH_SIZE, offset).await?;
|
||||
|
||||
for person in &persons {
|
||||
if let Err(e) = ctx
|
||||
.repos
|
||||
.search_command
|
||||
.index(IndexableDocument::Person {
|
||||
id: person.id().clone(),
|
||||
person: Box::new(person.clone()),
|
||||
})
|
||||
.await
|
||||
{
|
||||
tracing::warn!(person = %person.name(), "reindex person failed: {e}");
|
||||
}
|
||||
count += 1;
|
||||
}
|
||||
|
||||
if (persons.len() as u32) < BATCH_SIZE {
|
||||
break;
|
||||
}
|
||||
offset += BATCH_SIZE;
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
Ok(count)
|
||||
}
|
||||
|
||||
pub struct SearchReindexHandler {
|
||||
ctx: AppContext,
|
||||
running: AtomicBool,
|
||||
@@ -37,129 +146,22 @@ impl EventHandler for SearchReindexHandler {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let result = self.run_reindex().await;
|
||||
self.running.store(false, Ordering::SeqCst);
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
impl SearchReindexHandler {
|
||||
async fn run_reindex(&self) -> Result<(), DomainError> {
|
||||
tracing::info!("search reindex started");
|
||||
let result = execute(&self.ctx).await;
|
||||
self.running.store(false, Ordering::SeqCst);
|
||||
|
||||
let movies_indexed = self.reindex_movies().await?;
|
||||
let backfilled = self.backfill_persons().await?;
|
||||
if backfilled > 0 {
|
||||
tracing::info!(backfilled, "backfilled missing persons from credits");
|
||||
let r = result?;
|
||||
if r.persons_backfilled > 0 {
|
||||
tracing::info!(
|
||||
backfilled = r.persons_backfilled,
|
||||
"backfilled missing persons from credits"
|
||||
);
|
||||
}
|
||||
let persons_indexed = self.reindex_persons().await?;
|
||||
|
||||
tracing::info!(movies_indexed, persons_indexed, "search reindex completed");
|
||||
tracing::info!(
|
||||
movies_indexed = r.movies_indexed,
|
||||
persons_indexed = r.persons_indexed,
|
||||
"search reindex completed"
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn reindex_movies(&self) -> Result<u64, DomainError> {
|
||||
let mut count: u64 = 0;
|
||||
let mut offset: u32 = 0;
|
||||
loop {
|
||||
let page = self
|
||||
.ctx
|
||||
.repos
|
||||
.movie
|
||||
.list_movies(
|
||||
&PageParams {
|
||||
limit: BATCH_SIZE,
|
||||
offset,
|
||||
},
|
||||
&MovieFilter::default(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
for summary in &page.items {
|
||||
let movie_id = summary.movie.id().clone();
|
||||
let profile = self
|
||||
.ctx
|
||||
.repos
|
||||
.movie_profile
|
||||
.get_by_movie_id(&movie_id)
|
||||
.await?;
|
||||
|
||||
if let Err(e) = self
|
||||
.ctx
|
||||
.repos
|
||||
.search_command
|
||||
.index(IndexableDocument::Movie {
|
||||
id: movie_id.clone(),
|
||||
movie: Box::new(summary.movie.clone()),
|
||||
profile: profile.map(Box::new),
|
||||
})
|
||||
.await
|
||||
{
|
||||
tracing::warn!(movie_id = %movie_id.value(), "reindex movie failed: {e}");
|
||||
}
|
||||
count += 1;
|
||||
}
|
||||
|
||||
if (page.items.len() as u32) < BATCH_SIZE {
|
||||
break;
|
||||
}
|
||||
offset += BATCH_SIZE;
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
Ok(count)
|
||||
}
|
||||
|
||||
async fn backfill_persons(&self) -> Result<u64, DomainError> {
|
||||
let mut total = 0u64;
|
||||
loop {
|
||||
let (count, has_more) = self
|
||||
.ctx
|
||||
.repos
|
||||
.person_command
|
||||
.backfill_from_credits_batch(BATCH_SIZE)
|
||||
.await?;
|
||||
total += count;
|
||||
if !has_more {
|
||||
break;
|
||||
}
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
Ok(total)
|
||||
}
|
||||
|
||||
async fn reindex_persons(&self) -> Result<u64, DomainError> {
|
||||
let mut count: u64 = 0;
|
||||
let mut offset: u32 = 0;
|
||||
loop {
|
||||
let persons = self
|
||||
.ctx
|
||||
.repos
|
||||
.person_query
|
||||
.list_page(BATCH_SIZE, offset)
|
||||
.await?;
|
||||
|
||||
for person in &persons {
|
||||
if let Err(e) = self
|
||||
.ctx
|
||||
.repos
|
||||
.search_command
|
||||
.index(IndexableDocument::Person {
|
||||
id: person.id().clone(),
|
||||
person: Box::new(person.clone()),
|
||||
})
|
||||
.await
|
||||
{
|
||||
tracing::warn!(person = %person.name(), "reindex person failed: {e}");
|
||||
}
|
||||
count += 1;
|
||||
}
|
||||
|
||||
if (persons.len() as u32) < BATCH_SIZE {
|
||||
break;
|
||||
}
|
||||
offset += BATCH_SIZE;
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
Ok(count)
|
||||
}
|
||||
}
|
||||
|
||||
44
crates/application/src/movies/request_enrichment.rs
Normal file
44
crates/application/src/movies/request_enrichment.rs
Normal file
@@ -0,0 +1,44 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use chrono::Utc;
|
||||
use domain::{
|
||||
errors::DomainError,
|
||||
models::MovieProfile,
|
||||
ports::{MovieEnrichmentClient, MovieProfileRepository},
|
||||
value_objects::MovieId,
|
||||
};
|
||||
|
||||
const STALENESS_DAYS: i64 = 30;
|
||||
|
||||
pub async fn fetch_if_stale(
|
||||
enrichment_client: &dyn MovieEnrichmentClient,
|
||||
profile_repo: &Arc<dyn MovieProfileRepository>,
|
||||
movie_id: MovieId,
|
||||
external_metadata_id: &str,
|
||||
) -> Result<Option<MovieProfile>, DomainError> {
|
||||
if let Ok(Some(existing)) = profile_repo.get_by_movie_id(&movie_id).await {
|
||||
let age = Utc::now() - existing.enriched_at;
|
||||
if age.num_days() < STALENESS_DAYS {
|
||||
tracing::debug!(
|
||||
movie_id = %movie_id.value(),
|
||||
"skipping enrichment — profile is {} days old",
|
||||
age.num_days()
|
||||
);
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!(movie_id = %movie_id.value(), external_id = %external_metadata_id, "enriching movie");
|
||||
|
||||
match enrichment_client
|
||||
.fetch_profile(movie_id, external_metadata_id)
|
||||
.await
|
||||
{
|
||||
Ok(profile) => Ok(Some(profile)),
|
||||
Err(DomainError::NotFound(msg)) => {
|
||||
tracing::warn!("TMDb lookup found nothing: {msg}");
|
||||
Ok(None)
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user