From 40cb15e7cb55e27d264db9952bffc5486cd56390 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Fri, 12 Jun 2026 00:00:15 +0200 Subject: [PATCH] refactor(postgres): split fat PostgresRepository into per-port structs --- crates/adapters/postgres/src/diary.rs | 510 +++++++++++++ crates/adapters/postgres/src/lib.rs | 974 +------------------------ crates/adapters/postgres/src/movie.rs | 242 ++++++ crates/adapters/postgres/src/review.rs | 112 +++ crates/adapters/postgres/src/stats.rs | 153 ++++ crates/adapters/sqlite/src/movie.rs | 8 +- 6 files changed, 1042 insertions(+), 957 deletions(-) create mode 100644 crates/adapters/postgres/src/diary.rs create mode 100644 crates/adapters/postgres/src/movie.rs create mode 100644 crates/adapters/postgres/src/review.rs create mode 100644 crates/adapters/postgres/src/stats.rs diff --git a/crates/adapters/postgres/src/diary.rs b/crates/adapters/postgres/src/diary.rs new file mode 100644 index 0000000..2324672 --- /dev/null +++ b/crates/adapters/postgres/src/diary.rs @@ -0,0 +1,510 @@ +use async_trait::async_trait; +use domain::{ + errors::DomainError, + models::{ + DiaryEntry, DiaryFilter, FeedEntry, MovieStats, ReviewHistory, SortDirection, + collections::{PageParams, Paginated}, + }, + ports::DiaryRepository, + value_objects::{MovieId, UserId}, +}; +use sqlx::PgPool; + +use crate::models::{DiaryRow, FeedRow, MovieRow, MovieStatsRow, ReviewRow}; + +pub struct PostgresDiaryRepository { + pool: PgPool, +} + +impl PostgresDiaryRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } + + fn map_err(e: sqlx::Error) -> DomainError { + tracing::error!("Database error: {:?}", e); + DomainError::InfrastructureError("Database operation failed".into()) + } + + async fn count_diary_entries(&self, movie_id: Option<&str>) -> Result { + match movie_id { + None => sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM reviews") + .fetch_one(&self.pool) + .await + .map_err(Self::map_err), + Some(id) => { + sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM reviews WHERE movie_id = $1") + .bind(id) + .fetch_one(&self.pool) + .await + .map_err(Self::map_err) + } + } + } + + async fn fetch_all_diary_rows( + &self, + sort: &SortDirection, + limit: i64, + offset: i64, + ) -> Result, DomainError> { + let order = match sort { + SortDirection::ByRatingDesc => "r.rating DESC, r.watched_at DESC", + SortDirection::ByRatingAsc => "r.rating ASC, r.watched_at ASC", + SortDirection::Ascending => "r.watched_at ASC", + SortDirection::Descending => "r.watched_at DESC", + }; + let sql = format!( + "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, + r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, + to_char(r.watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at, + to_char(r.created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, + r.remote_actor_url + FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + ORDER BY {} + LIMIT $1 OFFSET $2", + order + ); + sqlx::query_as::<_, DiaryRow>(&sql) + .bind(limit) + .bind(offset) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err) + } + + async fn fetch_movie_diary_rows( + &self, + movie_id: &str, + sort: &SortDirection, + limit: i64, + offset: i64, + ) -> Result, DomainError> { + let order = match sort { + SortDirection::ByRatingDesc => "r.rating DESC, r.watched_at DESC", + SortDirection::ByRatingAsc => "r.rating ASC, r.watched_at ASC", + SortDirection::Ascending => "r.watched_at ASC", + SortDirection::Descending => "r.watched_at DESC", + }; + let sql = format!( + "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, + r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, + to_char(r.watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at, + to_char(r.created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, + r.remote_actor_url + FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + WHERE r.movie_id = $1 + ORDER BY {} + LIMIT $2 OFFSET $3", + order + ); + sqlx::query_as::<_, DiaryRow>(&sql) + .bind(movie_id) + .bind(limit) + .bind(offset) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err) + } + + async fn count_user_diary_entries( + &self, + user_id: &str, + search: Option<&str>, + ) -> Result { + let has_search = search.map(|s| !s.is_empty()).unwrap_or(false); + let sql = if has_search { + "SELECT COUNT(*) FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + WHERE r.user_id = $1 AND m.title ILIKE '%' || $2 || '%'" + .to_string() + } else { + "SELECT COUNT(*) FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + WHERE r.user_id = $1" + .to_string() + }; + let mut q = sqlx::query_scalar::<_, i64>(&sql).bind(user_id); + if has_search { + q = q.bind(search.unwrap()); + } + q.fetch_one(&self.pool).await.map_err(Self::map_err) + } + + async fn fetch_user_diary_rows( + &self, + user_id: &str, + sort: &SortDirection, + search: Option<&str>, + limit: i64, + offset: i64, + ) -> Result, DomainError> { + let has_search = search.map(|s| !s.is_empty()).unwrap_or(false); + let order_clause = match sort { + SortDirection::ByRatingDesc => "r.rating DESC, r.watched_at DESC", + SortDirection::ByRatingAsc => "r.rating ASC, r.watched_at ASC", + SortDirection::Ascending => "r.watched_at ASC", + SortDirection::Descending => "r.watched_at DESC", + }; + + // Build param counter: user_id=$1, optional search=$2, limit=$N-1, offset=$N + let mut p: i32 = 1; // $1 is user_id + let search_clause = if has_search { + p += 1; + format!(" AND m.title ILIKE '%' || ${} || '%'", p) + } else { + String::new() + }; + p += 1; + let limit_param = format!("${}", p); + p += 1; + let offset_param = format!("${}", p); + + let sql = format!( + "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, + r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, + to_char(r.watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at, + to_char(r.created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, + r.remote_actor_url + FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + WHERE r.user_id = $1{} + ORDER BY {} + LIMIT {} OFFSET {}", + search_clause, order_clause, limit_param, offset_param + ); + + let mut q = sqlx::query_as::<_, DiaryRow>(&sql).bind(user_id); + if has_search { + q = q.bind(search.unwrap()); + } + q.bind(limit) + .bind(offset) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err) + } +} + +#[async_trait] +impl DiaryRepository for PostgresDiaryRepository { + async fn query_diary( + &self, + filter: &DiaryFilter, + ) -> Result, DomainError> { + let limit = filter.page.limit as i64; + let offset = filter.page.offset as i64; + + let (total, rows) = match (&filter.movie_id, &filter.user_id) { + (None, None) => tokio::try_join!( + self.count_diary_entries(None), + self.fetch_all_diary_rows(&filter.sort_by, limit, offset) + )?, + (Some(id), None) => { + let id_str = id.value().to_string(); + tokio::try_join!( + self.count_diary_entries(Some(id_str.as_str())), + self.fetch_movie_diary_rows(&id_str, &filter.sort_by, limit, offset) + )? + } + (None, Some(uid)) => { + let uid_str = uid.value().to_string(); + let search = filter.search.as_deref(); + tokio::try_join!( + self.count_user_diary_entries(&uid_str, search), + self.fetch_user_diary_rows(&uid_str, &filter.sort_by, search, limit, offset) + )? + } + (Some(_), Some(_)) => { + return Err(DomainError::ValidationError( + "Combined movie_id + user_id filter not supported".into(), + )); + } + }; + + let items = rows + .into_iter() + .map(DiaryRow::into_domain) + .collect::, _>>()?; + + Ok(Paginated { + items, + total_count: total as u64, + limit: filter.page.limit, + offset: filter.page.offset, + }) + } + + async fn query_activity_feed( + &self, + page: &PageParams, + ) -> Result, DomainError> { + self.query_activity_feed_filtered(page, &domain::ports::FeedSortBy::Date, None, None) + .await + } + + async fn query_activity_feed_filtered( + &self, + page: &PageParams, + sort_by: &domain::ports::FeedSortBy, + search: Option<&str>, + following: Option<&domain::ports::FollowingFilter>, + ) -> Result, DomainError> { + use domain::ports::FeedSortBy; + + let limit = page.limit as i64; + let offset = page.offset as i64; + let has_search = search.map(|s| !s.is_empty()).unwrap_or(false); + + // Dynamic param counter + let mut p: i32 = 0; + let mut next_param = || { + p += 1; + format!("${}", p) + }; + + let mut where_parts = vec!["1=1".to_string()]; + + if has_search { + let pn = next_param(); + where_parts.push(format!("m.title ILIKE '%' || {} || '%'", pn)); + } + + if let Some(f) = following { + let local_params: Vec = f.local_user_ids.iter().map(|_| next_param()).collect(); + let remote_params: Vec = + f.remote_actor_urls.iter().map(|_| next_param()).collect(); + + let local_in = if local_params.is_empty() { + "(SELECT NULL::text WHERE false)".to_string() + } else { + local_params.join(", ") + }; + let remote_in = if remote_params.is_empty() { + "(SELECT NULL::text WHERE false)".to_string() + } else { + remote_params.join(", ") + }; + where_parts.push(format!( + "(r.user_id IN ({}) OR r.remote_actor_url IN ({}))", + local_in, remote_in + )); + } + + let limit_param = next_param(); + let offset_param = next_param(); + + let order_clause = match sort_by { + FeedSortBy::Date => "r.watched_at DESC", + FeedSortBy::DateAsc => "r.watched_at ASC", + FeedSortBy::Rating => "r.rating DESC, r.watched_at DESC", + FeedSortBy::RatingAsc => "r.rating ASC, r.watched_at ASC", + }; + + let where_clause = where_parts.join(" AND "); + + // Reset counter for count query (reuse same where_clause string but re-bind) + // We need a separate counter for count SQL — but since where_clause is already built + // with the right $N references, both queries share it. + let count_sql = format!( + "SELECT COUNT(*) FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + WHERE {}", + where_clause + ); + + let select_sql = format!( + "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, + r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, + to_char(r.watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at, + to_char(r.created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, + r.remote_actor_url, + COALESCE(u.email, r.remote_actor_url) AS user_email + FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + LEFT JOIN users u ON u.id = r.user_id + WHERE {} + ORDER BY {} + LIMIT {} OFFSET {}", + where_clause, order_clause, limit_param, offset_param + ); + + // Bind helper closure — binds search + following params in order + macro_rules! bind_filter_params { + ($q:expr) => {{ + let mut q = $q; + if has_search { + q = q.bind(search.unwrap()); + } + if let Some(f) = following { + for uid in &f.local_user_ids { + q = q.bind(uid.to_string()); + } + for url in &f.remote_actor_urls { + q = q.bind(url.as_str()); + } + } + q + }}; + } + + let count_q = bind_filter_params!(sqlx::query_scalar::<_, i64>(&count_sql)); + let total = count_q.fetch_one(&self.pool).await.map_err(Self::map_err)?; + + let rows_q = bind_filter_params!(sqlx::query_as::<_, FeedRow>(&select_sql)); + let rows = rows_q + .bind(limit) + .bind(offset) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err)?; + + let items = rows + .into_iter() + .map(FeedRow::into_domain) + .collect::, _>>()?; + + Ok(Paginated { + items, + total_count: total as u64, + limit: page.limit, + offset: page.offset, + }) + } + + async fn get_review_history(&self, movie_id: &MovieId) -> Result { + let id_str = movie_id.value().to_string(); + + let movie = sqlx::query_as::<_, MovieRow>( + "SELECT id, external_metadata_id, title, release_year, director, poster_path + FROM movies WHERE id = $1", + ) + .bind(&id_str) + .fetch_optional(&self.pool) + .await + .map_err(Self::map_err)? + .ok_or_else(|| DomainError::NotFound(format!("Movie {}", id_str)))? + .into_domain()?; + + let viewings = sqlx::query_as::<_, ReviewRow>( + "SELECT id, movie_id, user_id, rating, comment, + to_char(watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at, + to_char(created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, + remote_actor_url + FROM reviews WHERE movie_id = $1 ORDER BY watched_at ASC", + ) + .bind(&id_str) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err)? + .into_iter() + .map(ReviewRow::into_domain) + .collect::, _>>()?; + + Ok(ReviewHistory::new(movie, viewings)) + } + + async fn get_user_history(&self, user_id: &UserId) -> Result, DomainError> { + let uid = user_id.value().to_string(); + let rows = sqlx::query_as::<_, DiaryRow>( + "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, + r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, + to_char(r.watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at, + to_char(r.created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, + r.remote_actor_url + FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + WHERE r.user_id = $1 + ORDER BY r.watched_at DESC", + ) + .bind(&uid) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err)?; + + rows.into_iter().map(DiaryRow::into_domain).collect() + } + + async fn get_movie_stats(&self, movie_id: &MovieId) -> Result { + let id_str = movie_id.value().to_string(); + sqlx::query_as::<_, MovieStatsRow>( + "SELECT + COUNT(*) AS total_count, + AVG(CAST(rating AS FLOAT)) AS avg_rating, + COUNT(CASE WHEN remote_actor_url IS NOT NULL THEN 1 END) AS federated_count, + COUNT(CASE WHEN rating = 1 THEN 1 END) AS rating_1, + COUNT(CASE WHEN rating = 2 THEN 1 END) AS rating_2, + COUNT(CASE WHEN rating = 3 THEN 1 END) AS rating_3, + COUNT(CASE WHEN rating = 4 THEN 1 END) AS rating_4, + COUNT(CASE WHEN rating = 5 THEN 1 END) AS rating_5 + FROM reviews WHERE movie_id = $1", + ) + .bind(id_str) + .fetch_one(&self.pool) + .await + .map_err(Self::map_err) + .map(MovieStatsRow::into_domain) + } + + async fn get_movie_social_feed( + &self, + movie_id: &MovieId, + page: &PageParams, + ) -> Result, DomainError> { + let id_str = movie_id.value().to_string(); + let limit = page.limit as i64; + let offset = page.offset as i64; + + let total: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM reviews WHERE movie_id = $1") + .bind(&id_str) + .fetch_one(&self.pool) + .await + .map_err(Self::map_err)?; + + let rows = sqlx::query_as::<_, FeedRow>( + "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, + r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, + to_char(r.watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at, + to_char(r.created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, + r.remote_actor_url, + CASE WHEN r.remote_actor_url IS NOT NULL THEN r.remote_actor_url + WHEN u.email IS NOT NULL THEN u.email + ELSE r.user_id END AS user_email + FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + LEFT JOIN users u ON u.id = r.user_id + WHERE r.movie_id = $1 + ORDER BY r.watched_at DESC + LIMIT $2 OFFSET $3", + ) + .bind(&id_str) + .bind(limit) + .bind(offset) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err)?; + + let items = rows + .into_iter() + .map(FeedRow::into_domain) + .collect::, _>>()?; + + Ok(Paginated { + items, + total_count: total as u64, + limit: page.limit, + offset: page.offset, + }) + } + + async fn count_local_posts(&self) -> Result { + let count: i64 = + sqlx::query_scalar("SELECT COUNT(*) FROM reviews WHERE remote_actor_url IS NULL") + .fetch_one(&self.pool) + .await + .map_err(Self::map_err)?; + Ok(count as u64) + } +} diff --git a/crates/adapters/postgres/src/lib.rs b/crates/adapters/postgres/src/lib.rs index 1c8bf7d..9248978 100644 --- a/crates/adapters/postgres/src/lib.rs +++ b/crates/adapters/postgres/src/lib.rs @@ -1,53 +1,45 @@ -use async_trait::async_trait; -use domain::{ - errors::DomainError, - events::DomainEvent, - models::{ - DiaryEntry, DiaryFilter, DirectorStat, FeedEntry, MonthlyRating, Movie, MovieStats, Review, - ReviewHistory, ReviewSource, SortDirection, UserStats, UserTrends, - collections::{PageParams, Paginated}, - }, - ports::{DiaryRepository, MovieRepository, ReviewRepository, StatsRepository}, - value_objects::{ExternalMetadataId, MovieId, MovieTitle, ReleaseYear, ReviewId, UserId}, -}; +use domain::errors::DomainError; use sqlx::PgPool; mod ap_content; +mod diary; mod goals; mod image_ref; mod import_profile; mod import_session; mod models; +mod movie; mod persons; mod profile; mod profile_fields; mod refresh_sessions; mod remote_goals; +mod review; +mod stats; mod user_settings; mod users; mod watch_event; mod watchlist; mod wrapup; -use models::{ - DiaryRow, DirectorCountRow, FeedRow, MonthlyRatingRow, MovieRow, MovieStatsRow, - MovieSummaryRow, ReviewRow, UserTotalsRow, datetime_to_str, -}; - pub use ap_content::PostgresApContentQuery; +pub use diary::PostgresDiaryRepository; pub use image_ref::{PostgresImageRefAdapter, create_image_ref}; pub use import_profile::PostgresImportProfileRepository; pub use import_session::PostgresImportSessionRepository; +pub use movie::PostgresMovieRepository; pub use persons::{PostgresPersonAdapter, create_person_adapter}; pub use profile::PostgresMovieProfileRepository; pub use profile_fields::PostgresProfileFieldsRepository; pub use refresh_sessions::PostgresRefreshSessionAdapter; +pub use review::PostgresReviewRepository; +pub use stats::PostgresStatsRepository; pub use users::PostgresUserRepository; pub use watch_event::{PostgresWatchEventRepository, PostgresWebhookTokenRepository}; pub use watchlist::PostgresWatchlistRepository; pub use wrapup::{PostgresWrapUpRepository, PostgresWrapUpStatsQuery}; -fn format_year_month(ym: &str) -> String { +pub(crate) fn format_year_month(ym: &str) -> String { let parts: Vec<&str> = ym.splitn(2, '-').collect(); if parts.len() != 2 { return ym.to_string(); @@ -71,937 +63,12 @@ fn format_year_month(ym: &str) -> String { format!("{} '{}", month, year) } -pub struct PostgresRepository { - pool: PgPool, -} - -impl PostgresRepository { - pub fn new(pool: PgPool) -> Self { - Self { pool } - } - - pub async fn migrate(&self) -> Result<(), DomainError> { - sqlx::migrate!("./migrations") - .set_ignore_missing(true) - .run(&self.pool) - .await - .map_err(|e| DomainError::InfrastructureError(format!("Migration failed: {}", e))) - } - - fn map_err(e: sqlx::Error) -> DomainError { - tracing::error!("Database error: {:?}", e); - DomainError::InfrastructureError("Database operation failed".into()) - } - - async fn count_diary_entries(&self, movie_id: Option<&str>) -> Result { - match movie_id { - None => sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM reviews") - .fetch_one(&self.pool) - .await - .map_err(Self::map_err), - Some(id) => { - sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM reviews WHERE movie_id = $1") - .bind(id) - .fetch_one(&self.pool) - .await - .map_err(Self::map_err) - } - } - } - - async fn fetch_all_diary_rows( - &self, - sort: &SortDirection, - limit: i64, - offset: i64, - ) -> Result, DomainError> { - let order = match sort { - SortDirection::ByRatingDesc => "r.rating DESC, r.watched_at DESC", - SortDirection::ByRatingAsc => "r.rating ASC, r.watched_at ASC", - SortDirection::Ascending => "r.watched_at ASC", - SortDirection::Descending => "r.watched_at DESC", - }; - let sql = format!( - "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, - r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, - to_char(r.watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at, - to_char(r.created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, - r.remote_actor_url - FROM reviews r - INNER JOIN movies m ON m.id = r.movie_id - ORDER BY {} - LIMIT $1 OFFSET $2", - order - ); - sqlx::query_as::<_, DiaryRow>(&sql) - .bind(limit) - .bind(offset) - .fetch_all(&self.pool) - .await - .map_err(Self::map_err) - } - - async fn fetch_movie_diary_rows( - &self, - movie_id: &str, - sort: &SortDirection, - limit: i64, - offset: i64, - ) -> Result, DomainError> { - let order = match sort { - SortDirection::ByRatingDesc => "r.rating DESC, r.watched_at DESC", - SortDirection::ByRatingAsc => "r.rating ASC, r.watched_at ASC", - SortDirection::Ascending => "r.watched_at ASC", - SortDirection::Descending => "r.watched_at DESC", - }; - let sql = format!( - "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, - r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, - to_char(r.watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at, - to_char(r.created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, - r.remote_actor_url - FROM reviews r - INNER JOIN movies m ON m.id = r.movie_id - WHERE r.movie_id = $1 - ORDER BY {} - LIMIT $2 OFFSET $3", - order - ); - sqlx::query_as::<_, DiaryRow>(&sql) - .bind(movie_id) - .bind(limit) - .bind(offset) - .fetch_all(&self.pool) - .await - .map_err(Self::map_err) - } - - async fn count_user_diary_entries( - &self, - user_id: &str, - search: Option<&str>, - ) -> Result { - let has_search = search.map(|s| !s.is_empty()).unwrap_or(false); - let sql = if has_search { - "SELECT COUNT(*) FROM reviews r - INNER JOIN movies m ON m.id = r.movie_id - WHERE r.user_id = $1 AND m.title ILIKE '%' || $2 || '%'" - .to_string() - } else { - "SELECT COUNT(*) FROM reviews r - INNER JOIN movies m ON m.id = r.movie_id - WHERE r.user_id = $1" - .to_string() - }; - let mut q = sqlx::query_scalar::<_, i64>(&sql).bind(user_id); - if has_search { - q = q.bind(search.unwrap()); - } - q.fetch_one(&self.pool).await.map_err(Self::map_err) - } - - async fn fetch_user_diary_rows( - &self, - user_id: &str, - sort: &SortDirection, - search: Option<&str>, - limit: i64, - offset: i64, - ) -> Result, DomainError> { - let has_search = search.map(|s| !s.is_empty()).unwrap_or(false); - let order_clause = match sort { - SortDirection::ByRatingDesc => "r.rating DESC, r.watched_at DESC", - SortDirection::ByRatingAsc => "r.rating ASC, r.watched_at ASC", - SortDirection::Ascending => "r.watched_at ASC", - SortDirection::Descending => "r.watched_at DESC", - }; - - // Build param counter: user_id=$1, optional search=$2, limit=$N-1, offset=$N - let mut p: i32 = 1; // $1 is user_id - let search_clause = if has_search { - p += 1; - format!(" AND m.title ILIKE '%' || ${} || '%'", p) - } else { - String::new() - }; - p += 1; - let limit_param = format!("${}", p); - p += 1; - let offset_param = format!("${}", p); - - let sql = format!( - "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, - r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, - to_char(r.watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at, - to_char(r.created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, - r.remote_actor_url - FROM reviews r - INNER JOIN movies m ON m.id = r.movie_id - WHERE r.user_id = $1{} - ORDER BY {} - LIMIT {} OFFSET {}", - search_clause, order_clause, limit_param, offset_param - ); - - let mut q = sqlx::query_as::<_, DiaryRow>(&sql).bind(user_id); - if has_search { - q = q.bind(search.unwrap()); - } - q.bind(limit) - .bind(offset) - .fetch_all(&self.pool) - .await - .map_err(Self::map_err) - } - - async fn fetch_user_totals(&self, user_id: &str) -> Result { - sqlx::query_as::<_, UserTotalsRow>( - r#"SELECT COUNT(DISTINCT movie_id) AS total, - AVG(rating::float) AS avg_rating - FROM reviews WHERE user_id = $1"#, - ) - .bind(user_id) - .fetch_one(&self.pool) +pub async fn migrate(pool: &PgPool) -> Result<(), DomainError> { + sqlx::migrate!("./migrations") + .set_ignore_missing(true) + .run(pool) .await - .map_err(Self::map_err) - } - - async fn fetch_user_favorite_director( - &self, - user_id: &str, - ) -> Result, DomainError> { - sqlx::query_scalar::<_, String>( - "SELECT m.director - FROM reviews r - INNER JOIN movies m ON m.id = r.movie_id - WHERE r.user_id = $1 AND m.director IS NOT NULL - GROUP BY m.director - ORDER BY COUNT(*) DESC - LIMIT 1", - ) - .bind(user_id) - .fetch_optional(&self.pool) - .await - .map_err(Self::map_err) - } - - async fn fetch_user_most_active_month( - &self, - user_id: &str, - ) -> Result, DomainError> { - sqlx::query_scalar::<_, String>( - "SELECT to_char(watched_at AT TIME ZONE 'UTC', 'YYYY-MM') AS month - FROM reviews - WHERE user_id = $1 - GROUP BY to_char(watched_at AT TIME ZONE 'UTC', 'YYYY-MM') - ORDER BY COUNT(*) DESC - LIMIT 1", - ) - .bind(user_id) - .fetch_optional(&self.pool) - .await - .map_err(Self::map_err) - } -} - -#[async_trait] -impl MovieRepository for PostgresRepository { - async fn get_movie_by_external_id( - &self, - external_metadata_id: &ExternalMetadataId, - ) -> Result, DomainError> { - let id = external_metadata_id.value(); - sqlx::query_as::<_, MovieRow>( - "SELECT id, external_metadata_id, title, release_year, director, poster_path - FROM movies WHERE external_metadata_id = $1", - ) - .bind(id) - .fetch_optional(&self.pool) - .await - .map_err(Self::map_err)? - .map(MovieRow::into_domain) - .transpose() - } - - async fn get_movie_by_id(&self, movie_id: &MovieId) -> Result, DomainError> { - let id = movie_id.value().to_string(); - sqlx::query_as::<_, MovieRow>( - "SELECT id, external_metadata_id, title, release_year, director, poster_path - FROM movies WHERE id = $1", - ) - .bind(&id) - .fetch_optional(&self.pool) - .await - .map_err(Self::map_err)? - .map(MovieRow::into_domain) - .transpose() - } - - async fn get_movies_by_title_and_year( - &self, - title: &MovieTitle, - year: &ReleaseYear, - ) -> Result, DomainError> { - let title = title.value(); - let year = year.value() as i64; - sqlx::query_as::<_, MovieRow>( - "SELECT id, external_metadata_id, title, release_year, director, poster_path - FROM movies WHERE title = $1 AND release_year = $2", - ) - .bind(title) - .bind(year) - .fetch_all(&self.pool) - .await - .map_err(Self::map_err)? - .into_iter() - .map(MovieRow::into_domain) - .collect() - } - - async fn upsert_movie(&self, movie: &Movie) -> Result<(), DomainError> { - let id = movie.id().value().to_string(); - let external_metadata_id = movie.external_metadata_id().map(|e| e.value().to_string()); - let title = movie.title().value(); - let release_year = movie.release_year().value() as i64; - let director = movie.director(); - let poster_path = movie.poster_path().map(|p| p.value().to_string()); - - sqlx::query( - "INSERT INTO movies (id, external_metadata_id, title, release_year, director, poster_path) - VALUES ($1, $2, $3, $4, $5, $6) - ON CONFLICT(id) DO UPDATE SET - external_metadata_id = excluded.external_metadata_id, - title = excluded.title, - release_year = excluded.release_year, - director = excluded.director, - poster_path = excluded.poster_path", - ) - .bind(&id) - .bind(&external_metadata_id) - .bind(title) - .bind(release_year) - .bind(director) - .bind(&poster_path) - .execute(&self.pool) - .await - .map_err(Self::map_err)?; - - Ok(()) - } - - async fn delete_movie(&self, movie_id: &MovieId) -> Result<(), DomainError> { - let id = movie_id.value().to_string(); - sqlx::query("DELETE FROM movies WHERE id = $1") - .bind(&id) - .execute(&self.pool) - .await - .map_err(Self::map_err)?; - Ok(()) - } - - async fn existing_external_ids( - &self, - ids: &[ExternalMetadataId], - ) -> Result, DomainError> { - if ids.is_empty() { - return Ok(Default::default()); - } - let vals: Vec = ids.iter().map(|id| id.value().to_string()).collect(); - let rows: Vec<(String,)> = sqlx::query_as( - "SELECT external_metadata_id FROM movies WHERE external_metadata_id = ANY($1)", - ) - .bind(&vals) - .fetch_all(&self.pool) - .await - .map_err(Self::map_err)?; - Ok(rows.into_iter().map(|(id,)| id).collect()) - } - - async fn existing_title_year_pairs( - &self, - pairs: &[(MovieTitle, ReleaseYear)], - ) -> Result, DomainError> { - if pairs.is_empty() { - return Ok(Default::default()); - } - let titles: Vec<&str> = pairs.iter().map(|(t, _)| t.value()).collect(); - let years: Vec = pairs.iter().map(|(_, y)| y.value() as i64).collect(); - use sqlx::Row; - let rows = sqlx::query( - "SELECT DISTINCT m.title, m.release_year FROM movies m \ - INNER JOIN unnest($1::text[], $2::bigint[]) AS p(title, release_year) \ - ON m.title = p.title AND m.release_year = p.release_year", - ) - .bind(&titles) - .bind(&years) - .fetch_all(&self.pool) - .await - .map_err(Self::map_err)?; - Ok(rows - .into_iter() - .map(|r| { - let t: String = r.get("title"); - let y: i64 = r.get("release_year"); - (t, y as u16) - }) - .collect()) - } - - async fn list_movies( - &self, - page: &domain::models::collections::PageParams, - filter: &domain::models::MovieFilter, - ) -> Result, DomainError> - { - use sqlx::Row; - let limit = page.limit as i64; - let offset = page.offset as i64; - let pattern = filter - .search - .as_deref() - .map(|s| format!("%{}%", s.to_lowercase())); - let genre = filter.genre.as_deref(); - let language = filter.language.as_deref(); - - let rows: Vec = sqlx::query_as( - "SELECT \ - m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, \ - p.overview, p.runtime_minutes, p.original_language, p.collection_name, \ - array_agg(g.name) FILTER (WHERE g.name IS NOT NULL) AS genres \ - FROM movies m \ - LEFT JOIN movie_profiles p ON p.movie_id = m.id \ - LEFT JOIN movie_genres g ON g.movie_id = m.id \ - WHERE ($1::text IS NULL OR LOWER(m.title) LIKE $1) \ - AND ($2::text IS NULL OR p.original_language = $2) \ - AND ($3::text IS NULL OR m.id IN (SELECT movie_id FROM movie_genres WHERE LOWER(name) = LOWER($3))) \ - GROUP BY m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, \ - p.overview, p.runtime_minutes, p.original_language, p.collection_name \ - ORDER BY m.title ASC \ - LIMIT $4 OFFSET $5", - ) - .bind(&pattern) - .bind(language) - .bind(genre) - .bind(limit) - .bind(offset) - .fetch_all(&self.pool) - .await - .map_err(Self::map_err)?; - - let total: i64 = sqlx::query( - "SELECT COUNT(DISTINCT m.id) \ - FROM movies m \ - LEFT JOIN movie_profiles p ON p.movie_id = m.id \ - WHERE ($1::text IS NULL OR LOWER(m.title) LIKE $1) \ - AND ($2::text IS NULL OR p.original_language = $2) \ - AND ($3::text IS NULL OR m.id IN (SELECT movie_id FROM movie_genres WHERE LOWER(name) = LOWER($3)))", - ) - .bind(&pattern) - .bind(language) - .bind(genre) - .fetch_one(&self.pool) - .await - .map_err(Self::map_err)? - .try_get(0) - .unwrap_or(0); - - let items = rows - .into_iter() - .map(|r| r.into_domain()) - .collect::, _>>()?; - - Ok(domain::models::collections::Paginated { - items, - total_count: total as u64, - limit: page.limit, - offset: page.offset, - }) - } -} - -#[async_trait] -impl ReviewRepository for PostgresRepository { - async fn save_review(&self, review: &Review) -> Result { - let id = review.id().value().to_string(); - let movie_id = review.movie_id().value().to_string(); - let user_id = review.user_id().value().to_string(); - let rating = review.rating().value() as i64; - let comment = review.comment().map(|c| c.value().to_string()); - let watched_at = datetime_to_str(review.watched_at()); - let created_at = datetime_to_str(review.created_at()); - let remote_actor_url = match review.source() { - ReviewSource::Local => None, - ReviewSource::Remote { actor_url } => Some(actor_url.clone()), - }; - - sqlx::query( - "INSERT INTO reviews (id, movie_id, user_id, rating, comment, watched_at, created_at, remote_actor_url) - VALUES ($1, $2, $3, $4, $5, $6::timestamptz, $7::timestamptz, $8)", - ) - .bind(&id) - .bind(&movie_id) - .bind(&user_id) - .bind(rating) - .bind(&comment) - .bind(&watched_at) - .bind(&created_at) - .bind(&remote_actor_url) - .execute(&self.pool) - .await - .map_err(Self::map_err)?; - - Ok(DomainEvent::ReviewLogged { - review_id: review.id().clone(), - movie_id: review.movie_id().clone(), - user_id: review.user_id().clone(), - rating: review.rating().clone(), - watched_at: *review.watched_at(), - }) - } - - async fn get_review_by_id(&self, review_id: &ReviewId) -> Result, DomainError> { - let id = review_id.value().to_string(); - sqlx::query_as::<_, ReviewRow>( - "SELECT id, movie_id, user_id, rating, comment, - to_char(watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at, - to_char(created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, - remote_actor_url - FROM reviews WHERE id = $1", - ) - .bind(&id) - .fetch_optional(&self.pool) - .await - .map_err(Self::map_err)? - .map(ReviewRow::into_domain) - .transpose() - } - - async fn delete_review(&self, review_id: &ReviewId) -> Result<(), DomainError> { - let id = review_id.value().to_string(); - sqlx::query("DELETE FROM reviews WHERE id = $1") - .bind(&id) - .execute(&self.pool) - .await - .map_err(Self::map_err)?; - Ok(()) - } - - async fn get_all_reviews_for_user(&self, user_id: &UserId) -> Result, DomainError> { - let uid = user_id.value().to_string(); - sqlx::query_as::<_, ReviewRow>( - "SELECT id, movie_id, user_id, rating, comment, - to_char(watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at, - to_char(created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, - remote_actor_url - FROM reviews WHERE user_id = $1 ORDER BY watched_at DESC", - ) - .bind(&uid) - .fetch_all(&self.pool) - .await - .map_err(Self::map_err)? - .into_iter() - .map(ReviewRow::into_domain) - .collect() - } -} - -#[async_trait] -impl DiaryRepository for PostgresRepository { - async fn query_diary( - &self, - filter: &DiaryFilter, - ) -> Result, DomainError> { - let limit = filter.page.limit as i64; - let offset = filter.page.offset as i64; - - let (total, rows) = match (&filter.movie_id, &filter.user_id) { - (None, None) => tokio::try_join!( - self.count_diary_entries(None), - self.fetch_all_diary_rows(&filter.sort_by, limit, offset) - )?, - (Some(id), None) => { - let id_str = id.value().to_string(); - tokio::try_join!( - self.count_diary_entries(Some(id_str.as_str())), - self.fetch_movie_diary_rows(&id_str, &filter.sort_by, limit, offset) - )? - } - (None, Some(uid)) => { - let uid_str = uid.value().to_string(); - let search = filter.search.as_deref(); - tokio::try_join!( - self.count_user_diary_entries(&uid_str, search), - self.fetch_user_diary_rows(&uid_str, &filter.sort_by, search, limit, offset) - )? - } - (Some(_), Some(_)) => { - return Err(DomainError::ValidationError( - "Combined movie_id + user_id filter not supported".into(), - )); - } - }; - - let items = rows - .into_iter() - .map(DiaryRow::into_domain) - .collect::, _>>()?; - - Ok(Paginated { - items, - total_count: total as u64, - limit: filter.page.limit, - offset: filter.page.offset, - }) - } - - async fn query_activity_feed( - &self, - page: &PageParams, - ) -> Result, DomainError> { - self.query_activity_feed_filtered(page, &domain::ports::FeedSortBy::Date, None, None) - .await - } - - async fn query_activity_feed_filtered( - &self, - page: &PageParams, - sort_by: &domain::ports::FeedSortBy, - search: Option<&str>, - following: Option<&domain::ports::FollowingFilter>, - ) -> Result, DomainError> { - use domain::ports::FeedSortBy; - - let limit = page.limit as i64; - let offset = page.offset as i64; - let has_search = search.map(|s| !s.is_empty()).unwrap_or(false); - - // Dynamic param counter - let mut p: i32 = 0; - let mut next_param = || { - p += 1; - format!("${}", p) - }; - - let mut where_parts = vec!["1=1".to_string()]; - - if has_search { - let pn = next_param(); - where_parts.push(format!("m.title ILIKE '%' || {} || '%'", pn)); - } - - if let Some(f) = following { - let local_params: Vec = f.local_user_ids.iter().map(|_| next_param()).collect(); - let remote_params: Vec = - f.remote_actor_urls.iter().map(|_| next_param()).collect(); - - let local_in = if local_params.is_empty() { - "(SELECT NULL::text WHERE false)".to_string() - } else { - local_params.join(", ") - }; - let remote_in = if remote_params.is_empty() { - "(SELECT NULL::text WHERE false)".to_string() - } else { - remote_params.join(", ") - }; - where_parts.push(format!( - "(r.user_id IN ({}) OR r.remote_actor_url IN ({}))", - local_in, remote_in - )); - } - - let limit_param = next_param(); - let offset_param = next_param(); - - let order_clause = match sort_by { - FeedSortBy::Date => "r.watched_at DESC", - FeedSortBy::DateAsc => "r.watched_at ASC", - FeedSortBy::Rating => "r.rating DESC, r.watched_at DESC", - FeedSortBy::RatingAsc => "r.rating ASC, r.watched_at ASC", - }; - - let where_clause = where_parts.join(" AND "); - - // Reset counter for count query (reuse same where_clause string but re-bind) - // We need a separate counter for count SQL — but since where_clause is already built - // with the right $N references, both queries share it. - let count_sql = format!( - "SELECT COUNT(*) FROM reviews r - INNER JOIN movies m ON m.id = r.movie_id - WHERE {}", - where_clause - ); - - let select_sql = format!( - "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, - r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, - to_char(r.watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at, - to_char(r.created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, - r.remote_actor_url, - COALESCE(u.email, r.remote_actor_url) AS user_email - FROM reviews r - INNER JOIN movies m ON m.id = r.movie_id - LEFT JOIN users u ON u.id = r.user_id - WHERE {} - ORDER BY {} - LIMIT {} OFFSET {}", - where_clause, order_clause, limit_param, offset_param - ); - - // Bind helper closure — binds search + following params in order - macro_rules! bind_filter_params { - ($q:expr) => {{ - let mut q = $q; - if has_search { - q = q.bind(search.unwrap()); - } - if let Some(f) = following { - for uid in &f.local_user_ids { - q = q.bind(uid.to_string()); - } - for url in &f.remote_actor_urls { - q = q.bind(url.as_str()); - } - } - q - }}; - } - - let count_q = bind_filter_params!(sqlx::query_scalar::<_, i64>(&count_sql)); - let total = count_q.fetch_one(&self.pool).await.map_err(Self::map_err)?; - - let rows_q = bind_filter_params!(sqlx::query_as::<_, FeedRow>(&select_sql)); - let rows = rows_q - .bind(limit) - .bind(offset) - .fetch_all(&self.pool) - .await - .map_err(Self::map_err)?; - - let items = rows - .into_iter() - .map(FeedRow::into_domain) - .collect::, _>>()?; - - Ok(Paginated { - items, - total_count: total as u64, - limit: page.limit, - offset: page.offset, - }) - } - - async fn get_review_history(&self, movie_id: &MovieId) -> Result { - let id_str = movie_id.value().to_string(); - - let movie = sqlx::query_as::<_, MovieRow>( - "SELECT id, external_metadata_id, title, release_year, director, poster_path - FROM movies WHERE id = $1", - ) - .bind(&id_str) - .fetch_optional(&self.pool) - .await - .map_err(Self::map_err)? - .ok_or_else(|| DomainError::NotFound(format!("Movie {}", id_str)))? - .into_domain()?; - - let viewings = sqlx::query_as::<_, ReviewRow>( - "SELECT id, movie_id, user_id, rating, comment, - to_char(watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at, - to_char(created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, - remote_actor_url - FROM reviews WHERE movie_id = $1 ORDER BY watched_at ASC", - ) - .bind(&id_str) - .fetch_all(&self.pool) - .await - .map_err(Self::map_err)? - .into_iter() - .map(ReviewRow::into_domain) - .collect::, _>>()?; - - Ok(ReviewHistory::new(movie, viewings)) - } - - async fn get_user_history(&self, user_id: &UserId) -> Result, DomainError> { - let uid = user_id.value().to_string(); - let rows = sqlx::query_as::<_, DiaryRow>( - "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, - r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, - to_char(r.watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at, - to_char(r.created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, - r.remote_actor_url - FROM reviews r - INNER JOIN movies m ON m.id = r.movie_id - WHERE r.user_id = $1 - ORDER BY r.watched_at DESC", - ) - .bind(&uid) - .fetch_all(&self.pool) - .await - .map_err(Self::map_err)?; - - rows.into_iter().map(DiaryRow::into_domain).collect() - } - - async fn get_movie_stats(&self, movie_id: &MovieId) -> Result { - let id_str = movie_id.value().to_string(); - sqlx::query_as::<_, MovieStatsRow>( - "SELECT - COUNT(*) AS total_count, - AVG(CAST(rating AS FLOAT)) AS avg_rating, - COUNT(CASE WHEN remote_actor_url IS NOT NULL THEN 1 END) AS federated_count, - COUNT(CASE WHEN rating = 1 THEN 1 END) AS rating_1, - COUNT(CASE WHEN rating = 2 THEN 1 END) AS rating_2, - COUNT(CASE WHEN rating = 3 THEN 1 END) AS rating_3, - COUNT(CASE WHEN rating = 4 THEN 1 END) AS rating_4, - COUNT(CASE WHEN rating = 5 THEN 1 END) AS rating_5 - FROM reviews WHERE movie_id = $1", - ) - .bind(id_str) - .fetch_one(&self.pool) - .await - .map_err(Self::map_err) - .map(MovieStatsRow::into_domain) - } - - async fn get_movie_social_feed( - &self, - movie_id: &MovieId, - page: &PageParams, - ) -> Result, DomainError> { - let id_str = movie_id.value().to_string(); - let limit = page.limit as i64; - let offset = page.offset as i64; - - let total: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM reviews WHERE movie_id = $1") - .bind(&id_str) - .fetch_one(&self.pool) - .await - .map_err(Self::map_err)?; - - let rows = sqlx::query_as::<_, FeedRow>( - "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, - r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, - to_char(r.watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at, - to_char(r.created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, - r.remote_actor_url, - CASE WHEN r.remote_actor_url IS NOT NULL THEN r.remote_actor_url - WHEN u.email IS NOT NULL THEN u.email - ELSE r.user_id END AS user_email - FROM reviews r - INNER JOIN movies m ON m.id = r.movie_id - LEFT JOIN users u ON u.id = r.user_id - WHERE r.movie_id = $1 - ORDER BY r.watched_at DESC - LIMIT $2 OFFSET $3", - ) - .bind(&id_str) - .bind(limit) - .bind(offset) - .fetch_all(&self.pool) - .await - .map_err(Self::map_err)?; - - let items = rows - .into_iter() - .map(FeedRow::into_domain) - .collect::, _>>()?; - - Ok(Paginated { - items, - total_count: total as u64, - limit: page.limit, - offset: page.offset, - }) - } - - async fn count_local_posts(&self) -> Result { - let count: i64 = - sqlx::query_scalar("SELECT COUNT(*) FROM reviews WHERE remote_actor_url IS NULL") - .fetch_one(&self.pool) - .await - .map_err(Self::map_err)?; - Ok(count as u64) - } -} - -#[async_trait] -impl StatsRepository for PostgresRepository { - async fn get_user_stats(&self, user_id: &UserId) -> Result { - let uid = user_id.value().to_string(); - - let (totals, fav_director, most_active) = tokio::try_join!( - self.fetch_user_totals(&uid), - self.fetch_user_favorite_director(&uid), - self.fetch_user_most_active_month(&uid) - )?; - - let most_active_month = most_active.map(|ym| format_year_month(&ym)); - - Ok(UserStats { - total_movies: totals.total, - avg_rating: totals.avg_rating, - favorite_director: fav_director, - most_active_month, - }) - } - - async fn get_user_trends(&self, user_id: &UserId) -> Result { - let uid = user_id.value().to_string(); - - let (rating_rows, director_rows) = tokio::try_join!( - sqlx::query_as::<_, MonthlyRatingRow>( - "SELECT to_char(watched_at AT TIME ZONE 'UTC', 'YYYY-MM') AS month, - AVG(rating::float) AS avg_rating, - COUNT(*) AS count - FROM reviews - WHERE user_id = $1 AND watched_at >= NOW() - INTERVAL '12 months' - GROUP BY to_char(watched_at AT TIME ZONE 'UTC', 'YYYY-MM') - ORDER BY to_char(watched_at AT TIME ZONE 'UTC', 'YYYY-MM') ASC" - ) - .bind(&uid) - .fetch_all(&self.pool), - sqlx::query_as::<_, DirectorCountRow>( - "SELECT m.director AS director, COUNT(*) AS count - FROM reviews r - INNER JOIN movies m ON m.id = r.movie_id - WHERE r.user_id = $1 AND m.director IS NOT NULL - GROUP BY m.director - ORDER BY COUNT(*) DESC - LIMIT 5" - ) - .bind(&uid) - .fetch_all(&self.pool) - ) - .map_err(Self::map_err)?; - - let max_director_count = director_rows.iter().map(|d| d.count).max().unwrap_or(1); - - let monthly_ratings = rating_rows - .into_iter() - .map(|r| MonthlyRating { - month_label: format_year_month(&r.month), - year_month: r.month, - avg_rating: r.avg_rating, - count: r.count, - }) - .collect(); - - let top_directors = director_rows - .into_iter() - .map(|d| DirectorStat { - director: d.director, - count: d.count, - }) - .collect(); - - Ok(UserTrends { - monthly_ratings, - top_directors, - max_director_count, - }) - } + .map_err(|e| DomainError::InfrastructureError(e.to_string())) } pub fn create_profile_fields_repo( @@ -1036,18 +103,17 @@ pub async fn wire(database_url: &str) -> anyhow::Result { .await .context("Failed to connect to PostgreSQL database")?; - let repo = std::sync::Arc::new(PostgresRepository::new(pool.clone())); - repo.migrate() + migrate(&pool) .await .map_err(|e| anyhow::anyhow!("{e}")) .context("Database migration failed")?; Ok(PostgresWireOutput { pool: pool.clone(), - movie: std::sync::Arc::clone(&repo) as _, - review: std::sync::Arc::clone(&repo) as _, - diary: std::sync::Arc::clone(&repo) as _, - stats: std::sync::Arc::clone(&repo) as _, + movie: std::sync::Arc::new(PostgresMovieRepository::new(pool.clone())) as _, + review: std::sync::Arc::new(PostgresReviewRepository::new(pool.clone())) as _, + diary: std::sync::Arc::new(PostgresDiaryRepository::new(pool.clone())) as _, + stats: std::sync::Arc::new(PostgresStatsRepository::new(pool.clone())) as _, user: std::sync::Arc::new(PostgresUserRepository::new(pool.clone())) as _, import_session: std::sync::Arc::new(PostgresImportSessionRepository::new(pool.clone())) as _, diff --git a/crates/adapters/postgres/src/movie.rs b/crates/adapters/postgres/src/movie.rs new file mode 100644 index 0000000..584b18a --- /dev/null +++ b/crates/adapters/postgres/src/movie.rs @@ -0,0 +1,242 @@ +use async_trait::async_trait; +use domain::{ + errors::DomainError, + models::{ + Movie, MovieFilter, MovieSummary, + collections::{PageParams, Paginated}, + }, + ports::MovieRepository, + value_objects::{ExternalMetadataId, MovieId, MovieTitle, ReleaseYear}, +}; +use sqlx::PgPool; + +use crate::models::{MovieRow, MovieSummaryRow}; + +pub struct PostgresMovieRepository { + pool: PgPool, +} + +impl PostgresMovieRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } + + fn map_err(e: sqlx::Error) -> DomainError { + tracing::error!("Database error: {:?}", e); + DomainError::InfrastructureError("Database operation failed".into()) + } +} + +#[async_trait] +impl MovieRepository for PostgresMovieRepository { + async fn get_movie_by_external_id( + &self, + external_metadata_id: &ExternalMetadataId, + ) -> Result, DomainError> { + let id = external_metadata_id.value(); + sqlx::query_as::<_, MovieRow>( + "SELECT id, external_metadata_id, title, release_year, director, poster_path + FROM movies WHERE external_metadata_id = $1", + ) + .bind(id) + .fetch_optional(&self.pool) + .await + .map_err(Self::map_err)? + .map(MovieRow::into_domain) + .transpose() + } + + async fn get_movie_by_id(&self, movie_id: &MovieId) -> Result, DomainError> { + let id = movie_id.value().to_string(); + sqlx::query_as::<_, MovieRow>( + "SELECT id, external_metadata_id, title, release_year, director, poster_path + FROM movies WHERE id = $1", + ) + .bind(&id) + .fetch_optional(&self.pool) + .await + .map_err(Self::map_err)? + .map(MovieRow::into_domain) + .transpose() + } + + async fn get_movies_by_title_and_year( + &self, + title: &MovieTitle, + year: &ReleaseYear, + ) -> Result, DomainError> { + let title = title.value(); + let year = year.value() as i64; + sqlx::query_as::<_, MovieRow>( + "SELECT id, external_metadata_id, title, release_year, director, poster_path + FROM movies WHERE title = $1 AND release_year = $2", + ) + .bind(title) + .bind(year) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err)? + .into_iter() + .map(MovieRow::into_domain) + .collect() + } + + async fn upsert_movie(&self, movie: &Movie) -> Result<(), DomainError> { + let id = movie.id().value().to_string(); + let external_metadata_id = movie.external_metadata_id().map(|e| e.value().to_string()); + let title = movie.title().value(); + let release_year = movie.release_year().value() as i64; + let director = movie.director(); + let poster_path = movie.poster_path().map(|p| p.value().to_string()); + + sqlx::query( + "INSERT INTO movies (id, external_metadata_id, title, release_year, director, poster_path) + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT(id) DO UPDATE SET + external_metadata_id = excluded.external_metadata_id, + title = excluded.title, + release_year = excluded.release_year, + director = excluded.director, + poster_path = excluded.poster_path", + ) + .bind(&id) + .bind(&external_metadata_id) + .bind(title) + .bind(release_year) + .bind(director) + .bind(&poster_path) + .execute(&self.pool) + .await + .map_err(Self::map_err)?; + + Ok(()) + } + + async fn delete_movie(&self, movie_id: &MovieId) -> Result<(), DomainError> { + let id = movie_id.value().to_string(); + sqlx::query("DELETE FROM movies WHERE id = $1") + .bind(&id) + .execute(&self.pool) + .await + .map_err(Self::map_err)?; + Ok(()) + } + + async fn existing_external_ids( + &self, + ids: &[ExternalMetadataId], + ) -> Result, DomainError> { + if ids.is_empty() { + return Ok(Default::default()); + } + let vals: Vec = ids.iter().map(|id| id.value().to_string()).collect(); + let rows: Vec<(String,)> = sqlx::query_as( + "SELECT external_metadata_id FROM movies WHERE external_metadata_id = ANY($1)", + ) + .bind(&vals) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err)?; + Ok(rows.into_iter().map(|(id,)| id).collect()) + } + + async fn existing_title_year_pairs( + &self, + pairs: &[(MovieTitle, ReleaseYear)], + ) -> Result, DomainError> { + if pairs.is_empty() { + return Ok(Default::default()); + } + let titles: Vec<&str> = pairs.iter().map(|(t, _)| t.value()).collect(); + let years: Vec = pairs.iter().map(|(_, y)| y.value() as i64).collect(); + use sqlx::Row; + let rows = sqlx::query( + "SELECT DISTINCT m.title, m.release_year FROM movies m \ + INNER JOIN unnest($1::text[], $2::bigint[]) AS p(title, release_year) \ + ON m.title = p.title AND m.release_year = p.release_year", + ) + .bind(&titles) + .bind(&years) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err)?; + Ok(rows + .into_iter() + .map(|r| { + let t: String = r.get("title"); + let y: i64 = r.get("release_year"); + (t, y as u16) + }) + .collect()) + } + + async fn list_movies( + &self, + page: &PageParams, + filter: &MovieFilter, + ) -> Result, DomainError> { + use sqlx::Row; + let limit = page.limit as i64; + let offset = page.offset as i64; + let pattern = filter + .search + .as_deref() + .map(|s| format!("%{}%", s.to_lowercase())); + let genre = filter.genre.as_deref(); + let language = filter.language.as_deref(); + + let rows: Vec = sqlx::query_as( + "SELECT \ + m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, \ + p.overview, p.runtime_minutes, p.original_language, p.collection_name, \ + array_agg(g.name) FILTER (WHERE g.name IS NOT NULL) AS genres \ + FROM movies m \ + LEFT JOIN movie_profiles p ON p.movie_id = m.id \ + LEFT JOIN movie_genres g ON g.movie_id = m.id \ + WHERE ($1::text IS NULL OR LOWER(m.title) LIKE $1) \ + AND ($2::text IS NULL OR p.original_language = $2) \ + AND ($3::text IS NULL OR m.id IN (SELECT movie_id FROM movie_genres WHERE LOWER(name) = LOWER($3))) \ + GROUP BY m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, \ + p.overview, p.runtime_minutes, p.original_language, p.collection_name \ + ORDER BY m.title ASC \ + LIMIT $4 OFFSET $5", + ) + .bind(&pattern) + .bind(language) + .bind(genre) + .bind(limit) + .bind(offset) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err)?; + + let total: i64 = sqlx::query( + "SELECT COUNT(DISTINCT m.id) \ + FROM movies m \ + LEFT JOIN movie_profiles p ON p.movie_id = m.id \ + WHERE ($1::text IS NULL OR LOWER(m.title) LIKE $1) \ + AND ($2::text IS NULL OR p.original_language = $2) \ + AND ($3::text IS NULL OR m.id IN (SELECT movie_id FROM movie_genres WHERE LOWER(name) = LOWER($3)))", + ) + .bind(&pattern) + .bind(language) + .bind(genre) + .fetch_one(&self.pool) + .await + .map_err(Self::map_err)? + .try_get(0) + .unwrap_or(0); + + let items = rows + .into_iter() + .map(|r| r.into_domain()) + .collect::, _>>()?; + + Ok(Paginated { + items, + total_count: total as u64, + limit: page.limit, + offset: page.offset, + }) + } +} diff --git a/crates/adapters/postgres/src/review.rs b/crates/adapters/postgres/src/review.rs new file mode 100644 index 0000000..1a299a6 --- /dev/null +++ b/crates/adapters/postgres/src/review.rs @@ -0,0 +1,112 @@ +use async_trait::async_trait; +use domain::{ + errors::DomainError, + events::DomainEvent, + models::{Review, ReviewSource}, + ports::ReviewRepository, + value_objects::{ReviewId, UserId}, +}; +use sqlx::PgPool; + +use crate::models::{ReviewRow, datetime_to_str}; + +pub struct PostgresReviewRepository { + pool: PgPool, +} + +impl PostgresReviewRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } + + fn map_err(e: sqlx::Error) -> DomainError { + tracing::error!("Database error: {:?}", e); + DomainError::InfrastructureError("Database operation failed".into()) + } +} + +#[async_trait] +impl ReviewRepository for PostgresReviewRepository { + async fn save_review(&self, review: &Review) -> Result { + let id = review.id().value().to_string(); + let movie_id = review.movie_id().value().to_string(); + let user_id = review.user_id().value().to_string(); + let rating = review.rating().value() as i64; + let comment = review.comment().map(|c| c.value().to_string()); + let watched_at = datetime_to_str(review.watched_at()); + let created_at = datetime_to_str(review.created_at()); + let remote_actor_url = match review.source() { + ReviewSource::Local => None, + ReviewSource::Remote { actor_url } => Some(actor_url.clone()), + }; + + sqlx::query( + "INSERT INTO reviews (id, movie_id, user_id, rating, comment, watched_at, created_at, remote_actor_url) + VALUES ($1, $2, $3, $4, $5, $6::timestamptz, $7::timestamptz, $8)", + ) + .bind(&id) + .bind(&movie_id) + .bind(&user_id) + .bind(rating) + .bind(&comment) + .bind(&watched_at) + .bind(&created_at) + .bind(&remote_actor_url) + .execute(&self.pool) + .await + .map_err(Self::map_err)?; + + Ok(DomainEvent::ReviewLogged { + review_id: review.id().clone(), + movie_id: review.movie_id().clone(), + user_id: review.user_id().clone(), + rating: review.rating().clone(), + watched_at: *review.watched_at(), + }) + } + + async fn get_review_by_id(&self, review_id: &ReviewId) -> Result, DomainError> { + let id = review_id.value().to_string(); + sqlx::query_as::<_, ReviewRow>( + "SELECT id, movie_id, user_id, rating, comment, + to_char(watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at, + to_char(created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, + remote_actor_url + FROM reviews WHERE id = $1", + ) + .bind(&id) + .fetch_optional(&self.pool) + .await + .map_err(Self::map_err)? + .map(ReviewRow::into_domain) + .transpose() + } + + async fn delete_review(&self, review_id: &ReviewId) -> Result<(), DomainError> { + let id = review_id.value().to_string(); + sqlx::query("DELETE FROM reviews WHERE id = $1") + .bind(&id) + .execute(&self.pool) + .await + .map_err(Self::map_err)?; + Ok(()) + } + + async fn get_all_reviews_for_user(&self, user_id: &UserId) -> Result, DomainError> { + let uid = user_id.value().to_string(); + sqlx::query_as::<_, ReviewRow>( + "SELECT id, movie_id, user_id, rating, comment, + to_char(watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at, + to_char(created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, + remote_actor_url + FROM reviews WHERE user_id = $1 ORDER BY watched_at DESC", + ) + .bind(&uid) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err)? + .into_iter() + .map(ReviewRow::into_domain) + .collect() + } +} diff --git a/crates/adapters/postgres/src/stats.rs b/crates/adapters/postgres/src/stats.rs new file mode 100644 index 0000000..721780f --- /dev/null +++ b/crates/adapters/postgres/src/stats.rs @@ -0,0 +1,153 @@ +use async_trait::async_trait; +use domain::{ + errors::DomainError, + models::{DirectorStat, MonthlyRating, UserStats, UserTrends}, + ports::StatsRepository, + value_objects::UserId, +}; +use sqlx::PgPool; + +use crate::format_year_month; +use crate::models::{DirectorCountRow, MonthlyRatingRow, UserTotalsRow}; + +pub struct PostgresStatsRepository { + pool: PgPool, +} + +impl PostgresStatsRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } + + fn map_err(e: sqlx::Error) -> DomainError { + tracing::error!("Database error: {:?}", e); + DomainError::InfrastructureError("Database operation failed".into()) + } + + async fn fetch_user_totals(&self, user_id: &str) -> Result { + sqlx::query_as::<_, UserTotalsRow>( + r#"SELECT COUNT(DISTINCT movie_id) AS total, + AVG(rating::float) AS avg_rating + FROM reviews WHERE user_id = $1"#, + ) + .bind(user_id) + .fetch_one(&self.pool) + .await + .map_err(Self::map_err) + } + + async fn fetch_user_favorite_director( + &self, + user_id: &str, + ) -> Result, DomainError> { + sqlx::query_scalar::<_, String>( + "SELECT m.director + FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + WHERE r.user_id = $1 AND m.director IS NOT NULL + GROUP BY m.director + ORDER BY COUNT(*) DESC + LIMIT 1", + ) + .bind(user_id) + .fetch_optional(&self.pool) + .await + .map_err(Self::map_err) + } + + async fn fetch_user_most_active_month( + &self, + user_id: &str, + ) -> Result, DomainError> { + sqlx::query_scalar::<_, String>( + "SELECT to_char(watched_at AT TIME ZONE 'UTC', 'YYYY-MM') AS month + FROM reviews + WHERE user_id = $1 + GROUP BY to_char(watched_at AT TIME ZONE 'UTC', 'YYYY-MM') + ORDER BY COUNT(*) DESC + LIMIT 1", + ) + .bind(user_id) + .fetch_optional(&self.pool) + .await + .map_err(Self::map_err) + } +} + +#[async_trait] +impl StatsRepository for PostgresStatsRepository { + async fn get_user_stats(&self, user_id: &UserId) -> Result { + let uid = user_id.value().to_string(); + + let (totals, fav_director, most_active) = tokio::try_join!( + self.fetch_user_totals(&uid), + self.fetch_user_favorite_director(&uid), + self.fetch_user_most_active_month(&uid) + )?; + + let most_active_month = most_active.map(|ym| format_year_month(&ym)); + + Ok(UserStats { + total_movies: totals.total, + avg_rating: totals.avg_rating, + favorite_director: fav_director, + most_active_month, + }) + } + + async fn get_user_trends(&self, user_id: &UserId) -> Result { + let uid = user_id.value().to_string(); + + let (rating_rows, director_rows) = tokio::try_join!( + sqlx::query_as::<_, MonthlyRatingRow>( + "SELECT to_char(watched_at AT TIME ZONE 'UTC', 'YYYY-MM') AS month, + AVG(rating::float) AS avg_rating, + COUNT(*) AS count + FROM reviews + WHERE user_id = $1 AND watched_at >= NOW() - INTERVAL '12 months' + GROUP BY to_char(watched_at AT TIME ZONE 'UTC', 'YYYY-MM') + ORDER BY to_char(watched_at AT TIME ZONE 'UTC', 'YYYY-MM') ASC" + ) + .bind(&uid) + .fetch_all(&self.pool), + sqlx::query_as::<_, DirectorCountRow>( + "SELECT m.director AS director, COUNT(*) AS count + FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + WHERE r.user_id = $1 AND m.director IS NOT NULL + GROUP BY m.director + ORDER BY COUNT(*) DESC + LIMIT 5" + ) + .bind(&uid) + .fetch_all(&self.pool) + ) + .map_err(Self::map_err)?; + + let max_director_count = director_rows.iter().map(|d| d.count).max().unwrap_or(1); + + let monthly_ratings = rating_rows + .into_iter() + .map(|r| MonthlyRating { + month_label: format_year_month(&r.month), + year_month: r.month, + avg_rating: r.avg_rating, + count: r.count, + }) + .collect(); + + let top_directors = director_rows + .into_iter() + .map(|d| DirectorStat { + director: d.director, + count: d.count, + }) + .collect(); + + Ok(UserTrends { + monthly_ratings, + top_directors, + max_director_count, + }) + } +} diff --git a/crates/adapters/sqlite/src/movie.rs b/crates/adapters/sqlite/src/movie.rs index 742b6b6..9cd87d3 100644 --- a/crates/adapters/sqlite/src/movie.rs +++ b/crates/adapters/sqlite/src/movie.rs @@ -1,7 +1,10 @@ use async_trait::async_trait; use domain::{ errors::DomainError, - models::{Movie, MovieFilter, MovieSummary, collections::{PageParams, Paginated}}, + models::{ + Movie, MovieFilter, MovieSummary, + collections::{PageParams, Paginated}, + }, ports::MovieRepository, value_objects::{ExternalMetadataId, MovieId, MovieTitle, ReleaseYear}, }; @@ -174,8 +177,7 @@ impl MovieRepository for SqliteMovieRepository { &self, page: &PageParams, filter: &MovieFilter, - ) -> Result, DomainError> - { + ) -> Result, DomainError> { use sqlx::Row; let limit = page.limit as i64; let offset = page.offset as i64;