feat: extensible search engine with person entities (FTS5/tsvector)

This commit is contained in:
2026-05-12 18:45:24 +02:00
parent 763d622601
commit c6770659c5
45 changed files with 2421 additions and 86 deletions

View File

@@ -15,26 +15,35 @@ pub struct NatsConfig {
impl NatsConfig {
pub fn from_env() -> anyhow::Result<Self> {
let url = std::env::var("NATS_URL")
.map_err(|_| anyhow::anyhow!("NATS_URL is not set"))?;
Self::from_vars(
std::env::var("NATS_URL").ok().as_deref(),
std::env::var("NATS_MODE").ok().as_deref(),
std::env::var("NATS_SUBJECT_PREFIX").ok().as_deref(),
std::env::var("NATS_STREAM_NAME").ok().as_deref(),
std::env::var("NATS_CONSUMER_NAME").ok().as_deref(),
)
}
let mode = match std::env::var("NATS_MODE")
.unwrap_or_else(|_| "jetstream".to_string())
.as_str()
{
pub(crate) fn from_vars(
url: Option<&str>,
mode: Option<&str>,
subject_prefix: Option<&str>,
stream_name: Option<&str>,
consumer_name: Option<&str>,
) -> anyhow::Result<Self> {
let url = url.ok_or_else(|| anyhow::anyhow!("NATS_URL is not set"))?;
let mode = match mode.unwrap_or("jetstream") {
"core" => NatsMode::Core,
"jetstream" => NatsMode::JetStream,
other => anyhow::bail!("unknown NATS_MODE: {other}"),
};
let subject_prefix = std::env::var("NATS_SUBJECT_PREFIX")
.unwrap_or_else(|_| "movies-diary.events".to_string());
let stream_name = std::env::var("NATS_STREAM_NAME")
.unwrap_or_else(|_| "MOVIES_DIARY_EVENTS".to_string());
let consumer_name = std::env::var("NATS_CONSUMER_NAME")
.unwrap_or_else(|_| "worker".to_string());
let subject_prefix = subject_prefix.unwrap_or("movies-diary.events").to_string();
let stream_name = stream_name.unwrap_or("MOVIES_DIARY_EVENTS").to_string();
let consumer_name = consumer_name.unwrap_or("worker").to_string();
Ok(Self { url, mode, subject_prefix, stream_name, consumer_name })
Ok(Self { url: url.to_string(), mode, subject_prefix, stream_name, consumer_name })
}
}

View File

@@ -2,57 +2,26 @@ use super::*;
#[test]
fn errors_without_nats_url() {
unsafe { std::env::remove_var("NATS_URL"); }
assert!(NatsConfig::from_env().is_err());
assert!(NatsConfig::from_vars(None, None, None, None, None).is_err());
}
#[test]
fn defaults_with_only_url() {
unsafe {
std::env::set_var("NATS_URL", "nats://localhost:4222");
std::env::remove_var("NATS_MODE");
std::env::remove_var("NATS_SUBJECT_PREFIX");
std::env::remove_var("NATS_STREAM_NAME");
std::env::remove_var("NATS_CONSUMER_NAME");
}
let cfg = NatsConfig::from_env().unwrap();
let cfg = NatsConfig::from_vars(Some("nats://localhost:4222"), None, None, None, None).unwrap();
assert_eq!(cfg.url, "nats://localhost:4222");
assert_eq!(cfg.mode, NatsMode::JetStream);
assert_eq!(cfg.subject_prefix, "movies-diary.events");
assert_eq!(cfg.stream_name, "MOVIES_DIARY_EVENTS");
assert_eq!(cfg.consumer_name, "worker");
unsafe { std::env::remove_var("NATS_URL"); }
}
#[test]
fn core_mode_parsed() {
unsafe {
std::env::set_var("NATS_URL", "nats://test:4222");
std::env::set_var("NATS_MODE", "core");
}
let cfg = NatsConfig::from_env().unwrap();
let cfg = NatsConfig::from_vars(Some("nats://test:4222"), Some("core"), None, None, None).unwrap();
assert_eq!(cfg.mode, NatsMode::Core);
unsafe {
std::env::remove_var("NATS_URL");
std::env::remove_var("NATS_MODE");
}
}
#[test]
fn invalid_mode_errors() {
unsafe {
std::env::set_var("NATS_URL", "nats://test:4222");
std::env::set_var("NATS_MODE", "kafka");
}
assert!(NatsConfig::from_env().is_err());
unsafe {
std::env::remove_var("NATS_URL");
std::env::remove_var("NATS_MODE");
}
assert!(NatsConfig::from_vars(Some("nats://test:4222"), Some("kafka"), None, None, None).is_err());
}

View File

@@ -0,0 +1,10 @@
[package]
name = "postgres-search"
version = "0.1.0"
edition = "2021"
[dependencies]
domain = { workspace = true }
async-trait = { workspace = true }
sqlx = { version = "0.8.6", features = ["runtime-tokio-rustls", "postgres", "uuid", "macros"] }
uuid = { workspace = true }

View File

@@ -0,0 +1,313 @@
use std::sync::Arc;
use async_trait::async_trait;
use domain::{
errors::DomainError,
models::{
EntityType, IndexableDocument, MovieSearchHit, PersonSearchHit,
SearchQuery, SearchResults,
collections::Paginated,
},
models::PersonId,
value_objects::MovieId,
ports::{SearchCommand, SearchPort},
};
use sqlx::PgPool;
pub struct PostgresSearchAdapter {
pool: PgPool,
}
impl PostgresSearchAdapter {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
}
pub fn create_search_adapter(pool: PgPool) -> (Arc<dyn SearchCommand>, Arc<dyn SearchPort>) {
let adapter = Arc::new(PostgresSearchAdapter::new(pool));
(Arc::clone(&adapter) as Arc<dyn SearchCommand>, adapter as Arc<dyn SearchPort>)
}
fn map_err(e: sqlx::Error) -> DomainError {
DomainError::InfrastructureError(e.to_string())
}
#[async_trait]
impl SearchCommand for PostgresSearchAdapter {
async fn index(&self, doc: IndexableDocument) -> Result<(), DomainError> {
match doc {
IndexableDocument::Movie { id, movie, profile } => {
let movie_id = id.value().to_string();
let title = movie.title().value().to_string();
let director = movie.director().unwrap_or("").to_string();
let (overview, genres, keywords, cast_names, crew_names) =
match profile.as_deref() {
Some(p) => (
p.overview.clone().unwrap_or_default(),
p.genres.iter().map(|g| g.name.as_str()).collect::<Vec<_>>().join(" "),
p.keywords.iter().map(|k| k.name.as_str()).collect::<Vec<_>>().join(" "),
p.cast.iter().map(|c| c.name.as_str()).collect::<Vec<_>>().join(" "),
p.crew.iter().map(|c| c.name.as_str()).collect::<Vec<_>>().join(" "),
),
None => (String::new(), String::new(), String::new(), String::new(), String::new()),
};
let fts_input = format!(
"{} {} {} {} {} {} {}",
title, director, overview, genres, keywords, cast_names, crew_names
);
sqlx::query(
"INSERT INTO movies_search (movie_id, fts)
VALUES ($1, to_tsvector('english', $2))
ON CONFLICT (movie_id) DO UPDATE SET fts = EXCLUDED.fts",
)
.bind(&movie_id)
.bind(&fts_input)
.execute(&self.pool)
.await
.map_err(map_err)?;
Ok(())
}
IndexableDocument::Person { id, person } => {
let person_id = id.value().to_string();
let fts_input = format!(
"{} {}",
person.name(),
person.known_for_department().unwrap_or("")
);
sqlx::query(
"INSERT INTO people_search (person_id, fts)
VALUES ($1, to_tsvector('english', $2))
ON CONFLICT (person_id) DO UPDATE SET fts = EXCLUDED.fts",
)
.bind(&person_id)
.bind(&fts_input)
.execute(&self.pool)
.await
.map_err(map_err)?;
Ok(())
}
}
}
async fn remove(&self, entity_type: EntityType, id: &str) -> Result<(), DomainError> {
match entity_type {
EntityType::Movie => {
sqlx::query("DELETE FROM movies_search WHERE movie_id = $1")
.bind(id)
.execute(&self.pool)
.await
.map_err(map_err)?;
}
EntityType::Person => {
sqlx::query("DELETE FROM people_search WHERE person_id = $1")
.bind(id)
.execute(&self.pool)
.await
.map_err(map_err)?;
}
}
Ok(())
}
}
#[async_trait]
impl SearchPort for PostgresSearchAdapter {
async fn search(&self, query: &SearchQuery) -> Result<SearchResults, DomainError> {
let movies = self.search_movies(query).await?;
let people = self.search_people(query).await?;
Ok(SearchResults { movies, people })
}
}
impl PostgresSearchAdapter {
async fn search_movies(&self, query: &SearchQuery) -> Result<Paginated<MovieSearchHit>, DomainError> {
let limit = query.page.limit as i64;
let offset = query.page.offset as i64;
#[derive(sqlx::FromRow)]
struct Row {
id: String,
title: String,
release_year: Option<i32>,
director: Option<String>,
poster_path: Option<String>,
genres: Option<String>,
}
let total: u64 = if let Some(text) = &query.text {
let count: i64 = sqlx::query_scalar(
"SELECT COUNT(DISTINCT m.id)
FROM movies_search ms
JOIN movies m ON m.id = ms.movie_id
LEFT JOIN movie_genres mg ON mg.movie_id = m.id
WHERE ms.fts @@ plainto_tsquery('english', $1)
AND ($2::TEXT IS NULL OR EXISTS (SELECT 1 FROM movie_genres WHERE movie_id = m.id AND name = $2))
AND ($3::INT IS NULL OR m.release_year = $3)",
)
.bind(text)
.bind(&query.filters.genre)
.bind(query.filters.year.map(|y| y as i32))
.fetch_one(&self.pool)
.await
.map_err(map_err)?;
count as u64
} else {
let count: i64 = sqlx::query_scalar(
"SELECT COUNT(DISTINCT m.id) FROM movies m
LEFT JOIN movie_genres mg ON mg.movie_id = m.id
WHERE ($1::TEXT IS NULL OR EXISTS (SELECT 1 FROM movie_genres WHERE movie_id = m.id AND name = $1))
AND ($2::INT IS NULL OR m.release_year = $2)",
)
.bind(&query.filters.genre)
.bind(query.filters.year.map(|y| y as i32))
.fetch_one(&self.pool)
.await
.map_err(map_err)?;
count as u64
};
let rows: Vec<Row> = if let Some(text) = &query.text {
sqlx::query_as::<_, Row>(
"SELECT m.id, m.title, m.release_year, m.director, m.poster_path,
STRING_AGG(DISTINCT mg.name, ',' ORDER BY mg.name) AS genres
FROM movies_search ms
JOIN movies m ON m.id = ms.movie_id
LEFT JOIN movie_genres mg ON mg.movie_id = m.id
WHERE ms.fts @@ plainto_tsquery('english', $1)
AND ($2::TEXT IS NULL OR EXISTS (SELECT 1 FROM movie_genres WHERE movie_id = m.id AND name = $2))
AND ($3::INT IS NULL OR m.release_year = $3)
GROUP BY m.id, m.title, m.release_year, m.director, m.poster_path, ms.fts
ORDER BY ts_rank(ms.fts, plainto_tsquery('english', $1)) DESC
LIMIT $4 OFFSET $5",
)
.bind(text)
.bind(&query.filters.genre)
.bind(query.filters.year.map(|y| y as i32))
.bind(limit)
.bind(offset)
.fetch_all(&self.pool)
.await
.map_err(map_err)?
} else {
sqlx::query_as::<_, Row>(
"SELECT m.id, m.title, m.release_year, m.director, m.poster_path,
STRING_AGG(DISTINCT mg.name, ',' ORDER BY mg.name) AS genres
FROM movies m
LEFT JOIN movie_genres mg ON mg.movie_id = m.id
WHERE ($1::TEXT IS NULL OR EXISTS (SELECT 1 FROM movie_genres WHERE movie_id = m.id AND name = $1))
AND ($2::INT IS NULL OR m.release_year = $2)
GROUP BY m.id ORDER BY m.title LIMIT $3 OFFSET $4",
)
.bind(&query.filters.genre)
.bind(query.filters.year.map(|y| y as i32))
.bind(limit)
.bind(offset)
.fetch_all(&self.pool)
.await
.map_err(map_err)?
};
let items = rows.into_iter().map(|r| MovieSearchHit {
movie_id: MovieId::from_uuid(uuid::Uuid::parse_str(&r.id).unwrap_or_default()),
title: r.title,
release_year: r.release_year.map(|y| y as u16),
director: r.director,
poster_path: r.poster_path,
genres: r.genres
.unwrap_or_default()
.split(',')
.filter(|s| !s.is_empty())
.map(str::to_string)
.collect(),
}).collect::<Vec<_>>();
Ok(Paginated { items, total_count: total, limit: query.page.limit, offset: query.page.offset })
}
async fn search_people(&self, query: &SearchQuery) -> Result<Paginated<PersonSearchHit>, DomainError> {
let Some(text) = &query.text else {
return Ok(Paginated {
items: vec![],
total_count: 0,
limit: query.page.limit,
offset: query.page.offset,
});
};
let limit = query.page.limit as i64;
let offset = query.page.offset as i64;
let total: u64 = {
let count: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM people_search WHERE fts @@ plainto_tsquery('english', $1)",
)
.bind(text)
.fetch_one(&self.pool)
.await
.map_err(map_err)?;
count as u64
};
#[derive(sqlx::FromRow)]
struct Row {
person_id: String,
name: String,
known_for_department: Option<String>,
profile_path: Option<String>,
tmdb_person_id: Option<i64>,
}
let rows = sqlx::query_as::<_, Row>(
"SELECT ps.person_id, p.name, p.known_for_department, p.profile_path, p.tmdb_person_id
FROM people_search ps
JOIN persons p ON p.id = ps.person_id
WHERE ps.fts @@ plainto_tsquery('english', $1)
ORDER BY ts_rank(ps.fts, plainto_tsquery('english', $1)) DESC
LIMIT $2 OFFSET $3",
)
.bind(text)
.bind(limit)
.bind(offset)
.fetch_all(&self.pool)
.await
.map_err(map_err)?;
let mut items = Vec::with_capacity(rows.len());
for row in rows {
let known_for_titles = if let Some(tid) = row.tmdb_person_id {
sqlx::query_scalar::<_, String>(
"SELECT m.title FROM movie_cast mc
JOIN movies m ON m.id = mc.movie_id
WHERE mc.tmdb_person_id = $1
ORDER BY mc.billing_order
LIMIT 3",
)
.bind(tid)
.fetch_all(&self.pool)
.await
.unwrap_or_default()
} else {
vec![]
};
items.push(PersonSearchHit {
person_id: PersonId::from_uuid(
uuid::Uuid::parse_str(&row.person_id).unwrap_or_default()
),
name: row.name,
known_for_department: row.known_for_department,
profile_path: row.profile_path,
known_for_titles,
});
}
Ok(Paginated { items, total_count: total, limit: query.page.limit, offset: query.page.offset })
}
}

View File

@@ -0,0 +1,25 @@
CREATE TABLE IF NOT EXISTS persons (
id TEXT PRIMARY KEY,
external_id TEXT NOT NULL UNIQUE,
tmdb_person_id BIGINT UNIQUE,
name TEXT NOT NULL,
known_for_department TEXT,
profile_path TEXT
);
CREATE INDEX IF NOT EXISTS idx_persons_external ON persons (external_id);
CREATE INDEX IF NOT EXISTS idx_persons_tmdb_id ON persons (tmdb_person_id);
-- tsvector-based search for movies (equivalent of SQLite FTS5)
CREATE TABLE IF NOT EXISTS movies_search (
movie_id TEXT PRIMARY KEY REFERENCES movies(id) ON DELETE CASCADE,
fts TSVECTOR NOT NULL DEFAULT ''
);
CREATE INDEX IF NOT EXISTS idx_movies_search_fts ON movies_search USING GIN(fts);
-- tsvector-based search for people
CREATE TABLE IF NOT EXISTS people_search (
person_id TEXT PRIMARY KEY REFERENCES persons(id) ON DELETE CASCADE,
fts TSVECTOR NOT NULL DEFAULT ''
);
CREATE INDEX IF NOT EXISTS idx_people_search_fts ON people_search USING GIN(fts);

View File

@@ -16,6 +16,7 @@ mod image_ref;
mod import_profile;
mod import_session;
mod models;
mod persons;
mod profile;
mod users;
@@ -27,6 +28,7 @@ use models::{
pub use image_ref::{PostgresImageRefAdapter, create_image_ref};
pub use import_profile::PostgresImportProfileRepository;
pub use import_session::PostgresImportSessionRepository;
pub use persons::{PostgresPersonAdapter, create_person_adapter};
pub use profile::PostgresMovieProfileRepository;
pub use users::PostgresUserRepository;

View File

@@ -0,0 +1,198 @@
use async_trait::async_trait;
use domain::{
errors::DomainError,
models::{CastCredit, CrewCredit, ExternalPersonId, Person, PersonCredits, PersonId},
ports::{PersonCommand, PersonQuery},
value_objects::MovieId,
};
use sqlx::PgPool;
use std::sync::Arc;
pub struct PostgresPersonAdapter {
pool: PgPool,
}
impl PostgresPersonAdapter {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
}
pub fn create_person_adapter(pool: PgPool) -> (Arc<dyn PersonCommand>, Arc<dyn PersonQuery>) {
let adapter = Arc::new(PostgresPersonAdapter::new(pool));
(Arc::clone(&adapter) as Arc<dyn PersonCommand>, adapter as Arc<dyn PersonQuery>)
}
fn map_err(e: sqlx::Error) -> DomainError {
DomainError::InfrastructureError(e.to_string())
}
#[async_trait]
impl PersonCommand for PostgresPersonAdapter {
async fn upsert_batch(&self, persons: &[Person]) -> Result<(), DomainError> {
for person in persons {
let tmdb_id = person.external_id().tmdb_id();
sqlx::query(
"INSERT INTO persons (id, external_id, tmdb_person_id, name, known_for_department, profile_path)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT(id) DO UPDATE SET
external_id = EXCLUDED.external_id,
tmdb_person_id = EXCLUDED.tmdb_person_id,
name = EXCLUDED.name,
known_for_department = EXCLUDED.known_for_department,
profile_path = EXCLUDED.profile_path",
)
.bind(person.id().value().to_string())
.bind(person.external_id().value())
.bind(tmdb_id)
.bind(person.name())
.bind(person.known_for_department())
.bind(person.profile_path())
.execute(&self.pool)
.await
.map_err(map_err)?;
}
Ok(())
}
}
#[async_trait]
impl PersonQuery for PostgresPersonAdapter {
async fn get_by_id(&self, id: &PersonId) -> Result<Option<Person>, DomainError> {
#[derive(sqlx::FromRow)]
struct Row {
id: String,
external_id: String,
name: String,
known_for_department: Option<String>,
profile_path: Option<String>,
}
let row = sqlx::query_as::<_, Row>(
"SELECT id, external_id, name, known_for_department, profile_path FROM persons WHERE id = $1",
)
.bind(id.value().to_string())
.fetch_optional(&self.pool)
.await
.map_err(map_err)?;
Ok(row.map(|r| {
let ext = ExternalPersonId::new(r.external_id);
Person::new(
PersonId::from_uuid(uuid::Uuid::parse_str(&r.id).unwrap_or_default()),
ext,
r.name,
r.known_for_department,
r.profile_path,
)
}))
}
async fn get_by_external_id(&self, id: &ExternalPersonId) -> Result<Option<Person>, DomainError> {
#[derive(sqlx::FromRow)]
struct Row {
id: String,
external_id: String,
name: String,
known_for_department: Option<String>,
profile_path: Option<String>,
}
let row = sqlx::query_as::<_, Row>(
"SELECT id, external_id, name, known_for_department, profile_path FROM persons WHERE external_id = $1",
)
.bind(id.value())
.fetch_optional(&self.pool)
.await
.map_err(map_err)?;
Ok(row.map(|r| {
let ext = ExternalPersonId::new(r.external_id);
Person::new(
PersonId::from_uuid(uuid::Uuid::parse_str(&r.id).unwrap_or_default()),
ext,
r.name,
r.known_for_department,
r.profile_path,
)
}))
}
async fn get_credits(&self, id: &PersonId) -> Result<PersonCredits, DomainError> {
let person = self.get_by_id(id).await?.ok_or_else(|| {
DomainError::NotFound(format!("Person {} not found", id.value()))
})?;
let tmdb_id: Option<i64> = sqlx::query_scalar(
"SELECT tmdb_person_id FROM persons WHERE id = $1",
)
.bind(id.value().to_string())
.fetch_optional(&self.pool)
.await
.map_err(map_err)?
.flatten();
let Some(tmdb_id) = tmdb_id else {
return Ok(PersonCredits { person, cast: vec![], crew: vec![] });
};
#[derive(sqlx::FromRow)]
struct CastRow {
id: String,
title: String,
release_year: Option<i32>,
character: String,
poster_path: Option<String>,
}
#[derive(sqlx::FromRow)]
struct CrewRow {
id: String,
title: String,
release_year: Option<i32>,
job: String,
department: String,
poster_path: Option<String>,
}
let cast = sqlx::query_as::<_, CastRow>(
"SELECT m.id, m.title, m.release_year, mc.character, m.poster_path
FROM movie_cast mc JOIN movies m ON m.id = mc.movie_id
WHERE mc.tmdb_person_id = $1 ORDER BY mc.billing_order",
)
.bind(tmdb_id)
.fetch_all(&self.pool)
.await
.map_err(map_err)?
.into_iter()
.map(|r| CastCredit {
movie_id: MovieId::from_uuid(uuid::Uuid::parse_str(&r.id).unwrap_or_default()),
title: r.title,
release_year: r.release_year.map(|y| y as u16),
character: r.character,
poster_path: r.poster_path,
})
.collect();
let crew = sqlx::query_as::<_, CrewRow>(
"SELECT m.id, m.title, m.release_year, mc.job, mc.department, m.poster_path
FROM movie_crew mc JOIN movies m ON m.id = mc.movie_id
WHERE mc.tmdb_person_id = $1 ORDER BY m.title",
)
.bind(tmdb_id)
.fetch_all(&self.pool)
.await
.map_err(map_err)?
.into_iter()
.map(|r| CrewCredit {
movie_id: MovieId::from_uuid(uuid::Uuid::parse_str(&r.id).unwrap_or_default()),
title: r.title,
release_year: r.release_year.map(|y| y as u16),
job: r.job,
department: r.department,
poster_path: r.poster_path,
})
.collect();
Ok(PersonCredits { person, cast, crew })
}
}

View File

@@ -0,0 +1,13 @@
[package]
name = "sqlite-search"
version = "0.1.0"
edition = "2021"
[dependencies]
domain = { workspace = true }
async-trait = { workspace = true }
sqlx = { workspace = true }
uuid = { workspace = true }
[dev-dependencies]
tokio = { workspace = true }

View File

@@ -0,0 +1,356 @@
use std::sync::Arc;
use async_trait::async_trait;
use domain::{
errors::DomainError,
models::{
EntityType, IndexableDocument, MovieSearchHit, PersonSearchHit,
SearchQuery, SearchResults,
collections::Paginated,
},
models::PersonId,
value_objects::MovieId,
ports::{SearchCommand, SearchPort},
};
use sqlx::SqlitePool;
pub struct SqliteSearchAdapter {
pool: SqlitePool,
}
impl SqliteSearchAdapter {
pub fn new(pool: SqlitePool) -> Self {
Self { pool }
}
}
pub fn create_search_adapter(pool: SqlitePool) -> (Arc<dyn SearchCommand>, Arc<dyn SearchPort>) {
let adapter = Arc::new(SqliteSearchAdapter::new(pool));
(Arc::clone(&adapter) as Arc<dyn SearchCommand>, adapter as Arc<dyn SearchPort>)
}
fn map_err(e: sqlx::Error) -> DomainError {
DomainError::InfrastructureError(e.to_string())
}
#[async_trait]
impl SearchCommand for SqliteSearchAdapter {
async fn index(&self, doc: IndexableDocument) -> Result<(), DomainError> {
match doc {
IndexableDocument::Movie { id, movie, profile } => {
let movie_id = id.value().to_string();
let title = movie.title().value().to_string();
let director = movie.director().unwrap_or("").to_string();
let release_year = movie.release_year().value() as i64;
let (overview, genres, keywords, cast_names, crew_names, language) =
match profile.as_deref() {
Some(p) => (
p.overview.clone().unwrap_or_default(),
p.genres.iter().map(|g| g.name.as_str()).collect::<Vec<_>>().join(" "),
p.keywords.iter().map(|k| k.name.as_str()).collect::<Vec<_>>().join(" "),
p.cast.iter().map(|c| c.name.as_str()).collect::<Vec<_>>().join(" "),
p.crew.iter().map(|c| c.name.as_str()).collect::<Vec<_>>().join(" "),
p.original_language.clone().unwrap_or_default(),
),
None => (String::new(), String::new(), String::new(), String::new(), String::new(), String::new()),
};
sqlx::query(
"DELETE FROM movies_fts WHERE rowid = (SELECT rowid FROM movies_fts WHERE movie_id = ? LIMIT 1)",
)
.bind(&movie_id)
.execute(&self.pool)
.await
.map_err(map_err)?;
sqlx::query(
"INSERT INTO movies_fts(movie_id, title, director, overview, genres, keywords, cast_names, crew_names, release_year, language)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
)
.bind(&movie_id)
.bind(&title)
.bind(&director)
.bind(&overview)
.bind(&genres)
.bind(&keywords)
.bind(&cast_names)
.bind(&crew_names)
.bind(release_year)
.bind(&language)
.execute(&self.pool)
.await
.map_err(map_err)?;
Ok(())
}
IndexableDocument::Person { id, person } => {
let person_id = id.value().to_string();
sqlx::query(
"DELETE FROM people_fts WHERE rowid = (SELECT rowid FROM people_fts WHERE person_id = ? LIMIT 1)",
)
.bind(&person_id)
.execute(&self.pool)
.await
.map_err(map_err)?;
sqlx::query(
"INSERT INTO people_fts(person_id, name, known_for_department) VALUES (?, ?, ?)",
)
.bind(&person_id)
.bind(person.name())
.bind(person.known_for_department())
.execute(&self.pool)
.await
.map_err(map_err)?;
Ok(())
}
}
}
async fn remove(&self, entity_type: EntityType, id: &str) -> Result<(), DomainError> {
match entity_type {
EntityType::Movie => {
sqlx::query(
"DELETE FROM movies_fts WHERE rowid = (SELECT rowid FROM movies_fts WHERE movie_id = ? LIMIT 1)",
)
.bind(id)
.execute(&self.pool)
.await
.map_err(map_err)?;
}
EntityType::Person => {
sqlx::query(
"DELETE FROM people_fts WHERE rowid = (SELECT rowid FROM people_fts WHERE person_id = ? LIMIT 1)",
)
.bind(id)
.execute(&self.pool)
.await
.map_err(map_err)?;
}
}
Ok(())
}
}
#[async_trait]
impl SearchPort for SqliteSearchAdapter {
async fn search(&self, query: &SearchQuery) -> Result<SearchResults, DomainError> {
let movies = self.search_movies(query).await?;
let people = self.search_people(query).await?;
Ok(SearchResults { movies, people })
}
}
impl SqliteSearchAdapter {
async fn search_movies(&self, query: &SearchQuery) -> Result<Paginated<MovieSearchHit>, DomainError> {
let limit = query.page.limit as i64;
let offset = query.page.offset as i64;
#[derive(sqlx::FromRow)]
struct Row {
id: String,
title: String,
release_year: Option<i64>,
director: Option<String>,
poster_path: Option<String>,
genres: Option<String>,
}
let total: u64 = if let Some(text) = &query.text {
let fts_query = format!("{}*", text.replace('"', "").replace('*', ""));
let count: i64 = sqlx::query_scalar(
"SELECT COUNT(DISTINCT m.id)
FROM movies_fts fts
JOIN movies m ON m.id = fts.movie_id
LEFT JOIN movie_genres mg ON mg.movie_id = m.id
WHERE movies_fts MATCH ?
AND (? IS NULL OR EXISTS (SELECT 1 FROM movie_genres WHERE movie_id = m.id AND name = ?))
AND (? IS NULL OR m.release_year = ?)",
)
.bind(&fts_query)
.bind(&query.filters.genre)
.bind(&query.filters.genre)
.bind(query.filters.year.map(|y| y as i64))
.bind(query.filters.year.map(|y| y as i64))
.fetch_one(&self.pool)
.await
.map_err(map_err)?;
count as u64
} else {
let count: i64 = sqlx::query_scalar(
"SELECT COUNT(DISTINCT m.id)
FROM movies m
LEFT JOIN movie_genres mg ON mg.movie_id = m.id
WHERE (? IS NULL OR EXISTS (SELECT 1 FROM movie_genres WHERE movie_id = m.id AND name = ?))
AND (? IS NULL OR m.release_year = ?)",
)
.bind(&query.filters.genre)
.bind(&query.filters.genre)
.bind(query.filters.year.map(|y| y as i64))
.bind(query.filters.year.map(|y| y as i64))
.fetch_one(&self.pool)
.await
.map_err(map_err)?;
count as u64
};
let rows: Vec<Row> = if let Some(text) = &query.text {
let fts_query = format!("{}*", text.replace('"', "").replace('*', ""));
sqlx::query_as::<_, Row>(
"SELECT m.id, m.title, m.release_year, m.director, m.poster_path,
GROUP_CONCAT(DISTINCT mg.name) AS genres
FROM movies_fts fts
JOIN movies m ON m.id = fts.movie_id
LEFT JOIN movie_genres mg ON mg.movie_id = m.id
WHERE movies_fts MATCH ?
AND (? IS NULL OR EXISTS (SELECT 1 FROM movie_genres WHERE movie_id = m.id AND name = ?))
AND (? IS NULL OR m.release_year = ?)
GROUP BY m.id
ORDER BY rank
LIMIT ? OFFSET ?",
)
.bind(&fts_query)
.bind(&query.filters.genre)
.bind(&query.filters.genre)
.bind(query.filters.year.map(|y| y as i64))
.bind(query.filters.year.map(|y| y as i64))
.bind(limit)
.bind(offset)
.fetch_all(&self.pool)
.await
.map_err(map_err)?
} else {
sqlx::query_as::<_, Row>(
"SELECT m.id, m.title, m.release_year, m.director, m.poster_path,
GROUP_CONCAT(DISTINCT mg.name) AS genres
FROM movies m
LEFT JOIN movie_genres mg ON mg.movie_id = m.id
WHERE (? IS NULL OR EXISTS (SELECT 1 FROM movie_genres WHERE movie_id = m.id AND name = ?))
AND (? IS NULL OR m.release_year = ?)
GROUP BY m.id
ORDER BY m.title
LIMIT ? OFFSET ?",
)
.bind(&query.filters.genre)
.bind(&query.filters.genre)
.bind(query.filters.year.map(|y| y as i64))
.bind(query.filters.year.map(|y| y as i64))
.bind(limit)
.bind(offset)
.fetch_all(&self.pool)
.await
.map_err(map_err)?
};
let items = rows.into_iter().map(|r| MovieSearchHit {
movie_id: MovieId::from_uuid(uuid::Uuid::parse_str(&r.id).unwrap_or_default()),
title: r.title,
release_year: r.release_year.map(|y| y as u16),
director: r.director,
poster_path: r.poster_path,
genres: r.genres
.unwrap_or_default()
.split(',')
.filter(|s| !s.is_empty())
.map(str::to_string)
.collect(),
}).collect::<Vec<_>>();
Ok(Paginated { items, total_count: total, limit: query.page.limit, offset: query.page.offset })
}
async fn search_people(&self, query: &SearchQuery) -> Result<Paginated<PersonSearchHit>, DomainError> {
let Some(text) = &query.text else {
return Ok(Paginated {
items: vec![],
total_count: 0,
limit: query.page.limit,
offset: query.page.offset,
});
};
let limit = query.page.limit as i64;
let offset = query.page.offset as i64;
let fts_query = format!("{}*", text.replace('"', "").replace('*', ""));
let total: u64 = {
let count: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM people_fts WHERE people_fts MATCH ?",
)
.bind(&fts_query)
.fetch_one(&self.pool)
.await
.map_err(map_err)?;
count as u64
};
#[derive(sqlx::FromRow)]
struct Row {
person_id: String,
name: String,
known_for_department: Option<String>,
profile_path: Option<String>,
}
let rows = sqlx::query_as::<_, Row>(
"SELECT fts.person_id, p.name, p.known_for_department, p.profile_path
FROM people_fts fts
JOIN persons p ON p.id = fts.person_id
WHERE people_fts MATCH ?
ORDER BY rank
LIMIT ? OFFSET ?",
)
.bind(&fts_query)
.bind(limit)
.bind(offset)
.fetch_all(&self.pool)
.await
.map_err(map_err)?;
let mut items = Vec::with_capacity(rows.len());
for row in rows {
let tmdb_id: Option<i64> = sqlx::query_scalar(
"SELECT tmdb_person_id FROM persons WHERE id = ?",
)
.bind(&row.person_id)
.fetch_optional(&self.pool)
.await
.map_err(map_err)?
.flatten();
let known_for_titles = if let Some(tid) = tmdb_id {
sqlx::query_scalar::<_, String>(
"SELECT m.title FROM movie_cast mc
JOIN movies m ON m.id = mc.movie_id
WHERE mc.tmdb_person_id = ?
ORDER BY mc.billing_order
LIMIT 3",
)
.bind(tid)
.fetch_all(&self.pool)
.await
.unwrap_or_default()
} else {
vec![]
};
items.push(PersonSearchHit {
person_id: PersonId::from_uuid(
uuid::Uuid::parse_str(&row.person_id).unwrap_or_default()
),
name: row.name,
known_for_department: row.known_for_department,
profile_path: row.profile_path,
known_for_titles,
});
}
Ok(Paginated { items, total_count: total, limit: query.page.limit, offset: query.page.offset })
}
}
#[cfg(test)]
#[path = "tests/lib.rs"]
mod tests;

View File

@@ -0,0 +1,157 @@
use super::{SqliteSearchAdapter, create_search_adapter};
use domain::{
models::{
EntityType, IndexableDocument, Movie,
Person, PersonId, SearchFilters, SearchQuery,
ExternalPersonId,
collections::PageParams,
},
value_objects::{MovieId, MovieTitle, ReleaseYear},
ports::{SearchCommand, SearchPort},
};
use sqlx::SqlitePool;
async fn pool_with_schema() -> SqlitePool {
let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
sqlx::query(
"CREATE TABLE movies (id TEXT PRIMARY KEY, title TEXT NOT NULL,
release_year INTEGER, director TEXT, poster_path TEXT, external_metadata_id TEXT)",
)
.execute(&pool).await.unwrap();
sqlx::query(
"CREATE TABLE persons (id TEXT PRIMARY KEY, external_id TEXT UNIQUE,
tmdb_person_id INTEGER UNIQUE, name TEXT NOT NULL,
known_for_department TEXT, profile_path TEXT)",
)
.execute(&pool).await.unwrap();
sqlx::query(
"CREATE TABLE movie_cast (movie_id TEXT, tmdb_person_id INTEGER,
name TEXT, character TEXT, billing_order INTEGER, profile_path TEXT)",
)
.execute(&pool).await.unwrap();
sqlx::query(
"CREATE TABLE movie_genres (movie_id TEXT, tmdb_id INTEGER, name TEXT)",
)
.execute(&pool).await.unwrap();
sqlx::query(
"CREATE VIRTUAL TABLE movies_fts USING fts5(
movie_id UNINDEXED, title, director, overview, genres, keywords,
cast_names, crew_names, release_year UNINDEXED, language UNINDEXED)",
)
.execute(&pool).await.unwrap();
sqlx::query(
"CREATE VIRTUAL TABLE people_fts USING fts5(
person_id UNINDEXED, name, known_for_department UNINDEXED)",
)
.execute(&pool).await.unwrap();
pool
}
fn test_movie(id: &str, title: &str, year: u16) -> Movie {
Movie::from_persistence(
MovieId::from_uuid(uuid::Uuid::parse_str(id).unwrap()),
None,
MovieTitle::new(title.into()).unwrap(),
ReleaseYear::new(year).unwrap(),
Some("Test Director".to_string()),
None,
)
}
fn default_page() -> PageParams {
PageParams::new(Some(10), Some(0)).unwrap()
}
#[tokio::test]
async fn index_and_search_movie_by_title() {
let pool = pool_with_schema().await;
let (cmd, query) = create_search_adapter(pool.clone());
let id_str = "00000000-0000-0000-0000-000000000001";
let movie = test_movie(id_str, "Interstellar", 2014);
let movie_id = movie.id().clone();
sqlx::query("INSERT INTO movies VALUES (?, ?, ?, ?, ?, ?)")
.bind(id_str).bind("Interstellar").bind(2014i32)
.bind("Christopher Nolan").bind::<Option<String>>(None).bind::<Option<String>>(None)
.execute(&pool).await.unwrap();
cmd.index(IndexableDocument::Movie { id: movie_id.clone(), movie: Box::new(movie), profile: None })
.await.unwrap();
let results = query.search(&SearchQuery {
text: Some("Interstellar".to_string()),
filters: SearchFilters::default(),
page: default_page(),
}).await.unwrap();
assert_eq!(results.movies.items.len(), 1);
assert_eq!(results.movies.items[0].title, "Interstellar");
}
#[tokio::test]
async fn remove_movie_clears_from_index() {
let pool = pool_with_schema().await;
let (cmd, query) = create_search_adapter(pool.clone());
let id_str = "00000000-0000-0000-0000-000000000002";
let movie = test_movie(id_str, "Inception", 2010);
let movie_id = movie.id().clone();
sqlx::query("INSERT INTO movies VALUES (?, ?, ?, ?, ?, ?)")
.bind(id_str).bind("Inception").bind(2010i32)
.bind("Christopher Nolan").bind::<Option<String>>(None).bind::<Option<String>>(None)
.execute(&pool).await.unwrap();
cmd.index(IndexableDocument::Movie { id: movie_id.clone(), movie: Box::new(movie), profile: None })
.await.unwrap();
cmd.remove(EntityType::Movie, id_str).await.unwrap();
let results = query.search(&SearchQuery {
text: Some("Inception".to_string()),
filters: SearchFilters::default(),
page: default_page(),
}).await.unwrap();
assert!(results.movies.items.is_empty());
}
#[tokio::test]
async fn search_with_genre_filter() {
let pool = pool_with_schema().await;
let (cmd, query) = create_search_adapter(pool.clone());
let id_str = "00000000-0000-0000-0000-000000000003";
let movie = test_movie(id_str, "The Dark Knight", 2008);
let movie_id = movie.id().clone();
sqlx::query("INSERT INTO movies VALUES (?, ?, ?, ?, ?, ?)")
.bind(id_str).bind("The Dark Knight").bind(2008i32)
.bind("Christopher Nolan").bind::<Option<String>>(None).bind::<Option<String>>(None)
.execute(&pool).await.unwrap();
sqlx::query("INSERT INTO movie_genres VALUES (?, 1, 'Action')")
.bind(id_str)
.execute(&pool).await.unwrap();
cmd.index(IndexableDocument::Movie {
id: movie_id.clone(),
movie: Box::new(movie),
profile: None,
}).await.unwrap();
// Matching genre — no text filter
let results = query.search(&SearchQuery {
text: None,
filters: SearchFilters { genre: Some("Action".to_string()), ..Default::default() },
page: default_page(),
}).await.unwrap();
assert_eq!(results.movies.items.len(), 1);
// Non-matching genre
let results = query.search(&SearchQuery {
text: None,
filters: SearchFilters { genre: Some("Comedy".to_string()), ..Default::default() },
page: default_page(),
}).await.unwrap();
assert!(results.movies.items.is_empty());
}

View File

@@ -0,0 +1,36 @@
-- Persons table. tmdb_person_id is stored for efficient joins with existing
-- movie_cast and movie_crew tables (which use tmdb_person_id as their person key).
CREATE TABLE IF NOT EXISTS persons (
id TEXT PRIMARY KEY, -- UUID (PersonId)
external_id TEXT NOT NULL UNIQUE, -- "tmdb:12345"
tmdb_person_id INTEGER UNIQUE, -- parsed from external_id for fast joins
name TEXT NOT NULL,
known_for_department TEXT,
profile_path TEXT
);
CREATE INDEX IF NOT EXISTS idx_persons_external ON persons (external_id);
CREATE INDEX IF NOT EXISTS idx_persons_tmdb_id ON persons (tmdb_person_id);
-- FTS5 full-text search table for movies.
-- movie_id is UNINDEXED (stored but not tokenised for text search).
-- release_year and language are UNINDEXED (used only for structured filters).
CREATE VIRTUAL TABLE IF NOT EXISTS movies_fts USING fts5(
movie_id UNINDEXED,
title,
director,
overview,
genres,
keywords,
cast_names,
crew_names,
release_year UNINDEXED,
language UNINDEXED
);
-- FTS5 full-text search table for people.
CREATE VIRTUAL TABLE IF NOT EXISTS people_fts USING fts5(
person_id UNINDEXED,
name,
known_for_department UNINDEXED
);

View File

@@ -17,6 +17,7 @@ mod import_profile;
mod import_session;
mod migrations;
mod models;
mod persons;
mod profile;
mod users;
@@ -28,6 +29,7 @@ use models::{
pub use image_ref::{SqliteImageRefAdapter, create_image_ref};
pub use import_profile::SqliteImportProfileRepository;
pub use import_session::SqliteImportSessionRepository;
pub use persons::{SqlitePersonAdapter, create_person_adapter};
pub use profile::SqliteMovieProfileRepository;
pub use users::SqliteUserRepository;

View File

@@ -0,0 +1,195 @@
use async_trait::async_trait;
use domain::{
errors::DomainError,
models::{CastCredit, CrewCredit, ExternalPersonId, Person, PersonCredits, PersonId},
ports::{PersonCommand, PersonQuery},
value_objects::MovieId,
};
use sqlx::SqlitePool;
use std::sync::Arc;
pub struct SqlitePersonAdapter {
pool: SqlitePool,
}
impl SqlitePersonAdapter {
pub fn new(pool: SqlitePool) -> Self {
Self { pool }
}
}
pub fn create_person_adapter(pool: SqlitePool) -> (Arc<dyn PersonCommand>, Arc<dyn PersonQuery>) {
let adapter = Arc::new(SqlitePersonAdapter::new(pool));
(Arc::clone(&adapter) as Arc<dyn PersonCommand>, adapter as Arc<dyn PersonQuery>)
}
fn map_err(e: sqlx::Error) -> DomainError {
DomainError::InfrastructureError(e.to_string())
}
#[async_trait]
impl PersonCommand for SqlitePersonAdapter {
async fn upsert_batch(&self, persons: &[Person]) -> Result<(), DomainError> {
for person in persons {
let tmdb_id = person.external_id().tmdb_id();
sqlx::query(
"INSERT INTO persons (id, external_id, tmdb_person_id, name, known_for_department, profile_path)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
external_id = excluded.external_id,
tmdb_person_id = excluded.tmdb_person_id,
name = excluded.name,
known_for_department = excluded.known_for_department,
profile_path = excluded.profile_path",
)
.bind(person.id().value().to_string())
.bind(person.external_id().value())
.bind(tmdb_id)
.bind(person.name())
.bind(person.known_for_department())
.bind(person.profile_path())
.execute(&self.pool)
.await
.map_err(map_err)?;
}
Ok(())
}
}
#[async_trait]
impl PersonQuery for SqlitePersonAdapter {
async fn get_by_id(&self, id: &PersonId) -> Result<Option<Person>, DomainError> {
let row = sqlx::query_as::<_, PersonRow>(
"SELECT id, external_id, name, known_for_department, profile_path FROM persons WHERE id = ?",
)
.bind(id.value().to_string())
.fetch_optional(&self.pool)
.await
.map_err(map_err)?;
Ok(row.map(PersonRow::into_person))
}
async fn get_by_external_id(&self, id: &ExternalPersonId) -> Result<Option<Person>, DomainError> {
let row = sqlx::query_as::<_, PersonRow>(
"SELECT id, external_id, name, known_for_department, profile_path FROM persons WHERE external_id = ?",
)
.bind(id.value())
.fetch_optional(&self.pool)
.await
.map_err(map_err)?;
Ok(row.map(PersonRow::into_person))
}
async fn get_credits(&self, id: &PersonId) -> Result<PersonCredits, DomainError> {
let person = self.get_by_id(id).await?.ok_or_else(|| {
DomainError::NotFound(format!("Person {} not found", id.value()))
})?;
let tmdb_id: Option<i64> = sqlx::query_scalar(
"SELECT tmdb_person_id FROM persons WHERE id = ?",
)
.bind(id.value().to_string())
.fetch_optional(&self.pool)
.await
.map_err(map_err)?
.flatten();
let Some(tmdb_id) = tmdb_id else {
return Ok(PersonCredits { person, cast: vec![], crew: vec![] });
};
let cast = sqlx::query_as::<_, CastRow>(
"SELECT m.id, m.title, m.release_year, mc.character, m.poster_path
FROM movie_cast mc
JOIN movies m ON m.id = mc.movie_id
WHERE mc.tmdb_person_id = ?
ORDER BY mc.billing_order",
)
.bind(tmdb_id)
.fetch_all(&self.pool)
.await
.map_err(map_err)?
.into_iter()
.map(|r| CastCredit {
movie_id: MovieId::from_uuid(uuid::Uuid::parse_str(&r.id).unwrap_or_default()),
title: r.title,
release_year: r.release_year.map(|y| y as u16),
character: r.character,
poster_path: r.poster_path,
})
.collect();
let crew = sqlx::query_as::<_, CrewRow>(
"SELECT m.id, m.title, m.release_year, mc.job, mc.department, m.poster_path
FROM movie_crew mc
JOIN movies m ON m.id = mc.movie_id
WHERE mc.tmdb_person_id = ?
ORDER BY m.title",
)
.bind(tmdb_id)
.fetch_all(&self.pool)
.await
.map_err(map_err)?
.into_iter()
.map(|r| CrewCredit {
movie_id: MovieId::from_uuid(uuid::Uuid::parse_str(&r.id).unwrap_or_default()),
title: r.title,
release_year: r.release_year.map(|y| y as u16),
job: r.job,
department: r.department,
poster_path: r.poster_path,
})
.collect();
Ok(PersonCredits { person, cast, crew })
}
}
// ── Row types ────────────────────────────────────────────────────────────────
#[derive(sqlx::FromRow)]
struct PersonRow {
id: String,
external_id: String,
name: String,
known_for_department: Option<String>,
profile_path: Option<String>,
}
impl PersonRow {
fn into_person(self) -> Person {
let ext = ExternalPersonId::new(self.external_id);
Person::new(
PersonId::from_uuid(uuid::Uuid::parse_str(&self.id).unwrap_or_default()),
ext,
self.name,
self.known_for_department,
self.profile_path,
)
}
}
#[derive(sqlx::FromRow)]
struct CastRow {
id: String,
title: String,
release_year: Option<i64>,
character: String,
poster_path: Option<String>,
}
#[derive(sqlx::FromRow)]
struct CrewRow {
id: String,
title: String,
release_year: Option<i64>,
job: String,
department: String,
poster_path: Option<String>,
}
#[cfg(test)]
#[path = "tests/persons.rs"]
mod tests;

View File

@@ -0,0 +1,126 @@
use super::super::persons::SqlitePersonAdapter;
use domain::{
errors::DomainError,
models::{ExternalPersonId, Person, PersonId},
ports::{PersonCommand, PersonQuery},
};
use sqlx::SqlitePool;
async fn pool_with_schema() -> SqlitePool {
let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
sqlx::query(
"CREATE TABLE persons (
id TEXT PRIMARY KEY, external_id TEXT NOT NULL UNIQUE,
tmdb_person_id INTEGER UNIQUE, name TEXT NOT NULL,
known_for_department TEXT, profile_path TEXT
)",
)
.execute(&pool)
.await
.unwrap();
sqlx::query(
"CREATE TABLE movies (id TEXT PRIMARY KEY, title TEXT NOT NULL,
release_year INTEGER, director TEXT, poster_path TEXT,
external_metadata_id TEXT)",
)
.execute(&pool)
.await
.unwrap();
sqlx::query(
"CREATE TABLE movie_cast (movie_id TEXT, tmdb_person_id INTEGER,
name TEXT, character TEXT, billing_order INTEGER, profile_path TEXT,
PRIMARY KEY (movie_id, tmdb_person_id))",
)
.execute(&pool)
.await
.unwrap();
sqlx::query(
"CREATE TABLE movie_crew (movie_id TEXT, tmdb_person_id INTEGER,
name TEXT, job TEXT, department TEXT, profile_path TEXT,
PRIMARY KEY (movie_id, tmdb_person_id, job))",
)
.execute(&pool)
.await
.unwrap();
pool
}
fn make_person(tmdb_id: i64, name: &str, dept: Option<&str>) -> Person {
let ext = ExternalPersonId::new(format!("tmdb:{tmdb_id}"));
Person::new(
PersonId::from_external(&ext),
ext,
name.to_string(),
dept.map(str::to_string),
None,
)
}
#[tokio::test]
async fn upsert_batch_inserts_persons() {
let pool = pool_with_schema().await;
let adapter = SqlitePersonAdapter::new(pool.clone());
let persons = vec![make_person(1, "Alice", Some("Acting")), make_person(2, "Bob", Some("Directing"))];
adapter.upsert_batch(&persons).await.unwrap();
let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM persons")
.fetch_one(&pool).await.unwrap();
assert_eq!(count.0, 2);
}
#[tokio::test]
async fn upsert_batch_is_idempotent() {
let pool = pool_with_schema().await;
let adapter = SqlitePersonAdapter::new(pool.clone());
let persons = vec![make_person(1, "Alice", Some("Acting"))];
adapter.upsert_batch(&persons).await.unwrap();
adapter.upsert_batch(&persons).await.unwrap();
let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM persons")
.fetch_one(&pool).await.unwrap();
assert_eq!(count.0, 1);
}
#[tokio::test]
async fn get_by_id_returns_person() {
let pool = pool_with_schema().await;
let adapter = SqlitePersonAdapter::new(pool.clone());
let p = make_person(42, "Charlie", Some("Acting"));
adapter.upsert_batch(&[p.clone()]).await.unwrap();
let found = adapter.get_by_id(p.id()).await.unwrap().unwrap();
assert_eq!(found.name(), "Charlie");
assert_eq!(found.external_id().value(), "tmdb:42");
}
#[tokio::test]
async fn get_by_id_returns_none_for_unknown() {
let pool = pool_with_schema().await;
let adapter = SqlitePersonAdapter::new(pool);
let ext = ExternalPersonId::new("tmdb:999");
let id = PersonId::from_external(&ext);
assert!(adapter.get_by_id(&id).await.unwrap().is_none());
}
#[tokio::test]
async fn get_credits_returns_cast_and_crew() {
let pool = pool_with_schema().await;
let adapter = SqlitePersonAdapter::new(pool.clone());
let p = make_person(7, "Diana", Some("Acting"));
adapter.upsert_batch(&[p.clone()]).await.unwrap();
sqlx::query("INSERT INTO movies VALUES ('m1', 'The Film', 2020, 'Dir', NULL, NULL)")
.execute(&pool).await.unwrap();
sqlx::query("INSERT INTO movie_cast VALUES ('m1', 7, 'Diana', 'Hero', 1, NULL)")
.execute(&pool).await.unwrap();
let credits = adapter.get_credits(p.id()).await.unwrap();
assert_eq!(credits.person.name(), "Diana");
assert_eq!(credits.cast.len(), 1);
assert_eq!(credits.cast[0].character, "Hero");
assert!(credits.crew.is_empty());
}

View File

@@ -4,6 +4,7 @@ version = "0.1.0"
edition = "2024"
[dependencies]
application = { workspace = true }
domain = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true }

View File

@@ -2,11 +2,12 @@ use std::sync::Arc;
use async_trait::async_trait;
use chrono::Utc;
use application::{commands::EnrichMovieCommand, use_cases::enrich_movie};
use domain::{
errors::DomainError,
events::DomainEvent,
models::{CastMember, CrewMember, Genre, Keyword, MovieProfile},
ports::{EventHandler, MovieEnrichmentClient, MovieProfileRepository},
ports::{EventHandler, MovieEnrichmentClient, MovieProfileRepository, MovieRepository, PersonCommand, SearchCommand},
value_objects::MovieId,
};
use serde::Deserialize;
@@ -163,7 +164,10 @@ impl MovieEnrichmentClient for TmdbEnrichmentClient {
pub struct EnrichmentHandler {
pub enrichment_client: Arc<dyn MovieEnrichmentClient>,
pub profile_repo: Arc<dyn MovieProfileRepository>,
pub movie_repository: Arc<dyn MovieRepository>,
pub profile_repo: Arc<dyn MovieProfileRepository>,
pub person_command: Arc<dyn PersonCommand>,
pub search_command: Arc<dyn SearchCommand>,
}
#[async_trait]
@@ -176,7 +180,7 @@ impl EventHandler for EnrichmentHandler {
_ => return Ok(()),
};
// Skip if profile is fresh (checked by the repo's list_stale, but guard here too)
// Skip if profile is fresh (< 30 days old)
if let Ok(Some(existing)) = self.profile_repo.get_by_movie_id(&movie_id).await {
let age = Utc::now() - existing.enriched_at;
if age.num_days() < 30 {
@@ -190,16 +194,17 @@ impl EventHandler for EnrichmentHandler {
}
tracing::info!(movie_id = %movie_id.value(), external_id = %external_metadata_id, "enriching movie");
match self.enrichment_client.fetch_profile(movie_id.clone(), &external_metadata_id).await {
Ok(profile) => {
self.profile_repo.upsert(&profile).await?;
tracing::info!(
movie_id = %movie_id.value(),
genres = profile.genres.len(),
cast = profile.cast.len(),
crew = profile.crew.len(),
"enrichment stored"
);
enrich_movie::execute(
&self.movie_repository,
&self.profile_repo,
&self.person_command,
&self.search_command,
EnrichMovieCommand { movie_id, profile },
)
.await?;
}
Err(DomainError::NotFound(msg)) => {
tracing::warn!(movie_id = %movie_id.value(), "TMDb lookup found nothing: {msg}");