diff --git a/Cargo.lock b/Cargo.lock index 5da4424..1fc00bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3746,6 +3746,16 @@ dependencies = [ "uuid", ] +[[package]] +name = "postgres-search" +version = "0.1.0" +dependencies = [ + "async-trait", + "domain", + "sqlx", + "uuid", +] + [[package]] name = "potential_utf" version = "0.1.5" @@ -3797,12 +3807,14 @@ dependencies = [ "postgres", "postgres-event-queue", "postgres-federation", + "postgres-search", "rss 0.1.0", "serde", "serde_json", "sqlite", "sqlite-event-queue", "sqlite-federation", + "sqlite-search", "sqlx", "template-askama", "tokio", @@ -5002,6 +5014,17 @@ dependencies = [ "uuid", ] +[[package]] +name = "sqlite-search" +version = "0.1.0" +dependencies = [ + "async-trait", + "domain", + "sqlx", + "tokio", + "uuid", +] + [[package]] name = "sqlx" version = "0.8.6" @@ -5543,6 +5566,7 @@ name = "tmdb-enrichment" version = "0.1.0" dependencies = [ "anyhow", + "application", "async-trait", "chrono", "domain", @@ -6798,9 +6822,11 @@ dependencies = [ "postgres", "postgres-event-queue", "postgres-federation", + "postgres-search", "sqlite", "sqlite-event-queue", "sqlite-federation", + "sqlite-search", "sqlx", "tmdb-enrichment", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 8c39ed5..22f1b5b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,8 @@ members = [ "crates/tui", "crates/worker", "crates/adapters/importer", + "crates/adapters/sqlite-search", + "crates/adapters/postgres-search", ] resolver = "2" @@ -81,3 +83,5 @@ sqlite-event-queue = { path = "crates/adapters/sqlite-event-queue" } postgres-event-queue = { path = "crates/adapters/postgres-event-queue" } importer = { path = "crates/adapters/importer" } image-converter = { path = "crates/adapters/image-converter" } +sqlite-search = { path = "crates/adapters/sqlite-search" } +postgres-search = { path = "crates/adapters/postgres-search" } diff --git a/crates/adapters/nats/src/config.rs b/crates/adapters/nats/src/config.rs index 4e1b3c7..dc777ae 100644 --- a/crates/adapters/nats/src/config.rs +++ b/crates/adapters/nats/src/config.rs @@ -15,26 +15,35 @@ pub struct NatsConfig { impl NatsConfig { pub fn from_env() -> anyhow::Result { - let url = std::env::var("NATS_URL") - .map_err(|_| anyhow::anyhow!("NATS_URL is not set"))?; + Self::from_vars( + std::env::var("NATS_URL").ok().as_deref(), + std::env::var("NATS_MODE").ok().as_deref(), + std::env::var("NATS_SUBJECT_PREFIX").ok().as_deref(), + std::env::var("NATS_STREAM_NAME").ok().as_deref(), + std::env::var("NATS_CONSUMER_NAME").ok().as_deref(), + ) + } - let mode = match std::env::var("NATS_MODE") - .unwrap_or_else(|_| "jetstream".to_string()) - .as_str() - { + pub(crate) fn from_vars( + url: Option<&str>, + mode: Option<&str>, + subject_prefix: Option<&str>, + stream_name: Option<&str>, + consumer_name: Option<&str>, + ) -> anyhow::Result { + let url = url.ok_or_else(|| anyhow::anyhow!("NATS_URL is not set"))?; + + let mode = match mode.unwrap_or("jetstream") { "core" => NatsMode::Core, "jetstream" => NatsMode::JetStream, other => anyhow::bail!("unknown NATS_MODE: {other}"), }; - let subject_prefix = std::env::var("NATS_SUBJECT_PREFIX") - .unwrap_or_else(|_| "movies-diary.events".to_string()); - let stream_name = std::env::var("NATS_STREAM_NAME") - .unwrap_or_else(|_| "MOVIES_DIARY_EVENTS".to_string()); - let consumer_name = std::env::var("NATS_CONSUMER_NAME") - .unwrap_or_else(|_| "worker".to_string()); + let subject_prefix = subject_prefix.unwrap_or("movies-diary.events").to_string(); + let stream_name = stream_name.unwrap_or("MOVIES_DIARY_EVENTS").to_string(); + let consumer_name = consumer_name.unwrap_or("worker").to_string(); - Ok(Self { url, mode, subject_prefix, stream_name, consumer_name }) + Ok(Self { url: url.to_string(), mode, subject_prefix, stream_name, consumer_name }) } } diff --git a/crates/adapters/nats/src/tests/config.rs b/crates/adapters/nats/src/tests/config.rs index 4187074..10ba147 100644 --- a/crates/adapters/nats/src/tests/config.rs +++ b/crates/adapters/nats/src/tests/config.rs @@ -2,57 +2,26 @@ use super::*; #[test] fn errors_without_nats_url() { - unsafe { std::env::remove_var("NATS_URL"); } - assert!(NatsConfig::from_env().is_err()); + assert!(NatsConfig::from_vars(None, None, None, None, None).is_err()); } #[test] fn defaults_with_only_url() { - unsafe { - std::env::set_var("NATS_URL", "nats://localhost:4222"); - std::env::remove_var("NATS_MODE"); - std::env::remove_var("NATS_SUBJECT_PREFIX"); - std::env::remove_var("NATS_STREAM_NAME"); - std::env::remove_var("NATS_CONSUMER_NAME"); - } - - let cfg = NatsConfig::from_env().unwrap(); + let cfg = NatsConfig::from_vars(Some("nats://localhost:4222"), None, None, None, None).unwrap(); assert_eq!(cfg.url, "nats://localhost:4222"); assert_eq!(cfg.mode, NatsMode::JetStream); assert_eq!(cfg.subject_prefix, "movies-diary.events"); assert_eq!(cfg.stream_name, "MOVIES_DIARY_EVENTS"); assert_eq!(cfg.consumer_name, "worker"); - - unsafe { std::env::remove_var("NATS_URL"); } } #[test] fn core_mode_parsed() { - unsafe { - std::env::set_var("NATS_URL", "nats://test:4222"); - std::env::set_var("NATS_MODE", "core"); - } - - let cfg = NatsConfig::from_env().unwrap(); + let cfg = NatsConfig::from_vars(Some("nats://test:4222"), Some("core"), None, None, None).unwrap(); assert_eq!(cfg.mode, NatsMode::Core); - - unsafe { - std::env::remove_var("NATS_URL"); - std::env::remove_var("NATS_MODE"); - } } #[test] fn invalid_mode_errors() { - unsafe { - std::env::set_var("NATS_URL", "nats://test:4222"); - std::env::set_var("NATS_MODE", "kafka"); - } - - assert!(NatsConfig::from_env().is_err()); - - unsafe { - std::env::remove_var("NATS_URL"); - std::env::remove_var("NATS_MODE"); - } + assert!(NatsConfig::from_vars(Some("nats://test:4222"), Some("kafka"), None, None, None).is_err()); } diff --git a/crates/adapters/postgres-search/Cargo.toml b/crates/adapters/postgres-search/Cargo.toml new file mode 100644 index 0000000..8f22f2f --- /dev/null +++ b/crates/adapters/postgres-search/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "postgres-search" +version = "0.1.0" +edition = "2021" + +[dependencies] +domain = { workspace = true } +async-trait = { workspace = true } +sqlx = { version = "0.8.6", features = ["runtime-tokio-rustls", "postgres", "uuid", "macros"] } +uuid = { workspace = true } diff --git a/crates/adapters/postgres-search/src/lib.rs b/crates/adapters/postgres-search/src/lib.rs new file mode 100644 index 0000000..9125714 --- /dev/null +++ b/crates/adapters/postgres-search/src/lib.rs @@ -0,0 +1,313 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use domain::{ + errors::DomainError, + models::{ + EntityType, IndexableDocument, MovieSearchHit, PersonSearchHit, + SearchQuery, SearchResults, + collections::Paginated, + }, + models::PersonId, + value_objects::MovieId, + ports::{SearchCommand, SearchPort}, +}; +use sqlx::PgPool; + +pub struct PostgresSearchAdapter { + pool: PgPool, +} + +impl PostgresSearchAdapter { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +pub fn create_search_adapter(pool: PgPool) -> (Arc, Arc) { + let adapter = Arc::new(PostgresSearchAdapter::new(pool)); + (Arc::clone(&adapter) as Arc, adapter as Arc) +} + +fn map_err(e: sqlx::Error) -> DomainError { + DomainError::InfrastructureError(e.to_string()) +} + +#[async_trait] +impl SearchCommand for PostgresSearchAdapter { + async fn index(&self, doc: IndexableDocument) -> Result<(), DomainError> { + match doc { + IndexableDocument::Movie { id, movie, profile } => { + let movie_id = id.value().to_string(); + let title = movie.title().value().to_string(); + let director = movie.director().unwrap_or("").to_string(); + let (overview, genres, keywords, cast_names, crew_names) = + match profile.as_deref() { + Some(p) => ( + p.overview.clone().unwrap_or_default(), + p.genres.iter().map(|g| g.name.as_str()).collect::>().join(" "), + p.keywords.iter().map(|k| k.name.as_str()).collect::>().join(" "), + p.cast.iter().map(|c| c.name.as_str()).collect::>().join(" "), + p.crew.iter().map(|c| c.name.as_str()).collect::>().join(" "), + ), + None => (String::new(), String::new(), String::new(), String::new(), String::new()), + }; + + let fts_input = format!( + "{} {} {} {} {} {} {}", + title, director, overview, genres, keywords, cast_names, crew_names + ); + + sqlx::query( + "INSERT INTO movies_search (movie_id, fts) + VALUES ($1, to_tsvector('english', $2)) + ON CONFLICT (movie_id) DO UPDATE SET fts = EXCLUDED.fts", + ) + .bind(&movie_id) + .bind(&fts_input) + .execute(&self.pool) + .await + .map_err(map_err)?; + + Ok(()) + } + + IndexableDocument::Person { id, person } => { + let person_id = id.value().to_string(); + let fts_input = format!( + "{} {}", + person.name(), + person.known_for_department().unwrap_or("") + ); + + sqlx::query( + "INSERT INTO people_search (person_id, fts) + VALUES ($1, to_tsvector('english', $2)) + ON CONFLICT (person_id) DO UPDATE SET fts = EXCLUDED.fts", + ) + .bind(&person_id) + .bind(&fts_input) + .execute(&self.pool) + .await + .map_err(map_err)?; + + Ok(()) + } + } + } + + async fn remove(&self, entity_type: EntityType, id: &str) -> Result<(), DomainError> { + match entity_type { + EntityType::Movie => { + sqlx::query("DELETE FROM movies_search WHERE movie_id = $1") + .bind(id) + .execute(&self.pool) + .await + .map_err(map_err)?; + } + EntityType::Person => { + sqlx::query("DELETE FROM people_search WHERE person_id = $1") + .bind(id) + .execute(&self.pool) + .await + .map_err(map_err)?; + } + } + Ok(()) + } +} + +#[async_trait] +impl SearchPort for PostgresSearchAdapter { + async fn search(&self, query: &SearchQuery) -> Result { + let movies = self.search_movies(query).await?; + let people = self.search_people(query).await?; + Ok(SearchResults { movies, people }) + } +} + +impl PostgresSearchAdapter { + async fn search_movies(&self, query: &SearchQuery) -> Result, DomainError> { + let limit = query.page.limit as i64; + let offset = query.page.offset as i64; + + #[derive(sqlx::FromRow)] + struct Row { + id: String, + title: String, + release_year: Option, + director: Option, + poster_path: Option, + genres: Option, + } + + let total: u64 = if let Some(text) = &query.text { + let count: i64 = sqlx::query_scalar( + "SELECT COUNT(DISTINCT m.id) + FROM movies_search ms + JOIN movies m ON m.id = ms.movie_id + LEFT JOIN movie_genres mg ON mg.movie_id = m.id + WHERE ms.fts @@ plainto_tsquery('english', $1) + AND ($2::TEXT IS NULL OR EXISTS (SELECT 1 FROM movie_genres WHERE movie_id = m.id AND name = $2)) + AND ($3::INT IS NULL OR m.release_year = $3)", + ) + .bind(text) + .bind(&query.filters.genre) + .bind(query.filters.year.map(|y| y as i32)) + .fetch_one(&self.pool) + .await + .map_err(map_err)?; + count as u64 + } else { + let count: i64 = sqlx::query_scalar( + "SELECT COUNT(DISTINCT m.id) FROM movies m + LEFT JOIN movie_genres mg ON mg.movie_id = m.id + WHERE ($1::TEXT IS NULL OR EXISTS (SELECT 1 FROM movie_genres WHERE movie_id = m.id AND name = $1)) + AND ($2::INT IS NULL OR m.release_year = $2)", + ) + .bind(&query.filters.genre) + .bind(query.filters.year.map(|y| y as i32)) + .fetch_one(&self.pool) + .await + .map_err(map_err)?; + count as u64 + }; + + let rows: Vec = if let Some(text) = &query.text { + sqlx::query_as::<_, Row>( + "SELECT m.id, m.title, m.release_year, m.director, m.poster_path, + STRING_AGG(DISTINCT mg.name, ',' ORDER BY mg.name) AS genres + FROM movies_search ms + JOIN movies m ON m.id = ms.movie_id + LEFT JOIN movie_genres mg ON mg.movie_id = m.id + WHERE ms.fts @@ plainto_tsquery('english', $1) + AND ($2::TEXT IS NULL OR EXISTS (SELECT 1 FROM movie_genres WHERE movie_id = m.id AND name = $2)) + AND ($3::INT IS NULL OR m.release_year = $3) + GROUP BY m.id, m.title, m.release_year, m.director, m.poster_path, ms.fts + ORDER BY ts_rank(ms.fts, plainto_tsquery('english', $1)) DESC + LIMIT $4 OFFSET $5", + ) + .bind(text) + .bind(&query.filters.genre) + .bind(query.filters.year.map(|y| y as i32)) + .bind(limit) + .bind(offset) + .fetch_all(&self.pool) + .await + .map_err(map_err)? + } else { + sqlx::query_as::<_, Row>( + "SELECT m.id, m.title, m.release_year, m.director, m.poster_path, + STRING_AGG(DISTINCT mg.name, ',' ORDER BY mg.name) AS genres + FROM movies m + LEFT JOIN movie_genres mg ON mg.movie_id = m.id + WHERE ($1::TEXT IS NULL OR EXISTS (SELECT 1 FROM movie_genres WHERE movie_id = m.id AND name = $1)) + AND ($2::INT IS NULL OR m.release_year = $2) + GROUP BY m.id ORDER BY m.title LIMIT $3 OFFSET $4", + ) + .bind(&query.filters.genre) + .bind(query.filters.year.map(|y| y as i32)) + .bind(limit) + .bind(offset) + .fetch_all(&self.pool) + .await + .map_err(map_err)? + }; + + let items = rows.into_iter().map(|r| MovieSearchHit { + movie_id: MovieId::from_uuid(uuid::Uuid::parse_str(&r.id).unwrap_or_default()), + title: r.title, + release_year: r.release_year.map(|y| y as u16), + director: r.director, + poster_path: r.poster_path, + genres: r.genres + .unwrap_or_default() + .split(',') + .filter(|s| !s.is_empty()) + .map(str::to_string) + .collect(), + }).collect::>(); + + Ok(Paginated { items, total_count: total, limit: query.page.limit, offset: query.page.offset }) + } + + async fn search_people(&self, query: &SearchQuery) -> Result, DomainError> { + let Some(text) = &query.text else { + return Ok(Paginated { + items: vec![], + total_count: 0, + limit: query.page.limit, + offset: query.page.offset, + }); + }; + + let limit = query.page.limit as i64; + let offset = query.page.offset as i64; + + let total: u64 = { + let count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM people_search WHERE fts @@ plainto_tsquery('english', $1)", + ) + .bind(text) + .fetch_one(&self.pool) + .await + .map_err(map_err)?; + count as u64 + }; + + #[derive(sqlx::FromRow)] + struct Row { + person_id: String, + name: String, + known_for_department: Option, + profile_path: Option, + tmdb_person_id: Option, + } + + let rows = sqlx::query_as::<_, Row>( + "SELECT ps.person_id, p.name, p.known_for_department, p.profile_path, p.tmdb_person_id + FROM people_search ps + JOIN persons p ON p.id = ps.person_id + WHERE ps.fts @@ plainto_tsquery('english', $1) + ORDER BY ts_rank(ps.fts, plainto_tsquery('english', $1)) DESC + LIMIT $2 OFFSET $3", + ) + .bind(text) + .bind(limit) + .bind(offset) + .fetch_all(&self.pool) + .await + .map_err(map_err)?; + + let mut items = Vec::with_capacity(rows.len()); + for row in rows { + let known_for_titles = if let Some(tid) = row.tmdb_person_id { + sqlx::query_scalar::<_, String>( + "SELECT m.title FROM movie_cast mc + JOIN movies m ON m.id = mc.movie_id + WHERE mc.tmdb_person_id = $1 + ORDER BY mc.billing_order + LIMIT 3", + ) + .bind(tid) + .fetch_all(&self.pool) + .await + .unwrap_or_default() + } else { + vec![] + }; + + items.push(PersonSearchHit { + person_id: PersonId::from_uuid( + uuid::Uuid::parse_str(&row.person_id).unwrap_or_default() + ), + name: row.name, + known_for_department: row.known_for_department, + profile_path: row.profile_path, + known_for_titles, + }); + } + + Ok(Paginated { items, total_count: total, limit: query.page.limit, offset: query.page.offset }) + } +} diff --git a/crates/adapters/postgres/migrations/0015_search.sql b/crates/adapters/postgres/migrations/0015_search.sql new file mode 100644 index 0000000..0a57b4b --- /dev/null +++ b/crates/adapters/postgres/migrations/0015_search.sql @@ -0,0 +1,25 @@ +CREATE TABLE IF NOT EXISTS persons ( + id TEXT PRIMARY KEY, + external_id TEXT NOT NULL UNIQUE, + tmdb_person_id BIGINT UNIQUE, + name TEXT NOT NULL, + known_for_department TEXT, + profile_path TEXT +); + +CREATE INDEX IF NOT EXISTS idx_persons_external ON persons (external_id); +CREATE INDEX IF NOT EXISTS idx_persons_tmdb_id ON persons (tmdb_person_id); + +-- tsvector-based search for movies (equivalent of SQLite FTS5) +CREATE TABLE IF NOT EXISTS movies_search ( + movie_id TEXT PRIMARY KEY REFERENCES movies(id) ON DELETE CASCADE, + fts TSVECTOR NOT NULL DEFAULT '' +); +CREATE INDEX IF NOT EXISTS idx_movies_search_fts ON movies_search USING GIN(fts); + +-- tsvector-based search for people +CREATE TABLE IF NOT EXISTS people_search ( + person_id TEXT PRIMARY KEY REFERENCES persons(id) ON DELETE CASCADE, + fts TSVECTOR NOT NULL DEFAULT '' +); +CREATE INDEX IF NOT EXISTS idx_people_search_fts ON people_search USING GIN(fts); diff --git a/crates/adapters/postgres/src/lib.rs b/crates/adapters/postgres/src/lib.rs index 4fa9044..e824050 100644 --- a/crates/adapters/postgres/src/lib.rs +++ b/crates/adapters/postgres/src/lib.rs @@ -16,6 +16,7 @@ mod image_ref; mod import_profile; mod import_session; mod models; +mod persons; mod profile; mod users; @@ -27,6 +28,7 @@ use models::{ pub use image_ref::{PostgresImageRefAdapter, create_image_ref}; pub use import_profile::PostgresImportProfileRepository; pub use import_session::PostgresImportSessionRepository; +pub use persons::{PostgresPersonAdapter, create_person_adapter}; pub use profile::PostgresMovieProfileRepository; pub use users::PostgresUserRepository; diff --git a/crates/adapters/postgres/src/persons.rs b/crates/adapters/postgres/src/persons.rs new file mode 100644 index 0000000..2224a16 --- /dev/null +++ b/crates/adapters/postgres/src/persons.rs @@ -0,0 +1,198 @@ +use async_trait::async_trait; +use domain::{ + errors::DomainError, + models::{CastCredit, CrewCredit, ExternalPersonId, Person, PersonCredits, PersonId}, + ports::{PersonCommand, PersonQuery}, + value_objects::MovieId, +}; +use sqlx::PgPool; +use std::sync::Arc; + +pub struct PostgresPersonAdapter { + pool: PgPool, +} + +impl PostgresPersonAdapter { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +pub fn create_person_adapter(pool: PgPool) -> (Arc, Arc) { + let adapter = Arc::new(PostgresPersonAdapter::new(pool)); + (Arc::clone(&adapter) as Arc, adapter as Arc) +} + +fn map_err(e: sqlx::Error) -> DomainError { + DomainError::InfrastructureError(e.to_string()) +} + +#[async_trait] +impl PersonCommand for PostgresPersonAdapter { + async fn upsert_batch(&self, persons: &[Person]) -> Result<(), DomainError> { + for person in persons { + let tmdb_id = person.external_id().tmdb_id(); + sqlx::query( + "INSERT INTO persons (id, external_id, tmdb_person_id, name, known_for_department, profile_path) + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT(id) DO UPDATE SET + external_id = EXCLUDED.external_id, + tmdb_person_id = EXCLUDED.tmdb_person_id, + name = EXCLUDED.name, + known_for_department = EXCLUDED.known_for_department, + profile_path = EXCLUDED.profile_path", + ) + .bind(person.id().value().to_string()) + .bind(person.external_id().value()) + .bind(tmdb_id) + .bind(person.name()) + .bind(person.known_for_department()) + .bind(person.profile_path()) + .execute(&self.pool) + .await + .map_err(map_err)?; + } + Ok(()) + } +} + +#[async_trait] +impl PersonQuery for PostgresPersonAdapter { + async fn get_by_id(&self, id: &PersonId) -> Result, DomainError> { + #[derive(sqlx::FromRow)] + struct Row { + id: String, + external_id: String, + name: String, + known_for_department: Option, + profile_path: Option, + } + + let row = sqlx::query_as::<_, Row>( + "SELECT id, external_id, name, known_for_department, profile_path FROM persons WHERE id = $1", + ) + .bind(id.value().to_string()) + .fetch_optional(&self.pool) + .await + .map_err(map_err)?; + + Ok(row.map(|r| { + let ext = ExternalPersonId::new(r.external_id); + Person::new( + PersonId::from_uuid(uuid::Uuid::parse_str(&r.id).unwrap_or_default()), + ext, + r.name, + r.known_for_department, + r.profile_path, + ) + })) + } + + async fn get_by_external_id(&self, id: &ExternalPersonId) -> Result, DomainError> { + #[derive(sqlx::FromRow)] + struct Row { + id: String, + external_id: String, + name: String, + known_for_department: Option, + profile_path: Option, + } + + let row = sqlx::query_as::<_, Row>( + "SELECT id, external_id, name, known_for_department, profile_path FROM persons WHERE external_id = $1", + ) + .bind(id.value()) + .fetch_optional(&self.pool) + .await + .map_err(map_err)?; + + Ok(row.map(|r| { + let ext = ExternalPersonId::new(r.external_id); + Person::new( + PersonId::from_uuid(uuid::Uuid::parse_str(&r.id).unwrap_or_default()), + ext, + r.name, + r.known_for_department, + r.profile_path, + ) + })) + } + + async fn get_credits(&self, id: &PersonId) -> Result { + let person = self.get_by_id(id).await?.ok_or_else(|| { + DomainError::NotFound(format!("Person {} not found", id.value())) + })?; + + let tmdb_id: Option = sqlx::query_scalar( + "SELECT tmdb_person_id FROM persons WHERE id = $1", + ) + .bind(id.value().to_string()) + .fetch_optional(&self.pool) + .await + .map_err(map_err)? + .flatten(); + + let Some(tmdb_id) = tmdb_id else { + return Ok(PersonCredits { person, cast: vec![], crew: vec![] }); + }; + + #[derive(sqlx::FromRow)] + struct CastRow { + id: String, + title: String, + release_year: Option, + character: String, + poster_path: Option, + } + #[derive(sqlx::FromRow)] + struct CrewRow { + id: String, + title: String, + release_year: Option, + job: String, + department: String, + poster_path: Option, + } + + let cast = sqlx::query_as::<_, CastRow>( + "SELECT m.id, m.title, m.release_year, mc.character, m.poster_path + FROM movie_cast mc JOIN movies m ON m.id = mc.movie_id + WHERE mc.tmdb_person_id = $1 ORDER BY mc.billing_order", + ) + .bind(tmdb_id) + .fetch_all(&self.pool) + .await + .map_err(map_err)? + .into_iter() + .map(|r| CastCredit { + movie_id: MovieId::from_uuid(uuid::Uuid::parse_str(&r.id).unwrap_or_default()), + title: r.title, + release_year: r.release_year.map(|y| y as u16), + character: r.character, + poster_path: r.poster_path, + }) + .collect(); + + let crew = sqlx::query_as::<_, CrewRow>( + "SELECT m.id, m.title, m.release_year, mc.job, mc.department, m.poster_path + FROM movie_crew mc JOIN movies m ON m.id = mc.movie_id + WHERE mc.tmdb_person_id = $1 ORDER BY m.title", + ) + .bind(tmdb_id) + .fetch_all(&self.pool) + .await + .map_err(map_err)? + .into_iter() + .map(|r| CrewCredit { + movie_id: MovieId::from_uuid(uuid::Uuid::parse_str(&r.id).unwrap_or_default()), + title: r.title, + release_year: r.release_year.map(|y| y as u16), + job: r.job, + department: r.department, + poster_path: r.poster_path, + }) + .collect(); + + Ok(PersonCredits { person, cast, crew }) + } +} diff --git a/crates/adapters/sqlite-search/Cargo.toml b/crates/adapters/sqlite-search/Cargo.toml new file mode 100644 index 0000000..a313e86 --- /dev/null +++ b/crates/adapters/sqlite-search/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "sqlite-search" +version = "0.1.0" +edition = "2021" + +[dependencies] +domain = { workspace = true } +async-trait = { workspace = true } +sqlx = { workspace = true } +uuid = { workspace = true } + +[dev-dependencies] +tokio = { workspace = true } diff --git a/crates/adapters/sqlite-search/src/lib.rs b/crates/adapters/sqlite-search/src/lib.rs new file mode 100644 index 0000000..4469b1c --- /dev/null +++ b/crates/adapters/sqlite-search/src/lib.rs @@ -0,0 +1,356 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use domain::{ + errors::DomainError, + models::{ + EntityType, IndexableDocument, MovieSearchHit, PersonSearchHit, + SearchQuery, SearchResults, + collections::Paginated, + }, + models::PersonId, + value_objects::MovieId, + ports::{SearchCommand, SearchPort}, +}; +use sqlx::SqlitePool; + +pub struct SqliteSearchAdapter { + pool: SqlitePool, +} + +impl SqliteSearchAdapter { + pub fn new(pool: SqlitePool) -> Self { + Self { pool } + } +} + +pub fn create_search_adapter(pool: SqlitePool) -> (Arc, Arc) { + let adapter = Arc::new(SqliteSearchAdapter::new(pool)); + (Arc::clone(&adapter) as Arc, adapter as Arc) +} + +fn map_err(e: sqlx::Error) -> DomainError { + DomainError::InfrastructureError(e.to_string()) +} + +#[async_trait] +impl SearchCommand for SqliteSearchAdapter { + async fn index(&self, doc: IndexableDocument) -> Result<(), DomainError> { + match doc { + IndexableDocument::Movie { id, movie, profile } => { + let movie_id = id.value().to_string(); + let title = movie.title().value().to_string(); + let director = movie.director().unwrap_or("").to_string(); + let release_year = movie.release_year().value() as i64; + let (overview, genres, keywords, cast_names, crew_names, language) = + match profile.as_deref() { + Some(p) => ( + p.overview.clone().unwrap_or_default(), + p.genres.iter().map(|g| g.name.as_str()).collect::>().join(" "), + p.keywords.iter().map(|k| k.name.as_str()).collect::>().join(" "), + p.cast.iter().map(|c| c.name.as_str()).collect::>().join(" "), + p.crew.iter().map(|c| c.name.as_str()).collect::>().join(" "), + p.original_language.clone().unwrap_or_default(), + ), + None => (String::new(), String::new(), String::new(), String::new(), String::new(), String::new()), + }; + + sqlx::query( + "DELETE FROM movies_fts WHERE rowid = (SELECT rowid FROM movies_fts WHERE movie_id = ? LIMIT 1)", + ) + .bind(&movie_id) + .execute(&self.pool) + .await + .map_err(map_err)?; + + sqlx::query( + "INSERT INTO movies_fts(movie_id, title, director, overview, genres, keywords, cast_names, crew_names, release_year, language) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + ) + .bind(&movie_id) + .bind(&title) + .bind(&director) + .bind(&overview) + .bind(&genres) + .bind(&keywords) + .bind(&cast_names) + .bind(&crew_names) + .bind(release_year) + .bind(&language) + .execute(&self.pool) + .await + .map_err(map_err)?; + + Ok(()) + } + + IndexableDocument::Person { id, person } => { + let person_id = id.value().to_string(); + + sqlx::query( + "DELETE FROM people_fts WHERE rowid = (SELECT rowid FROM people_fts WHERE person_id = ? LIMIT 1)", + ) + .bind(&person_id) + .execute(&self.pool) + .await + .map_err(map_err)?; + + sqlx::query( + "INSERT INTO people_fts(person_id, name, known_for_department) VALUES (?, ?, ?)", + ) + .bind(&person_id) + .bind(person.name()) + .bind(person.known_for_department()) + .execute(&self.pool) + .await + .map_err(map_err)?; + + Ok(()) + } + } + } + + async fn remove(&self, entity_type: EntityType, id: &str) -> Result<(), DomainError> { + match entity_type { + EntityType::Movie => { + sqlx::query( + "DELETE FROM movies_fts WHERE rowid = (SELECT rowid FROM movies_fts WHERE movie_id = ? LIMIT 1)", + ) + .bind(id) + .execute(&self.pool) + .await + .map_err(map_err)?; + } + EntityType::Person => { + sqlx::query( + "DELETE FROM people_fts WHERE rowid = (SELECT rowid FROM people_fts WHERE person_id = ? LIMIT 1)", + ) + .bind(id) + .execute(&self.pool) + .await + .map_err(map_err)?; + } + } + Ok(()) + } +} + +#[async_trait] +impl SearchPort for SqliteSearchAdapter { + async fn search(&self, query: &SearchQuery) -> Result { + let movies = self.search_movies(query).await?; + let people = self.search_people(query).await?; + Ok(SearchResults { movies, people }) + } +} + +impl SqliteSearchAdapter { + async fn search_movies(&self, query: &SearchQuery) -> Result, DomainError> { + let limit = query.page.limit as i64; + let offset = query.page.offset as i64; + + #[derive(sqlx::FromRow)] + struct Row { + id: String, + title: String, + release_year: Option, + director: Option, + poster_path: Option, + genres: Option, + } + + let total: u64 = if let Some(text) = &query.text { + let fts_query = format!("{}*", text.replace('"', "").replace('*', "")); + let count: i64 = sqlx::query_scalar( + "SELECT COUNT(DISTINCT m.id) + FROM movies_fts fts + JOIN movies m ON m.id = fts.movie_id + LEFT JOIN movie_genres mg ON mg.movie_id = m.id + WHERE movies_fts MATCH ? + AND (? IS NULL OR EXISTS (SELECT 1 FROM movie_genres WHERE movie_id = m.id AND name = ?)) + AND (? IS NULL OR m.release_year = ?)", + ) + .bind(&fts_query) + .bind(&query.filters.genre) + .bind(&query.filters.genre) + .bind(query.filters.year.map(|y| y as i64)) + .bind(query.filters.year.map(|y| y as i64)) + .fetch_one(&self.pool) + .await + .map_err(map_err)?; + count as u64 + } else { + let count: i64 = sqlx::query_scalar( + "SELECT COUNT(DISTINCT m.id) + FROM movies m + LEFT JOIN movie_genres mg ON mg.movie_id = m.id + WHERE (? IS NULL OR EXISTS (SELECT 1 FROM movie_genres WHERE movie_id = m.id AND name = ?)) + AND (? IS NULL OR m.release_year = ?)", + ) + .bind(&query.filters.genre) + .bind(&query.filters.genre) + .bind(query.filters.year.map(|y| y as i64)) + .bind(query.filters.year.map(|y| y as i64)) + .fetch_one(&self.pool) + .await + .map_err(map_err)?; + count as u64 + }; + + let rows: Vec = if let Some(text) = &query.text { + let fts_query = format!("{}*", text.replace('"', "").replace('*', "")); + sqlx::query_as::<_, Row>( + "SELECT m.id, m.title, m.release_year, m.director, m.poster_path, + GROUP_CONCAT(DISTINCT mg.name) AS genres + FROM movies_fts fts + JOIN movies m ON m.id = fts.movie_id + LEFT JOIN movie_genres mg ON mg.movie_id = m.id + WHERE movies_fts MATCH ? + AND (? IS NULL OR EXISTS (SELECT 1 FROM movie_genres WHERE movie_id = m.id AND name = ?)) + AND (? IS NULL OR m.release_year = ?) + GROUP BY m.id + ORDER BY rank + LIMIT ? OFFSET ?", + ) + .bind(&fts_query) + .bind(&query.filters.genre) + .bind(&query.filters.genre) + .bind(query.filters.year.map(|y| y as i64)) + .bind(query.filters.year.map(|y| y as i64)) + .bind(limit) + .bind(offset) + .fetch_all(&self.pool) + .await + .map_err(map_err)? + } else { + sqlx::query_as::<_, Row>( + "SELECT m.id, m.title, m.release_year, m.director, m.poster_path, + GROUP_CONCAT(DISTINCT mg.name) AS genres + FROM movies m + LEFT JOIN movie_genres mg ON mg.movie_id = m.id + WHERE (? IS NULL OR EXISTS (SELECT 1 FROM movie_genres WHERE movie_id = m.id AND name = ?)) + AND (? IS NULL OR m.release_year = ?) + GROUP BY m.id + ORDER BY m.title + LIMIT ? OFFSET ?", + ) + .bind(&query.filters.genre) + .bind(&query.filters.genre) + .bind(query.filters.year.map(|y| y as i64)) + .bind(query.filters.year.map(|y| y as i64)) + .bind(limit) + .bind(offset) + .fetch_all(&self.pool) + .await + .map_err(map_err)? + }; + let items = rows.into_iter().map(|r| MovieSearchHit { + movie_id: MovieId::from_uuid(uuid::Uuid::parse_str(&r.id).unwrap_or_default()), + title: r.title, + release_year: r.release_year.map(|y| y as u16), + director: r.director, + poster_path: r.poster_path, + genres: r.genres + .unwrap_or_default() + .split(',') + .filter(|s| !s.is_empty()) + .map(str::to_string) + .collect(), + }).collect::>(); + + Ok(Paginated { items, total_count: total, limit: query.page.limit, offset: query.page.offset }) + } + + async fn search_people(&self, query: &SearchQuery) -> Result, DomainError> { + let Some(text) = &query.text else { + return Ok(Paginated { + items: vec![], + total_count: 0, + limit: query.page.limit, + offset: query.page.offset, + }); + }; + + let limit = query.page.limit as i64; + let offset = query.page.offset as i64; + let fts_query = format!("{}*", text.replace('"', "").replace('*', "")); + + let total: u64 = { + let count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM people_fts WHERE people_fts MATCH ?", + ) + .bind(&fts_query) + .fetch_one(&self.pool) + .await + .map_err(map_err)?; + count as u64 + }; + + #[derive(sqlx::FromRow)] + struct Row { + person_id: String, + name: String, + known_for_department: Option, + profile_path: Option, + } + + let rows = sqlx::query_as::<_, Row>( + "SELECT fts.person_id, p.name, p.known_for_department, p.profile_path + FROM people_fts fts + JOIN persons p ON p.id = fts.person_id + WHERE people_fts MATCH ? + ORDER BY rank + LIMIT ? OFFSET ?", + ) + .bind(&fts_query) + .bind(limit) + .bind(offset) + .fetch_all(&self.pool) + .await + .map_err(map_err)?; + + let mut items = Vec::with_capacity(rows.len()); + for row in rows { + let tmdb_id: Option = sqlx::query_scalar( + "SELECT tmdb_person_id FROM persons WHERE id = ?", + ) + .bind(&row.person_id) + .fetch_optional(&self.pool) + .await + .map_err(map_err)? + .flatten(); + + let known_for_titles = if let Some(tid) = tmdb_id { + sqlx::query_scalar::<_, String>( + "SELECT m.title FROM movie_cast mc + JOIN movies m ON m.id = mc.movie_id + WHERE mc.tmdb_person_id = ? + ORDER BY mc.billing_order + LIMIT 3", + ) + .bind(tid) + .fetch_all(&self.pool) + .await + .unwrap_or_default() + } else { + vec![] + }; + + items.push(PersonSearchHit { + person_id: PersonId::from_uuid( + uuid::Uuid::parse_str(&row.person_id).unwrap_or_default() + ), + name: row.name, + known_for_department: row.known_for_department, + profile_path: row.profile_path, + known_for_titles, + }); + } + + Ok(Paginated { items, total_count: total, limit: query.page.limit, offset: query.page.offset }) + } +} + +#[cfg(test)] +#[path = "tests/lib.rs"] +mod tests; diff --git a/crates/adapters/sqlite-search/src/tests/lib.rs b/crates/adapters/sqlite-search/src/tests/lib.rs new file mode 100644 index 0000000..5d552de --- /dev/null +++ b/crates/adapters/sqlite-search/src/tests/lib.rs @@ -0,0 +1,157 @@ +use super::{SqliteSearchAdapter, create_search_adapter}; +use domain::{ + models::{ + EntityType, IndexableDocument, Movie, + Person, PersonId, SearchFilters, SearchQuery, + ExternalPersonId, + collections::PageParams, + }, + value_objects::{MovieId, MovieTitle, ReleaseYear}, + ports::{SearchCommand, SearchPort}, +}; +use sqlx::SqlitePool; + +async fn pool_with_schema() -> SqlitePool { + let pool = SqlitePool::connect("sqlite::memory:").await.unwrap(); + sqlx::query( + "CREATE TABLE movies (id TEXT PRIMARY KEY, title TEXT NOT NULL, + release_year INTEGER, director TEXT, poster_path TEXT, external_metadata_id TEXT)", + ) + .execute(&pool).await.unwrap(); + sqlx::query( + "CREATE TABLE persons (id TEXT PRIMARY KEY, external_id TEXT UNIQUE, + tmdb_person_id INTEGER UNIQUE, name TEXT NOT NULL, + known_for_department TEXT, profile_path TEXT)", + ) + .execute(&pool).await.unwrap(); + sqlx::query( + "CREATE TABLE movie_cast (movie_id TEXT, tmdb_person_id INTEGER, + name TEXT, character TEXT, billing_order INTEGER, profile_path TEXT)", + ) + .execute(&pool).await.unwrap(); + sqlx::query( + "CREATE TABLE movie_genres (movie_id TEXT, tmdb_id INTEGER, name TEXT)", + ) + .execute(&pool).await.unwrap(); + sqlx::query( + "CREATE VIRTUAL TABLE movies_fts USING fts5( + movie_id UNINDEXED, title, director, overview, genres, keywords, + cast_names, crew_names, release_year UNINDEXED, language UNINDEXED)", + ) + .execute(&pool).await.unwrap(); + sqlx::query( + "CREATE VIRTUAL TABLE people_fts USING fts5( + person_id UNINDEXED, name, known_for_department UNINDEXED)", + ) + .execute(&pool).await.unwrap(); + pool +} + +fn test_movie(id: &str, title: &str, year: u16) -> Movie { + Movie::from_persistence( + MovieId::from_uuid(uuid::Uuid::parse_str(id).unwrap()), + None, + MovieTitle::new(title.into()).unwrap(), + ReleaseYear::new(year).unwrap(), + Some("Test Director".to_string()), + None, + ) +} + +fn default_page() -> PageParams { + PageParams::new(Some(10), Some(0)).unwrap() +} + +#[tokio::test] +async fn index_and_search_movie_by_title() { + let pool = pool_with_schema().await; + let (cmd, query) = create_search_adapter(pool.clone()); + + let id_str = "00000000-0000-0000-0000-000000000001"; + let movie = test_movie(id_str, "Interstellar", 2014); + let movie_id = movie.id().clone(); + + sqlx::query("INSERT INTO movies VALUES (?, ?, ?, ?, ?, ?)") + .bind(id_str).bind("Interstellar").bind(2014i32) + .bind("Christopher Nolan").bind::>(None).bind::>(None) + .execute(&pool).await.unwrap(); + + cmd.index(IndexableDocument::Movie { id: movie_id.clone(), movie: Box::new(movie), profile: None }) + .await.unwrap(); + + let results = query.search(&SearchQuery { + text: Some("Interstellar".to_string()), + filters: SearchFilters::default(), + page: default_page(), + }).await.unwrap(); + + assert_eq!(results.movies.items.len(), 1); + assert_eq!(results.movies.items[0].title, "Interstellar"); +} + +#[tokio::test] +async fn remove_movie_clears_from_index() { + let pool = pool_with_schema().await; + let (cmd, query) = create_search_adapter(pool.clone()); + + let id_str = "00000000-0000-0000-0000-000000000002"; + let movie = test_movie(id_str, "Inception", 2010); + let movie_id = movie.id().clone(); + + sqlx::query("INSERT INTO movies VALUES (?, ?, ?, ?, ?, ?)") + .bind(id_str).bind("Inception").bind(2010i32) + .bind("Christopher Nolan").bind::>(None).bind::>(None) + .execute(&pool).await.unwrap(); + + cmd.index(IndexableDocument::Movie { id: movie_id.clone(), movie: Box::new(movie), profile: None }) + .await.unwrap(); + cmd.remove(EntityType::Movie, id_str).await.unwrap(); + + let results = query.search(&SearchQuery { + text: Some("Inception".to_string()), + filters: SearchFilters::default(), + page: default_page(), + }).await.unwrap(); + + assert!(results.movies.items.is_empty()); +} + +#[tokio::test] +async fn search_with_genre_filter() { + let pool = pool_with_schema().await; + let (cmd, query) = create_search_adapter(pool.clone()); + + let id_str = "00000000-0000-0000-0000-000000000003"; + let movie = test_movie(id_str, "The Dark Knight", 2008); + let movie_id = movie.id().clone(); + + sqlx::query("INSERT INTO movies VALUES (?, ?, ?, ?, ?, ?)") + .bind(id_str).bind("The Dark Knight").bind(2008i32) + .bind("Christopher Nolan").bind::>(None).bind::>(None) + .execute(&pool).await.unwrap(); + sqlx::query("INSERT INTO movie_genres VALUES (?, 1, 'Action')") + .bind(id_str) + .execute(&pool).await.unwrap(); + + cmd.index(IndexableDocument::Movie { + id: movie_id.clone(), + movie: Box::new(movie), + profile: None, + }).await.unwrap(); + + // Matching genre — no text filter + let results = query.search(&SearchQuery { + text: None, + filters: SearchFilters { genre: Some("Action".to_string()), ..Default::default() }, + page: default_page(), + }).await.unwrap(); + assert_eq!(results.movies.items.len(), 1); + + // Non-matching genre + let results = query.search(&SearchQuery { + text: None, + filters: SearchFilters { genre: Some("Comedy".to_string()), ..Default::default() }, + page: default_page(), + }).await.unwrap(); + assert!(results.movies.items.is_empty()); +} diff --git a/crates/adapters/sqlite/migrations/0015_search.sql b/crates/adapters/sqlite/migrations/0015_search.sql new file mode 100644 index 0000000..a5ddd4f --- /dev/null +++ b/crates/adapters/sqlite/migrations/0015_search.sql @@ -0,0 +1,36 @@ +-- Persons table. tmdb_person_id is stored for efficient joins with existing +-- movie_cast and movie_crew tables (which use tmdb_person_id as their person key). +CREATE TABLE IF NOT EXISTS persons ( + id TEXT PRIMARY KEY, -- UUID (PersonId) + external_id TEXT NOT NULL UNIQUE, -- "tmdb:12345" + tmdb_person_id INTEGER UNIQUE, -- parsed from external_id for fast joins + name TEXT NOT NULL, + known_for_department TEXT, + profile_path TEXT +); + +CREATE INDEX IF NOT EXISTS idx_persons_external ON persons (external_id); +CREATE INDEX IF NOT EXISTS idx_persons_tmdb_id ON persons (tmdb_person_id); + +-- FTS5 full-text search table for movies. +-- movie_id is UNINDEXED (stored but not tokenised for text search). +-- release_year and language are UNINDEXED (used only for structured filters). +CREATE VIRTUAL TABLE IF NOT EXISTS movies_fts USING fts5( + movie_id UNINDEXED, + title, + director, + overview, + genres, + keywords, + cast_names, + crew_names, + release_year UNINDEXED, + language UNINDEXED +); + +-- FTS5 full-text search table for people. +CREATE VIRTUAL TABLE IF NOT EXISTS people_fts USING fts5( + person_id UNINDEXED, + name, + known_for_department UNINDEXED +); diff --git a/crates/adapters/sqlite/src/lib.rs b/crates/adapters/sqlite/src/lib.rs index ccd2aa5..1765719 100644 --- a/crates/adapters/sqlite/src/lib.rs +++ b/crates/adapters/sqlite/src/lib.rs @@ -17,6 +17,7 @@ mod import_profile; mod import_session; mod migrations; mod models; +mod persons; mod profile; mod users; @@ -28,6 +29,7 @@ use models::{ pub use image_ref::{SqliteImageRefAdapter, create_image_ref}; pub use import_profile::SqliteImportProfileRepository; pub use import_session::SqliteImportSessionRepository; +pub use persons::{SqlitePersonAdapter, create_person_adapter}; pub use profile::SqliteMovieProfileRepository; pub use users::SqliteUserRepository; diff --git a/crates/adapters/sqlite/src/persons.rs b/crates/adapters/sqlite/src/persons.rs new file mode 100644 index 0000000..eef62c8 --- /dev/null +++ b/crates/adapters/sqlite/src/persons.rs @@ -0,0 +1,195 @@ +use async_trait::async_trait; +use domain::{ + errors::DomainError, + models::{CastCredit, CrewCredit, ExternalPersonId, Person, PersonCredits, PersonId}, + ports::{PersonCommand, PersonQuery}, + value_objects::MovieId, +}; +use sqlx::SqlitePool; +use std::sync::Arc; + +pub struct SqlitePersonAdapter { + pool: SqlitePool, +} + +impl SqlitePersonAdapter { + pub fn new(pool: SqlitePool) -> Self { + Self { pool } + } +} + +pub fn create_person_adapter(pool: SqlitePool) -> (Arc, Arc) { + let adapter = Arc::new(SqlitePersonAdapter::new(pool)); + (Arc::clone(&adapter) as Arc, adapter as Arc) +} + +fn map_err(e: sqlx::Error) -> DomainError { + DomainError::InfrastructureError(e.to_string()) +} + +#[async_trait] +impl PersonCommand for SqlitePersonAdapter { + async fn upsert_batch(&self, persons: &[Person]) -> Result<(), DomainError> { + for person in persons { + let tmdb_id = person.external_id().tmdb_id(); + sqlx::query( + "INSERT INTO persons (id, external_id, tmdb_person_id, name, known_for_department, profile_path) + VALUES (?, ?, ?, ?, ?, ?) + ON CONFLICT(id) DO UPDATE SET + external_id = excluded.external_id, + tmdb_person_id = excluded.tmdb_person_id, + name = excluded.name, + known_for_department = excluded.known_for_department, + profile_path = excluded.profile_path", + ) + .bind(person.id().value().to_string()) + .bind(person.external_id().value()) + .bind(tmdb_id) + .bind(person.name()) + .bind(person.known_for_department()) + .bind(person.profile_path()) + .execute(&self.pool) + .await + .map_err(map_err)?; + } + Ok(()) + } +} + +#[async_trait] +impl PersonQuery for SqlitePersonAdapter { + async fn get_by_id(&self, id: &PersonId) -> Result, DomainError> { + let row = sqlx::query_as::<_, PersonRow>( + "SELECT id, external_id, name, known_for_department, profile_path FROM persons WHERE id = ?", + ) + .bind(id.value().to_string()) + .fetch_optional(&self.pool) + .await + .map_err(map_err)?; + + Ok(row.map(PersonRow::into_person)) + } + + async fn get_by_external_id(&self, id: &ExternalPersonId) -> Result, DomainError> { + let row = sqlx::query_as::<_, PersonRow>( + "SELECT id, external_id, name, known_for_department, profile_path FROM persons WHERE external_id = ?", + ) + .bind(id.value()) + .fetch_optional(&self.pool) + .await + .map_err(map_err)?; + + Ok(row.map(PersonRow::into_person)) + } + + async fn get_credits(&self, id: &PersonId) -> Result { + let person = self.get_by_id(id).await?.ok_or_else(|| { + DomainError::NotFound(format!("Person {} not found", id.value())) + })?; + + let tmdb_id: Option = sqlx::query_scalar( + "SELECT tmdb_person_id FROM persons WHERE id = ?", + ) + .bind(id.value().to_string()) + .fetch_optional(&self.pool) + .await + .map_err(map_err)? + .flatten(); + + let Some(tmdb_id) = tmdb_id else { + return Ok(PersonCredits { person, cast: vec![], crew: vec![] }); + }; + + let cast = sqlx::query_as::<_, CastRow>( + "SELECT m.id, m.title, m.release_year, mc.character, m.poster_path + FROM movie_cast mc + JOIN movies m ON m.id = mc.movie_id + WHERE mc.tmdb_person_id = ? + ORDER BY mc.billing_order", + ) + .bind(tmdb_id) + .fetch_all(&self.pool) + .await + .map_err(map_err)? + .into_iter() + .map(|r| CastCredit { + movie_id: MovieId::from_uuid(uuid::Uuid::parse_str(&r.id).unwrap_or_default()), + title: r.title, + release_year: r.release_year.map(|y| y as u16), + character: r.character, + poster_path: r.poster_path, + }) + .collect(); + + let crew = sqlx::query_as::<_, CrewRow>( + "SELECT m.id, m.title, m.release_year, mc.job, mc.department, m.poster_path + FROM movie_crew mc + JOIN movies m ON m.id = mc.movie_id + WHERE mc.tmdb_person_id = ? + ORDER BY m.title", + ) + .bind(tmdb_id) + .fetch_all(&self.pool) + .await + .map_err(map_err)? + .into_iter() + .map(|r| CrewCredit { + movie_id: MovieId::from_uuid(uuid::Uuid::parse_str(&r.id).unwrap_or_default()), + title: r.title, + release_year: r.release_year.map(|y| y as u16), + job: r.job, + department: r.department, + poster_path: r.poster_path, + }) + .collect(); + + Ok(PersonCredits { person, cast, crew }) + } +} + +// ── Row types ──────────────────────────────────────────────────────────────── + +#[derive(sqlx::FromRow)] +struct PersonRow { + id: String, + external_id: String, + name: String, + known_for_department: Option, + profile_path: Option, +} + +impl PersonRow { + fn into_person(self) -> Person { + let ext = ExternalPersonId::new(self.external_id); + Person::new( + PersonId::from_uuid(uuid::Uuid::parse_str(&self.id).unwrap_or_default()), + ext, + self.name, + self.known_for_department, + self.profile_path, + ) + } +} + +#[derive(sqlx::FromRow)] +struct CastRow { + id: String, + title: String, + release_year: Option, + character: String, + poster_path: Option, +} + +#[derive(sqlx::FromRow)] +struct CrewRow { + id: String, + title: String, + release_year: Option, + job: String, + department: String, + poster_path: Option, +} + +#[cfg(test)] +#[path = "tests/persons.rs"] +mod tests; diff --git a/crates/adapters/sqlite/src/tests/persons.rs b/crates/adapters/sqlite/src/tests/persons.rs new file mode 100644 index 0000000..36c9e74 --- /dev/null +++ b/crates/adapters/sqlite/src/tests/persons.rs @@ -0,0 +1,126 @@ +use super::super::persons::SqlitePersonAdapter; +use domain::{ + errors::DomainError, + models::{ExternalPersonId, Person, PersonId}, + ports::{PersonCommand, PersonQuery}, +}; +use sqlx::SqlitePool; + +async fn pool_with_schema() -> SqlitePool { + let pool = SqlitePool::connect("sqlite::memory:").await.unwrap(); + sqlx::query( + "CREATE TABLE persons ( + id TEXT PRIMARY KEY, external_id TEXT NOT NULL UNIQUE, + tmdb_person_id INTEGER UNIQUE, name TEXT NOT NULL, + known_for_department TEXT, profile_path TEXT + )", + ) + .execute(&pool) + .await + .unwrap(); + sqlx::query( + "CREATE TABLE movies (id TEXT PRIMARY KEY, title TEXT NOT NULL, + release_year INTEGER, director TEXT, poster_path TEXT, + external_metadata_id TEXT)", + ) + .execute(&pool) + .await + .unwrap(); + sqlx::query( + "CREATE TABLE movie_cast (movie_id TEXT, tmdb_person_id INTEGER, + name TEXT, character TEXT, billing_order INTEGER, profile_path TEXT, + PRIMARY KEY (movie_id, tmdb_person_id))", + ) + .execute(&pool) + .await + .unwrap(); + sqlx::query( + "CREATE TABLE movie_crew (movie_id TEXT, tmdb_person_id INTEGER, + name TEXT, job TEXT, department TEXT, profile_path TEXT, + PRIMARY KEY (movie_id, tmdb_person_id, job))", + ) + .execute(&pool) + .await + .unwrap(); + pool +} + +fn make_person(tmdb_id: i64, name: &str, dept: Option<&str>) -> Person { + let ext = ExternalPersonId::new(format!("tmdb:{tmdb_id}")); + Person::new( + PersonId::from_external(&ext), + ext, + name.to_string(), + dept.map(str::to_string), + None, + ) +} + +#[tokio::test] +async fn upsert_batch_inserts_persons() { + let pool = pool_with_schema().await; + let adapter = SqlitePersonAdapter::new(pool.clone()); + + let persons = vec![make_person(1, "Alice", Some("Acting")), make_person(2, "Bob", Some("Directing"))]; + adapter.upsert_batch(&persons).await.unwrap(); + + let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM persons") + .fetch_one(&pool).await.unwrap(); + assert_eq!(count.0, 2); +} + +#[tokio::test] +async fn upsert_batch_is_idempotent() { + let pool = pool_with_schema().await; + let adapter = SqlitePersonAdapter::new(pool.clone()); + + let persons = vec![make_person(1, "Alice", Some("Acting"))]; + adapter.upsert_batch(&persons).await.unwrap(); + adapter.upsert_batch(&persons).await.unwrap(); + + let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM persons") + .fetch_one(&pool).await.unwrap(); + assert_eq!(count.0, 1); +} + +#[tokio::test] +async fn get_by_id_returns_person() { + let pool = pool_with_schema().await; + let adapter = SqlitePersonAdapter::new(pool.clone()); + + let p = make_person(42, "Charlie", Some("Acting")); + adapter.upsert_batch(&[p.clone()]).await.unwrap(); + + let found = adapter.get_by_id(p.id()).await.unwrap().unwrap(); + assert_eq!(found.name(), "Charlie"); + assert_eq!(found.external_id().value(), "tmdb:42"); +} + +#[tokio::test] +async fn get_by_id_returns_none_for_unknown() { + let pool = pool_with_schema().await; + let adapter = SqlitePersonAdapter::new(pool); + let ext = ExternalPersonId::new("tmdb:999"); + let id = PersonId::from_external(&ext); + assert!(adapter.get_by_id(&id).await.unwrap().is_none()); +} + +#[tokio::test] +async fn get_credits_returns_cast_and_crew() { + let pool = pool_with_schema().await; + let adapter = SqlitePersonAdapter::new(pool.clone()); + + let p = make_person(7, "Diana", Some("Acting")); + adapter.upsert_batch(&[p.clone()]).await.unwrap(); + + sqlx::query("INSERT INTO movies VALUES ('m1', 'The Film', 2020, 'Dir', NULL, NULL)") + .execute(&pool).await.unwrap(); + sqlx::query("INSERT INTO movie_cast VALUES ('m1', 7, 'Diana', 'Hero', 1, NULL)") + .execute(&pool).await.unwrap(); + + let credits = adapter.get_credits(p.id()).await.unwrap(); + assert_eq!(credits.person.name(), "Diana"); + assert_eq!(credits.cast.len(), 1); + assert_eq!(credits.cast[0].character, "Hero"); + assert!(credits.crew.is_empty()); +} diff --git a/crates/adapters/tmdb-enrichment/Cargo.toml b/crates/adapters/tmdb-enrichment/Cargo.toml index b6d28d4..548a9ab 100644 --- a/crates/adapters/tmdb-enrichment/Cargo.toml +++ b/crates/adapters/tmdb-enrichment/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" edition = "2024" [dependencies] +application = { workspace = true } domain = { workspace = true } reqwest = { workspace = true } serde = { workspace = true } diff --git a/crates/adapters/tmdb-enrichment/src/lib.rs b/crates/adapters/tmdb-enrichment/src/lib.rs index 1363078..72e2ab4 100644 --- a/crates/adapters/tmdb-enrichment/src/lib.rs +++ b/crates/adapters/tmdb-enrichment/src/lib.rs @@ -2,11 +2,12 @@ use std::sync::Arc; use async_trait::async_trait; use chrono::Utc; +use application::{commands::EnrichMovieCommand, use_cases::enrich_movie}; use domain::{ errors::DomainError, events::DomainEvent, models::{CastMember, CrewMember, Genre, Keyword, MovieProfile}, - ports::{EventHandler, MovieEnrichmentClient, MovieProfileRepository}, + ports::{EventHandler, MovieEnrichmentClient, MovieProfileRepository, MovieRepository, PersonCommand, SearchCommand}, value_objects::MovieId, }; use serde::Deserialize; @@ -163,7 +164,10 @@ impl MovieEnrichmentClient for TmdbEnrichmentClient { pub struct EnrichmentHandler { pub enrichment_client: Arc, - pub profile_repo: Arc, + pub movie_repository: Arc, + pub profile_repo: Arc, + pub person_command: Arc, + pub search_command: Arc, } #[async_trait] @@ -176,7 +180,7 @@ impl EventHandler for EnrichmentHandler { _ => return Ok(()), }; - // Skip if profile is fresh (checked by the repo's list_stale, but guard here too) + // Skip if profile is fresh (< 30 days old) if let Ok(Some(existing)) = self.profile_repo.get_by_movie_id(&movie_id).await { let age = Utc::now() - existing.enriched_at; if age.num_days() < 30 { @@ -190,16 +194,17 @@ impl EventHandler for EnrichmentHandler { } tracing::info!(movie_id = %movie_id.value(), external_id = %external_metadata_id, "enriching movie"); + match self.enrichment_client.fetch_profile(movie_id.clone(), &external_metadata_id).await { Ok(profile) => { - self.profile_repo.upsert(&profile).await?; - tracing::info!( - movie_id = %movie_id.value(), - genres = profile.genres.len(), - cast = profile.cast.len(), - crew = profile.crew.len(), - "enrichment stored" - ); + enrich_movie::execute( + &self.movie_repository, + &self.profile_repo, + &self.person_command, + &self.search_command, + EnrichMovieCommand { movie_id, profile }, + ) + .await?; } Err(DomainError::NotFound(msg)) => { tracing::warn!(movie_id = %movie_id.value(), "TMDb lookup found nothing: {msg}"); diff --git a/crates/api-types/src/lib.rs b/crates/api-types/src/lib.rs index 2d9823b..caa4180 100644 --- a/crates/api-types/src/lib.rs +++ b/crates/api-types/src/lib.rs @@ -3,6 +3,7 @@ pub mod common; pub mod diary; pub mod import; pub mod movies; +pub mod search; pub mod social; pub mod users; diff --git a/crates/api-types/src/search.rs b/crates/api-types/src/search.rs new file mode 100644 index 0000000..183f298 --- /dev/null +++ b/crates/api-types/src/search.rs @@ -0,0 +1,90 @@ +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +#[derive(Debug, Deserialize)] +pub struct SearchQueryParams { + pub q: Option, + pub genre: Option, + pub year: Option, + pub person_id: Option, + pub department: Option, + pub language: Option, + pub limit: Option, + pub offset: Option, +} + +#[derive(Debug, Serialize)] +pub struct SearchResponse { + pub movies: PaginatedMovieHits, + pub people: PaginatedPersonHits, +} + +#[derive(Debug, Serialize)] +pub struct PaginatedMovieHits { + pub items: Vec, + pub total_count: u64, + pub limit: u32, + pub offset: u32, +} + +#[derive(Debug, Serialize)] +pub struct PaginatedPersonHits { + pub items: Vec, + pub total_count: u64, + pub limit: u32, + pub offset: u32, +} + +#[derive(Debug, Serialize)] +pub struct MovieSearchHitDto { + pub movie_id: Uuid, + pub title: String, + pub release_year: Option, + pub director: Option, + pub poster_path: Option, + pub genres: Vec, +} + +#[derive(Debug, Serialize)] +pub struct PersonSearchHitDto { + pub person_id: Uuid, + pub name: String, + pub known_for_department: Option, + pub profile_path: Option, + pub known_for_titles: Vec, +} + +#[derive(Debug, Serialize)] +pub struct PersonDto { + pub id: Uuid, + pub external_id: String, + pub name: String, + pub known_for_department: Option, + pub profile_path: Option, +} + +#[derive(Debug, Serialize)] +pub struct PersonCreditsDto { + pub person: PersonDto, + pub cast: Vec, + pub crew: Vec, +} + +#[derive(Debug, Serialize)] +pub struct CastCreditDto { + pub movie_id: Uuid, + pub title: String, + pub release_year: Option, + pub character: String, + pub poster_path: Option, +} + +#[derive(Debug, Serialize)] +pub struct CrewCreditDto { + pub movie_id: Uuid, + pub title: String, + pub release_year: Option, + pub job: String, + pub department: String, + pub poster_path: Option, +} diff --git a/crates/application/src/commands.rs b/crates/application/src/commands.rs index 5186d63..313a067 100644 --- a/crates/application/src/commands.rs +++ b/crates/application/src/commands.rs @@ -76,3 +76,8 @@ pub struct UpdateProfileCommand { pub avatar_bytes: Option>, pub avatar_content_type: Option, } + +pub struct EnrichMovieCommand { + pub movie_id: domain::value_objects::MovieId, + pub profile: domain::models::MovieProfile, +} diff --git a/crates/application/src/context.rs b/crates/application/src/context.rs index f0a1e0c..046f0e8 100644 --- a/crates/application/src/context.rs +++ b/crates/application/src/context.rs @@ -5,6 +5,7 @@ use domain::ports::{ ImageStorage, ImportProfileRepository, ImportSessionRepository, MetadataClient, MovieProfileRepository, MovieRepository, PasswordHasher, PosterFetcherClient, + PersonCommand, PersonQuery, SearchCommand, SearchPort, ReviewRepository, StatsRepository, UserRepository, }; @@ -28,5 +29,9 @@ pub struct AppContext { pub import_session_repository: Arc, pub import_profile_repository: Arc, pub movie_profile_repository: Arc, + pub person_command: Arc, + pub person_query: Arc, + pub search_port: Arc, + pub search_command: Arc, pub config: AppConfig, } diff --git a/crates/application/src/lib.rs b/crates/application/src/lib.rs index f023f3d..33523c0 100644 --- a/crates/application/src/lib.rs +++ b/crates/application/src/lib.rs @@ -7,3 +7,6 @@ pub mod movie_resolver; pub mod ports; pub mod queries; pub mod use_cases; +pub mod search_cleanup; + +pub use search_cleanup::SearchCleanupHandler; diff --git a/crates/application/src/search_cleanup.rs b/crates/application/src/search_cleanup.rs new file mode 100644 index 0000000..a28c561 --- /dev/null +++ b/crates/application/src/search_cleanup.rs @@ -0,0 +1,34 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use domain::{ + errors::DomainError, + events::DomainEvent, + models::EntityType, + ports::{EventHandler, SearchCommand}, +}; + +pub struct SearchCleanupHandler { + search_command: Arc, +} + +impl SearchCleanupHandler { + pub fn new(search_command: Arc) -> Self { + Self { search_command } + } +} + +#[async_trait] +impl EventHandler for SearchCleanupHandler { + async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> { + let movie_id = match event { + DomainEvent::MovieDeleted { movie_id, .. } => movie_id.value().to_string(), + _ => return Ok(()), + }; + + if let Err(e) = self.search_command.remove(EntityType::Movie, &movie_id).await { + tracing::warn!("search cleanup failed for movie {movie_id}: {e}"); + } + Ok(()) + } +} diff --git a/crates/application/src/use_cases/enrich_movie.rs b/crates/application/src/use_cases/enrich_movie.rs new file mode 100644 index 0000000..e1f6455 --- /dev/null +++ b/crates/application/src/use_cases/enrich_movie.rs @@ -0,0 +1,96 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use domain::{ + errors::DomainError, + models::{ + CastMember, CrewMember, ExternalPersonId, IndexableDocument, Person, PersonId, + }, + ports::{MovieProfileRepository, MovieRepository, PersonCommand, SearchCommand}, +}; + +use crate::commands::EnrichMovieCommand; + +pub async fn execute( + movie_repository: &Arc, + profile_repository: &Arc, + person_command: &Arc, + search_command: &Arc, + cmd: EnrichMovieCommand, +) -> Result<(), DomainError> { + // 1. Persist the enriched profile (also handles movie_cast, movie_crew, genres, keywords) + profile_repository.upsert(&cmd.profile).await?; + + // 2. Upsert persons extracted from cast + crew (no reads — only upsert) + let persons = extract_persons(&cmd.profile.cast, &cmd.profile.crew); + if !persons.is_empty() { + person_command.upsert_batch(&persons).await?; + } + + // 3. Fetch the movie for the search index document + let Some(movie) = movie_repository.get_movie_by_id(&cmd.movie_id).await? else { + tracing::warn!(movie_id = %cmd.movie_id.value(), "enrich_movie: movie not found after profile upsert"); + return Ok(()); + }; + + // 4. Index the movie in search + search_command + .index(IndexableDocument::Movie { + id: cmd.movie_id.clone(), + movie: Box::new(movie), + profile: Some(Box::new(cmd.profile.clone())), + }) + .await?; + + // 5. Index each unique person in search (no reads — persons built from in-memory data) + for person in &persons { + search_command + .index(IndexableDocument::Person { + id: person.id().clone(), + person: Box::new(person.clone()), + }) + .await?; + } + + tracing::info!( + movie_id = %cmd.movie_id.value(), + persons = persons.len(), + "enrich_movie: profile stored and search index updated" + ); + Ok(()) +} + +/// Build unique Person values from cast and crew. +/// Uses deterministic UUIDv5 so the same tmdb_person_id always maps to the same PersonId. +/// No DB reads — persons are built entirely from in-memory TMDb data. +fn extract_persons(cast: &[CastMember], crew: &[CrewMember]) -> Vec { + let mut seen: HashMap = HashMap::new(); + + for member in cast { + seen.entry(member.tmdb_person_id).or_insert_with(|| { + let ext = ExternalPersonId::new(format!("tmdb:{}", member.tmdb_person_id)); + Person::new( + PersonId::from_external(&ext), + ext, + member.name.clone(), + Some("Acting".to_string()), + member.profile_path.clone(), + ) + }); + } + + for member in crew { + seen.entry(member.tmdb_person_id).or_insert_with(|| { + let ext = ExternalPersonId::new(format!("tmdb:{}", member.tmdb_person_id)); + Person::new( + PersonId::from_external(&ext), + ext, + member.name.clone(), + Some(member.department.clone()), + member.profile_path.clone(), + ) + }); + } + + seen.into_values().collect() +} diff --git a/crates/application/src/use_cases/get_person.rs b/crates/application/src/use_cases/get_person.rs new file mode 100644 index 0000000..b010917 --- /dev/null +++ b/crates/application/src/use_cases/get_person.rs @@ -0,0 +1,6 @@ +use domain::{errors::DomainError, models::{Person, PersonId}}; +use crate::context::AppContext; + +pub async fn execute(ctx: &AppContext, id: PersonId) -> Result, DomainError> { + ctx.person_query.get_by_id(&id).await +} diff --git a/crates/application/src/use_cases/get_person_credits.rs b/crates/application/src/use_cases/get_person_credits.rs new file mode 100644 index 0000000..6ff194a --- /dev/null +++ b/crates/application/src/use_cases/get_person_credits.rs @@ -0,0 +1,6 @@ +use domain::{errors::DomainError, models::{PersonCredits, PersonId}}; +use crate::context::AppContext; + +pub async fn execute(ctx: &AppContext, id: PersonId) -> Result { + ctx.person_query.get_credits(&id).await +} diff --git a/crates/application/src/use_cases/mod.rs b/crates/application/src/use_cases/mod.rs index d319736..de4e579 100644 --- a/crates/application/src/use_cases/mod.rs +++ b/crates/application/src/use_cases/mod.rs @@ -1,3 +1,4 @@ +pub mod enrich_movie; pub mod apply_import_mapping; pub mod apply_import_profile; pub mod cleanup_expired_import_sessions; @@ -12,11 +13,14 @@ pub mod get_activity_feed; pub mod get_diary; pub mod get_movie_social_page; pub mod get_movies; +pub mod get_person; +pub mod get_person_credits; pub mod get_review_history; pub mod get_user_profile; pub mod get_users; pub mod log_review; pub mod login; pub mod register; +pub mod search; pub mod sync_poster; pub mod update_profile; diff --git a/crates/application/src/use_cases/search.rs b/crates/application/src/use_cases/search.rs new file mode 100644 index 0000000..1236449 --- /dev/null +++ b/crates/application/src/use_cases/search.rs @@ -0,0 +1,6 @@ +use domain::{errors::DomainError, models::{SearchQuery, SearchResults}}; +use crate::context::AppContext; + +pub async fn execute(ctx: &AppContext, query: SearchQuery) -> Result { + ctx.search_port.search(&query).await +} diff --git a/crates/domain/src/models/mod.rs b/crates/domain/src/models/mod.rs index ec5f403..d203c56 100644 --- a/crates/domain/src/models/mod.rs +++ b/crates/domain/src/models/mod.rs @@ -12,6 +12,8 @@ pub mod collections; pub mod import; pub mod import_session; pub mod import_profile; +pub mod person; +pub mod search; pub use import::{ AnnotatedRow, DomainField, FieldMapping, FileFormat, ImportError, @@ -19,6 +21,11 @@ pub use import::{ }; pub use import_session::ImportSession; pub use import_profile::ImportProfile; +pub use person::{CastCredit, CrewCredit, ExternalPersonId, Person, PersonCredits, PersonId}; +pub use search::{ + EntityType, IndexableDocument, MovieSearchHit, PersonSearchHit, + SearchFilters, SearchQuery, SearchResults, +}; #[derive(Clone, Debug, Default)] pub enum SortDirection { diff --git a/crates/domain/src/models/person.rs b/crates/domain/src/models/person.rs new file mode 100644 index 0000000..ac409c2 --- /dev/null +++ b/crates/domain/src/models/person.rs @@ -0,0 +1,107 @@ +use uuid::Uuid; + +use crate::models::MovieId; + +#[derive(Clone, Debug, PartialEq)] +pub struct PersonId(Uuid); + +impl PersonId { + pub fn from_uuid(uuid: Uuid) -> Self { + Self(uuid) + } + + /// Deterministic UUIDv5 from an external person ID string. + /// "tmdb:12345" always maps to the same PersonId. + pub fn from_external(external_id: &ExternalPersonId) -> Self { + Self(Uuid::new_v5(&Uuid::NAMESPACE_URL, external_id.0.as_bytes())) + } + + pub fn value(&self) -> Uuid { + self.0 + } +} + +#[derive(Clone, Debug, PartialEq)] +pub struct ExternalPersonId(String); + +impl ExternalPersonId { + pub fn new(s: impl Into) -> Self { + Self(s.into()) + } + + pub fn value(&self) -> &str { + &self.0 + } + + /// Parse the TMDb numeric ID from "tmdb:12345". Returns None for other formats. + pub fn tmdb_id(&self) -> Option { + self.0.strip_prefix("tmdb:").and_then(|s| s.parse().ok()) + } +} + +#[derive(Clone, Debug)] +pub struct Person { + id: PersonId, + external_id: ExternalPersonId, + name: String, + known_for_department: Option, + profile_path: Option, +} + +impl Person { + pub fn new( + id: PersonId, + external_id: ExternalPersonId, + name: String, + known_for_department: Option, + profile_path: Option, + ) -> Self { + Self { id, external_id, name, known_for_department, profile_path } + } + + pub fn id(&self) -> &PersonId { + &self.id + } + + pub fn external_id(&self) -> &ExternalPersonId { + &self.external_id + } + + pub fn name(&self) -> &str { + &self.name + } + + pub fn known_for_department(&self) -> Option<&str> { + self.known_for_department.as_deref() + } + + pub fn profile_path(&self) -> Option<&str> { + self.profile_path.as_deref() + } +} + +#[derive(Clone, Debug)] +pub struct PersonCredits { + pub person: Person, + pub cast: Vec, + pub crew: Vec, +} + +#[derive(Clone, Debug)] +pub struct CastCredit { + pub movie_id: MovieId, + pub title: String, + pub release_year: Option, + pub character: String, + pub poster_path: Option, +} + +#[derive(Clone, Debug)] +pub struct CrewCredit { + pub movie_id: MovieId, + pub title: String, + pub release_year: Option, + pub job: String, + pub department: String, + pub poster_path: Option, +} diff --git a/crates/domain/src/models/search.rs b/crates/domain/src/models/search.rs new file mode 100644 index 0000000..8389492 --- /dev/null +++ b/crates/domain/src/models/search.rs @@ -0,0 +1,68 @@ +use crate::models::{ + Movie, MovieId, MovieProfile, Person, PersonId, + collections::{PageParams, Paginated}, +}; + +#[derive(Clone, Debug, Default)] +pub struct SearchQuery { + pub text: Option, + pub filters: SearchFilters, + pub page: PageParams, +} + +#[derive(Clone, Debug, Default)] +pub struct SearchFilters { + pub genre: Option, + pub year: Option, + pub person_id: Option, + pub department: Option, + pub language: Option, +} + +#[derive(Clone, Debug)] +pub struct SearchResults { + pub movies: Paginated, + pub people: Paginated, +} + +#[derive(Clone, Debug)] +pub struct MovieSearchHit { + pub movie_id: MovieId, + pub title: String, + pub release_year: Option, + pub director: Option, + pub poster_path: Option, + pub genres: Vec, +} + +#[derive(Clone, Debug)] +pub struct PersonSearchHit { + pub person_id: PersonId, + pub name: String, + pub known_for_department: Option, + pub profile_path: Option, + /// Top movie titles this person is known for — populated at query time + /// by joining relational tables, never from the index. + pub known_for_titles: Vec, +} + +/// Document submitted to the search index. +/// Add a new variant here to make a new entity type searchable — the port never changes. +pub enum IndexableDocument { + Movie { + id: MovieId, + movie: Box, + profile: Option>, + }, + Person { + id: PersonId, + person: Box, + // known_for_titles intentionally absent — no reads inside a command flow + }, +} + +#[derive(Clone, Debug, PartialEq)] +pub enum EntityType { + Movie, + Person, +} diff --git a/crates/domain/src/ports.rs b/crates/domain/src/ports.rs index cbb1f9e..46aeb5b 100644 --- a/crates/domain/src/ports.rs +++ b/crates/domain/src/ports.rs @@ -8,6 +8,8 @@ use crate::{ AnnotatedRow, DiaryEntry, DiaryFilter, ExportFormat, FeedEntry, FieldMapping, FileFormat, ImportError, ImportProfile, ImportSession, Movie, MovieProfile, MovieStats, ParsedFile, Review, ReviewHistory, User, UserStats, UserSummary, UserTrends, + EntityType, ExternalPersonId, IndexableDocument, Person, PersonCredits, + PersonId, SearchQuery, SearchResults, collections::{self, PageParams, Paginated}, }, value_objects::{ @@ -274,3 +276,34 @@ pub trait ImageRefCommand: Send + Sync { pub trait ImageRefQuery: Send + Sync { async fn list_keys(&self) -> Result, DomainError>; } + +/// Write port — mutates the persons table. No reads. +#[async_trait] +pub trait PersonCommand: Send + Sync { + /// Upsert a batch of persons. Uses INSERT OR REPLACE (SQLite) / ON CONFLICT DO UPDATE (Postgres). + async fn upsert_batch(&self, persons: &[Person]) -> Result<(), DomainError>; +} + +/// Read port — queries persons and credits. No mutations. +#[async_trait] +pub trait PersonQuery: Send + Sync { + async fn get_by_id(&self, id: &PersonId) -> Result, DomainError>; + async fn get_by_external_id(&self, id: &ExternalPersonId) -> Result, DomainError>; + /// Returns the person's full cast and crew credit history across all indexed movies. + async fn get_credits(&self, id: &PersonId) -> Result; +} + +/// Read port — executes search queries. No mutations. +#[async_trait] +pub trait SearchPort: Send + Sync { + async fn search(&self, query: &SearchQuery) -> Result; +} + +/// Write port — manages the search index. No reads. +#[async_trait] +pub trait SearchCommand: Send + Sync { + /// Add or replace a document in the search index. + async fn index(&self, doc: IndexableDocument) -> Result<(), DomainError>; + /// Remove a document from the search index by entity type and internal ID string. + async fn remove(&self, entity_type: EntityType, id: &str) -> Result<(), DomainError>; +} diff --git a/crates/presentation/Cargo.toml b/crates/presentation/Cargo.toml index 42f881a..cff472d 100644 --- a/crates/presentation/Cargo.toml +++ b/crates/presentation/Cargo.toml @@ -5,8 +5,8 @@ edition = "2024" [features] default = ["sqlite", "sqlite-federation"] -sqlite = ["dep:sqlite", "dep:sqlite-event-queue"] -postgres = ["dep:postgres", "dep:postgres-event-queue"] +sqlite = ["dep:sqlite", "dep:sqlite-event-queue", "dep:sqlite-search"] +postgres = ["dep:postgres", "dep:postgres-event-queue", "dep:postgres-search"] nats = ["dep:nats"] # Meta-feature: true when any federation adapter is active — keeps all #[cfg(feature = "federation")] gates working federation = [] @@ -63,6 +63,8 @@ sqlite = { workspace = true, optional = true } postgres = { workspace = true, optional = true } sqlite-event-queue = { workspace = true, optional = true } postgres-event-queue = { workspace = true, optional = true } +sqlite-search = { workspace = true, optional = true } +postgres-search = { workspace = true, optional = true } # Optional — federation activitypub = { workspace = true, optional = true } diff --git a/crates/presentation/src/handlers/api.rs b/crates/presentation/src/handlers/api.rs index a415d8a..3bd1bf1 100644 --- a/crates/presentation/src/handlers/api.rs +++ b/crates/presentation/src/handlers/api.rs @@ -21,11 +21,12 @@ use application::{ get_diary, get_movie_social_page, get_movies, get_review_history, get_user_profile as get_user_profile_uc, get_users, log_review, login as login_uc, register as register_uc, sync_poster, update_profile, + search as search_uc, get_person, get_person_credits, }, }; use domain::{ errors::DomainError, - models::{DiaryEntry, ExportFormat, Movie, Review}, + models::{DiaryEntry, ExportFormat, Movie, Review, PersonId, collections::PageParams}, services::review_history::Trend, value_objects::{MovieId, UserId}, }; @@ -44,6 +45,10 @@ use api_types::{ ReviewDto, ReviewHistoryResponse, SocialFeedResponse, SocialReviewDto, UserProfileQueryParams, UserProfileResponse, UserStatsDto, UserSummaryDto, UserTrendsDto, UsersResponse, }; +use api_types::search::{ + CastCreditDto, CrewCreditDto, MovieSearchHitDto, PersonCreditsDto, PersonDto, + PersonSearchHitDto, PaginatedMovieHits, PaginatedPersonHits, SearchQueryParams, SearchResponse, +}; use crate::{ errors::ApiError, extractors::AuthenticatedUser, @@ -1088,3 +1093,117 @@ pub async fn export_diary( } } } + +// Search and person endpoints are intentionally public — browsing the catalog +// and people profiles does not require authentication. + +pub async fn get_search( + State(state): State, + Query(params): Query, +) -> impl IntoResponse { + let query = domain::models::SearchQuery { + text: params.q, + filters: domain::models::SearchFilters { + genre: params.genre, + year: params.year, + person_id: params.person_id.map(PersonId::from_uuid), + department: params.department, + language: params.language, + }, + page: PageParams { + limit: params.limit.unwrap_or(5), + offset: params.offset.unwrap_or(0), + }, + }; + + match search_uc::execute(&state.app_ctx, query).await { + Ok(results) => axum::Json(SearchResponse { + movies: PaginatedMovieHits { + items: results.movies.items.iter().map(|h| MovieSearchHitDto { + movie_id: h.movie_id.value(), + title: h.title.clone(), + release_year: h.release_year, + director: h.director.clone(), + poster_path: h.poster_path.clone(), + genres: h.genres.clone(), + }).collect(), + total_count: results.movies.total_count, + limit: results.movies.limit, + offset: results.movies.offset, + }, + people: PaginatedPersonHits { + items: results.people.items.iter().map(|h| PersonSearchHitDto { + person_id: h.person_id.value(), + name: h.name.clone(), + known_for_department: h.known_for_department.clone(), + profile_path: h.profile_path.clone(), + known_for_titles: h.known_for_titles.clone(), + }).collect(), + total_count: results.people.total_count, + limit: results.people.limit, + offset: results.people.offset, + }, + }).into_response(), + Err(e) => { + tracing::error!("search failed: {e}"); + StatusCode::INTERNAL_SERVER_ERROR.into_response() + } + } +} + +pub async fn get_person_handler( + State(state): State, + Path(id): Path, +) -> impl IntoResponse { + match get_person::execute(&state.app_ctx, PersonId::from_uuid(id)).await { + Ok(Some(person)) => axum::Json(PersonDto { + id: person.id().value(), + external_id: person.external_id().value().to_string(), + name: person.name().to_string(), + known_for_department: person.known_for_department().map(str::to_string), + profile_path: person.profile_path().map(str::to_string), + }).into_response(), + Ok(None) => StatusCode::NOT_FOUND.into_response(), + Err(e) => { + tracing::error!("get_person failed: {e}"); + StatusCode::INTERNAL_SERVER_ERROR.into_response() + } + } +} + +pub async fn get_person_credits_handler( + State(state): State, + Path(id): Path, +) -> impl IntoResponse { + match get_person_credits::execute(&state.app_ctx, PersonId::from_uuid(id)).await { + Ok(credits) => axum::Json(PersonCreditsDto { + person: PersonDto { + id: credits.person.id().value(), + external_id: credits.person.external_id().value().to_string(), + name: credits.person.name().to_string(), + known_for_department: credits.person.known_for_department().map(str::to_string), + profile_path: credits.person.profile_path().map(str::to_string), + }, + cast: credits.cast.iter().map(|c| CastCreditDto { + movie_id: c.movie_id.value(), + title: c.title.clone(), + release_year: c.release_year, + character: c.character.clone(), + poster_path: c.poster_path.clone(), + }).collect(), + crew: credits.crew.iter().map(|c| CrewCreditDto { + movie_id: c.movie_id.value(), + title: c.title.clone(), + release_year: c.release_year, + job: c.job.clone(), + department: c.department.clone(), + poster_path: c.poster_path.clone(), + }).collect(), + }).into_response(), + Err(DomainError::NotFound(_)) => StatusCode::NOT_FOUND.into_response(), + Err(e) => { + tracing::error!("get_person_credits failed: {e}"); + StatusCode::INTERNAL_SERVER_ERROR.into_response() + } + } +} diff --git a/crates/presentation/src/lib.rs b/crates/presentation/src/lib.rs index 4928585..9397be6 100644 --- a/crates/presentation/src/lib.rs +++ b/crates/presentation/src/lib.rs @@ -7,3 +7,6 @@ pub mod openapi; pub mod ports; pub mod routes; pub mod state; + +#[cfg(test)] +mod tests; diff --git a/crates/presentation/src/main.rs b/crates/presentation/src/main.rs index eee1429..ed1dd81 100644 --- a/crates/presentation/src/main.rs +++ b/crates/presentation/src/main.rs @@ -15,6 +15,11 @@ use presentation::{openapi, routes, state::AppState}; use domain::ports::{DiaryExporter, DocumentParser, EventPublisher, ImportProfileRepository, ImportSessionRepository}; +#[cfg(feature = "sqlite")] +use sqlite_search; +#[cfg(feature = "postgres")] +use postgres_search; + #[cfg(not(any(feature = "sqlite", feature = "postgres")))] compile_error!("At least one database backend must be enabled. Use --features sqlite or --features postgres"); @@ -49,17 +54,21 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> { let poster_fetcher = poster_fetcher::create()?; let image_storage = image_storage::create()?; - let (movie_repository, review_repository, diary_repository, stats_repository, user_repository, import_session_repository, import_profile_repository, movie_profile_repository, db_pool) = + let (movie_repository, review_repository, diary_repository, stats_repository, user_repository, import_session_repository, import_profile_repository, movie_profile_repository, person_command, person_query, search_command, search_port, db_pool) = match backend.as_str() { #[cfg(feature = "postgres")] "postgres" => { let (pool, m, r, d, s, u, is, ip, mp) = postgres::wire(&database_url).await?; - (m, r, d, s, u, is, ip, mp, DbPool::Postgres(pool)) + let (pc, pq) = postgres::create_person_adapter(pool.clone()); + let (sc, sp) = postgres_search::create_search_adapter(pool.clone()); + (m, r, d, s, u, is, ip, mp, pc, pq, sc, sp, DbPool::Postgres(pool)) } #[cfg(feature = "sqlite")] _ => { let (pool, m, r, d, s, u, is, ip, mp) = sqlite::wire(&database_url).await?; - (m, r, d, s, u, is, ip, mp, DbPool::Sqlite(pool)) + let (pc, pq) = sqlite::create_person_adapter(pool.clone()); + let (sc, sp) = sqlite_search::create_search_adapter(pool.clone()); + (m, r, d, s, u, is, ip, mp, pc, pq, sc, sp, DbPool::Sqlite(pool)) } #[cfg(not(feature = "sqlite"))] _ => anyhow::bail!("DATABASE_BACKEND={backend} is not supported by this build (sqlite feature is not enabled)"), @@ -121,14 +130,14 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> { let event_publisher_arc: Arc = match event_bus { EventBusBackend::Db => { tracing::info!("event bus: DB queue"); - match backend.as_str() { + match &db_pool { #[cfg(feature = "postgres")] - "postgres" => postgres_event_queue::PostgresEventQueue::create_publisher( - pg_pool.as_ref().unwrap().clone() + DbPool::Postgres(pool) => postgres_event_queue::PostgresEventQueue::create_publisher( + pool.clone() ).await?, #[cfg(feature = "sqlite")] - _ => sqlite_event_queue::SqliteEventQueue::create_publisher( - sqlite_pool.as_ref().unwrap().clone() + DbPool::Sqlite(pool) => sqlite_event_queue::SqliteEventQueue::create_publisher( + pool.clone() ).await?, #[cfg(not(feature = "sqlite"))] _ => anyhow::bail!("EVENT_BUS_BACKEND=db has no adapter for DATABASE_BACKEND={backend}; enable the sqlite or postgres feature"), @@ -162,6 +171,10 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> { import_session_repository: import_session_repository as Arc, import_profile_repository: import_profile_repository as Arc, movie_profile_repository, + person_command, + person_query, + search_port, + search_command, config: app_config, }; diff --git a/crates/presentation/src/routes.rs b/crates/presentation/src/routes.rs index 88e7112..2610151 100644 --- a/crates/presentation/src/routes.rs +++ b/crates/presentation/src/routes.rs @@ -210,7 +210,10 @@ fn api_routes(rate_limit: u64) -> Router { .route("/import/sessions/{id}/confirm", routing::post(handlers::import::api_post_confirm)) .route("/import/profiles", routing::get(handlers::import::api_get_profiles).post(handlers::import::api_post_profile)) .route("/import/profiles/{id}", routing::delete(handlers::import::api_delete_profile)) - .route("/profile", routing::get(handlers::api::get_profile).put(handlers::api::update_profile_handler)); + .route("/profile", routing::get(handlers::api::get_profile).put(handlers::api::update_profile_handler)) + .route("/search", routing::get(handlers::api::get_search)) + .route("/people/{id}", routing::get(handlers::api::get_person_handler)) + .route("/people/{id}/credits", routing::get(handlers::api::get_person_credits_handler)); #[cfg(feature = "federation")] let base = base.merge(federation_api_routes()); diff --git a/crates/presentation/src/tests/api_handlers.rs b/crates/presentation/src/tests/api_handlers.rs new file mode 100644 index 0000000..e4aa094 --- /dev/null +++ b/crates/presentation/src/tests/api_handlers.rs @@ -0,0 +1,142 @@ +use super::extractors::{make_test_state, Panic}; +use axum::{ + Router, + body::Body, + http::{Request, StatusCode}, + routing::get, +}; +use domain::errors::DomainError; +use std::sync::Arc; +use tower::ServiceExt; +use uuid::Uuid; + +// Custom stub for SearchPort that returns empty results instead of panicking +struct SearchPortStub; +#[async_trait::async_trait] +impl domain::ports::SearchPort for SearchPortStub { + async fn search(&self, _: &domain::models::SearchQuery) -> Result { + Ok(domain::models::SearchResults { + movies: domain::models::collections::Paginated { + items: vec![], + total_count: 0, + limit: 10, + offset: 0, + }, + people: domain::models::collections::Paginated { + items: vec![], + total_count: 0, + limit: 10, + offset: 0, + }, + }) + } +} + +// Custom stub for PersonQuery that returns 404 instead of panicking +struct PersonQueryStub; +#[async_trait::async_trait] +impl domain::ports::PersonQuery for PersonQueryStub { + async fn get_by_id(&self, _: &domain::models::PersonId) -> Result, DomainError> { + Ok(None) // Return None to trigger 404 + } + async fn get_by_external_id(&self, _: &domain::models::ExternalPersonId) -> Result, DomainError> { + Ok(None) + } + async fn get_credits(&self, _: &domain::models::PersonId) -> Result { + Err(DomainError::NotFound("Person not found".into())) + } +} + +// --- Search endpoint tests --- + +#[tokio::test] +async fn search_endpoint_returns_200_with_empty_results() { + let mut state = make_test_state(Arc::new(Panic)); + // Override the search_port with our stub + state.app_ctx.search_port = Arc::new(SearchPortStub); + let app = Router::new() + .route("/api/v1/search", get(crate::handlers::api::get_search)) + .with_state(state); + + let resp = app + .oneshot( + Request::builder() + .uri("/api/v1/search?q=test&limit=10&offset=0") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(resp.status(), StatusCode::OK); +} + +#[tokio::test] +async fn search_endpoint_with_no_query_returns_200() { + let mut state = make_test_state(Arc::new(Panic)); + // Override the search_port with our stub + state.app_ctx.search_port = Arc::new(SearchPortStub); + let app = Router::new() + .route("/api/v1/search", get(crate::handlers::api::get_search)) + .with_state(state); + + let resp = app + .oneshot( + Request::builder() + .uri("/api/v1/search?q=&limit=5&offset=0") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(resp.status(), StatusCode::OK); +} + +// --- Person endpoint tests --- + +#[tokio::test] +async fn person_endpoint_returns_404_for_unknown_id() { + let mut state = make_test_state(Arc::new(Panic)); + // Override the person_query with our stub + state.app_ctx.person_query = Arc::new(PersonQueryStub); + let app = Router::new() + .route("/api/v1/people/{id}", get(crate::handlers::api::get_person_handler)) + .with_state(state); + + let unknown_id = Uuid::new_v4(); + let resp = app + .oneshot( + Request::builder() + .uri(&format!("/api/v1/people/{}", unknown_id)) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(resp.status(), StatusCode::NOT_FOUND); +} + +#[tokio::test] +async fn person_credits_endpoint_returns_404_for_unknown_id() { + let mut state = make_test_state(Arc::new(Panic)); + // Override the person_query with our stub + state.app_ctx.person_query = Arc::new(PersonQueryStub); + let app = Router::new() + .route("/api/v1/people/{id}/credits", get(crate::handlers::api::get_person_credits_handler)) + .with_state(state); + + let unknown_id = Uuid::new_v4(); + let resp = app + .oneshot( + Request::builder() + .uri(&format!("/api/v1/people/{}/credits", unknown_id)) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(resp.status(), StatusCode::NOT_FOUND); +} diff --git a/crates/presentation/src/tests/extractors.rs b/crates/presentation/src/tests/extractors.rs index 8c793eb..b2c81fe 100644 --- a/crates/presentation/src/tests/extractors.rs +++ b/crates/presentation/src/tests/extractors.rs @@ -13,11 +13,14 @@ use domain::{ DiaryEntry, DiaryFilter, FeedEntry, Movie, Review, ReviewHistory, UserStats, UserTrends, collections::{PageParams, Paginated}, + PersonId, EntityType, IndexableDocument, Person, PersonCredits, + SearchQuery, SearchResults, }, ports::{ AuthService, DiaryRepository, EventPublisher, GeneratedToken, ImageStorage, MetadataClient, MovieRepository, PasswordHasher, PosterFetcherClient, ReviewRepository, StatsRepository, UserRepository, + PersonCommand, PersonQuery, SearchPort, SearchCommand, }, value_objects::{ Email, ExternalMetadataId, MovieId, MovieTitle, PasswordHash, PosterUrl, @@ -29,7 +32,7 @@ use tower::ServiceExt; // --- Panic stubs (defined once) --- -struct Panic; +pub struct Panic; #[async_trait::async_trait] impl MovieRepository for Panic { @@ -350,9 +353,29 @@ impl AuthService for RejectingAuth { } } +#[async_trait::async_trait] +impl PersonCommand for Panic { + async fn upsert_batch(&self, _: &[Person]) -> Result<(), DomainError> { panic!() } +} +#[async_trait::async_trait] +impl PersonQuery for Panic { + async fn get_by_id(&self, _: &PersonId) -> Result, DomainError> { panic!() } + async fn get_by_external_id(&self, _: &domain::models::ExternalPersonId) -> Result, DomainError> { panic!() } + async fn get_credits(&self, _: &PersonId) -> Result { panic!() } +} +#[async_trait::async_trait] +impl SearchPort for Panic { + async fn search(&self, _: &SearchQuery) -> Result { panic!() } +} +#[async_trait::async_trait] +impl SearchCommand for Panic { + async fn index(&self, _: IndexableDocument) -> Result<(), DomainError> { panic!() } + async fn remove(&self, _: EntityType, _: &str) -> Result<(), DomainError> { panic!() } +} + // --- Single state factory — only auth_service varies --- -fn make_test_state(auth_service: Arc) -> crate::state::AppState { +pub fn make_test_state(auth_service: Arc) -> crate::state::AppState { let repo = Arc::new(Panic); crate::state::AppState { app_ctx: AppContext { @@ -371,6 +394,10 @@ fn make_test_state(auth_service: Arc) -> crate::state::AppState import_session_repository: Arc::clone(&repo) as _, import_profile_repository: Arc::clone(&repo) as _, movie_profile_repository: Arc::clone(&repo) as _, + person_command: Arc::clone(&repo) as _, + person_query: Arc::clone(&repo) as _, + search_port: Arc::clone(&repo) as _, + search_command: Arc::clone(&repo) as _, auth_service, config: AppConfig { allow_registration: false, diff --git a/crates/presentation/src/tests/mod.rs b/crates/presentation/src/tests/mod.rs new file mode 100644 index 0000000..3b0a423 --- /dev/null +++ b/crates/presentation/src/tests/mod.rs @@ -0,0 +1,45 @@ +// Re-export imports needed by subtest modules +pub use application::{config::AppConfig, context::AppContext}; +pub use axum::{ + Router, + body::Body, + http::{Request, StatusCode}, + routing::get, +}; +pub use domain::{ + errors::DomainError, + events::DomainEvent, + models::{ + DiaryEntry, DiaryFilter, FeedEntry, Movie, Review, ReviewHistory, UserStats, + UserTrends, + collections::{PageParams, Paginated}, + PersonId, EntityType, IndexableDocument, Person, PersonCredits, + SearchQuery, SearchResults, + }, + ports::{ + AuthService, DiaryRepository, EventPublisher, GeneratedToken, ImageStorage, + MetadataClient, MovieRepository, PasswordHasher, PosterFetcherClient, ReviewRepository, + StatsRepository, UserRepository, + PersonCommand, PersonQuery, SearchPort, SearchCommand, + }, + value_objects::{ + Email, ExternalMetadataId, MovieId, MovieTitle, PasswordHash, PosterUrl, + ReleaseYear, ReviewId, UserId, + }, +}; +pub use std::sync::Arc; +pub use tower::ServiceExt; + +// API types for tests +pub use api_types::{ + LoginRequest, LogReviewRequest, DiaryQueryParams, +}; +pub use crate::{ + extractors::{AuthenticatedUser, OptionalCookieUser, RequiredCookieUser}, + forms::{LogReviewData, LogReviewForm, to_diary_query}, + state::AppState, +}; + +mod extractors; +mod forms; +mod api_handlers; diff --git a/crates/presentation/tests/api_test.rs b/crates/presentation/tests/api_test.rs index 3e56f04..812274a 100644 --- a/crates/presentation/tests/api_test.rs +++ b/crates/presentation/tests/api_test.rs @@ -10,10 +10,11 @@ use axum::{ use domain::{ errors::DomainError, events::DomainEvent, - models::{Movie, User}, + models::{Movie, User, PersonId, Person, PersonCredits, EntityType, IndexableDocument, SearchQuery, SearchResults, ExternalPersonId}, ports::{ AuthService, EventPublisher, GeneratedToken, ImageStorage, MetadataClient, MetadataSearchCriteria, PasswordHasher, PosterFetcherClient, UserRepository, + PersonCommand, PersonQuery, SearchPort, SearchCommand, }, value_objects::{ Email, ExternalMetadataId, PasswordHash, PosterUrl, UserId, @@ -163,6 +164,33 @@ impl domain::ports::ImportProfileRepository for PanicImportProfile { async fn delete(&self, _: &domain::value_objects::ImportProfileId) -> Result<(), DomainError> { panic!() } } +struct PanicPersonCommand; +#[async_trait] +impl PersonCommand for PanicPersonCommand { + async fn upsert_batch(&self, _: &[Person]) -> Result<(), DomainError> { panic!() } +} + +struct PanicPersonQuery; +#[async_trait] +impl PersonQuery for PanicPersonQuery { + async fn get_by_id(&self, _: &PersonId) -> Result, DomainError> { panic!() } + async fn get_by_external_id(&self, _: &ExternalPersonId) -> Result, DomainError> { panic!() } + async fn get_credits(&self, _: &PersonId) -> Result { panic!() } +} + +struct PanicSearchPort; +#[async_trait] +impl SearchPort for PanicSearchPort { + async fn search(&self, _: &SearchQuery) -> Result { panic!() } +} + +struct PanicSearchCommand; +#[async_trait] +impl SearchCommand for PanicSearchCommand { + async fn index(&self, _: IndexableDocument) -> Result<(), DomainError> { panic!() } + async fn remove(&self, _: EntityType, _: &str) -> Result<(), DomainError> { panic!() } +} + #[cfg(feature = "federation")] struct PanicSocialQuery; #[cfg(feature = "federation")] @@ -207,6 +235,10 @@ async fn test_app() -> Router { import_session_repository: Arc::new(PanicImportSession), import_profile_repository: Arc::new(PanicImportProfile), movie_profile_repository: Arc::new(PanicMovieProfile), + person_command: Arc::new(PanicPersonCommand), + person_query: Arc::new(PanicPersonQuery), + search_port: Arc::new(PanicSearchPort), + search_command: Arc::new(PanicSearchCommand), config: AppConfig { allow_registration: false, base_url: "http://localhost:3000".to_string(), diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index b5f1bc2..63134a1 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -5,8 +5,8 @@ edition = "2024" [features] default = ["sqlite"] -sqlite = ["dep:sqlite", "dep:sqlite-event-queue"] -postgres = ["dep:postgres", "dep:postgres-event-queue"] +sqlite = ["dep:sqlite", "dep:sqlite-event-queue", "dep:sqlite-search"] +postgres = ["dep:postgres", "dep:postgres-event-queue", "dep:postgres-search"] nats = ["dep:nats"] federation = [] sqlite-federation = ["sqlite", "dep:sqlite-federation", "dep:activitypub", "federation"] @@ -37,6 +37,8 @@ sqlite = { workspace = true, optional = true } postgres = { workspace = true, optional = true } sqlite-event-queue = { workspace = true, optional = true } postgres-event-queue = { workspace = true, optional = true } +sqlite-search = { workspace = true, optional = true } +postgres-search = { workspace = true, optional = true } # Optional — federation activitypub = { workspace = true, optional = true } diff --git a/crates/worker/src/db.rs b/crates/worker/src/db.rs index 5639a7b..f6fefc9 100644 --- a/crates/worker/src/db.rs +++ b/crates/worker/src/db.rs @@ -3,8 +3,8 @@ use std::sync::Arc; use anyhow::Context; use domain::ports::{ DiaryRepository, ImageRefCommand, ImageRefQuery, ImportProfileRepository, - ImportSessionRepository, MovieProfileRepository, MovieRepository, ReviewRepository, - StatsRepository, UserRepository, + ImportSessionRepository, MovieProfileRepository, MovieRepository, PersonCommand, PersonQuery, + ReviewRepository, SearchCommand, SearchPort, StatsRepository, UserRepository, }; pub enum DbPool { @@ -24,7 +24,11 @@ pub struct Repos { pub import_profile: Arc, pub movie_profile: Arc, pub image_ref_command: Arc, - pub image_ref_query: Arc, + pub image_ref_query: Arc, + pub person_command: Arc, + pub person_query: Arc, + pub search_command: Arc, + pub search_port: Arc, } pub async fn connect(database_url: &str, backend: &str) -> anyhow::Result<(Repos, DbPool)> { @@ -34,18 +38,26 @@ pub async fn connect(database_url: &str, backend: &str) -> anyhow::Result<(Repos let (pool, m, r, d, s, u, is, ip, mp) = postgres::wire(database_url).await.context("PostgreSQL connection failed")?; let (image_ref_command, image_ref_query) = postgres::create_image_ref(pool.clone()); + let (person_command, person_query) = postgres::create_person_adapter(pool.clone()); + let (search_command, search_port) = postgres_search::create_search_adapter(pool.clone()); Ok((Repos { movie: m, review: r, diary: d, stats: s, user: u, import_session: is, import_profile: ip, movie_profile: mp, - image_ref_command, image_ref_query }, DbPool::Postgres(pool))) + image_ref_command, image_ref_query, + person_command, person_query, search_command, search_port }, + DbPool::Postgres(pool))) } #[cfg(feature = "sqlite")] _ => { let (pool, m, r, d, s, u, is, ip, mp) = sqlite::wire(database_url).await.context("SQLite connection failed")?; let (image_ref_command, image_ref_query) = sqlite::create_image_ref(pool.clone()); + let (person_command, person_query) = sqlite::create_person_adapter(pool.clone()); + let (search_command, search_port) = sqlite_search::create_search_adapter(pool.clone()); Ok((Repos { movie: m, review: r, diary: d, stats: s, user: u, import_session: is, import_profile: ip, movie_profile: mp, - image_ref_command, image_ref_query }, DbPool::Sqlite(pool))) + image_ref_command, image_ref_query, + person_command, person_query, search_command, search_port }, + DbPool::Sqlite(pool))) } #[cfg(not(feature = "sqlite"))] _ => anyhow::bail!("DATABASE_BACKEND={backend} is not supported by this build"), diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index f1de06f..c6737c0 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -4,7 +4,7 @@ mod event_bus; use std::sync::Arc; use anyhow::Context; -use application::{config::AppConfig, context::AppContext, worker::WorkerService}; +use application::{config::AppConfig, context::AppContext, worker::WorkerService, SearchCleanupHandler}; use export::ExportAdapter; use importer::ImporterDocumentParser; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -32,7 +32,11 @@ async fn main() -> anyhow::Result<()> { let (event_publisher_arc, consumer_arc) = event_bus::create(&db_pool).await?; let image_ref_command = Arc::clone(&repos.image_ref_command); - let image_ref_query = Arc::clone(&repos.image_ref_query); + let image_ref_query = Arc::clone(&repos.image_ref_query); + let person_command = Arc::clone(&repos.person_command); + let person_query = Arc::clone(&repos.person_query); + let search_command = Arc::clone(&repos.search_command); + let search_port = Arc::clone(&repos.search_port); // Clone refs federation handler needs before ctx consumes them. #[cfg(feature = "federation")] @@ -62,6 +66,10 @@ async fn main() -> anyhow::Result<()> { import_session_repository: repos.import_session, import_profile_repository: repos.import_profile, movie_profile_repository: repos.movie_profile, + person_command: Arc::clone(&person_command), + person_query: Arc::clone(&person_query), + search_port: Arc::clone(&search_port), + search_command: Arc::clone(&search_command), config: app_config, }; @@ -75,7 +83,10 @@ async fn main() -> anyhow::Result<()> { tracing::info!("TMDb enrichment enabled"); let handler = Arc::new(tmdb_enrichment::EnrichmentHandler { enrichment_client: Arc::new(client), - profile_repo: Arc::clone(&ctx.movie_profile_repository), + movie_repository: Arc::clone(&ctx.movie_repository), + profile_repo: Arc::clone(&ctx.movie_profile_repository), + person_command: Arc::clone(&ctx.person_command), + search_command: Arc::clone(&ctx.search_command), }) as Arc; let job = Arc::new(application::jobs::EnrichmentStalenessJob::new(ctx.clone())) as Arc; @@ -134,7 +145,10 @@ async fn main() -> anyhow::Result<()> { #[cfg(not(feature = "federation"))] { - let mut h = vec![poster, cleanup]; + let search_cleanup = Arc::new(SearchCleanupHandler::new( + Arc::clone(&ctx.search_command), + )) as Arc; + let mut h = vec![poster, cleanup, search_cleanup]; if let Some(e) = enrichment_handler { h.push(e); } if let Some((ref conv_handler, _)) = conversion { h.push(Arc::clone(conv_handler)); } h @@ -160,8 +174,11 @@ async fn main() -> anyhow::Result<()> { allow_registration, ).await?.event_handler; + let search_cleanup = Arc::new(SearchCleanupHandler::new( + Arc::clone(&ctx.search_command), + )) as Arc; tracing::info!("federation event handler registered"); - let mut h = vec![poster, cleanup, ap]; + let mut h = vec![poster, cleanup, ap, search_cleanup]; if let Some(e) = enrichment_handler { h.push(e); } if let Some((ref conv_handler, _)) = conversion { h.push(Arc::clone(conv_handler)); } h