From e8fa24bf9b931b2bc3b9098feb8069a09ee3a7d7 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Thu, 11 Jun 2026 13:46:00 +0200 Subject: [PATCH] refactor: split tmdb-enrichment into client, movie_handler, person_handler --- crates/adapters/tmdb-enrichment/src/client.rs | 212 +++++++++ crates/adapters/tmdb-enrichment/src/lib.rs | 416 +----------------- .../tmdb-enrichment/src/movie_handler.rs | 101 +++++ .../tmdb-enrichment/src/person_handler.rs | 62 +++ 4 files changed, 381 insertions(+), 410 deletions(-) create mode 100644 crates/adapters/tmdb-enrichment/src/client.rs create mode 100644 crates/adapters/tmdb-enrichment/src/movie_handler.rs create mode 100644 crates/adapters/tmdb-enrichment/src/person_handler.rs diff --git a/crates/adapters/tmdb-enrichment/src/client.rs b/crates/adapters/tmdb-enrichment/src/client.rs new file mode 100644 index 0000000..96632f4 --- /dev/null +++ b/crates/adapters/tmdb-enrichment/src/client.rs @@ -0,0 +1,212 @@ +use async_trait::async_trait; +use chrono::Utc; +use domain::{ + errors::DomainError, + models::{CastMember, CrewMember, Genre, Keyword, MovieProfile, PersonEnrichmentData}, + ports::{MovieEnrichmentClient, PersonEnrichmentClient}, + value_objects::MovieId, +}; +use serde::Deserialize; + +pub struct TmdbEnrichmentClient { + api_key: String, + http: reqwest::Client, +} + +impl TmdbEnrichmentClient { + pub fn from_env() -> Result { + let api_key = std::env::var("TMDB_API_KEY") + .map_err(|_| DomainError::InfrastructureError("TMDB_API_KEY is not set".into()))?; + Ok(Self { + api_key, + http: reqwest::Client::new(), + }) + } + + pub(crate) fn base(&self, path: &str) -> String { + format!("https://api.themoviedb.org/3{}", path) + } + + pub(crate) async fn get Deserialize<'de>>( + &self, + url: &str, + extra: &[(&str, &str)], + ) -> Result { + let mut req = self + .http + .get(url) + .query(&[("api_key", self.api_key.as_str())]); + for (k, v) in extra { + req = req.query(&[(k, v)]); + } + req.send() + .await + .map_err(|e| DomainError::InfrastructureError(e.to_string()))? + .error_for_status() + .map_err(|e| DomainError::InfrastructureError(e.to_string()))? + .json::() + .await + .map_err(|e| DomainError::InfrastructureError(e.to_string())) + } + + async fn resolve_tmdb_id(&self, external_id: &str) -> Result { + if let Some(numeric) = external_id.strip_prefix("tmdb:") { + return numeric.parse::().map_err(|_| { + DomainError::InfrastructureError(format!("Invalid tmdb id: {numeric}")) + }); + } + + #[derive(Deserialize)] + struct FindResult { + id: u64, + } + #[derive(Deserialize)] + struct FindResponse { + movie_results: Vec, + } + + let url = self.base(&format!("/find/{}", external_id)); + let resp: FindResponse = self.get(&url, &[("external_source", "imdb_id")]).await?; + resp.movie_results + .into_iter() + .next() + .map(|r| r.id) + .ok_or_else(|| DomainError::NotFound(format!("TMDb: no movie for {external_id}"))) + } +} + +#[async_trait] +impl MovieEnrichmentClient for TmdbEnrichmentClient { + async fn fetch_profile( + &self, + movie_id: MovieId, + external_metadata_id: &str, + ) -> Result { + let tmdb_id = self.resolve_tmdb_id(external_metadata_id).await?; + + #[derive(Deserialize)] + struct GenreDto { + id: u32, + name: String, + } + #[derive(Deserialize)] + struct CollectionDto { + name: String, + } + #[derive(Deserialize)] + struct CastDto { + id: u64, + name: String, + character: String, + order: u32, + profile_path: Option, + } + #[derive(Deserialize)] + struct CrewDto { + id: u64, + name: String, + job: String, + department: String, + profile_path: Option, + } + #[derive(Deserialize)] + struct Credits { + cast: Vec, + crew: Vec, + } + #[derive(Deserialize)] + struct KeywordDto { + id: u32, + name: String, + } + #[derive(Deserialize)] + struct Keywords { + keywords: Vec, + } + #[derive(Deserialize)] + struct Details { + imdb_id: Option, + overview: Option, + tagline: Option, + runtime: Option, + budget: Option, + revenue: Option, + vote_average: Option, + vote_count: Option, + original_language: Option, + genres: Vec, + belongs_to_collection: Option, + credits: Credits, + keywords: Keywords, + } + + let url = self.base(&format!("/movie/{}", tmdb_id)); + let d: Details = self + .get(&url, &[("append_to_response", "credits,keywords")]) + .await?; + + Ok(MovieProfile { + movie_id, + tmdb_id, + imdb_id: d.imdb_id.filter(|s| !s.is_empty()), + overview: d.overview.filter(|s| !s.is_empty()), + tagline: d.tagline.filter(|s| !s.is_empty()), + runtime_minutes: d.runtime, + budget_usd: d.budget.filter(|&v| v > 0), + revenue_usd: d.revenue.filter(|&v| v > 0), + vote_average: d.vote_average, + vote_count: d.vote_count, + original_language: d.original_language, + collection_name: d.belongs_to_collection.map(|c| c.name), + genres: d.genres.into_iter().map(|g| Genre { tmdb_id: g.id, name: g.name }).collect(), + keywords: d.keywords.keywords.into_iter().map(|k| Keyword { tmdb_id: k.id, name: k.name }).collect(), + cast: d.credits.cast.into_iter().map(|c| CastMember { + tmdb_person_id: c.id, name: c.name, character: c.character, + billing_order: c.order, profile_path: c.profile_path, + }).collect(), + crew: d.credits.crew.into_iter().map(|c| CrewMember { + tmdb_person_id: c.id, name: c.name, job: c.job, + department: c.department, profile_path: c.profile_path, + }).collect(), + enriched_at: Utc::now(), + }) + } +} + +#[async_trait] +impl PersonEnrichmentClient for TmdbEnrichmentClient { + async fn fetch_details(&self, external_id: &str) -> Result { + let tmdb_id = external_id + .strip_prefix("tmdb:") + .and_then(|s| s.parse::().ok()) + .ok_or_else(|| { + DomainError::InfrastructureError(format!( + "Cannot parse person external_id: {external_id}" + )) + })?; + + #[derive(Deserialize)] + struct PersonDetails { + biography: Option, + birthday: Option, + deathday: Option, + place_of_birth: Option, + also_known_as: Option>, + homepage: Option, + imdb_id: Option, + } + + let url = self.base(&format!("/person/{tmdb_id}")); + let d: PersonDetails = self.get(&url, &[]).await?; + + Ok(PersonEnrichmentData { + biography: d.biography.filter(|s| !s.is_empty()), + birthday: d.birthday.and_then(|s| chrono::NaiveDate::parse_from_str(&s, "%Y-%m-%d").ok()), + deathday: d.deathday.and_then(|s| chrono::NaiveDate::parse_from_str(&s, "%Y-%m-%d").ok()), + place_of_birth: d.place_of_birth.filter(|s| !s.is_empty()), + also_known_as: d.also_known_as.unwrap_or_default(), + homepage: d.homepage.filter(|s| !s.is_empty()), + imdb_id: d.imdb_id.filter(|s| !s.is_empty()), + }) + } +} diff --git a/crates/adapters/tmdb-enrichment/src/lib.rs b/crates/adapters/tmdb-enrichment/src/lib.rs index c940cf5..f9a0317 100644 --- a/crates/adapters/tmdb-enrichment/src/lib.rs +++ b/crates/adapters/tmdb-enrichment/src/lib.rs @@ -1,411 +1,7 @@ -use std::sync::Arc; +mod client; +mod movie_handler; +mod person_handler; -use application::movies::{commands::EnrichMovieCommand, enrich_movie, request_enrichment}; -use async_trait::async_trait; -use chrono::Utc; -use domain::{ - errors::DomainError, - events::DomainEvent, - models::{CastMember, CrewMember, Genre, Keyword, MovieProfile, PersonEnrichmentData}, - ports::{ - EventHandler, MovieEnrichmentClient, MovieProfileRepository, MovieRepository, - ObjectStorage, PersonCommand, PersonEnrichmentClient, PersonQuery, SearchCommand, - }, - value_objects::MovieId, -}; -use serde::Deserialize; - -// ── TMDb enrichment client ─────────────────────────────────────────────────── - -pub struct TmdbEnrichmentClient { - api_key: String, - http: reqwest::Client, -} - -impl TmdbEnrichmentClient { - pub fn from_env() -> Result { - let api_key = std::env::var("TMDB_API_KEY") - .map_err(|_| DomainError::InfrastructureError("TMDB_API_KEY is not set".into()))?; - Ok(Self { - api_key, - http: reqwest::Client::new(), - }) - } - - fn base(&self, path: &str) -> String { - format!("https://api.themoviedb.org/3{}", path) - } - - async fn get Deserialize<'de>>( - &self, - url: &str, - extra: &[(&str, &str)], - ) -> Result { - let mut req = self - .http - .get(url) - .query(&[("api_key", self.api_key.as_str())]); - for (k, v) in extra { - req = req.query(&[(k, v)]); - } - req.send() - .await - .map_err(|e| DomainError::InfrastructureError(e.to_string()))? - .error_for_status() - .map_err(|e| DomainError::InfrastructureError(e.to_string()))? - .json::() - .await - .map_err(|e| DomainError::InfrastructureError(e.to_string())) - } - - async fn resolve_tmdb_id(&self, external_id: &str) -> Result { - if let Some(numeric) = external_id.strip_prefix("tmdb:") { - return numeric.parse::().map_err(|_| { - DomainError::InfrastructureError(format!("Invalid tmdb id: {numeric}")) - }); - } - - // Assume IMDb ID (tt…) — use /find - #[derive(Deserialize)] - struct FindResult { - id: u64, - } - #[derive(Deserialize)] - struct FindResponse { - movie_results: Vec, - } - - let url = self.base(&format!("/find/{}", external_id)); - let resp: FindResponse = self.get(&url, &[("external_source", "imdb_id")]).await?; - resp.movie_results - .into_iter() - .next() - .map(|r| r.id) - .ok_or_else(|| DomainError::NotFound(format!("TMDb: no movie for {external_id}"))) - } -} - -#[async_trait] -impl MovieEnrichmentClient for TmdbEnrichmentClient { - async fn fetch_profile( - &self, - movie_id: MovieId, - external_metadata_id: &str, - ) -> Result { - let tmdb_id = self.resolve_tmdb_id(external_metadata_id).await?; - - #[derive(Deserialize)] - struct GenreDto { - id: u32, - name: String, - } - - #[derive(Deserialize)] - struct CollectionDto { - name: String, - } - - #[derive(Deserialize)] - struct CastDto { - id: u64, - name: String, - character: String, - order: u32, - profile_path: Option, - } - - #[derive(Deserialize)] - struct CrewDto { - id: u64, - name: String, - job: String, - department: String, - profile_path: Option, - } - - #[derive(Deserialize)] - struct Credits { - cast: Vec, - crew: Vec, - } - - #[derive(Deserialize)] - struct KeywordDto { - id: u32, - name: String, - } - - #[derive(Deserialize)] - struct Keywords { - keywords: Vec, - } - - #[derive(Deserialize)] - struct Details { - imdb_id: Option, - overview: Option, - tagline: Option, - runtime: Option, - budget: Option, - revenue: Option, - vote_average: Option, - vote_count: Option, - original_language: Option, - genres: Vec, - belongs_to_collection: Option, - credits: Credits, - keywords: Keywords, - } - - let url = self.base(&format!("/movie/{}", tmdb_id)); - let d: Details = self - .get(&url, &[("append_to_response", "credits,keywords")]) - .await?; - - Ok(MovieProfile { - movie_id, - tmdb_id, - imdb_id: d.imdb_id.filter(|s| !s.is_empty()), - overview: d.overview.filter(|s| !s.is_empty()), - tagline: d.tagline.filter(|s| !s.is_empty()), - runtime_minutes: d.runtime, - budget_usd: d.budget.filter(|&v| v > 0), - revenue_usd: d.revenue.filter(|&v| v > 0), - vote_average: d.vote_average, - vote_count: d.vote_count, - original_language: d.original_language, - collection_name: d.belongs_to_collection.map(|c| c.name), - genres: d - .genres - .into_iter() - .map(|g| Genre { - tmdb_id: g.id, - name: g.name, - }) - .collect(), - keywords: d - .keywords - .keywords - .into_iter() - .map(|k| Keyword { - tmdb_id: k.id, - name: k.name, - }) - .collect(), - cast: d - .credits - .cast - .into_iter() - .map(|c| CastMember { - tmdb_person_id: c.id, - name: c.name, - character: c.character, - billing_order: c.order, - profile_path: c.profile_path, - }) - .collect(), - crew: d - .credits - .crew - .into_iter() - .map(|c| CrewMember { - tmdb_person_id: c.id, - name: c.name, - job: c.job, - department: c.department, - profile_path: c.profile_path, - }) - .collect(), - enriched_at: Utc::now(), - }) - } -} - -// ── Person enrichment client ──────────────────────────────────────────────── - -#[async_trait] -impl PersonEnrichmentClient for TmdbEnrichmentClient { - async fn fetch_details(&self, external_id: &str) -> Result { - let tmdb_id = external_id - .strip_prefix("tmdb:") - .and_then(|s| s.parse::().ok()) - .ok_or_else(|| { - DomainError::InfrastructureError(format!( - "Cannot parse person external_id: {external_id}" - )) - })?; - - #[derive(Deserialize)] - struct PersonDetails { - biography: Option, - birthday: Option, - deathday: Option, - place_of_birth: Option, - also_known_as: Option>, - homepage: Option, - imdb_id: Option, - } - - let url = self.base(&format!("/person/{tmdb_id}")); - let d: PersonDetails = self.get(&url, &[]).await?; - - Ok(PersonEnrichmentData { - biography: d.biography.filter(|s| !s.is_empty()), - birthday: d - .birthday - .and_then(|s| chrono::NaiveDate::parse_from_str(&s, "%Y-%m-%d").ok()), - deathday: d - .deathday - .and_then(|s| chrono::NaiveDate::parse_from_str(&s, "%Y-%m-%d").ok()), - place_of_birth: d.place_of_birth.filter(|s| !s.is_empty()), - also_known_as: d.also_known_as.unwrap_or_default(), - homepage: d.homepage.filter(|s| !s.is_empty()), - imdb_id: d.imdb_id.filter(|s| !s.is_empty()), - }) - } -} - -// ── Movie enrichment event handler ────────────────────────────────────────── - -pub struct EnrichmentHandler { - pub enrichment_client: Arc, - pub movie_repository: Arc, - pub profile_repo: Arc, - pub person_command: Arc, - pub search_command: Arc, - pub object_storage: Arc, - http: reqwest::Client, -} - -impl EnrichmentHandler { - pub fn new( - enrichment_client: Arc, - movie_repository: Arc, - profile_repo: Arc, - person_command: Arc, - search_command: Arc, - object_storage: Arc, - ) -> Self { - Self { - enrichment_client, - movie_repository, - profile_repo, - person_command, - search_command, - object_storage, - http: reqwest::Client::new(), - } - } - - async fn download_cast_photos(&self, profile: &MovieProfile) { - for member in profile.cast.iter().take(5) { - let Some(ref path) = member.profile_path else { - continue; - }; - let key = format!("cast{path}"); - if self.object_storage.get(&key).await.is_ok() { - continue; - } - let url = format!("https://image.tmdb.org/t/p/w185{path}"); - match self.http.get(&url).send().await { - Ok(resp) if resp.status().is_success() => { - if let Ok(bytes) = resp.bytes().await - && let Err(e) = self.object_storage.store(&key, &bytes).await - { - tracing::debug!("cast photo store failed for {path}: {e}"); - } - } - _ => tracing::debug!("cast photo download failed for {path}"), - } - } - } -} - -#[async_trait] -impl EventHandler for EnrichmentHandler { - async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> { - let (movie_id, external_metadata_id) = match event { - DomainEvent::MovieEnrichmentRequested { - movie_id, - external_metadata_id, - } => (movie_id.clone(), external_metadata_id.clone()), - _ => return Ok(()), - }; - - let Some(profile) = request_enrichment::fetch_if_stale( - self.enrichment_client.as_ref(), - &self.profile_repo, - movie_id.clone(), - &external_metadata_id, - ) - .await? - else { - return Ok(()); - }; - - self.download_cast_photos(&profile).await; - enrich_movie::execute( - &self.movie_repository, - &self.profile_repo, - &self.person_command, - &self.search_command, - EnrichMovieCommand { movie_id, profile }, - ) - .await - } -} - -// ── Person enrichment event handler ───────────────────────────────────────── - -pub struct PersonEnrichmentHandler { - enrichment_client: Arc, - person_query: Arc, - person_command: Arc, -} - -impl PersonEnrichmentHandler { - pub fn new( - enrichment_client: Arc, - person_query: Arc, - person_command: Arc, - ) -> Self { - Self { - enrichment_client, - person_query, - person_command, - } - } -} - -const PERSON_STALENESS_DAYS: i64 = 90; - -#[async_trait] -impl EventHandler for PersonEnrichmentHandler { - async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> { - let (person_id, external_person_id) = match event { - DomainEvent::PersonEnrichmentRequested { - person_id, - external_person_id, - } => (person_id.clone(), external_person_id.clone()), - _ => return Ok(()), - }; - - if let Some(person) = self.person_query.get_by_id(&person_id).await? { - if let Some(at) = person.enriched_at() { - if (Utc::now() - at).num_days() < PERSON_STALENESS_DAYS { - tracing::debug!(person_id = %person_id.value(), "person enrichment still fresh"); - return Ok(()); - } - } - } - - tracing::info!(person_id = %person_id.value(), "enriching person from TMDb"); - let data = self - .enrichment_client - .fetch_details(&external_person_id) - .await?; - self.person_command - .update_enrichment(&person_id, &data) - .await - } -} +pub use client::TmdbEnrichmentClient; +pub use movie_handler::MovieEnrichmentHandler as EnrichmentHandler; +pub use person_handler::PersonEnrichmentHandler; diff --git a/crates/adapters/tmdb-enrichment/src/movie_handler.rs b/crates/adapters/tmdb-enrichment/src/movie_handler.rs new file mode 100644 index 0000000..b90f795 --- /dev/null +++ b/crates/adapters/tmdb-enrichment/src/movie_handler.rs @@ -0,0 +1,101 @@ +use std::sync::Arc; + +use application::movies::{commands::EnrichMovieCommand, enrich_movie, request_enrichment}; +use async_trait::async_trait; +use domain::{ + errors::DomainError, + events::DomainEvent, + models::MovieProfile, + ports::{ + EventHandler, MovieEnrichmentClient, MovieProfileRepository, MovieRepository, + ObjectStorage, PersonCommand, SearchCommand, + }, +}; + +pub struct MovieEnrichmentHandler { + enrichment_client: Arc, + movie_repository: Arc, + profile_repo: Arc, + person_command: Arc, + search_command: Arc, + object_storage: Arc, + http: reqwest::Client, +} + +impl MovieEnrichmentHandler { + pub fn new( + enrichment_client: Arc, + movie_repository: Arc, + profile_repo: Arc, + person_command: Arc, + search_command: Arc, + object_storage: Arc, + ) -> Self { + Self { + enrichment_client, + movie_repository, + profile_repo, + person_command, + search_command, + object_storage, + http: reqwest::Client::new(), + } + } + + async fn download_cast_photos(&self, profile: &MovieProfile) { + for member in profile.cast.iter().take(5) { + let Some(ref path) = member.profile_path else { + continue; + }; + let key = format!("cast{path}"); + if self.object_storage.get(&key).await.is_ok() { + continue; + } + let url = format!("https://image.tmdb.org/t/p/w185{path}"); + match self.http.get(&url).send().await { + Ok(resp) if resp.status().is_success() => { + if let Ok(bytes) = resp.bytes().await + && let Err(e) = self.object_storage.store(&key, &bytes).await + { + tracing::debug!("cast photo store failed for {path}: {e}"); + } + } + _ => tracing::debug!("cast photo download failed for {path}"), + } + } + } +} + +#[async_trait] +impl EventHandler for MovieEnrichmentHandler { + async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> { + let (movie_id, external_metadata_id) = match event { + DomainEvent::MovieEnrichmentRequested { + movie_id, + external_metadata_id, + } => (movie_id.clone(), external_metadata_id.clone()), + _ => return Ok(()), + }; + + let Some(profile) = request_enrichment::fetch_if_stale( + self.enrichment_client.as_ref(), + &self.profile_repo, + movie_id.clone(), + &external_metadata_id, + ) + .await? + else { + return Ok(()); + }; + + self.download_cast_photos(&profile).await; + enrich_movie::execute( + &self.movie_repository, + &self.profile_repo, + &self.person_command, + &self.search_command, + EnrichMovieCommand { movie_id, profile }, + ) + .await + } +} diff --git a/crates/adapters/tmdb-enrichment/src/person_handler.rs b/crates/adapters/tmdb-enrichment/src/person_handler.rs new file mode 100644 index 0000000..07469be --- /dev/null +++ b/crates/adapters/tmdb-enrichment/src/person_handler.rs @@ -0,0 +1,62 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use chrono::Utc; +use domain::{ + errors::DomainError, + events::DomainEvent, + ports::{EventHandler, PersonCommand, PersonEnrichmentClient, PersonQuery}, +}; + +const STALENESS_DAYS: i64 = 90; + +pub struct PersonEnrichmentHandler { + enrichment_client: Arc, + person_query: Arc, + person_command: Arc, +} + +impl PersonEnrichmentHandler { + pub fn new( + enrichment_client: Arc, + person_query: Arc, + person_command: Arc, + ) -> Self { + Self { + enrichment_client, + person_query, + person_command, + } + } +} + +#[async_trait] +impl EventHandler for PersonEnrichmentHandler { + async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> { + let (person_id, external_person_id) = match event { + DomainEvent::PersonEnrichmentRequested { + person_id, + external_person_id, + } => (person_id.clone(), external_person_id.clone()), + _ => return Ok(()), + }; + + if let Some(person) = self.person_query.get_by_id(&person_id).await? { + if let Some(at) = person.enriched_at() { + if (Utc::now() - at).num_days() < STALENESS_DAYS { + tracing::debug!(person_id = %person_id.value(), "person enrichment still fresh"); + return Ok(()); + } + } + } + + tracing::info!(person_id = %person_id.value(), "enriching person from TMDb"); + let data = self + .enrichment_client + .fetch_details(&external_person_id) + .await?; + self.person_command + .update_enrichment(&person_id, &data) + .await + } +}