diff --git a/crates/adapters/postgres/migrations/0024_wrap_up.sql b/crates/adapters/postgres/migrations/0024_wrap_up.sql new file mode 100644 index 0000000..fbc0b19 --- /dev/null +++ b/crates/adapters/postgres/migrations/0024_wrap_up.sql @@ -0,0 +1,12 @@ +CREATE TABLE IF NOT EXISTS wrap_up_records ( + id TEXT PRIMARY KEY NOT NULL, + user_id TEXT, + start_date DATE NOT NULL, + end_date DATE NOT NULL, + status TEXT NOT NULL DEFAULT 'pending', + report_json TEXT, + error_message TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + completed_at TIMESTAMPTZ +); +CREATE INDEX IF NOT EXISTS idx_wrap_up_user ON wrap_up_records (user_id, start_date, end_date); diff --git a/crates/adapters/postgres/src/lib.rs b/crates/adapters/postgres/src/lib.rs index d9d109d..bb52fc5 100644 --- a/crates/adapters/postgres/src/lib.rs +++ b/crates/adapters/postgres/src/lib.rs @@ -23,6 +23,7 @@ mod profile_fields; mod users; mod watch_event; mod watchlist; +mod wrapup; use models::{ DiaryRow, DirectorCountRow, FeedRow, MonthlyRatingRow, MovieRow, MovieStatsRow, @@ -39,6 +40,7 @@ pub use profile_fields::PostgresProfileFieldsRepository; pub use users::PostgresUserRepository; pub use watch_event::{PostgresWatchEventRepository, PostgresWebhookTokenRepository}; pub use watchlist::PostgresWatchlistRepository; +pub use wrapup::{PostgresWrapUpRepository, PostgresWrapUpStatsQuery}; fn format_year_month(ym: &str) -> String { let parts: Vec<&str> = ym.splitn(2, '-').collect(); @@ -1000,6 +1002,8 @@ pub struct PostgresWireOutput { pub movie_profile: std::sync::Arc, pub watchlist: std::sync::Arc, pub ap_content: std::sync::Arc, + pub wrapup_repo: std::sync::Arc, + pub wrapup_stats: std::sync::Arc, } pub async fn wire(database_url: &str) -> anyhow::Result { @@ -1028,6 +1032,8 @@ pub async fn wire(database_url: &str) -> anyhow::Result { as _, movie_profile: std::sync::Arc::new(PostgresMovieProfileRepository::new(pool.clone())) as _, watchlist: std::sync::Arc::new(PostgresWatchlistRepository::new(pool.clone())) as _, - ap_content: std::sync::Arc::new(PostgresApContentQuery::new(pool)) as _, + ap_content: std::sync::Arc::new(PostgresApContentQuery::new(pool.clone())) as _, + wrapup_repo: std::sync::Arc::new(PostgresWrapUpRepository::new(pool.clone())) as _, + wrapup_stats: std::sync::Arc::new(PostgresWrapUpStatsQuery::new(pool)) as _, }) } diff --git a/crates/adapters/postgres/src/wrapup.rs b/crates/adapters/postgres/src/wrapup.rs new file mode 100644 index 0000000..4674f99 --- /dev/null +++ b/crates/adapters/postgres/src/wrapup.rs @@ -0,0 +1,430 @@ +use std::collections::HashMap; + +use async_trait::async_trait; +use chrono::NaiveDate; +use domain::{ + errors::DomainError, + models::wrapup::{DateRange, WrapUpRecord, WrapUpScope, WrapUpStatus}, + ports::{WrapUpMovieRow, WrapUpRepository, WrapUpStatsQuery}, + value_objects::WrapUpId, +}; +use sqlx::{PgPool, Row}; +use uuid::Uuid; + +use crate::models::{parse_datetime, parse_uuid}; + +fn map_err(e: sqlx::Error) -> DomainError { + tracing::error!("Database error: {:?}", e); + DomainError::InfrastructureError("Database operation failed".into()) +} + +fn status_to_str(s: &WrapUpStatus) -> &'static str { + match s { + WrapUpStatus::Pending => "pending", + WrapUpStatus::Generating => "generating", + WrapUpStatus::Ready => "ready", + WrapUpStatus::Failed => "failed", + } +} + +fn parse_status(s: &str) -> Result { + match s { + "pending" => Ok(WrapUpStatus::Pending), + "generating" => Ok(WrapUpStatus::Generating), + "ready" => Ok(WrapUpStatus::Ready), + "failed" => Ok(WrapUpStatus::Failed), + other => Err(DomainError::InfrastructureError(format!( + "Unknown wrap-up status: {other}" + ))), + } +} + +// ── WrapUpRepository ───────────────────────────────────────────────────────── + +pub struct PostgresWrapUpRepository { + pool: PgPool, +} + +impl PostgresWrapUpRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl WrapUpRepository for PostgresWrapUpRepository { + async fn create(&self, record: &WrapUpRecord) -> Result<(), DomainError> { + let id = record.id.value().to_string(); + let user_id = record.user_id.map(|u| u.to_string()); + let status = status_to_str(&record.status); + + sqlx::query( + "INSERT INTO wrap_up_records \ + (id, user_id, start_date, end_date, status, report_json, error_message, created_at, completed_at) \ + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)", + ) + .bind(&id) + .bind(&user_id) + .bind(record.start_date) + .bind(record.end_date) + .bind(status) + .bind(&record.report_json) + .bind(&record.error_message) + .bind(record.created_at) + .bind(record.completed_at) + .execute(&self.pool) + .await + .map_err(map_err)?; + + Ok(()) + } + + async fn update_status( + &self, + id: &WrapUpId, + status: &WrapUpStatus, + error: Option<&str>, + ) -> Result<(), DomainError> { + let id_str = id.value().to_string(); + let status_str = status_to_str(status); + + sqlx::query( + "UPDATE wrap_up_records SET status = $1, error_message = $2 WHERE id = $3", + ) + .bind(status_str) + .bind(error) + .bind(&id_str) + .execute(&self.pool) + .await + .map_err(map_err)?; + + Ok(()) + } + + async fn set_complete(&self, id: &WrapUpId, report_json: &str) -> Result<(), DomainError> { + let id_str = id.value().to_string(); + + sqlx::query( + "UPDATE wrap_up_records \ + SET status = 'ready', report_json = $1, completed_at = NOW() \ + WHERE id = $2", + ) + .bind(report_json) + .bind(&id_str) + .execute(&self.pool) + .await + .map_err(map_err)?; + + Ok(()) + } + + async fn get_by_id(&self, id: &WrapUpId) -> Result, DomainError> { + let id_str = id.value().to_string(); + + let row = sqlx::query( + "SELECT id, user_id, start_date, end_date, status, report_json, error_message, \ + to_char(created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, \ + to_char(completed_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS completed_at \ + FROM wrap_up_records WHERE id = $1", + ) + .bind(&id_str) + .fetch_optional(&self.pool) + .await + .map_err(map_err)?; + + row.as_ref().map(row_to_record).transpose() + } + + async fn list_for_user(&self, user_id: Uuid) -> Result, DomainError> { + let uid = user_id.to_string(); + + let rows = sqlx::query( + "SELECT id, user_id, start_date, end_date, status, report_json, error_message, \ + to_char(created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, \ + to_char(completed_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS completed_at \ + FROM wrap_up_records WHERE user_id = $1 ORDER BY created_at DESC", + ) + .bind(&uid) + .fetch_all(&self.pool) + .await + .map_err(map_err)?; + + rows.iter().map(row_to_record).collect() + } + + async fn list_global(&self) -> Result, DomainError> { + let rows = sqlx::query( + "SELECT id, user_id, start_date, end_date, status, report_json, error_message, \ + to_char(created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, \ + to_char(completed_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS completed_at \ + FROM wrap_up_records WHERE user_id IS NULL ORDER BY created_at DESC", + ) + .fetch_all(&self.pool) + .await + .map_err(map_err)?; + + rows.iter().map(row_to_record).collect() + } + + async fn find_existing( + &self, + user_id: Option, + start: NaiveDate, + end: NaiveDate, + ) -> Result, DomainError> { + let uid = user_id.map(|u| u.to_string()); + + let row = sqlx::query( + "SELECT id, user_id, start_date, end_date, status, report_json, error_message, \ + to_char(created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, \ + to_char(completed_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS completed_at \ + FROM wrap_up_records \ + WHERE (($1::text IS NULL AND user_id IS NULL) OR user_id = $1) \ + AND start_date = $2 AND end_date = $3 \ + LIMIT 1", + ) + .bind(&uid) + .bind(start) + .bind(end) + .fetch_optional(&self.pool) + .await + .map_err(map_err)?; + + row.as_ref().map(row_to_record).transpose() + } +} + +fn row_to_record(row: &sqlx::postgres::PgRow) -> Result { + let id_str: String = row.try_get("id").map_err(map_err)?; + let user_id_str: Option = row.try_get("user_id").map_err(map_err)?; + let start_date: NaiveDate = row.try_get("start_date").map_err(map_err)?; + let end_date: NaiveDate = row.try_get("end_date").map_err(map_err)?; + let status_str: String = row.try_get("status").map_err(map_err)?; + let report_json: Option = row.try_get("report_json").map_err(map_err)?; + let error_message: Option = row.try_get("error_message").map_err(map_err)?; + let created_at_str: String = row.try_get("created_at").map_err(map_err)?; + let completed_at_str: Option = row.try_get("completed_at").map_err(map_err)?; + + let user_id = user_id_str + .as_deref() + .map(parse_uuid) + .transpose()?; + + Ok(WrapUpRecord { + id: WrapUpId::from_uuid(parse_uuid(&id_str)?), + user_id, + start_date, + end_date, + status: parse_status(&status_str)?, + report_json, + error_message, + created_at: parse_datetime(&created_at_str)?, + completed_at: completed_at_str.as_deref().map(parse_datetime).transpose()?, + }) +} + +// ── WrapUpStatsQuery ───────────────────────────────────────────────────────── + +pub struct PostgresWrapUpStatsQuery { + pool: PgPool, +} + +impl PostgresWrapUpStatsQuery { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl WrapUpStatsQuery for PostgresWrapUpStatsQuery { + async fn get_reviews_with_profiles( + &self, + scope: &WrapUpScope, + range: &DateRange, + ) -> Result, DomainError> { + // 1) Main query: reviews + movies + movie_profiles + let (scope_clause, scope_bind) = match scope { + WrapUpScope::User(uid) => ("AND r.user_id = $3", Some(uid.to_string())), + WrapUpScope::Global => ("", None), + }; + + let sql = format!( + "SELECT r.movie_id, m.title, m.release_year, m.director, m.poster_path, \ + r.rating, \ + to_char(r.watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at, \ + r.user_id, \ + p.runtime_minutes, p.budget_usd, p.original_language \ + FROM reviews r \ + INNER JOIN movies m ON m.id = r.movie_id \ + LEFT JOIN movie_profiles p ON p.movie_id = m.id \ + WHERE r.watched_at >= $1 AND r.watched_at < $2 {scope_clause} \ + ORDER BY r.watched_at ASC" + ); + + let mut q = sqlx::query(&sql) + .bind(range.start) + .bind(range.end); + if let Some(ref uid) = scope_bind { + q = q.bind(uid); + } + + let rows = q.fetch_all(&self.pool).await.map_err(map_err)?; + + if rows.is_empty() { + return Ok(vec![]); + } + + // Collect unique movie IDs + let mut movie_ids: Vec = Vec::new(); + let mut seen = std::collections::HashSet::new(); + for row in &rows { + let mid: String = row.try_get("movie_id").map_err(map_err)?; + if seen.insert(mid.clone()) { + movie_ids.push(mid); + } + } + + // 2) Batch-fetch genres, keywords, cast + let (genres_map, keywords_map, cast_map) = tokio::try_join!( + fetch_genres_pg(&self.pool, &movie_ids), + fetch_keywords_pg(&self.pool, &movie_ids), + fetch_cast_pg(&self.pool, &movie_ids), + )?; + + // 3) Build result + let mut result = Vec::with_capacity(rows.len()); + for row in &rows { + let movie_id_str: String = row.try_get("movie_id").map_err(map_err)?; + let title: String = row.try_get("title").map_err(map_err)?; + let release_year: i64 = row.try_get("release_year").map_err(map_err)?; + let director: Option = row.try_get("director").map_err(map_err)?; + let poster_path: Option = row.try_get("poster_path").map_err(map_err)?; + let rating: i64 = row.try_get("rating").map_err(map_err)?; + let watched_at_str: String = row.try_get("watched_at").map_err(map_err)?; + let user_id_str: String = row.try_get("user_id").map_err(map_err)?; + let runtime_minutes: Option = row.try_get("runtime_minutes").map_err(map_err)?; + let budget_usd: Option = row.try_get("budget_usd").map_err(map_err)?; + let original_language: Option = + row.try_get("original_language").map_err(map_err)?; + + let genres = genres_map + .get(&movie_id_str) + .cloned() + .unwrap_or_default(); + let keywords = keywords_map + .get(&movie_id_str) + .cloned() + .unwrap_or_default(); + let cast = cast_map + .get(&movie_id_str) + .cloned() + .unwrap_or_default(); + + let cast_names: Vec<(String, u32)> = cast + .iter() + .map(|c| (c.name.clone(), c.billing_order)) + .collect(); + let cast_profile_paths: Vec> = + cast.iter().map(|c| c.profile_path.clone()).collect(); + + result.push(WrapUpMovieRow { + movie_id: parse_uuid(&movie_id_str)?, + title, + release_year: release_year as u16, + director, + poster_path, + rating: rating as u8, + watched_at: parse_datetime(&watched_at_str)?, + user_id: parse_uuid(&user_id_str)?, + runtime_minutes: runtime_minutes.map(|v| v as u32), + budget_usd, + original_language, + genres, + keywords, + cast_names, + cast_profile_paths, + }); + } + + Ok(result) + } +} + +#[derive(Clone)] +struct CastEntry { + name: String, + billing_order: u32, + profile_path: Option, +} + +async fn fetch_genres_pg( + pool: &PgPool, + movie_ids: &[String], +) -> Result>, DomainError> { + let rows = sqlx::query( + "SELECT movie_id, name FROM movie_genres WHERE movie_id = ANY($1) ORDER BY name", + ) + .bind(movie_ids) + .fetch_all(pool) + .await + .map_err(map_err)?; + + let mut map: HashMap> = HashMap::new(); + for row in rows { + let mid: String = row.try_get("movie_id").map_err(map_err)?; + let name: String = row.try_get("name").map_err(map_err)?; + map.entry(mid).or_default().push(name); + } + Ok(map) +} + +async fn fetch_keywords_pg( + pool: &PgPool, + movie_ids: &[String], +) -> Result>, DomainError> { + let rows = sqlx::query( + "SELECT movie_id, name FROM movie_keywords WHERE movie_id = ANY($1) ORDER BY name", + ) + .bind(movie_ids) + .fetch_all(pool) + .await + .map_err(map_err)?; + + let mut map: HashMap> = HashMap::new(); + for row in rows { + let mid: String = row.try_get("movie_id").map_err(map_err)?; + let name: String = row.try_get("name").map_err(map_err)?; + map.entry(mid).or_default().push(name); + } + Ok(map) +} + +async fn fetch_cast_pg( + pool: &PgPool, + movie_ids: &[String], +) -> Result>, DomainError> { + let rows = sqlx::query( + "SELECT movie_id, name, billing_order, profile_path \ + FROM movie_cast \ + WHERE movie_id = ANY($1) AND billing_order <= 3 \ + ORDER BY billing_order ASC", + ) + .bind(movie_ids) + .fetch_all(pool) + .await + .map_err(map_err)?; + + let mut map: HashMap> = HashMap::new(); + for row in rows { + let mid: String = row.try_get("movie_id").map_err(map_err)?; + let name: String = row.try_get("name").map_err(map_err)?; + let billing_order: i32 = row.try_get("billing_order").map_err(map_err)?; + let profile_path: Option = row.try_get("profile_path").map_err(map_err)?; + map.entry(mid).or_default().push(CastEntry { + name, + billing_order: billing_order as u32, + profile_path, + }); + } + Ok(map) +} diff --git a/crates/adapters/sqlite/migrations/0024_wrap_up.sql b/crates/adapters/sqlite/migrations/0024_wrap_up.sql new file mode 100644 index 0000000..f3f36c7 --- /dev/null +++ b/crates/adapters/sqlite/migrations/0024_wrap_up.sql @@ -0,0 +1,12 @@ +CREATE TABLE IF NOT EXISTS wrap_up_records ( + id TEXT PRIMARY KEY NOT NULL, + user_id TEXT, + start_date TEXT NOT NULL, + end_date TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'pending', + report_json TEXT, + error_message TEXT, + created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%d %H:%M:%S', 'now')), + completed_at TEXT +); +CREATE INDEX IF NOT EXISTS idx_wrap_up_user ON wrap_up_records (user_id, start_date, end_date); diff --git a/crates/adapters/sqlite/src/lib.rs b/crates/adapters/sqlite/src/lib.rs index d5b5e2f..ce90603 100644 --- a/crates/adapters/sqlite/src/lib.rs +++ b/crates/adapters/sqlite/src/lib.rs @@ -24,6 +24,7 @@ mod profile_fields; mod users; mod watch_event; mod watchlist; +mod wrapup; use models::{ DiaryRow, DirectorCountRow, FeedRow, MonthlyRatingRow, MovieRow, MovieStatsRow, @@ -40,6 +41,7 @@ pub use profile_fields::SqliteProfileFieldsRepository; pub use users::SqliteUserRepository; pub use watch_event::{SqliteWatchEventRepository, SqliteWebhookTokenRepository}; pub use watchlist::SqliteWatchlistRepository; +pub use wrapup::{SqliteWrapUpRepository, SqliteWrapUpStatsQuery}; pub fn create_profile_fields_repo( pool: sqlx::SqlitePool, @@ -998,6 +1000,8 @@ pub struct SqliteWireOutput { pub movie_profile: std::sync::Arc, pub watchlist: std::sync::Arc, pub ap_content: std::sync::Arc, + pub wrapup_repo: std::sync::Arc, + pub wrapup_stats: std::sync::Arc, } pub async fn wire(database_url: &str) -> anyhow::Result { @@ -1031,7 +1035,9 @@ pub async fn wire(database_url: &str) -> anyhow::Result { import_profile: std::sync::Arc::new(SqliteImportProfileRepository::new(pool.clone())) as _, movie_profile: std::sync::Arc::new(SqliteMovieProfileRepository::new(pool.clone())) as _, watchlist: std::sync::Arc::new(SqliteWatchlistRepository::new(pool.clone())) as _, - ap_content: std::sync::Arc::new(SqliteApContentQuery::new(pool)) as _, + ap_content: std::sync::Arc::new(SqliteApContentQuery::new(pool.clone())) as _, + wrapup_repo: std::sync::Arc::new(SqliteWrapUpRepository::new(pool.clone())) as _, + wrapup_stats: std::sync::Arc::new(SqliteWrapUpStatsQuery::new(pool)) as _, }) } diff --git a/crates/adapters/sqlite/src/wrapup.rs b/crates/adapters/sqlite/src/wrapup.rs new file mode 100644 index 0000000..960e07b --- /dev/null +++ b/crates/adapters/sqlite/src/wrapup.rs @@ -0,0 +1,465 @@ +use std::collections::HashMap; + +use async_trait::async_trait; +use chrono::NaiveDate; +use domain::{ + errors::DomainError, + models::wrapup::{DateRange, WrapUpRecord, WrapUpScope, WrapUpStatus}, + ports::{WrapUpMovieRow, WrapUpRepository, WrapUpStatsQuery}, + value_objects::WrapUpId, +}; +use sqlx::{Row, SqlitePool}; +use uuid::Uuid; + +use crate::models::{parse_datetime, parse_uuid}; + +fn map_err(e: sqlx::Error) -> DomainError { + tracing::error!("Database error: {:?}", e); + DomainError::InfrastructureError("Database operation failed".into()) +} + +fn status_to_str(s: &WrapUpStatus) -> &'static str { + match s { + WrapUpStatus::Pending => "pending", + WrapUpStatus::Generating => "generating", + WrapUpStatus::Ready => "ready", + WrapUpStatus::Failed => "failed", + } +} + +fn parse_status(s: &str) -> Result { + match s { + "pending" => Ok(WrapUpStatus::Pending), + "generating" => Ok(WrapUpStatus::Generating), + "ready" => Ok(WrapUpStatus::Ready), + "failed" => Ok(WrapUpStatus::Failed), + other => Err(DomainError::InfrastructureError(format!( + "Unknown wrap-up status: {other}" + ))), + } +} + +fn parse_date(s: &str) -> Result { + NaiveDate::parse_from_str(s, "%Y-%m-%d") + .map_err(|e| DomainError::InfrastructureError(format!("Invalid date '{s}': {e}"))) +} + +// ── WrapUpRepository ───────────────────────────────────────────────────────── + +pub struct SqliteWrapUpRepository { + pool: SqlitePool, +} + +impl SqliteWrapUpRepository { + pub fn new(pool: SqlitePool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl WrapUpRepository for SqliteWrapUpRepository { + async fn create(&self, record: &WrapUpRecord) -> Result<(), DomainError> { + let id = record.id.value().to_string(); + let user_id = record.user_id.map(|u| u.to_string()); + let status = status_to_str(&record.status); + let start = record.start_date.format("%Y-%m-%d").to_string(); + let end = record.end_date.format("%Y-%m-%d").to_string(); + let created = record.created_at.format("%Y-%m-%d %H:%M:%S").to_string(); + let completed = record + .completed_at + .map(|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string()); + + sqlx::query( + "INSERT INTO wrap_up_records \ + (id, user_id, start_date, end_date, status, report_json, error_message, created_at, completed_at) \ + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", + ) + .bind(&id) + .bind(&user_id) + .bind(&start) + .bind(&end) + .bind(status) + .bind(&record.report_json) + .bind(&record.error_message) + .bind(&created) + .bind(&completed) + .execute(&self.pool) + .await + .map_err(map_err)?; + + Ok(()) + } + + async fn update_status( + &self, + id: &WrapUpId, + status: &WrapUpStatus, + error: Option<&str>, + ) -> Result<(), DomainError> { + let id_str = id.value().to_string(); + let status_str = status_to_str(status); + + sqlx::query("UPDATE wrap_up_records SET status = ?, error_message = ? WHERE id = ?") + .bind(status_str) + .bind(error) + .bind(&id_str) + .execute(&self.pool) + .await + .map_err(map_err)?; + + Ok(()) + } + + async fn set_complete(&self, id: &WrapUpId, report_json: &str) -> Result<(), DomainError> { + let id_str = id.value().to_string(); + + sqlx::query( + "UPDATE wrap_up_records \ + SET status = 'ready', report_json = ?, completed_at = strftime('%Y-%m-%d %H:%M:%S', 'now') \ + WHERE id = ?", + ) + .bind(report_json) + .bind(&id_str) + .execute(&self.pool) + .await + .map_err(map_err)?; + + Ok(()) + } + + async fn get_by_id(&self, id: &WrapUpId) -> Result, DomainError> { + let id_str = id.value().to_string(); + + let row = sqlx::query( + "SELECT id, user_id, start_date, end_date, status, report_json, error_message, \ + created_at, completed_at \ + FROM wrap_up_records WHERE id = ?", + ) + .bind(&id_str) + .fetch_optional(&self.pool) + .await + .map_err(map_err)?; + + row.as_ref().map(row_to_record).transpose() + } + + async fn list_for_user(&self, user_id: Uuid) -> Result, DomainError> { + let uid = user_id.to_string(); + + let rows = sqlx::query( + "SELECT id, user_id, start_date, end_date, status, report_json, error_message, \ + created_at, completed_at \ + FROM wrap_up_records WHERE user_id = ? ORDER BY created_at DESC", + ) + .bind(&uid) + .fetch_all(&self.pool) + .await + .map_err(map_err)?; + + rows.iter().map(row_to_record).collect() + } + + async fn list_global(&self) -> Result, DomainError> { + let rows = sqlx::query( + "SELECT id, user_id, start_date, end_date, status, report_json, error_message, \ + created_at, completed_at \ + FROM wrap_up_records WHERE user_id IS NULL ORDER BY created_at DESC", + ) + .fetch_all(&self.pool) + .await + .map_err(map_err)?; + + rows.iter().map(row_to_record).collect() + } + + async fn find_existing( + &self, + user_id: Option, + start: NaiveDate, + end: NaiveDate, + ) -> Result, DomainError> { + let uid = user_id.map(|u| u.to_string()); + let start_str = start.format("%Y-%m-%d").to_string(); + let end_str = end.format("%Y-%m-%d").to_string(); + + let row = sqlx::query( + "SELECT id, user_id, start_date, end_date, status, report_json, error_message, \ + created_at, completed_at \ + FROM wrap_up_records \ + WHERE ((? IS NULL AND user_id IS NULL) OR user_id = ?) \ + AND start_date = ? AND end_date = ? \ + LIMIT 1", + ) + .bind(&uid) + .bind(&uid) + .bind(&start_str) + .bind(&end_str) + .fetch_optional(&self.pool) + .await + .map_err(map_err)?; + + row.as_ref().map(row_to_record).transpose() + } +} + +fn row_to_record(row: &sqlx::sqlite::SqliteRow) -> Result { + let id_str: String = row.try_get("id").map_err(map_err)?; + let user_id_str: Option = row.try_get("user_id").map_err(map_err)?; + let start_date_str: String = row.try_get("start_date").map_err(map_err)?; + let end_date_str: String = row.try_get("end_date").map_err(map_err)?; + let status_str: String = row.try_get("status").map_err(map_err)?; + let report_json: Option = row.try_get("report_json").map_err(map_err)?; + let error_message: Option = row.try_get("error_message").map_err(map_err)?; + let created_at_str: String = row.try_get("created_at").map_err(map_err)?; + let completed_at_str: Option = row.try_get("completed_at").map_err(map_err)?; + + let user_id = user_id_str + .as_deref() + .map(parse_uuid) + .transpose()?; + + Ok(WrapUpRecord { + id: WrapUpId::from_uuid(parse_uuid(&id_str)?), + user_id, + start_date: parse_date(&start_date_str)?, + end_date: parse_date(&end_date_str)?, + status: parse_status(&status_str)?, + report_json, + error_message, + created_at: parse_datetime(&created_at_str)?, + completed_at: completed_at_str.as_deref().map(parse_datetime).transpose()?, + }) +} + +// ── WrapUpStatsQuery ───────────────────────────────────────────────────────── + +pub struct SqliteWrapUpStatsQuery { + pool: SqlitePool, +} + +impl SqliteWrapUpStatsQuery { + pub fn new(pool: SqlitePool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl WrapUpStatsQuery for SqliteWrapUpStatsQuery { + async fn get_reviews_with_profiles( + &self, + scope: &WrapUpScope, + range: &DateRange, + ) -> Result, DomainError> { + let start_str = range.start.format("%Y-%m-%d").to_string(); + let end_str = range.end.format("%Y-%m-%d").to_string(); + + // 1) Main query + let (scope_clause, scope_bind) = match scope { + WrapUpScope::User(uid) => ("AND r.user_id = ?", Some(uid.to_string())), + WrapUpScope::Global => ("", None), + }; + + let sql = format!( + "SELECT r.movie_id, m.title, m.release_year, m.director, m.poster_path, \ + r.rating, r.watched_at, r.user_id, \ + p.runtime_minutes, p.budget_usd, p.original_language \ + FROM reviews r \ + INNER JOIN movies m ON m.id = r.movie_id \ + LEFT JOIN movie_profiles p ON p.movie_id = m.id \ + WHERE r.watched_at >= ? AND r.watched_at < ? {scope_clause} \ + ORDER BY r.watched_at ASC" + ); + + let mut q = sqlx::query(&sql) + .bind(&start_str) + .bind(&end_str); + if let Some(ref uid) = scope_bind { + q = q.bind(uid); + } + + let rows = q.fetch_all(&self.pool).await.map_err(map_err)?; + + if rows.is_empty() { + return Ok(vec![]); + } + + // Collect unique movie IDs + let mut movie_ids: Vec = Vec::new(); + let mut seen = std::collections::HashSet::new(); + for row in &rows { + let mid: String = row.try_get("movie_id").map_err(map_err)?; + if seen.insert(mid.clone()) { + movie_ids.push(mid); + } + } + + // 2) Batch-fetch genres, keywords, cast + let (genres_map, keywords_map, cast_map) = tokio::try_join!( + fetch_genres_sqlite(&self.pool, &movie_ids), + fetch_keywords_sqlite(&self.pool, &movie_ids), + fetch_cast_sqlite(&self.pool, &movie_ids), + )?; + + // 3) Build result + let mut result = Vec::with_capacity(rows.len()); + for row in &rows { + let movie_id_str: String = row.try_get("movie_id").map_err(map_err)?; + let title: String = row.try_get("title").map_err(map_err)?; + let release_year: i64 = row.try_get("release_year").map_err(map_err)?; + let director: Option = row.try_get("director").map_err(map_err)?; + let poster_path: Option = row.try_get("poster_path").map_err(map_err)?; + let rating: i64 = row.try_get("rating").map_err(map_err)?; + let watched_at_str: String = row.try_get("watched_at").map_err(map_err)?; + let user_id_str: String = row.try_get("user_id").map_err(map_err)?; + let runtime_minutes: Option = row.try_get("runtime_minutes").map_err(map_err)?; + let budget_usd: Option = row.try_get("budget_usd").map_err(map_err)?; + let original_language: Option = + row.try_get("original_language").map_err(map_err)?; + + let genres = genres_map + .get(&movie_id_str) + .cloned() + .unwrap_or_default(); + let keywords = keywords_map + .get(&movie_id_str) + .cloned() + .unwrap_or_default(); + let cast = cast_map + .get(&movie_id_str) + .cloned() + .unwrap_or_default(); + + let cast_names: Vec<(String, u32)> = cast + .iter() + .map(|c| (c.name.clone(), c.billing_order)) + .collect(); + let cast_profile_paths: Vec> = + cast.iter().map(|c| c.profile_path.clone()).collect(); + + result.push(WrapUpMovieRow { + movie_id: parse_uuid(&movie_id_str)?, + title, + release_year: release_year as u16, + director, + poster_path, + rating: rating as u8, + watched_at: parse_datetime(&watched_at_str)?, + user_id: parse_uuid(&user_id_str)?, + runtime_minutes: runtime_minutes.map(|v| v as u32), + budget_usd, + original_language, + genres, + keywords, + cast_names, + cast_profile_paths, + }); + } + + Ok(result) + } +} + +#[derive(Clone)] +struct CastEntry { + name: String, + billing_order: u32, + profile_path: Option, +} + +fn in_placeholders(n: usize) -> String { + let mut s = String::new(); + for i in 0..n { + if i > 0 { + s.push(','); + } + s.push('?'); + } + s +} + +async fn fetch_genres_sqlite( + pool: &SqlitePool, + movie_ids: &[String], +) -> Result>, DomainError> { + if movie_ids.is_empty() { + return Ok(HashMap::new()); + } + let sql = format!( + "SELECT movie_id, name FROM movie_genres WHERE movie_id IN ({}) ORDER BY name", + in_placeholders(movie_ids.len()) + ); + let mut q = sqlx::query(&sql); + for id in movie_ids { + q = q.bind(id); + } + let rows = q.fetch_all(pool).await.map_err(map_err)?; + + let mut map: HashMap> = HashMap::new(); + for row in rows { + let mid: String = row.try_get("movie_id").map_err(map_err)?; + let name: String = row.try_get("name").map_err(map_err)?; + map.entry(mid).or_default().push(name); + } + Ok(map) +} + +async fn fetch_keywords_sqlite( + pool: &SqlitePool, + movie_ids: &[String], +) -> Result>, DomainError> { + if movie_ids.is_empty() { + return Ok(HashMap::new()); + } + let sql = format!( + "SELECT movie_id, name FROM movie_keywords WHERE movie_id IN ({}) ORDER BY name", + in_placeholders(movie_ids.len()) + ); + let mut q = sqlx::query(&sql); + for id in movie_ids { + q = q.bind(id); + } + let rows = q.fetch_all(pool).await.map_err(map_err)?; + + let mut map: HashMap> = HashMap::new(); + for row in rows { + let mid: String = row.try_get("movie_id").map_err(map_err)?; + let name: String = row.try_get("name").map_err(map_err)?; + map.entry(mid).or_default().push(name); + } + Ok(map) +} + +async fn fetch_cast_sqlite( + pool: &SqlitePool, + movie_ids: &[String], +) -> Result>, DomainError> { + if movie_ids.is_empty() { + return Ok(HashMap::new()); + } + let sql = format!( + "SELECT movie_id, name, billing_order, profile_path \ + FROM movie_cast \ + WHERE movie_id IN ({}) AND billing_order <= 3 \ + ORDER BY billing_order ASC", + in_placeholders(movie_ids.len()) + ); + let mut q = sqlx::query(&sql); + for id in movie_ids { + q = q.bind(id); + } + let rows = q.fetch_all(pool).await.map_err(map_err)?; + + let mut map: HashMap> = HashMap::new(); + for row in rows { + let mid: String = row.try_get("movie_id").map_err(map_err)?; + let name: String = row.try_get("name").map_err(map_err)?; + let billing_order: i32 = row.try_get("billing_order").map_err(map_err)?; + let profile_path: Option = row.try_get("profile_path").map_err(map_err)?; + map.entry(mid).or_default().push(CastEntry { + name, + billing_order: billing_order as u32, + profile_path, + }); + } + Ok(map) +}