refactor(movies): EnrichMovieDeps, ReindexSearchDeps, SyncPosterDeps, SearchReindexHandler, EnrichmentStalenessJob

This commit is contained in:
2026-06-11 22:13:25 +02:00
parent 66bd138927
commit 1e62f12903
15 changed files with 224 additions and 125 deletions

View File

@@ -1,17 +1,27 @@
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use domain::{errors::DomainError, events::DomainEvent, ports::PeriodicJob};
use crate::context::AppContext;
use domain::{
errors::DomainError,
events::DomainEvent,
ports::{EventPublisher, MovieProfileRepository, PeriodicJob},
};
pub struct EnrichmentStalenessJob {
ctx: AppContext,
movie_profile: Arc<dyn MovieProfileRepository>,
event_publisher: Arc<dyn EventPublisher>,
}
impl EnrichmentStalenessJob {
pub fn new(ctx: AppContext) -> Self {
Self { ctx }
pub fn new(
movie_profile: Arc<dyn MovieProfileRepository>,
event_publisher: Arc<dyn EventPublisher>,
) -> Self {
Self {
movie_profile,
event_publisher,
}
}
}
@@ -22,7 +32,7 @@ impl PeriodicJob for EnrichmentStalenessJob {
}
async fn run(&self) -> Result<(), DomainError> {
let stale = self.ctx.repos.movie_profile.list_stale().await?;
let stale = self.movie_profile.list_stale().await?;
if stale.is_empty() {
return Ok(());
}
@@ -32,7 +42,7 @@ impl PeriodicJob for EnrichmentStalenessJob {
movie_id,
external_metadata_id,
};
self.ctx.services.event_publisher.publish(&event).await?;
self.event_publisher.publish(&event).await?;
}
Ok(())
}

View File

@@ -0,0 +1,39 @@
use std::sync::Arc;
use domain::ports::{
EventPublisher, MetadataClient, MovieProfileRepository, MovieRepository, ObjectStorage,
PersonCommand, PersonQuery, PosterFetcherClient, SearchCommand,
};
pub struct GetMoviesDeps {
pub movie: Arc<dyn MovieRepository>,
}
pub struct GetMovieProfileDeps {
pub movie_profile: Arc<dyn MovieProfileRepository>,
}
pub struct SyncPosterDeps {
pub movie: Arc<dyn MovieRepository>,
pub movie_profile: Arc<dyn MovieProfileRepository>,
pub metadata: Arc<dyn MetadataClient>,
pub poster_fetcher: Arc<dyn PosterFetcherClient>,
pub object_storage: Arc<dyn ObjectStorage>,
pub event_publisher: Arc<dyn EventPublisher>,
pub search_command: Arc<dyn SearchCommand>,
}
pub struct EnrichMovieDeps {
pub movie: Arc<dyn MovieRepository>,
pub movie_profile: Arc<dyn MovieProfileRepository>,
pub person_command: Arc<dyn PersonCommand>,
pub search_command: Arc<dyn SearchCommand>,
}
pub struct ReindexSearchDeps {
pub movie: Arc<dyn MovieRepository>,
pub movie_profile: Arc<dyn MovieProfileRepository>,
pub search_command: Arc<dyn SearchCommand>,
pub person_command: Arc<dyn PersonCommand>,
pub person_query: Arc<dyn PersonQuery>,
}

View File

@@ -1,38 +1,30 @@
use std::collections::HashMap;
use std::sync::Arc;
use domain::{
errors::DomainError,
models::{CastMember, CrewMember, ExternalPersonId, IndexableDocument, Person, PersonId},
ports::{MovieProfileRepository, MovieRepository, PersonCommand, SearchCommand},
};
use crate::movies::commands::EnrichMovieCommand;
use crate::movies::{commands::EnrichMovieCommand, deps::EnrichMovieDeps};
pub async fn execute(
movie_repository: &Arc<dyn MovieRepository>,
profile_repository: &Arc<dyn MovieProfileRepository>,
person_command: &Arc<dyn PersonCommand>,
search_command: &Arc<dyn SearchCommand>,
cmd: EnrichMovieCommand,
) -> Result<(), DomainError> {
pub async fn execute(deps: &EnrichMovieDeps, cmd: EnrichMovieCommand) -> Result<(), DomainError> {
// 1. Persist the enriched profile (also handles movie_cast, movie_crew, genres, keywords)
profile_repository.upsert(&cmd.profile).await?;
deps.movie_profile.upsert(&cmd.profile).await?;
// 2. Upsert persons extracted from cast + crew (no reads — only upsert)
let persons = extract_persons(&cmd.profile.cast, &cmd.profile.crew);
if !persons.is_empty() {
person_command.upsert_batch(&persons).await?;
deps.person_command.upsert_batch(&persons).await?;
}
// 3. Fetch the movie for the search index document
let Some(movie) = movie_repository.get_movie_by_id(&cmd.movie_id).await? else {
let Some(movie) = deps.movie.get_movie_by_id(&cmd.movie_id).await? else {
tracing::warn!(movie_id = %cmd.movie_id.value(), "enrich_movie: movie not found after profile upsert");
return Ok(());
};
// 4. Index the movie in search
search_command
deps.search_command
.index(IndexableDocument::Movie {
id: cmd.movie_id.clone(),
movie: Box::new(movie),
@@ -42,7 +34,7 @@ pub async fn execute(
// 5. Index each unique person in search (no reads — persons built from in-memory data)
for person in &persons {
search_command
deps.search_command
.index(IndexableDocument::Person {
id: person.id().clone(),
person: Box::new(person.clone()),

View File

@@ -5,7 +5,7 @@ use domain::{
};
use uuid::Uuid;
use crate::context::AppContext;
use crate::movies::deps::GetMovieProfileDeps;
pub struct GetMovieProfileQuery {
pub movie_id: Uuid,
@@ -60,11 +60,11 @@ fn resolve_crew(member: &CrewMember) -> CrewMemberWithId {
}
pub async fn execute(
ctx: &AppContext,
deps: &GetMovieProfileDeps,
query: GetMovieProfileQuery,
) -> Result<Option<MovieProfileResult>, DomainError> {
let movie_id = MovieId::from_uuid(query.movie_id);
let profile = ctx.repos.movie_profile.get_by_movie_id(&movie_id).await?;
let profile = deps.movie_profile.get_by_movie_id(&movie_id).await?;
Ok(profile.map(|p| {
let cast = p.cast.iter().map(resolve_cast).collect();

View File

@@ -4,10 +4,10 @@ use domain::{
models::{MovieFilter, MovieSummary},
};
use crate::{context::AppContext, movies::queries::GetMoviesQuery};
use crate::movies::{deps::GetMoviesDeps, queries::GetMoviesQuery};
pub async fn execute(
ctx: &AppContext,
deps: &GetMoviesDeps,
query: GetMoviesQuery,
) -> Result<Paginated<MovieSummary>, DomainError> {
let page = PageParams::new(query.limit, query.offset)?;
@@ -16,7 +16,7 @@ pub async fn execute(
genre: query.genre,
language: query.language,
};
ctx.repos.movie.list_movies(&page, &filter).await
deps.movie.list_movies(&page, &filter).await
}
#[cfg(test)]

View File

@@ -1,4 +1,5 @@
pub mod commands;
pub mod deps;
pub mod discovery_indexer;
pub mod enrich_movie;
pub mod get_movie_profile;

View File

@@ -7,7 +7,7 @@ use domain::{
};
use std::sync::atomic::{AtomicBool, Ordering};
use crate::context::AppContext;
use crate::movies::deps::ReindexSearchDeps;
const BATCH_SIZE: u32 = 500;
@@ -17,10 +17,10 @@ pub struct ReindexResult {
pub persons_backfilled: u64,
}
pub async fn execute(ctx: &AppContext) -> Result<ReindexResult, DomainError> {
let movies_indexed = reindex_movies(ctx).await?;
let persons_backfilled = backfill_persons(ctx).await?;
let persons_indexed = reindex_persons(ctx).await?;
pub async fn execute(deps: &ReindexSearchDeps) -> Result<ReindexResult, DomainError> {
let movies_indexed = reindex_movies(deps).await?;
let persons_backfilled = backfill_persons(deps).await?;
let persons_indexed = reindex_persons(deps).await?;
Ok(ReindexResult {
movies_indexed,
@@ -29,12 +29,11 @@ pub async fn execute(ctx: &AppContext) -> Result<ReindexResult, DomainError> {
})
}
async fn reindex_movies(ctx: &AppContext) -> Result<u64, DomainError> {
async fn reindex_movies(deps: &ReindexSearchDeps) -> Result<u64, DomainError> {
let mut count: u64 = 0;
let mut offset: u32 = 0;
loop {
let page = ctx
.repos
let page = deps
.movie
.list_movies(
&PageParams {
@@ -47,10 +46,9 @@ async fn reindex_movies(ctx: &AppContext) -> Result<u64, DomainError> {
for summary in &page.items {
let movie_id = summary.movie.id().clone();
let profile = ctx.repos.movie_profile.get_by_movie_id(&movie_id).await?;
let profile = deps.movie_profile.get_by_movie_id(&movie_id).await?;
if let Err(e) = ctx
.repos
if let Err(e) = deps
.search_command
.index(IndexableDocument::Movie {
id: movie_id.clone(),
@@ -73,11 +71,10 @@ async fn reindex_movies(ctx: &AppContext) -> Result<u64, DomainError> {
Ok(count)
}
async fn backfill_persons(ctx: &AppContext) -> Result<u64, DomainError> {
async fn backfill_persons(deps: &ReindexSearchDeps) -> Result<u64, DomainError> {
let mut total = 0u64;
loop {
let (count, has_more) = ctx
.repos
let (count, has_more) = deps
.person_command
.backfill_from_credits_batch(BATCH_SIZE)
.await?;
@@ -90,15 +87,14 @@ async fn backfill_persons(ctx: &AppContext) -> Result<u64, DomainError> {
Ok(total)
}
async fn reindex_persons(ctx: &AppContext) -> Result<u64, DomainError> {
async fn reindex_persons(deps: &ReindexSearchDeps) -> Result<u64, DomainError> {
let mut count: u64 = 0;
let mut offset: u32 = 0;
loop {
let persons = ctx.repos.person_query.list_page(BATCH_SIZE, offset).await?;
let persons = deps.person_query.list_page(BATCH_SIZE, offset).await?;
for person in &persons {
if let Err(e) = ctx
.repos
if let Err(e) = deps
.search_command
.index(IndexableDocument::Person {
id: person.id().clone(),
@@ -121,14 +117,14 @@ async fn reindex_persons(ctx: &AppContext) -> Result<u64, DomainError> {
}
pub struct SearchReindexHandler {
ctx: AppContext,
deps: ReindexSearchDeps,
running: AtomicBool,
}
impl SearchReindexHandler {
pub fn new(ctx: AppContext) -> Self {
pub fn new(deps: ReindexSearchDeps) -> Self {
Self {
ctx,
deps,
running: AtomicBool::new(false),
}
}
@@ -147,7 +143,7 @@ impl EventHandler for SearchReindexHandler {
}
tracing::info!("search reindex started");
let result = execute(&self.ctx).await;
let result = execute(&self.deps).await;
self.running.store(false, Ordering::SeqCst);
let r = result?;

View File

@@ -5,12 +5,12 @@ use domain::{
value_objects::{MovieId, PosterPath},
};
use crate::{context::AppContext, diary::commands::SyncPosterCommand};
use crate::{diary::commands::SyncPosterCommand, movies::deps::SyncPosterDeps};
pub async fn execute(ctx: &AppContext, cmd: SyncPosterCommand) -> Result<(), DomainError> {
pub async fn execute(deps: &SyncPosterDeps, cmd: SyncPosterCommand) -> Result<(), DomainError> {
let movie_id = MovieId::from_uuid(cmd.movie_id);
let mut movie = match ctx.repos.movie.get_movie_by_id(&movie_id).await? {
let mut movie = match deps.movie.get_movie_by_id(&movie_id).await? {
Some(m) => m,
None => {
tracing::warn!(
@@ -30,8 +30,7 @@ pub async fn execute(ctx: &AppContext, cmd: SyncPosterCommand) -> Result<(), Dom
})?
.clone();
let poster_url = match ctx
.services
let poster_url = match deps
.metadata
.get_poster_url(&external_metadata_id)
.await
@@ -44,20 +43,17 @@ pub async fn execute(ctx: &AppContext, cmd: SyncPosterCommand) -> Result<(), Dom
}
};
let image_bytes = ctx
.services
let image_bytes = deps
.poster_fetcher
.fetch_poster_bytes(&poster_url)
.await?;
let stored_path = ctx
.services
let stored_path = deps
.object_storage
.store(&movie_id.value().to_string(), &image_bytes)
.await?;
if let Err(e) = ctx
.services
if let Err(e) = deps
.event_publisher
.publish(&DomainEvent::ImageStored {
key: stored_path.clone(),
@@ -70,19 +66,17 @@ pub async fn execute(ctx: &AppContext, cmd: SyncPosterCommand) -> Result<(), Dom
let poster_path = PosterPath::new(stored_path)?;
movie.update_poster(poster_path);
ctx.repos.movie.upsert_movie(&movie).await?;
deps.movie.upsert_movie(&movie).await?;
// Refresh search index so the new poster_path is reflected immediately.
// Fetch existing profile if available for a complete index document.
let profile = ctx
.repos
let profile = deps
.movie_profile
.get_by_movie_id(&movie_id)
.await
.ok()
.flatten();
if let Err(e) = ctx
.repos
if let Err(e) = deps
.search_command
.index(IndexableDocument::Movie {
id: movie_id.clone(),

View File

@@ -11,15 +11,13 @@ use domain::{
value_objects::{MovieId, MovieTitle, ReleaseYear},
};
use crate::movies::{commands::EnrichMovieCommand, enrich_movie};
use crate::movies::{commands::EnrichMovieCommand, deps::EnrichMovieDeps, enrich_movie};
#[tokio::test]
async fn stores_profile_and_indexes() {
let movie_repo = InMemoryMovieRepository::new();
let profile_repo = InMemoryMovieProfileRepository::new();
let search_cmd: Arc<dyn domain::ports::SearchCommand> = Arc::new(FakeSearchCommand);
// PanicPersonCommand is safe here — empty cast/crew means upsert_batch is never called
let person_cmd: Arc<dyn domain::ports::PersonCommand> = Arc::new(PanicPersonCommand);
let movie = Movie::new(
None,
@@ -51,11 +49,15 @@ async fn stores_profile_and_indexes() {
enriched_at: Utc::now(),
};
let deps = EnrichMovieDeps {
movie: movie_repo as Arc<_>,
movie_profile: Arc::clone(&profile_repo) as Arc<_>,
person_command: Arc::new(PanicPersonCommand),
search_command: Arc::new(FakeSearchCommand),
};
enrich_movie::execute(
&(movie_repo as Arc<_>),
&(profile_repo.clone() as Arc<_>),
&person_cmd,
&search_cmd,
&deps,
EnrichMovieCommand {
movie_id: movie_id.clone(),
profile,
@@ -96,8 +98,6 @@ impl domain::ports::PersonCommand for NoopPersonCommand {
async fn extracts_and_indexes_persons() {
let movie_repo = InMemoryMovieRepository::new();
let profile_repo = InMemoryMovieProfileRepository::new();
let search_cmd: Arc<dyn domain::ports::SearchCommand> = Arc::new(FakeSearchCommand);
let person_cmd: Arc<dyn domain::ports::PersonCommand> = Arc::new(NoopPersonCommand);
let movie = Movie::new(
None,
@@ -141,11 +141,15 @@ async fn extracts_and_indexes_persons() {
enriched_at: Utc::now(),
};
let deps = EnrichMovieDeps {
movie: movie_repo as Arc<_>,
movie_profile: Arc::clone(&profile_repo) as Arc<_>,
person_command: Arc::new(NoopPersonCommand),
search_command: Arc::new(FakeSearchCommand),
};
enrich_movie::execute(
&(movie_repo as Arc<_>),
&(profile_repo.clone() as Arc<_>),
&person_cmd,
&search_cmd,
&deps,
EnrichMovieCommand {
movie_id: movie_id.clone(),
profile,

View File

@@ -10,17 +10,19 @@ use domain::{
value_objects::MovieId,
};
use crate::{
movies::get_movie_profile::{self, GetMovieProfileQuery},
test_helpers::TestContextBuilder,
use crate::movies::{
deps::GetMovieProfileDeps,
get_movie_profile::{self, GetMovieProfileQuery},
};
#[tokio::test]
async fn returns_none_when_no_profile() {
let ctx = TestContextBuilder::new().build();
let deps = GetMovieProfileDeps {
movie_profile: InMemoryMovieProfileRepository::new(),
};
let result = get_movie_profile::execute(
&ctx,
&deps,
GetMovieProfileQuery {
movie_id: Uuid::new_v4(),
},
@@ -69,12 +71,12 @@ async fn returns_profile_with_cast_and_crew() {
};
profile_repo.upsert(&profile).await.unwrap();
let ctx = TestContextBuilder::new()
.with_movie_profiles(Arc::clone(&profile_repo) as _)
.build();
let deps = GetMovieProfileDeps {
movie_profile: Arc::clone(&profile_repo) as _,
};
let result = get_movie_profile::execute(
&ctx,
&deps,
GetMovieProfileQuery {
movie_id: movie_id.value(),
},

View File

@@ -1,14 +1,15 @@
use crate::{
movies::{get_movies, queries::GetMoviesQuery},
test_helpers::TestContextBuilder,
};
use domain::testing::InMemoryMovieRepository;
use crate::movies::{deps::GetMoviesDeps, get_movies, queries::GetMoviesQuery};
#[tokio::test]
async fn returns_empty_when_no_movies() {
let ctx = TestContextBuilder::new().build();
let deps = GetMoviesDeps {
movie: InMemoryMovieRepository::new(),
};
let result = get_movies::execute(
&ctx,
&deps,
GetMoviesQuery {
limit: None,
offset: None,

View File

@@ -6,20 +6,33 @@ use domain::{
errors::DomainError,
models::Movie,
ports::{MetadataClient, MovieRepository},
testing::InMemoryMovieRepository,
testing::{InMemoryMovieProfileRepository, InMemoryMovieRepository, NoopEventPublisher, NoopObjectStorage, FakeSearchCommand},
value_objects::{ExternalMetadataId, MovieTitle, PosterUrl, ReleaseYear},
};
use crate::{
diary::commands::SyncPosterCommand, movies::sync_poster, test_helpers::TestContextBuilder,
diary::commands::SyncPosterCommand,
movies::{deps::SyncPosterDeps, sync_poster},
};
fn default_deps() -> SyncPosterDeps {
SyncPosterDeps {
movie: InMemoryMovieRepository::new(),
movie_profile: InMemoryMovieProfileRepository::new(),
metadata: Arc::new(domain::testing::FakeMetadataClient),
poster_fetcher: Arc::new(domain::testing::FakePosterFetcher),
object_storage: Arc::new(NoopObjectStorage),
event_publisher: NoopEventPublisher::new(),
search_command: Arc::new(FakeSearchCommand),
}
}
#[tokio::test]
async fn fails_when_movie_not_found() {
let ctx = TestContextBuilder::new().build();
let deps = default_deps();
let result = sync_poster::execute(
&ctx,
&deps,
SyncPosterCommand {
movie_id: Uuid::new_v4(),
},
@@ -42,11 +55,12 @@ async fn fails_when_no_external_id() {
let movie_id = movie.id().value();
movies.upsert_movie(&movie).await.unwrap();
let ctx = TestContextBuilder::new()
.with_movies(Arc::clone(&movies) as _)
.build();
let deps = SyncPosterDeps {
movie: Arc::clone(&movies) as _,
..default_deps()
};
let result = sync_poster::execute(&ctx, SyncPosterCommand { movie_id }).await;
let result = sync_poster::execute(&deps, SyncPosterCommand { movie_id }).await;
assert!(result.is_err());
}
@@ -85,12 +99,13 @@ async fn syncs_poster_for_movie_with_external_id() {
let movie_id = movie.id().value();
movies.upsert_movie(&movie).await.unwrap();
let ctx = TestContextBuilder::new()
.with_movies(Arc::clone(&movies) as _)
.with_metadata_client(Arc::new(FakeMetaWithPoster) as _)
.build();
let deps = SyncPosterDeps {
movie: Arc::clone(&movies) as _,
metadata: Arc::new(FakeMetaWithPoster) as _,
..default_deps()
};
sync_poster::execute(&ctx, SyncPosterCommand { movie_id })
sync_poster::execute(&deps, SyncPosterCommand { movie_id })
.await
.unwrap();