From 38d13fbff10220cb1e259dce1b5e8bb19d2a3d45 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Tue, 12 May 2026 13:23:41 +0200 Subject: [PATCH] feat: implement TMDb enrichment for movie profiles - Add SqliteMovieProfileRepository for managing movie profiles in SQLite. - Create TmdbEnrichmentClient to fetch movie details from TMDb API. - Implement enrichment event handling with EnrichmentHandler. - Introduce periodic jobs for cleaning up expired import sessions and checking for stale movie profiles. - Update application context to include movie profile repository. - Add API endpoint to retrieve movie profiles. - Extend domain models with new structures for movie enrichment (Genre, Keyword, CastMember, CrewMember, MovieProfile). - Modify event system to include MovieEnrichmentRequested event. - Enhance tests to cover new functionality and ensure stability. --- Cargo.lock | 15 ++ Cargo.toml | 2 + Dockerfile | 1 + README.md | 5 + crates/adapters/event-payload/src/lib.rs | 17 ++ crates/adapters/nats/src/subject.rs | 5 +- .../migrations/0014_movie_profiles.sql | 54 ++++ crates/adapters/postgres/src/lib.rs | 5 + crates/adapters/postgres/src/profile.rs | 236 +++++++++++++++++ .../sqlite/migrations/0014_movie_profiles.sql | 54 ++++ crates/adapters/sqlite/src/lib.rs | 5 + crates/adapters/sqlite/src/profile.rs | 240 ++++++++++++++++++ crates/adapters/tmdb-enrichment/Cargo.toml | 14 + crates/adapters/tmdb-enrichment/src/lib.rs | 211 +++++++++++++++ crates/api-types/src/movies.rs | 52 ++++ crates/application/src/context.rs | 3 +- crates/application/src/jobs.rs | 60 +++++ crates/application/src/lib.rs | 1 + .../application/src/use_cases/log_review.rs | 8 + crates/application/src/worker.rs | 1 + crates/domain/src/events.rs | 4 + crates/domain/src/models/mod.rs | 55 +++- crates/domain/src/ports.rs | 29 ++- crates/presentation/src/extractors.rs | 7 + crates/presentation/src/handlers/api.rs | 53 +++- crates/presentation/src/main.rs | 12 +- crates/presentation/src/routes.rs | 4 + crates/presentation/tests/api_test.rs | 9 + crates/worker/Cargo.toml | 1 + crates/worker/src/main.rs | 60 +++-- 30 files changed, 1193 insertions(+), 30 deletions(-) create mode 100644 crates/adapters/postgres/migrations/0014_movie_profiles.sql create mode 100644 crates/adapters/postgres/src/profile.rs create mode 100644 crates/adapters/sqlite/migrations/0014_movie_profiles.sql create mode 100644 crates/adapters/sqlite/src/profile.rs create mode 100644 crates/adapters/tmdb-enrichment/Cargo.toml create mode 100644 crates/adapters/tmdb-enrichment/src/lib.rs create mode 100644 crates/application/src/jobs.rs diff --git a/Cargo.lock b/Cargo.lock index fca9271..25d0737 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5152,6 +5152,20 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" +[[package]] +name = "tmdb-enrichment" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "chrono", + "domain", + "reqwest 0.13.3", + "serde", + "serde_json", + "tracing", +] + [[package]] name = "tokio" version = "1.52.1" @@ -6331,6 +6345,7 @@ dependencies = [ "sqlite-event-queue", "sqlite-federation", "sqlx", + "tmdb-enrichment", "tokio", "tracing", "tracing-subscriber", diff --git a/Cargo.toml b/Cargo.toml index 6b6593b..1517ca1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ members = [ "crates/adapters/nats", "crates/api-types", "crates/application", + "crates/adapters/tmdb-enrichment", "crates/domain", "crates/presentation", "crates/tui", @@ -55,6 +56,7 @@ csv = "1" api-types = { path = "crates/api-types" } domain = { path = "crates/domain" } +tmdb-enrichment = { path = "crates/adapters/tmdb-enrichment" } application = { path = "crates/application" } presentation = { path = "crates/presentation" } auth = { path = "crates/adapters/auth" } diff --git a/Dockerfile b/Dockerfile index 0f8ab41..87d1aa3 100644 --- a/Dockerfile +++ b/Dockerfile @@ -29,6 +29,7 @@ COPY crates/adapters/postgres-event-queue/Cargo.toml crates/adapters/postgres-ev COPY crates/adapters/template-askama/Cargo.toml crates/adapters/template-askama/Cargo.toml COPY crates/api-types/Cargo.toml crates/api-types/Cargo.toml COPY crates/application/Cargo.toml crates/application/Cargo.toml +COPY crates/adapters/tmdb-enrichment/Cargo.toml crates/adapters/tmdb-enrichment/Cargo.toml COPY crates/domain/Cargo.toml crates/domain/Cargo.toml COPY crates/presentation/Cargo.toml crates/presentation/Cargo.toml COPY crates/tui/Cargo.toml crates/tui/Cargo.toml diff --git a/README.md b/README.md index 914cdde..31255e0 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,7 @@ A self-hosted, server-side rendered movie logging system with a full REST API. B - Log movies with a TMDB/OMDb ID or manual title/year/director, with a 0–5 rating - Immutable append-only viewing ledger (tracks re-watches) - Background poster fetching and storage (local filesystem or S3-compatible) +- Movie enrichment via TMDb — full cast, crew, genres, keywords, runtime, budget/revenue, ratings; fetched automatically on movie discovery and refreshed every 30 days; exposed via `GET /api/v1/movies/{id}/profile` - RSS/Atom feed for public subscription (global and per-user) - JWT authentication via cookie (HTML) or Bearer token (REST API) - ActivityPub federation — follow/unfollow remote users, accept/reject/remove followers, federated reviews broadcast as `Note` objects with `#MoviesDiary` + `#MovieTitle` hashtags, paginated outbox, boost/Announce tracking, NodeInfo discovery endpoint, shared inbox delivery, actor profile sync (bio, avatar, discoverable) @@ -37,6 +38,7 @@ adapters/ poster-fetcher — downloads poster images image-storage — stores images (posters + user avatars) on local filesystem or S3-compatible storage poster-sync — event handler: triggers poster fetch+store on MovieDiscovered + tmdb-enrichment — event handler: fetches full movie profile (cast, crew, genres, keywords, box office) from TMDb on MovieEnrichmentRequested; resolves IMDb IDs automatically template-askama — Askama HTML rendering rss — RSS/Atom feed generation export — CSV and JSON diary serialization @@ -74,6 +76,9 @@ JWT_SECRET=change-me # OMDb metadata OMDB_API_KEY=your-key +# TMDb metadata + enrichment (optional — enables full cast/crew/genre data) +# TMDB_API_KEY=your-key + # Public base URL (used for ActivityPub actor URLs and canonical links) BASE_URL=https://yourdomain.example.com diff --git a/crates/adapters/event-payload/src/lib.rs b/crates/adapters/event-payload/src/lib.rs index fa10434..7db1e5f 100644 --- a/crates/adapters/event-payload/src/lib.rs +++ b/crates/adapters/event-payload/src/lib.rs @@ -39,6 +39,10 @@ pub enum EventPayload { review_id: String, user_id: String, }, + MovieEnrichmentRequested { + movie_id: String, + external_metadata_id: String, + }, } impl EventPayload { @@ -50,6 +54,7 @@ impl EventPayload { EventPayload::MovieDeleted { .. } => "MovieDeleted", EventPayload::UserUpdated { .. } => "UserUpdated", EventPayload::ReviewDeleted { .. } => "ReviewDeleted", + EventPayload::MovieEnrichmentRequested { .. } => "MovieEnrichmentRequested", } } } @@ -103,6 +108,12 @@ impl From<&DomainEvent> for EventPayload { review_id: review_id.value().to_string(), user_id: user_id.value().to_string(), }, + DomainEvent::MovieEnrichmentRequested { movie_id, external_metadata_id } => { + EventPayload::MovieEnrichmentRequested { + movie_id: movie_id.value().to_string(), + external_metadata_id: external_metadata_id.clone(), + } + } } } } @@ -154,6 +165,12 @@ impl TryFrom for DomainEvent { user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?), }) } + EventPayload::MovieEnrichmentRequested { movie_id, external_metadata_id } => { + Ok(DomainEvent::MovieEnrichmentRequested { + movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?), + external_metadata_id, + }) + } } } } diff --git a/crates/adapters/nats/src/subject.rs b/crates/adapters/nats/src/subject.rs index dd9d2bb..c991607 100644 --- a/crates/adapters/nats/src/subject.rs +++ b/crates/adapters/nats/src/subject.rs @@ -6,8 +6,9 @@ pub fn event_to_subject(prefix: &str, event: &DomainEvent) -> String { DomainEvent::ReviewUpdated { .. } => "review.updated", DomainEvent::ReviewDeleted { .. } => "review.deleted", DomainEvent::MovieDiscovered { .. } => "movie.discovered", - DomainEvent::MovieDeleted { .. } => "movie.deleted", - DomainEvent::UserUpdated { .. } => "user.updated", + DomainEvent::MovieDeleted { .. } => "movie.deleted", + DomainEvent::UserUpdated { .. } => "user.updated", + DomainEvent::MovieEnrichmentRequested { .. } => "movie.enrichment.requested", }; format!("{prefix}.{suffix}") } diff --git a/crates/adapters/postgres/migrations/0014_movie_profiles.sql b/crates/adapters/postgres/migrations/0014_movie_profiles.sql new file mode 100644 index 0000000..c3bdfc8 --- /dev/null +++ b/crates/adapters/postgres/migrations/0014_movie_profiles.sql @@ -0,0 +1,54 @@ +CREATE TABLE IF NOT EXISTS movie_profiles ( + movie_id TEXT PRIMARY KEY NOT NULL REFERENCES movies(id) ON DELETE CASCADE, + tmdb_id BIGINT NOT NULL, + imdb_id TEXT, + overview TEXT, + tagline TEXT, + runtime_minutes INTEGER, + budget_usd BIGINT, + revenue_usd BIGINT, + vote_average DOUBLE PRECISION, + vote_count INTEGER, + original_language TEXT, + collection_name TEXT, + enriched_at TIMESTAMPTZ NOT NULL +); + +CREATE TABLE IF NOT EXISTS movie_genres ( + movie_id TEXT NOT NULL REFERENCES movies(id) ON DELETE CASCADE, + tmdb_id INTEGER NOT NULL, + name TEXT NOT NULL, + PRIMARY KEY (movie_id, tmdb_id) +); + +CREATE TABLE IF NOT EXISTS movie_keywords ( + movie_id TEXT NOT NULL REFERENCES movies(id) ON DELETE CASCADE, + tmdb_id INTEGER NOT NULL, + name TEXT NOT NULL, + PRIMARY KEY (movie_id, tmdb_id) +); + +CREATE TABLE IF NOT EXISTS movie_cast ( + movie_id TEXT NOT NULL REFERENCES movies(id) ON DELETE CASCADE, + tmdb_person_id BIGINT NOT NULL, + name TEXT NOT NULL, + character TEXT NOT NULL, + billing_order INTEGER NOT NULL, + profile_path TEXT, + PRIMARY KEY (movie_id, tmdb_person_id) +); + +CREATE TABLE IF NOT EXISTS movie_crew ( + movie_id TEXT NOT NULL REFERENCES movies(id) ON DELETE CASCADE, + tmdb_person_id BIGINT NOT NULL, + name TEXT NOT NULL, + job TEXT NOT NULL, + department TEXT NOT NULL, + profile_path TEXT, + PRIMARY KEY (movie_id, tmdb_person_id, job) +); + +CREATE INDEX IF NOT EXISTS idx_movie_cast_person ON movie_cast (tmdb_person_id); +CREATE INDEX IF NOT EXISTS idx_movie_crew_person ON movie_crew (tmdb_person_id); +CREATE INDEX IF NOT EXISTS idx_movie_genres_name ON movie_genres (name); +CREATE INDEX IF NOT EXISTS idx_movie_keywords_name ON movie_keywords (name); diff --git a/crates/adapters/postgres/src/lib.rs b/crates/adapters/postgres/src/lib.rs index c92355f..94c0ec7 100644 --- a/crates/adapters/postgres/src/lib.rs +++ b/crates/adapters/postgres/src/lib.rs @@ -15,6 +15,7 @@ use sqlx::PgPool; mod import_profile; mod import_session; mod models; +mod profile; mod users; use models::{ @@ -24,6 +25,7 @@ use models::{ pub use import_profile::PostgresImportProfileRepository; pub use import_session::PostgresImportSessionRepository; +pub use profile::PostgresMovieProfileRepository; pub use users::PostgresUserRepository; fn format_year_month(ym: &str) -> String { @@ -865,6 +867,7 @@ pub async fn wire(database_url: &str) -> anyhow::Result<( std::sync::Arc, std::sync::Arc, std::sync::Arc, + std::sync::Arc, )> { use anyhow::Context; @@ -880,6 +883,7 @@ pub async fn wire(database_url: &str) -> anyhow::Result<( let import_session_repo = std::sync::Arc::new(PostgresImportSessionRepository::new(pool.clone())); let import_profile_repo = std::sync::Arc::new(PostgresImportProfileRepository::new(pool.clone())); + let movie_profile_repo = std::sync::Arc::new(PostgresMovieProfileRepository::new(pool.clone())); Ok(( pool.clone(), @@ -890,5 +894,6 @@ pub async fn wire(database_url: &str) -> anyhow::Result<( std::sync::Arc::new(PostgresUserRepository::new(pool)) as _, import_session_repo as _, import_profile_repo as _, + movie_profile_repo as _, )) } diff --git a/crates/adapters/postgres/src/profile.rs b/crates/adapters/postgres/src/profile.rs new file mode 100644 index 0000000..45ec0cd --- /dev/null +++ b/crates/adapters/postgres/src/profile.rs @@ -0,0 +1,236 @@ +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use domain::{ + errors::DomainError, + models::{CastMember, CrewMember, Genre, Keyword, MovieProfile}, + ports::MovieProfileRepository, + value_objects::MovieId, +}; +use sqlx::{PgPool, Row}; + +pub struct PostgresMovieProfileRepository { + pool: PgPool, +} + +impl PostgresMovieProfileRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } + + fn map_err(e: sqlx::Error) -> DomainError { + tracing::error!("Database error: {:?}", e); + DomainError::InfrastructureError("Database operation failed".into()) + } +} + +#[async_trait] +impl MovieProfileRepository for PostgresMovieProfileRepository { + async fn upsert(&self, p: &MovieProfile) -> Result<(), DomainError> { + let movie_id = p.movie_id.value().to_string(); + + let mut tx = self.pool.begin().await.map_err(Self::map_err)?; + + sqlx::query( + r#"INSERT INTO movie_profiles + (movie_id, tmdb_id, imdb_id, overview, tagline, runtime_minutes, + budget_usd, revenue_usd, vote_average, vote_count, + original_language, collection_name, enriched_at) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13) + ON CONFLICT(movie_id) DO UPDATE SET + tmdb_id=EXCLUDED.tmdb_id, imdb_id=EXCLUDED.imdb_id, + overview=EXCLUDED.overview, tagline=EXCLUDED.tagline, + runtime_minutes=EXCLUDED.runtime_minutes, + budget_usd=EXCLUDED.budget_usd, revenue_usd=EXCLUDED.revenue_usd, + vote_average=EXCLUDED.vote_average, vote_count=EXCLUDED.vote_count, + original_language=EXCLUDED.original_language, + collection_name=EXCLUDED.collection_name, + enriched_at=EXCLUDED.enriched_at"#, + ) + .bind(&movie_id) + .bind(p.tmdb_id as i64) + .bind(&p.imdb_id) + .bind(&p.overview) + .bind(&p.tagline) + .bind(p.runtime_minutes.map(|v| v as i32)) + .bind(p.budget_usd) + .bind(p.revenue_usd) + .bind(p.vote_average) + .bind(p.vote_count.map(|v| v as i32)) + .bind(&p.original_language) + .bind(&p.collection_name) + .bind(p.enriched_at) + .execute(&mut *tx) + .await + .map_err(Self::map_err)?; + + sqlx::query("DELETE FROM movie_genres WHERE movie_id = $1") + .bind(&movie_id) + .execute(&mut *tx).await.map_err(Self::map_err)?; + for g in &p.genres { + sqlx::query("INSERT INTO movie_genres (movie_id, tmdb_id, name) VALUES ($1,$2,$3) ON CONFLICT DO NOTHING") + .bind(&movie_id).bind(g.tmdb_id as i32).bind(&g.name) + .execute(&mut *tx).await.map_err(Self::map_err)?; + } + + sqlx::query("DELETE FROM movie_keywords WHERE movie_id = $1") + .bind(&movie_id) + .execute(&mut *tx).await.map_err(Self::map_err)?; + for k in &p.keywords { + sqlx::query("INSERT INTO movie_keywords (movie_id, tmdb_id, name) VALUES ($1,$2,$3) ON CONFLICT DO NOTHING") + .bind(&movie_id).bind(k.tmdb_id as i32).bind(&k.name) + .execute(&mut *tx).await.map_err(Self::map_err)?; + } + + sqlx::query("DELETE FROM movie_cast WHERE movie_id = $1") + .bind(&movie_id) + .execute(&mut *tx).await.map_err(Self::map_err)?; + for c in &p.cast { + sqlx::query( + "INSERT INTO movie_cast \ + (movie_id, tmdb_person_id, name, character, billing_order, profile_path) \ + VALUES ($1,$2,$3,$4,$5,$6) ON CONFLICT DO NOTHING", + ) + .bind(&movie_id).bind(c.tmdb_person_id as i64).bind(&c.name) + .bind(&c.character).bind(c.billing_order as i32).bind(&c.profile_path) + .execute(&mut *tx).await.map_err(Self::map_err)?; + } + + sqlx::query("DELETE FROM movie_crew WHERE movie_id = $1") + .bind(&movie_id) + .execute(&mut *tx).await.map_err(Self::map_err)?; + for cr in &p.crew { + sqlx::query( + "INSERT INTO movie_crew \ + (movie_id, tmdb_person_id, name, job, department, profile_path) \ + VALUES ($1,$2,$3,$4,$5,$6) ON CONFLICT DO NOTHING", + ) + .bind(&movie_id).bind(cr.tmdb_person_id as i64).bind(&cr.name) + .bind(&cr.job).bind(&cr.department).bind(&cr.profile_path) + .execute(&mut *tx).await.map_err(Self::map_err)?; + } + + tx.commit().await.map_err(Self::map_err) + } + + async fn get_by_movie_id(&self, id: &MovieId) -> Result, DomainError> { + let movie_id = id.value().to_string(); + + let row = sqlx::query( + "SELECT tmdb_id, imdb_id, overview, tagline, runtime_minutes, budget_usd, + revenue_usd, vote_average, vote_count, original_language, + collection_name, enriched_at + FROM movie_profiles WHERE movie_id = $1", + ) + .bind(&movie_id) + .fetch_optional(&self.pool) + .await + .map_err(Self::map_err)?; + + let row = match row { + Some(r) => r, + None => return Ok(None), + }; + + let enriched_at: DateTime = row.try_get("enriched_at") + .map_err(|_| DomainError::InfrastructureError("invalid enriched_at".into()))?; + + let genres = sqlx::query("SELECT tmdb_id, name FROM movie_genres WHERE movie_id = $1") + .bind(&movie_id) + .fetch_all(&self.pool).await.map_err(Self::map_err)? + .into_iter() + .map(|r| Genre { + tmdb_id: r.try_get::("tmdb_id").unwrap_or(0) as u32, + name: r.try_get("name").unwrap_or_default(), + }) + .collect(); + + let keywords = sqlx::query("SELECT tmdb_id, name FROM movie_keywords WHERE movie_id = $1") + .bind(&movie_id) + .fetch_all(&self.pool).await.map_err(Self::map_err)? + .into_iter() + .map(|r| Keyword { + tmdb_id: r.try_get::("tmdb_id").unwrap_or(0) as u32, + name: r.try_get("name").unwrap_or_default(), + }) + .collect(); + + let cast = sqlx::query( + "SELECT tmdb_person_id, name, character, billing_order, profile_path \ + FROM movie_cast WHERE movie_id = $1 ORDER BY billing_order", + ) + .bind(&movie_id) + .fetch_all(&self.pool).await.map_err(Self::map_err)? + .into_iter() + .map(|r| CastMember { + tmdb_person_id: r.try_get::("tmdb_person_id").unwrap_or(0) as u64, + name: r.try_get("name").unwrap_or_default(), + character: r.try_get("character").unwrap_or_default(), + billing_order: r.try_get::("billing_order").unwrap_or(0) as u32, + profile_path: r.try_get("profile_path").ok(), + }) + .collect(); + + let crew = sqlx::query( + "SELECT tmdb_person_id, name, job, department, profile_path \ + FROM movie_crew WHERE movie_id = $1", + ) + .bind(&movie_id) + .fetch_all(&self.pool).await.map_err(Self::map_err)? + .into_iter() + .map(|r| CrewMember { + tmdb_person_id: r.try_get::("tmdb_person_id").unwrap_or(0) as u64, + name: r.try_get("name").unwrap_or_default(), + job: r.try_get("job").unwrap_or_default(), + department: r.try_get("department").unwrap_or_default(), + profile_path: r.try_get("profile_path").ok(), + }) + .collect(); + + Ok(Some(MovieProfile { + movie_id: id.clone(), + tmdb_id: row.try_get::("tmdb_id").unwrap_or(0) as u64, + imdb_id: row.try_get("imdb_id").ok(), + overview: row.try_get("overview").ok(), + tagline: row.try_get("tagline").ok(), + runtime_minutes: row.try_get::, _>("runtime_minutes").ok().flatten().map(|v| v as u32), + budget_usd: row.try_get("budget_usd").ok(), + revenue_usd: row.try_get("revenue_usd").ok(), + vote_average: row.try_get("vote_average").ok(), + vote_count: row.try_get::, _>("vote_count").ok().flatten().map(|v| v as u32), + original_language: row.try_get("original_language").ok(), + collection_name: row.try_get("collection_name").ok(), + genres, + keywords, + cast, + crew, + enriched_at, + })) + } + + async fn list_stale(&self) -> Result, DomainError> { + let threshold = Utc::now() - chrono::Duration::days(30); + let rows = sqlx::query( + r#"SELECT m.id, m.external_metadata_id + FROM movies m + LEFT JOIN movie_profiles p ON p.movie_id = m.id + WHERE m.external_metadata_id IS NOT NULL + AND (p.movie_id IS NULL OR p.enriched_at < $1) + ORDER BY p.enriched_at ASC NULLS FIRST"#, + ) + .bind(threshold) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err)?; + + Ok(rows + .into_iter() + .filter_map(|r| { + let ext_id: Option = r.try_get("external_metadata_id").ok()?; + let ext_id = ext_id?; + let id_str: String = r.try_get("id").ok()?; + let movie_id = id_str.parse::().ok().map(MovieId::from_uuid)?; + Some((movie_id, ext_id)) + }) + .collect()) + } +} diff --git a/crates/adapters/sqlite/migrations/0014_movie_profiles.sql b/crates/adapters/sqlite/migrations/0014_movie_profiles.sql new file mode 100644 index 0000000..e1329c2 --- /dev/null +++ b/crates/adapters/sqlite/migrations/0014_movie_profiles.sql @@ -0,0 +1,54 @@ +CREATE TABLE IF NOT EXISTS movie_profiles ( + movie_id TEXT PRIMARY KEY NOT NULL REFERENCES movies(id) ON DELETE CASCADE, + tmdb_id INTEGER NOT NULL, + imdb_id TEXT, + overview TEXT, + tagline TEXT, + runtime_minutes INTEGER, + budget_usd INTEGER, + revenue_usd INTEGER, + vote_average REAL, + vote_count INTEGER, + original_language TEXT, + collection_name TEXT, + enriched_at TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS movie_genres ( + movie_id TEXT NOT NULL REFERENCES movies(id) ON DELETE CASCADE, + tmdb_id INTEGER NOT NULL, + name TEXT NOT NULL, + PRIMARY KEY (movie_id, tmdb_id) +); + +CREATE TABLE IF NOT EXISTS movie_keywords ( + movie_id TEXT NOT NULL REFERENCES movies(id) ON DELETE CASCADE, + tmdb_id INTEGER NOT NULL, + name TEXT NOT NULL, + PRIMARY KEY (movie_id, tmdb_id) +); + +CREATE TABLE IF NOT EXISTS movie_cast ( + movie_id TEXT NOT NULL REFERENCES movies(id) ON DELETE CASCADE, + tmdb_person_id INTEGER NOT NULL, + name TEXT NOT NULL, + character TEXT NOT NULL, + billing_order INTEGER NOT NULL, + profile_path TEXT, + PRIMARY KEY (movie_id, tmdb_person_id) +); + +CREATE TABLE IF NOT EXISTS movie_crew ( + movie_id TEXT NOT NULL REFERENCES movies(id) ON DELETE CASCADE, + tmdb_person_id INTEGER NOT NULL, + name TEXT NOT NULL, + job TEXT NOT NULL, + department TEXT NOT NULL, + profile_path TEXT, + PRIMARY KEY (movie_id, tmdb_person_id, job) +); + +CREATE INDEX IF NOT EXISTS idx_movie_cast_person ON movie_cast (tmdb_person_id); +CREATE INDEX IF NOT EXISTS idx_movie_crew_person ON movie_crew (tmdb_person_id); +CREATE INDEX IF NOT EXISTS idx_movie_genres_name ON movie_genres (name); +CREATE INDEX IF NOT EXISTS idx_movie_keywords_name ON movie_keywords (name); diff --git a/crates/adapters/sqlite/src/lib.rs b/crates/adapters/sqlite/src/lib.rs index 44d3fcd..05c4a46 100644 --- a/crates/adapters/sqlite/src/lib.rs +++ b/crates/adapters/sqlite/src/lib.rs @@ -16,6 +16,7 @@ mod import_profile; mod import_session; mod migrations; mod models; +mod profile; mod users; use models::{ @@ -25,6 +26,7 @@ use models::{ pub use import_profile::SqliteImportProfileRepository; pub use import_session::SqliteImportSessionRepository; +pub use profile::SqliteMovieProfileRepository; pub use users::SqliteUserRepository; fn format_year_month(ym: &str) -> String { @@ -854,6 +856,7 @@ pub async fn wire(database_url: &str) -> anyhow::Result<( std::sync::Arc, std::sync::Arc, std::sync::Arc, + std::sync::Arc, )> { use std::str::FromStr; use anyhow::Context; @@ -876,6 +879,7 @@ pub async fn wire(database_url: &str) -> anyhow::Result<( let import_session_repo = std::sync::Arc::new(SqliteImportSessionRepository::new(pool.clone())); let import_profile_repo = std::sync::Arc::new(SqliteImportProfileRepository::new(pool.clone())); + let movie_profile_repo = std::sync::Arc::new(SqliteMovieProfileRepository::new(pool.clone())); Ok(( pool.clone(), @@ -886,6 +890,7 @@ pub async fn wire(database_url: &str) -> anyhow::Result<( std::sync::Arc::new(SqliteUserRepository::new(pool)) as _, import_session_repo as _, import_profile_repo as _, + movie_profile_repo as _, )) } diff --git a/crates/adapters/sqlite/src/profile.rs b/crates/adapters/sqlite/src/profile.rs new file mode 100644 index 0000000..8fd7617 --- /dev/null +++ b/crates/adapters/sqlite/src/profile.rs @@ -0,0 +1,240 @@ +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use domain::{ + errors::DomainError, + models::{CastMember, CrewMember, Genre, Keyword, MovieProfile}, + ports::MovieProfileRepository, + value_objects::MovieId, +}; +use sqlx::{Row, SqlitePool}; + +pub struct SqliteMovieProfileRepository { + pool: SqlitePool, +} + +impl SqliteMovieProfileRepository { + pub fn new(pool: SqlitePool) -> Self { + Self { pool } + } + + fn map_err(e: sqlx::Error) -> DomainError { + tracing::error!("Database error: {:?}", e); + DomainError::InfrastructureError("Database operation failed".into()) + } +} + +#[async_trait] +impl MovieProfileRepository for SqliteMovieProfileRepository { + async fn upsert(&self, p: &MovieProfile) -> Result<(), DomainError> { + let movie_id = p.movie_id.value().to_string(); + let enriched_at = p.enriched_at.to_rfc3339(); + + let mut tx = self.pool.begin().await.map_err(Self::map_err)?; + + sqlx::query( + r#"INSERT INTO movie_profiles + (movie_id, tmdb_id, imdb_id, overview, tagline, runtime_minutes, + budget_usd, revenue_usd, vote_average, vote_count, + original_language, collection_name, enriched_at) + VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?) + ON CONFLICT(movie_id) DO UPDATE SET + tmdb_id=excluded.tmdb_id, imdb_id=excluded.imdb_id, + overview=excluded.overview, tagline=excluded.tagline, + runtime_minutes=excluded.runtime_minutes, + budget_usd=excluded.budget_usd, revenue_usd=excluded.revenue_usd, + vote_average=excluded.vote_average, vote_count=excluded.vote_count, + original_language=excluded.original_language, + collection_name=excluded.collection_name, + enriched_at=excluded.enriched_at"#, + ) + .bind(&movie_id) + .bind(p.tmdb_id as i64) + .bind(&p.imdb_id) + .bind(&p.overview) + .bind(&p.tagline) + .bind(p.runtime_minutes.map(|v| v as i64)) + .bind(p.budget_usd) + .bind(p.revenue_usd) + .bind(p.vote_average) + .bind(p.vote_count.map(|v| v as i64)) + .bind(&p.original_language) + .bind(&p.collection_name) + .bind(&enriched_at) + .execute(&mut *tx) + .await + .map_err(Self::map_err)?; + + sqlx::query("DELETE FROM movie_genres WHERE movie_id = ?") + .bind(&movie_id) + .execute(&mut *tx).await.map_err(Self::map_err)?; + for g in &p.genres { + sqlx::query("INSERT OR IGNORE INTO movie_genres (movie_id, tmdb_id, name) VALUES (?,?,?)") + .bind(&movie_id).bind(g.tmdb_id as i64).bind(&g.name) + .execute(&mut *tx).await.map_err(Self::map_err)?; + } + + sqlx::query("DELETE FROM movie_keywords WHERE movie_id = ?") + .bind(&movie_id) + .execute(&mut *tx).await.map_err(Self::map_err)?; + for k in &p.keywords { + sqlx::query("INSERT OR IGNORE INTO movie_keywords (movie_id, tmdb_id, name) VALUES (?,?,?)") + .bind(&movie_id).bind(k.tmdb_id as i64).bind(&k.name) + .execute(&mut *tx).await.map_err(Self::map_err)?; + } + + sqlx::query("DELETE FROM movie_cast WHERE movie_id = ?") + .bind(&movie_id) + .execute(&mut *tx).await.map_err(Self::map_err)?; + for c in &p.cast { + sqlx::query( + "INSERT OR IGNORE INTO movie_cast \ + (movie_id, tmdb_person_id, name, character, billing_order, profile_path) \ + VALUES (?,?,?,?,?,?)", + ) + .bind(&movie_id).bind(c.tmdb_person_id as i64).bind(&c.name) + .bind(&c.character).bind(c.billing_order as i64).bind(&c.profile_path) + .execute(&mut *tx).await.map_err(Self::map_err)?; + } + + sqlx::query("DELETE FROM movie_crew WHERE movie_id = ?") + .bind(&movie_id) + .execute(&mut *tx).await.map_err(Self::map_err)?; + for cr in &p.crew { + sqlx::query( + "INSERT OR IGNORE INTO movie_crew \ + (movie_id, tmdb_person_id, name, job, department, profile_path) \ + VALUES (?,?,?,?,?,?)", + ) + .bind(&movie_id).bind(cr.tmdb_person_id as i64).bind(&cr.name) + .bind(&cr.job).bind(&cr.department).bind(&cr.profile_path) + .execute(&mut *tx).await.map_err(Self::map_err)?; + } + + tx.commit().await.map_err(Self::map_err) + } + + async fn get_by_movie_id(&self, id: &MovieId) -> Result, DomainError> { + let movie_id = id.value().to_string(); + + let row = sqlx::query( + "SELECT tmdb_id, imdb_id, overview, tagline, runtime_minutes, budget_usd, + revenue_usd, vote_average, vote_count, original_language, + collection_name, enriched_at + FROM movie_profiles WHERE movie_id = ?", + ) + .bind(&movie_id) + .fetch_optional(&self.pool) + .await + .map_err(Self::map_err)?; + + let row = match row { + Some(r) => r, + None => return Ok(None), + }; + + let enriched_at_str: String = row.try_get("enriched_at") + .map_err(|_| DomainError::InfrastructureError("invalid enriched_at".into()))?; + let enriched_at: DateTime = enriched_at_str + .parse() + .map_err(|_| DomainError::InfrastructureError("invalid enriched_at".into()))?; + + let genres = sqlx::query("SELECT tmdb_id, name FROM movie_genres WHERE movie_id = ?") + .bind(&movie_id) + .fetch_all(&self.pool).await.map_err(Self::map_err)? + .into_iter() + .map(|r| Genre { + tmdb_id: r.try_get::("tmdb_id").unwrap_or(0) as u32, + name: r.try_get("name").unwrap_or_default(), + }) + .collect(); + + let keywords = sqlx::query("SELECT tmdb_id, name FROM movie_keywords WHERE movie_id = ?") + .bind(&movie_id) + .fetch_all(&self.pool).await.map_err(Self::map_err)? + .into_iter() + .map(|r| Keyword { + tmdb_id: r.try_get::("tmdb_id").unwrap_or(0) as u32, + name: r.try_get("name").unwrap_or_default(), + }) + .collect(); + + let cast = sqlx::query( + "SELECT tmdb_person_id, name, character, billing_order, profile_path \ + FROM movie_cast WHERE movie_id = ? ORDER BY billing_order", + ) + .bind(&movie_id) + .fetch_all(&self.pool).await.map_err(Self::map_err)? + .into_iter() + .map(|r| CastMember { + tmdb_person_id: r.try_get::("tmdb_person_id").unwrap_or(0) as u64, + name: r.try_get("name").unwrap_or_default(), + character: r.try_get("character").unwrap_or_default(), + billing_order: r.try_get::("billing_order").unwrap_or(0) as u32, + profile_path: r.try_get("profile_path").ok(), + }) + .collect(); + + let crew = sqlx::query( + "SELECT tmdb_person_id, name, job, department, profile_path \ + FROM movie_crew WHERE movie_id = ?", + ) + .bind(&movie_id) + .fetch_all(&self.pool).await.map_err(Self::map_err)? + .into_iter() + .map(|r| CrewMember { + tmdb_person_id: r.try_get::("tmdb_person_id").unwrap_or(0) as u64, + name: r.try_get("name").unwrap_or_default(), + job: r.try_get("job").unwrap_or_default(), + department: r.try_get("department").unwrap_or_default(), + profile_path: r.try_get("profile_path").ok(), + }) + .collect(); + + Ok(Some(MovieProfile { + movie_id: id.clone(), + tmdb_id: row.try_get::("tmdb_id").unwrap_or(0) as u64, + imdb_id: row.try_get("imdb_id").ok(), + overview: row.try_get("overview").ok(), + tagline: row.try_get("tagline").ok(), + runtime_minutes: row.try_get::, _>("runtime_minutes").ok().flatten().map(|v| v as u32), + budget_usd: row.try_get("budget_usd").ok(), + revenue_usd: row.try_get("revenue_usd").ok(), + vote_average: row.try_get("vote_average").ok(), + vote_count: row.try_get::, _>("vote_count").ok().flatten().map(|v| v as u32), + original_language: row.try_get("original_language").ok(), + collection_name: row.try_get("collection_name").ok(), + genres, + keywords, + cast, + crew, + enriched_at, + })) + } + + async fn list_stale(&self) -> Result, DomainError> { + let threshold = (Utc::now() - chrono::Duration::days(30)).to_rfc3339(); + let rows = sqlx::query( + r#"SELECT m.id, m.external_metadata_id + FROM movies m + LEFT JOIN movie_profiles p ON p.movie_id = m.id + WHERE m.external_metadata_id IS NOT NULL + AND (p.movie_id IS NULL OR p.enriched_at < ?) + ORDER BY p.enriched_at ASC"#, + ) + .bind(&threshold) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err)?; + + Ok(rows + .into_iter() + .filter_map(|r| { + let ext_id: Option = r.try_get("external_metadata_id").ok()?; + let ext_id = ext_id?; + let id_str: String = r.try_get("id").ok()?; + let movie_id = id_str.parse::().ok().map(MovieId::from_uuid)?; + Some((movie_id, ext_id)) + }) + .collect()) + } +} diff --git a/crates/adapters/tmdb-enrichment/Cargo.toml b/crates/adapters/tmdb-enrichment/Cargo.toml new file mode 100644 index 0000000..b6d28d4 --- /dev/null +++ b/crates/adapters/tmdb-enrichment/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "tmdb-enrichment" +version = "0.1.0" +edition = "2024" + +[dependencies] +domain = { workspace = true } +reqwest = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +anyhow = { workspace = true } +async-trait = { workspace = true } +tracing = { workspace = true } +chrono = { workspace = true } diff --git a/crates/adapters/tmdb-enrichment/src/lib.rs b/crates/adapters/tmdb-enrichment/src/lib.rs new file mode 100644 index 0000000..1363078 --- /dev/null +++ b/crates/adapters/tmdb-enrichment/src/lib.rs @@ -0,0 +1,211 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use chrono::Utc; +use domain::{ + errors::DomainError, + events::DomainEvent, + models::{CastMember, CrewMember, Genre, Keyword, MovieProfile}, + ports::{EventHandler, MovieEnrichmentClient, MovieProfileRepository}, + value_objects::MovieId, +}; +use serde::Deserialize; + +// ── TMDb enrichment client ─────────────────────────────────────────────────── + +pub struct TmdbEnrichmentClient { + api_key: String, + http: reqwest::Client, +} + +impl TmdbEnrichmentClient { + pub fn from_env() -> Result { + let api_key = std::env::var("TMDB_API_KEY").map_err(|_| { + DomainError::InfrastructureError("TMDB_API_KEY is not set".into()) + })?; + Ok(Self { api_key, http: reqwest::Client::new() }) + } + + fn base(&self, path: &str) -> String { + format!("https://api.themoviedb.org/3{}", path) + } + + async fn get Deserialize<'de>>(&self, url: &str, extra: &[(&str, &str)]) -> Result { + let mut req = self.http.get(url).query(&[("api_key", self.api_key.as_str())]); + for (k, v) in extra { + req = req.query(&[(k, v)]); + } + req.send().await + .map_err(|e| DomainError::InfrastructureError(e.to_string()))? + .error_for_status() + .map_err(|e| DomainError::InfrastructureError(e.to_string()))? + .json::().await + .map_err(|e| DomainError::InfrastructureError(e.to_string())) + } + + async fn resolve_tmdb_id(&self, external_id: &str) -> Result { + if let Some(numeric) = external_id.strip_prefix("tmdb:") { + return numeric.parse::() + .map_err(|_| DomainError::InfrastructureError(format!("Invalid tmdb id: {numeric}"))); + } + + // Assume IMDb ID (tt…) — use /find + #[derive(Deserialize)] + struct FindResult { id: u64 } + #[derive(Deserialize)] + struct FindResponse { movie_results: Vec } + + let url = self.base(&format!("/find/{}", external_id)); + let resp: FindResponse = self.get(&url, &[("external_source", "imdb_id")]).await?; + resp.movie_results + .into_iter() + .next() + .map(|r| r.id) + .ok_or_else(|| DomainError::NotFound(format!("TMDb: no movie for {external_id}"))) + } +} + +#[async_trait] +impl MovieEnrichmentClient for TmdbEnrichmentClient { + async fn fetch_profile(&self, movie_id: MovieId, external_metadata_id: &str) -> Result { + let tmdb_id = self.resolve_tmdb_id(external_metadata_id).await?; + + #[derive(Deserialize)] + struct GenreDto { id: u32, name: String } + + #[derive(Deserialize)] + struct CollectionDto { name: String } + + #[derive(Deserialize)] + struct CastDto { + id: u64, + name: String, + character: String, + order: u32, + profile_path: Option, + } + + #[derive(Deserialize)] + struct CrewDto { + id: u64, + name: String, + job: String, + department: String, + profile_path: Option, + } + + #[derive(Deserialize)] + struct Credits { cast: Vec, crew: Vec } + + #[derive(Deserialize)] + struct KeywordDto { id: u32, name: String } + + #[derive(Deserialize)] + struct Keywords { keywords: Vec } + + #[derive(Deserialize)] + struct Details { + imdb_id: Option, + overview: Option, + tagline: Option, + runtime: Option, + budget: Option, + revenue: Option, + vote_average: Option, + vote_count: Option, + original_language: Option, + genres: Vec, + belongs_to_collection: Option, + credits: Credits, + keywords: Keywords, + } + + let url = self.base(&format!("/movie/{}", tmdb_id)); + let d: Details = self.get(&url, &[("append_to_response", "credits,keywords")]).await?; + + Ok(MovieProfile { + movie_id, + tmdb_id, + imdb_id: d.imdb_id.filter(|s| !s.is_empty()), + overview: d.overview.filter(|s| !s.is_empty()), + tagline: d.tagline.filter(|s| !s.is_empty()), + runtime_minutes: d.runtime, + budget_usd: d.budget.filter(|&v| v > 0), + revenue_usd: d.revenue.filter(|&v| v > 0), + vote_average: d.vote_average, + vote_count: d.vote_count, + original_language: d.original_language, + collection_name: d.belongs_to_collection.map(|c| c.name), + genres: d.genres.into_iter().map(|g| Genre { tmdb_id: g.id, name: g.name }).collect(), + keywords: d.keywords.keywords.into_iter() + .map(|k| Keyword { tmdb_id: k.id, name: k.name }) + .collect(), + cast: d.credits.cast.into_iter().map(|c| CastMember { + tmdb_person_id: c.id, + name: c.name, + character: c.character, + billing_order: c.order, + profile_path: c.profile_path, + }).collect(), + crew: d.credits.crew.into_iter().map(|c| CrewMember { + tmdb_person_id: c.id, + name: c.name, + job: c.job, + department: c.department, + profile_path: c.profile_path, + }).collect(), + enriched_at: Utc::now(), + }) + } +} + +// ── Enrichment event handler ───────────────────────────────────────────────── + +pub struct EnrichmentHandler { + pub enrichment_client: Arc, + pub profile_repo: Arc, +} + +#[async_trait] +impl EventHandler for EnrichmentHandler { + async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> { + let (movie_id, external_metadata_id) = match event { + DomainEvent::MovieEnrichmentRequested { movie_id, external_metadata_id } => { + (movie_id.clone(), external_metadata_id.clone()) + } + _ => return Ok(()), + }; + + // Skip if profile is fresh (checked by the repo's list_stale, but guard here too) + if let Ok(Some(existing)) = self.profile_repo.get_by_movie_id(&movie_id).await { + let age = Utc::now() - existing.enriched_at; + if age.num_days() < 30 { + tracing::debug!( + movie_id = %movie_id.value(), + "skipping enrichment — profile is {} days old", + age.num_days() + ); + return Ok(()); + } + } + + tracing::info!(movie_id = %movie_id.value(), external_id = %external_metadata_id, "enriching movie"); + match self.enrichment_client.fetch_profile(movie_id.clone(), &external_metadata_id).await { + Ok(profile) => { + self.profile_repo.upsert(&profile).await?; + tracing::info!( + movie_id = %movie_id.value(), + genres = profile.genres.len(), + cast = profile.cast.len(), + crew = profile.crew.len(), + "enrichment stored" + ); + } + Err(DomainError::NotFound(msg)) => { + tracing::warn!(movie_id = %movie_id.value(), "TMDb lookup found nothing: {msg}"); + } + Err(e) => return Err(e), + } + Ok(()) + } +} diff --git a/crates/api-types/src/movies.rs b/crates/api-types/src/movies.rs index 18d6ab9..23bb1ae 100644 --- a/crates/api-types/src/movies.rs +++ b/crates/api-types/src/movies.rs @@ -1,6 +1,58 @@ use serde::{Deserialize, Serialize}; use uuid::Uuid; +// ── Movie profile (enrichment) ──────────────────────────────────────────────── + +#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)] +pub struct GenreDto { + pub tmdb_id: u32, + pub name: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)] +pub struct KeywordDto { + pub tmdb_id: u32, + pub name: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)] +pub struct CastMemberDto { + pub tmdb_person_id: u64, + pub name: String, + pub character: String, + pub billing_order: u32, + pub profile_path: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)] +pub struct CrewMemberDto { + pub tmdb_person_id: u64, + pub name: String, + pub job: String, + pub department: String, + pub profile_path: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)] +pub struct MovieProfileResponse { + pub tmdb_id: u64, + pub imdb_id: Option, + pub overview: Option, + pub tagline: Option, + pub runtime_minutes: Option, + pub budget_usd: Option, + pub revenue_usd: Option, + pub vote_average: Option, + pub vote_count: Option, + pub original_language: Option, + pub collection_name: Option, + pub genres: Vec, + pub keywords: Vec, + pub cast: Vec, + pub crew: Vec, + pub enriched_at: String, +} + #[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)] pub struct MovieDto { pub id: Uuid, diff --git a/crates/application/src/context.rs b/crates/application/src/context.rs index 0561a99..f0a1e0c 100644 --- a/crates/application/src/context.rs +++ b/crates/application/src/context.rs @@ -4,7 +4,7 @@ use domain::ports::{ AuthService, DiaryExporter, DiaryRepository, DocumentParser, EventPublisher, ImageStorage, ImportProfileRepository, ImportSessionRepository, - MetadataClient, MovieRepository, PasswordHasher, PosterFetcherClient, + MetadataClient, MovieProfileRepository, MovieRepository, PasswordHasher, PosterFetcherClient, ReviewRepository, StatsRepository, UserRepository, }; @@ -27,5 +27,6 @@ pub struct AppContext { pub user_repository: Arc, pub import_session_repository: Arc, pub import_profile_repository: Arc, + pub movie_profile_repository: Arc, pub config: AppConfig, } diff --git a/crates/application/src/jobs.rs b/crates/application/src/jobs.rs new file mode 100644 index 0000000..eb39646 --- /dev/null +++ b/crates/application/src/jobs.rs @@ -0,0 +1,60 @@ +use std::time::Duration; + +use async_trait::async_trait; +use domain::{errors::DomainError, events::DomainEvent, ports::PeriodicJob}; + +use crate::context::AppContext; + +pub struct ImportSessionCleanupJob { + ctx: AppContext, +} + +impl ImportSessionCleanupJob { + pub fn new(ctx: AppContext) -> Self { + Self { ctx } + } +} + +#[async_trait] +impl PeriodicJob for ImportSessionCleanupJob { + fn interval(&self) -> Duration { + Duration::from_secs(3600) + } + + async fn run(&self) -> Result<(), DomainError> { + let n = crate::use_cases::cleanup_expired_import_sessions::execute(&self.ctx).await?; + tracing::info!("import session cleanup: removed {} expired sessions", n); + Ok(()) + } +} + +pub struct EnrichmentStalenessJob { + ctx: AppContext, +} + +impl EnrichmentStalenessJob { + pub fn new(ctx: AppContext) -> Self { + Self { ctx } + } +} + +#[async_trait] +impl PeriodicJob for EnrichmentStalenessJob { + fn interval(&self) -> Duration { + Duration::from_secs(3600) + } + + async fn run(&self) -> Result<(), DomainError> { + let stale = self.ctx.movie_profile_repository.list_stale().await?; + if stale.is_empty() { + return Ok(()); + } + tracing::info!("enrichment scan: {} stale movies", stale.len()); + for (movie_id, external_metadata_id) in stale { + let event = DomainEvent::MovieEnrichmentRequested { movie_id, external_metadata_id }; + self.ctx.event_publisher.publish(&event).await?; + } + Ok(()) + } +} + diff --git a/crates/application/src/lib.rs b/crates/application/src/lib.rs index 87dbbb5..f023f3d 100644 --- a/crates/application/src/lib.rs +++ b/crates/application/src/lib.rs @@ -1,4 +1,5 @@ pub mod commands; +pub mod jobs; pub mod worker; pub mod config; pub mod context; diff --git a/crates/application/src/use_cases/log_review.rs b/crates/application/src/use_cases/log_review.rs index 24340b7..e566ab5 100644 --- a/crates/application/src/use_cases/log_review.rs +++ b/crates/application/src/use_cases/log_review.rs @@ -50,6 +50,14 @@ async fn publish_events( } } + if let Some(ext_id) = movie.external_metadata_id() { + let enrichment_event = DomainEvent::MovieEnrichmentRequested { + movie_id: movie.id().clone(), + external_metadata_id: ext_id.value().to_string(), + }; + ctx.event_publisher.publish(&enrichment_event).await?; + } + ctx.event_publisher.publish(&review_event).await?; Ok(()) } diff --git a/crates/application/src/worker.rs b/crates/application/src/worker.rs index 7343ddd..6ff74ea 100644 --- a/crates/application/src/worker.rs +++ b/crates/application/src/worker.rs @@ -96,6 +96,7 @@ mod tests { DomainEvent::ReviewDeleted { .. } => "review_deleted", DomainEvent::MovieDeleted { .. } => "movie_deleted", DomainEvent::UserUpdated { .. } => "user_updated", + DomainEvent::MovieEnrichmentRequested { .. } => "movie_enrichment_requested", }; self.calls.lock().unwrap().push(label); Ok(()) diff --git a/crates/domain/src/events.rs b/crates/domain/src/events.rs index 95c3655..5406c30 100644 --- a/crates/domain/src/events.rs +++ b/crates/domain/src/events.rs @@ -37,6 +37,10 @@ pub enum DomainEvent { review_id: ReviewId, user_id: UserId, }, + MovieEnrichmentRequested { + movie_id: MovieId, + external_metadata_id: String, + }, } #[async_trait] diff --git a/crates/domain/src/models/mod.rs b/crates/domain/src/models/mod.rs index 360ab6e..88045a9 100644 --- a/crates/domain/src/models/mod.rs +++ b/crates/domain/src/models/mod.rs @@ -1,4 +1,4 @@ -use chrono::{NaiveDateTime, Utc}; +use chrono::{DateTime, NaiveDateTime, Utc}; use crate::{ errors::DomainError, @@ -490,3 +490,56 @@ mod tests { assert_eq!(user.avatar_path(), None); } } + +// ── Movie enrichment ─────────────────────────────────────────────────────────── + +#[derive(Clone, Debug)] +pub struct Genre { + pub tmdb_id: u32, + pub name: String, +} + +#[derive(Clone, Debug)] +pub struct Keyword { + pub tmdb_id: u32, + pub name: String, +} + +#[derive(Clone, Debug)] +pub struct CastMember { + pub tmdb_person_id: u64, + pub name: String, + pub character: String, + pub billing_order: u32, + pub profile_path: Option, +} + +#[derive(Clone, Debug)] +pub struct CrewMember { + pub tmdb_person_id: u64, + pub name: String, + pub job: String, + pub department: String, + pub profile_path: Option, +} + +#[derive(Clone, Debug)] +pub struct MovieProfile { + pub movie_id: MovieId, + pub tmdb_id: u64, + pub imdb_id: Option, + pub overview: Option, + pub tagline: Option, + pub runtime_minutes: Option, + pub budget_usd: Option, + pub revenue_usd: Option, + pub vote_average: Option, + pub vote_count: Option, + pub original_language: Option, + pub collection_name: Option, + pub genres: Vec, + pub keywords: Vec, + pub cast: Vec, + pub crew: Vec, + pub enriched_at: DateTime, +} diff --git a/crates/domain/src/ports.rs b/crates/domain/src/ports.rs index 9dc0ba8..a9c44d1 100644 --- a/crates/domain/src/ports.rs +++ b/crates/domain/src/ports.rs @@ -6,8 +6,8 @@ use crate::{ events::{DomainEvent, EventEnvelope}, models::{ AnnotatedRow, DiaryEntry, DiaryFilter, ExportFormat, FeedEntry, FieldMapping, - FileFormat, ImportError, ImportProfile, ImportSession, Movie, MovieStats, ParsedFile, - Review, ReviewHistory, User, UserStats, UserSummary, UserTrends, + FileFormat, ImportError, ImportProfile, ImportSession, Movie, MovieProfile, MovieStats, + ParsedFile, Review, ReviewHistory, User, UserStats, UserSummary, UserTrends, collections::{PageParams, Paginated}, }, value_objects::{ @@ -217,6 +217,31 @@ pub trait EventHandler: Send + Sync { async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError>; } +#[async_trait] +pub trait PeriodicJob: Send + Sync { + fn interval(&self) -> std::time::Duration; + async fn run(&self) -> Result<(), DomainError>; +} + +#[async_trait] +pub trait MovieProfileRepository: Send + Sync { + async fn upsert(&self, profile: &MovieProfile) -> Result<(), DomainError>; + async fn get_by_movie_id(&self, id: &MovieId) -> Result, DomainError>; + /// Returns (movie_id, external_metadata_id) for movies with no profile or a stale one + /// (enriched_at older than 30 days). + async fn list_stale(&self) -> Result, DomainError>; +} + +#[async_trait] +pub trait MovieEnrichmentClient: Send + Sync { + /// Resolves an external ID (TMDb or IMDb) and fetches the full movie profile. + async fn fetch_profile( + &self, + movie_id: MovieId, + external_metadata_id: &str, + ) -> Result; +} + #[async_trait] pub trait ImportSessionRepository: Send + Sync { async fn create(&self, session: &ImportSession) -> Result<(), DomainError>; diff --git a/crates/presentation/src/extractors.rs b/crates/presentation/src/extractors.rs index b6df723..72f8578 100644 --- a/crates/presentation/src/extractors.rs +++ b/crates/presentation/src/extractors.rs @@ -358,6 +358,12 @@ mod tests { async fn delete(&self, _: &domain::value_objects::ImportProfileId) -> Result<(), DomainError> { panic!() } } #[async_trait::async_trait] + impl domain::ports::MovieProfileRepository for Panic { + async fn upsert(&self, _: &domain::models::MovieProfile) -> Result<(), DomainError> { panic!() } + async fn get_by_movie_id(&self, _: &domain::value_objects::MovieId) -> Result, DomainError> { Ok(None) } + async fn list_stale(&self) -> Result, DomainError> { Ok(vec![]) } + } + #[async_trait::async_trait] impl domain::ports::DiaryExporter for Panic { async fn serialize_entries( &self, @@ -483,6 +489,7 @@ mod tests { user_repository: Arc::clone(&repo) as _, import_session_repository: Arc::clone(&repo) as _, import_profile_repository: Arc::clone(&repo) as _, + movie_profile_repository: Arc::clone(&repo) as _, auth_service, config: AppConfig { allow_registration: false, diff --git a/crates/presentation/src/handlers/api.rs b/crates/presentation/src/handlers/api.rs index 07f37fb..c1dbc6e 100644 --- a/crates/presentation/src/handlers/api.rs +++ b/crates/presentation/src/handlers/api.rs @@ -36,9 +36,10 @@ use api_types::{ BlockedDomainResponse, FollowRequest, RemoteActorDto, }; use api_types::{ - ActivityFeedQueryParams, ActivityFeedResponse, DiaryEntryDto, DiaryQueryParams, DiaryResponse, - DirectorStatDto, ExportQueryParams, FeedEntryDto, LogReviewRequest, LoginRequest, LoginResponse, - MonthActivityDto, MonthlyRatingDto, MovieDetailResponse, MovieDto, MovieStatsDto, + ActivityFeedQueryParams, ActivityFeedResponse, CastMemberDto, CrewMemberDto, DiaryEntryDto, + DiaryQueryParams, DiaryResponse, DirectorStatDto, ExportQueryParams, FeedEntryDto, + GenreDto, KeywordDto, LogReviewRequest, LoginRequest, LoginResponse, MonthActivityDto, + MonthlyRatingDto, MovieDetailResponse, MovieDto, MovieProfileResponse, MovieStatsDto, PaginationQueryParams, ProfileResponse, RegisterRequest, ReviewDto, ReviewHistoryResponse, SocialFeedResponse, SocialReviewDto, UserProfileQueryParams, UserProfileResponse, UserStatsDto, UserSummaryDto, UserTrendsDto, UsersResponse, @@ -293,6 +294,52 @@ pub async fn get_movie_detail( })) } +#[utoipa::path( + get, path = "/api/v1/movies/{id}/profile", + params(("id" = Uuid, Path, description = "Movie ID")), + responses( + (status = 200, body = MovieProfileResponse), + (status = 404, description = "No profile found for this movie"), + ) +)] +pub async fn get_movie_profile( + State(state): State, + Path(movie_id): Path, +) -> impl IntoResponse { + let id = domain::value_objects::MovieId::from_uuid(movie_id); + match state.app_ctx.movie_profile_repository.get_by_movie_id(&id).await { + Ok(Some(p)) => Json(MovieProfileResponse { + tmdb_id: p.tmdb_id, + imdb_id: p.imdb_id, + overview: p.overview, + tagline: p.tagline, + runtime_minutes: p.runtime_minutes, + budget_usd: p.budget_usd, + revenue_usd: p.revenue_usd, + vote_average: p.vote_average, + vote_count: p.vote_count, + original_language: p.original_language, + collection_name: p.collection_name, + genres: p.genres.into_iter().map(|g| GenreDto { tmdb_id: g.tmdb_id, name: g.name }).collect(), + keywords: p.keywords.into_iter().map(|k| KeywordDto { tmdb_id: k.tmdb_id, name: k.name }).collect(), + cast: p.cast.into_iter().map(|c| CastMemberDto { + tmdb_person_id: c.tmdb_person_id, name: c.name, character: c.character, + billing_order: c.billing_order, profile_path: c.profile_path, + }).collect(), + crew: p.crew.into_iter().map(|c| CrewMemberDto { + tmdb_person_id: c.tmdb_person_id, name: c.name, job: c.job, + department: c.department, profile_path: c.profile_path, + }).collect(), + enriched_at: p.enriched_at.to_rfc3339(), + }).into_response(), + Ok(None) => StatusCode::NOT_FOUND.into_response(), + Err(e) => { + tracing::error!("get_movie_profile: {:?}", e); + StatusCode::INTERNAL_SERVER_ERROR.into_response() + } + } +} + #[utoipa::path( get, path = "/api/v1/profile", responses( diff --git a/crates/presentation/src/main.rs b/crates/presentation/src/main.rs index f4be111..eee1429 100644 --- a/crates/presentation/src/main.rs +++ b/crates/presentation/src/main.rs @@ -49,17 +49,17 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> { let poster_fetcher = poster_fetcher::create()?; let image_storage = image_storage::create()?; - let (movie_repository, review_repository, diary_repository, stats_repository, user_repository, import_session_repository, import_profile_repository, db_pool) = + let (movie_repository, review_repository, diary_repository, stats_repository, user_repository, import_session_repository, import_profile_repository, movie_profile_repository, db_pool) = match backend.as_str() { #[cfg(feature = "postgres")] "postgres" => { - let (pool, m, r, d, s, u, is, ip) = postgres::wire(&database_url).await?; - (m, r, d, s, u, is, ip, DbPool::Postgres(pool)) + let (pool, m, r, d, s, u, is, ip, mp) = postgres::wire(&database_url).await?; + (m, r, d, s, u, is, ip, mp, DbPool::Postgres(pool)) } #[cfg(feature = "sqlite")] _ => { - let (pool, m, r, d, s, u, is, ip) = sqlite::wire(&database_url).await?; - (m, r, d, s, u, is, ip, DbPool::Sqlite(pool)) + let (pool, m, r, d, s, u, is, ip, mp) = sqlite::wire(&database_url).await?; + (m, r, d, s, u, is, ip, mp, DbPool::Sqlite(pool)) } #[cfg(not(feature = "sqlite"))] _ => anyhow::bail!("DATABASE_BACKEND={backend} is not supported by this build (sqlite feature is not enabled)"), @@ -161,6 +161,7 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> { user_repository, import_session_repository: import_session_repository as Arc, import_profile_repository: import_profile_repository as Arc, + movie_profile_repository, config: app_config, }; @@ -185,6 +186,7 @@ enum DbPool { Postgres(sqlx::PgPool), } + #[derive(Clone, Copy)] enum EventBusBackend { Db, diff --git a/crates/presentation/src/routes.rs b/crates/presentation/src/routes.rs index e08197f..73a8eef 100644 --- a/crates/presentation/src/routes.rs +++ b/crates/presentation/src/routes.rs @@ -181,6 +181,10 @@ fn api_routes(rate_limit: u64) -> Router { "/movies/{id}", routing::get(handlers::api::get_movie_detail), ) + .route( + "/movies/{id}/profile", + routing::get(handlers::api::get_movie_profile), + ) .route("/reviews", routing::post(handlers::api::post_review)) .route( "/reviews/{id}", diff --git a/crates/presentation/tests/api_test.rs b/crates/presentation/tests/api_test.rs index f6d6e52..3e56f04 100644 --- a/crates/presentation/tests/api_test.rs +++ b/crates/presentation/tests/api_test.rs @@ -147,6 +147,14 @@ impl domain::ports::DocumentParser for PanicDocumentParser { } struct PanicImportProfile; + +struct PanicMovieProfile; +#[async_trait] +impl domain::ports::MovieProfileRepository for PanicMovieProfile { + async fn upsert(&self, _: &domain::models::MovieProfile) -> Result<(), DomainError> { panic!() } + async fn get_by_movie_id(&self, _: &domain::value_objects::MovieId) -> Result, DomainError> { Ok(None) } + async fn list_stale(&self) -> Result, DomainError> { Ok(vec![]) } +} #[async_trait] impl domain::ports::ImportProfileRepository for PanicImportProfile { async fn save(&self, _: &domain::models::ImportProfile) -> Result<(), DomainError> { panic!() } @@ -198,6 +206,7 @@ async fn test_app() -> Router { user_repository: Arc::new(NobodyUserRepo), import_session_repository: Arc::new(PanicImportSession), import_profile_repository: Arc::new(PanicImportProfile), + movie_profile_repository: Arc::new(PanicMovieProfile), config: AppConfig { allow_registration: false, base_url: "http://localhost:3000".to_string(), diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index 78b9f54..a9c8f5f 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -26,6 +26,7 @@ poster-fetcher = { workspace = true } image-storage = { workspace = true } poster-sync = { workspace = true } export = { workspace = true } +tmdb-enrichment = { workspace = true } importer = { workspace = true } nats = { workspace = true, optional = true } sqlx = { workspace = true } diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index ca3d01a..2a6ddaa 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -6,7 +6,7 @@ use export::ExportAdapter; use importer::ImporterDocumentParser; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; -use domain::ports::{DiaryExporter, DocumentParser, EventHandler}; +use domain::ports::{DiaryExporter, DocumentParser, EventHandler, PeriodicJob}; #[cfg(not(any(feature = "sqlite", feature = "postgres")))] compile_error!("At least one database backend must be enabled. Use --features sqlite or --features postgres"); @@ -25,17 +25,17 @@ async fn main() -> anyhow::Result<()> { let poster_fetcher = poster_fetcher::create()?; let image_storage = image_storage::create()?; - let (movie_repository, review_repository, diary_repository, stats_repository, user_repository, import_session_repository, import_profile_repository, db_pool) = + let (movie_repository, review_repository, diary_repository, stats_repository, user_repository, import_session_repository, import_profile_repository, movie_profile_repository, db_pool) = match backend.as_str() { #[cfg(feature = "postgres")] "postgres" => { - let (pool, m, r, d, s, u, is, ip) = postgres::wire(&database_url).await?; - (m, r, d, s, u, is, ip, DbPool::Postgres(pool)) + let (pool, m, r, d, s, u, is, ip, mp) = postgres::wire(&database_url).await?; + (m, r, d, s, u, is, ip, mp, DbPool::Postgres(pool)) } #[cfg(feature = "sqlite")] _ => { - let (pool, m, r, d, s, u, is, ip) = sqlite::wire(&database_url).await?; - (m, r, d, s, u, is, ip, DbPool::Sqlite(pool)) + let (pool, m, r, d, s, u, is, ip, mp) = sqlite::wire(&database_url).await?; + (m, r, d, s, u, is, ip, mp, DbPool::Sqlite(pool)) } #[cfg(not(feature = "sqlite"))] _ => anyhow::bail!("DATABASE_BACKEND={backend} is not supported by this build"), @@ -63,6 +63,8 @@ async fn main() -> anyhow::Result<()> { } }; + let profile_repo = movie_profile_repository; + // Clone what federation handler needs before ctx and app_config are consumed. #[cfg(feature = "federation")] let (fed_movie_repo, fed_review_repo, fed_diary_repo, fed_user_repo, base_url, allow_registration) = ( @@ -90,19 +92,37 @@ async fn main() -> anyhow::Result<()> { user_repository, import_session_repository, import_profile_repository, + movie_profile_repository: Arc::clone(&profile_repo) as _, config: app_config, }; - // Spawn periodic import session cleanup (hourly) - { - let cleanup_ctx = ctx.clone(); + let enrichment_handler: Option> = + match tmdb_enrichment::TmdbEnrichmentClient::from_env() { + Ok(client) => { + tracing::info!("TMDb enrichment enabled"); + Some(Arc::new(tmdb_enrichment::EnrichmentHandler { + enrichment_client: Arc::new(client), + profile_repo: Arc::clone(&profile_repo), + })) + } + Err(e) => { + tracing::warn!("TMDb enrichment disabled: {e}"); + None + } + }; + + let periodic_jobs: Vec> = vec![ + Arc::new(application::jobs::ImportSessionCleanupJob::new(ctx.clone())), + Arc::new(application::jobs::EnrichmentStalenessJob::new(ctx.clone())), + ]; + + for job in periodic_jobs { tokio::spawn(async move { - let mut interval = tokio::time::interval(std::time::Duration::from_secs(3600)); + let mut tick = tokio::time::interval(job.interval()); loop { - interval.tick().await; - match application::use_cases::cleanup_expired_import_sessions::execute(&cleanup_ctx).await { - Ok(n) => tracing::info!("import session cleanup: removed {} expired sessions", n), - Err(e) => tracing::error!("import session cleanup failed: {:?}", e), + tick.tick().await; + if let Err(e) = job.run().await { + tracing::error!("periodic job failed: {e}"); } } }); @@ -121,8 +141,14 @@ async fn main() -> anyhow::Result<()> { Arc::clone(&ctx.image_storage), )) as Arc; + let enrichment = enrichment_handler; + #[cfg(not(feature = "federation"))] - { vec![poster, cleanup] } + { + let mut h: Vec> = vec![poster, cleanup]; + if let Some(e) = enrichment { h.push(e); } + h + } #[cfg(feature = "federation")] { @@ -145,7 +171,9 @@ async fn main() -> anyhow::Result<()> { ).await?.event_handler; tracing::info!("federation event handler registered"); - vec![poster, cleanup, ap] + let mut h: Vec> = vec![poster, cleanup, ap]; + if let Some(e) = enrichment { h.push(e); } + h } };