diff --git a/crates/adapters/sqlite/src/diary.rs b/crates/adapters/sqlite/src/diary.rs new file mode 100644 index 0000000..959eea6 --- /dev/null +++ b/crates/adapters/sqlite/src/diary.rs @@ -0,0 +1,475 @@ +use async_trait::async_trait; +use domain::{ + errors::DomainError, + models::{ + DiaryEntry, DiaryFilter, FeedEntry, MovieStats, ReviewHistory, SortDirection, + collections::{PageParams, Paginated}, + }, + ports::DiaryRepository, + value_objects::{MovieId, UserId}, +}; +use sqlx::SqlitePool; + +use crate::models::{DiaryRow, FeedRow, MovieRow, MovieStatsRow, ReviewRow}; + +pub struct SqliteDiaryRepository { + pool: SqlitePool, +} + +impl SqliteDiaryRepository { + pub fn new(pool: SqlitePool) -> Self { + Self { pool } + } + + fn map_err(e: sqlx::Error) -> DomainError { + tracing::error!("Database error: {:?}", e); + DomainError::InfrastructureError("Database operation failed".into()) + } + + async fn count_diary_entries(&self, movie_id: Option<&str>) -> Result { + match movie_id { + None => sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM reviews") + .fetch_one(&self.pool) + .await + .map_err(Self::map_err), + Some(id) => { + sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM reviews WHERE movie_id = ?") + .bind(id) + .fetch_one(&self.pool) + .await + .map_err(Self::map_err) + } + } + } + + async fn fetch_all_diary_rows( + &self, + sort: &SortDirection, + limit: i64, + offset: i64, + ) -> Result, DomainError> { + let order_clause = match sort { + SortDirection::ByRatingDesc => "r.rating DESC, r.watched_at DESC", + SortDirection::ByRatingAsc => "r.rating ASC, r.watched_at ASC", + SortDirection::Ascending => "r.watched_at ASC", + SortDirection::Descending => "r.watched_at DESC", + }; + let sql = format!( + "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, + r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, r.remote_actor_url + FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + ORDER BY {} + LIMIT ? OFFSET ?", + order_clause + ); + sqlx::query_as::<_, DiaryRow>(&sql) + .bind(limit) + .bind(offset) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err) + } + + async fn fetch_movie_diary_rows( + &self, + movie_id: &str, + sort: &SortDirection, + limit: i64, + offset: i64, + ) -> Result, DomainError> { + let order_clause = match sort { + SortDirection::ByRatingDesc => "r.rating DESC, r.watched_at DESC", + SortDirection::ByRatingAsc => "r.rating ASC, r.watched_at ASC", + SortDirection::Ascending => "r.watched_at ASC", + SortDirection::Descending => "r.watched_at DESC", + }; + let sql = format!( + "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, + r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, r.remote_actor_url + FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + WHERE r.movie_id = ? + ORDER BY {} + LIMIT ? OFFSET ?", + order_clause + ); + sqlx::query_as::<_, DiaryRow>(&sql) + .bind(movie_id) + .bind(limit) + .bind(offset) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err) + } + + async fn count_user_diary_entries( + &self, + user_id: &str, + search: Option<&str>, + ) -> Result { + let has_search = search.map(|s| !s.is_empty()).unwrap_or(false); + let sql = if has_search { + "SELECT COUNT(*) FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + WHERE r.user_id = ? AND m.title LIKE '%' || ? || '%'" + .to_string() + } else { + "SELECT COUNT(*) FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + WHERE r.user_id = ?" + .to_string() + }; + let mut q = sqlx::query_scalar::<_, i64>(&sql).bind(user_id); + if has_search { + q = q.bind(search.unwrap()); + } + q.fetch_one(&self.pool).await.map_err(Self::map_err) + } + + async fn fetch_user_diary_rows( + &self, + user_id: &str, + sort: &SortDirection, + search: Option<&str>, + limit: i64, + offset: i64, + ) -> Result, DomainError> { + let has_search = search.map(|s| !s.is_empty()).unwrap_or(false); + let search_clause = if has_search { + " AND m.title LIKE '%' || ? || '%'" + } else { + "" + }; + let order_clause = match sort { + SortDirection::ByRatingDesc => "r.rating DESC, r.watched_at DESC", + SortDirection::ByRatingAsc => "r.rating ASC, r.watched_at ASC", + SortDirection::Ascending => "r.watched_at ASC", + SortDirection::Descending => "r.watched_at DESC", + }; + let sql = format!( + "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, + r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, r.remote_actor_url + FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + WHERE r.user_id = ?{} + ORDER BY {} + LIMIT ? OFFSET ?", + search_clause, order_clause + ); + let mut q = sqlx::query_as::<_, DiaryRow>(&sql).bind(user_id); + if has_search { + q = q.bind(search.unwrap()); + } + q.bind(limit) + .bind(offset) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err) + } +} + +#[async_trait] +impl DiaryRepository for SqliteDiaryRepository { + async fn query_diary( + &self, + filter: &DiaryFilter, + ) -> Result, DomainError> { + let limit = filter.page.limit as i64; + let offset = filter.page.offset as i64; + + let (total, rows) = match (&filter.movie_id, &filter.user_id) { + (None, None) => tokio::try_join!( + self.count_diary_entries(None), + self.fetch_all_diary_rows(&filter.sort_by, limit, offset) + )?, + (Some(id), None) => { + let id_str = id.value().to_string(); + tokio::try_join!( + self.count_diary_entries(Some(id_str.as_str())), + self.fetch_movie_diary_rows(&id_str, &filter.sort_by, limit, offset) + )? + } + (None, Some(uid)) => { + let uid_str = uid.value().to_string(); + let search = filter.search.as_deref(); + tokio::try_join!( + self.count_user_diary_entries(&uid_str, search), + self.fetch_user_diary_rows(&uid_str, &filter.sort_by, search, limit, offset) + )? + } + (Some(_), Some(_)) => { + return Err(DomainError::ValidationError( + "Combined movie_id + user_id filter not supported".into(), + )); + } + }; + + let items = rows + .into_iter() + .map(DiaryRow::into_domain) + .collect::, _>>()?; + + Ok(Paginated { + items, + total_count: total as u64, + limit: filter.page.limit, + offset: filter.page.offset, + }) + } + + async fn query_activity_feed( + &self, + page: &PageParams, + ) -> Result, DomainError> { + self.query_activity_feed_filtered(page, &domain::ports::FeedSortBy::Date, None, None) + .await + } + + async fn query_activity_feed_filtered( + &self, + page: &PageParams, + sort_by: &domain::ports::FeedSortBy, + search: Option<&str>, + following: Option<&domain::ports::FollowingFilter>, + ) -> Result, DomainError> { + use domain::ports::FeedSortBy; + + let limit = page.limit as i64; + let offset = page.offset as i64; + let has_search = search.map(|s| !s.is_empty()).unwrap_or(false); + + let mut where_parts = vec!["1=1".to_string()]; + + if has_search { + where_parts.push("m.title LIKE '%' || ? || '%'".to_string()); + } + + if let Some(f) = following { + let local_in = if f.local_user_ids.is_empty() { + "SELECT NULL WHERE 0".to_string() + } else { + f.local_user_ids + .iter() + .map(|_| "?") + .collect::>() + .join(",") + }; + let remote_in = if f.remote_actor_urls.is_empty() { + "SELECT NULL WHERE 0".to_string() + } else { + f.remote_actor_urls + .iter() + .map(|_| "?") + .collect::>() + .join(",") + }; + where_parts.push(format!( + "(r.user_id IN ({}) OR r.remote_actor_url IN ({}))", + local_in, remote_in + )); + } + + let order_clause = match sort_by { + FeedSortBy::Date => "r.watched_at DESC", + FeedSortBy::DateAsc => "r.watched_at ASC", + FeedSortBy::Rating => "r.rating DESC, r.watched_at DESC", + FeedSortBy::RatingAsc => "r.rating ASC, r.watched_at ASC", + }; + + let where_clause = where_parts.join(" AND "); + + let count_sql = format!( + "SELECT COUNT(*) FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + WHERE {}", + where_clause + ); + + let select_sql = format!( + "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, + r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, + r.watched_at, r.created_at, r.remote_actor_url, + COALESCE(u.email, r.remote_actor_url) AS user_email + FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + LEFT JOIN users u ON u.id = r.user_id + WHERE {} + ORDER BY {} + LIMIT ? OFFSET ?", + where_clause, order_clause + ); + + macro_rules! bind_filter_params { + ($q:expr) => {{ + let mut q = $q; + if has_search { + q = q.bind(search.unwrap()); + } + if let Some(f) = following { + for uid in &f.local_user_ids { + q = q.bind(uid.to_string()); + } + for url in &f.remote_actor_urls { + q = q.bind(url.as_str()); + } + } + q + }}; + } + + let count_q = bind_filter_params!(sqlx::query_scalar::<_, i64>(&count_sql)); + let total = count_q.fetch_one(&self.pool).await.map_err(Self::map_err)?; + + let rows_q = bind_filter_params!(sqlx::query_as::<_, FeedRow>(&select_sql)); + let rows = rows_q + .bind(limit) + .bind(offset) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err)?; + + let items = rows + .into_iter() + .map(FeedRow::into_domain) + .collect::, _>>()?; + + Ok(Paginated { + items, + total_count: total as u64, + limit: page.limit, + offset: page.offset, + }) + } + + async fn get_review_history(&self, movie_id: &MovieId) -> Result { + let id_str = movie_id.value().to_string(); + + let movie = sqlx::query_as::<_, MovieRow>( + "SELECT id, external_metadata_id, title, release_year, director, poster_path + FROM movies WHERE id = ?", + ) + .bind(&id_str) + .fetch_optional(&self.pool) + .await + .map_err(Self::map_err)? + .ok_or_else(|| DomainError::NotFound(format!("Movie {}", id_str)))? + .into_domain()?; + + let viewings = sqlx::query_as::<_, ReviewRow>( + "SELECT id, movie_id, user_id, rating, comment, watched_at, created_at, remote_actor_url + FROM reviews WHERE movie_id = ? ORDER BY watched_at ASC", + ) + .bind(&id_str) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err)? + .into_iter() + .map(ReviewRow::into_domain) + .collect::, _>>()?; + + Ok(ReviewHistory::new(movie, viewings)) + } + + async fn get_user_history(&self, user_id: &UserId) -> Result, DomainError> { + let uid = user_id.value().to_string(); + let rows = sqlx::query_as::<_, DiaryRow>( + "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, + r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, r.remote_actor_url + FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + WHERE r.user_id = ? + ORDER BY r.watched_at DESC", + ) + .bind(&uid) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err)?; + + rows.into_iter().map(DiaryRow::into_domain).collect() + } + + async fn get_movie_stats(&self, movie_id: &MovieId) -> Result { + let id_str = movie_id.value().to_string(); + sqlx::query_as::<_, MovieStatsRow>( + "SELECT + COUNT(*) AS total_count, + AVG(CAST(rating AS REAL)) AS avg_rating, + COUNT(CASE WHEN remote_actor_url IS NOT NULL THEN 1 END) AS federated_count, + COUNT(CASE WHEN rating = 1 THEN 1 END) AS rating_1, + COUNT(CASE WHEN rating = 2 THEN 1 END) AS rating_2, + COUNT(CASE WHEN rating = 3 THEN 1 END) AS rating_3, + COUNT(CASE WHEN rating = 4 THEN 1 END) AS rating_4, + COUNT(CASE WHEN rating = 5 THEN 1 END) AS rating_5 + FROM reviews WHERE movie_id = ?", + ) + .bind(id_str) + .fetch_one(&self.pool) + .await + .map_err(Self::map_err) + .map(MovieStatsRow::into_domain) + } + + async fn get_movie_social_feed( + &self, + movie_id: &MovieId, + page: &PageParams, + ) -> Result, DomainError> { + let id_str = movie_id.value().to_string(); + let limit = page.limit as i64; + let offset = page.offset as i64; + + let total: i64 = + sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM reviews WHERE movie_id = ?") + .bind(&id_str) + .fetch_one(&self.pool) + .await + .map_err(Self::map_err)?; + + let rows = sqlx::query_as::<_, FeedRow>( + "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, + r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, + r.watched_at, r.created_at, r.remote_actor_url, + CASE WHEN r.remote_actor_url IS NOT NULL THEN r.remote_actor_url + WHEN u.email IS NOT NULL THEN u.email + ELSE r.user_id END AS user_email + FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + LEFT JOIN users u ON u.id = r.user_id + WHERE r.movie_id = ? + ORDER BY r.watched_at DESC + LIMIT ? OFFSET ?", + ) + .bind(&id_str) + .bind(limit) + .bind(offset) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err)?; + + let items = rows + .into_iter() + .map(FeedRow::into_domain) + .collect::, _>>()?; + + Ok(Paginated { + items, + total_count: total as u64, + limit: page.limit, + offset: page.offset, + }) + } + + async fn count_local_posts(&self) -> Result { + let count: i64 = + sqlx::query_scalar("SELECT COUNT(*) FROM reviews WHERE remote_actor_url IS NULL") + .fetch_one(&self.pool) + .await + .map_err(Self::map_err)?; + Ok(count as u64) + } +} + +#[cfg(test)] +#[path = "tests/diary.rs"] +mod tests; diff --git a/crates/adapters/sqlite/src/lib.rs b/crates/adapters/sqlite/src/lib.rs index 89f65ea..237048d 100644 --- a/crates/adapters/sqlite/src/lib.rs +++ b/crates/adapters/sqlite/src/lib.rs @@ -1,48 +1,39 @@ -use async_trait::async_trait; -use domain::{ - errors::DomainError, - events::DomainEvent, - models::{ - DiaryEntry, DiaryFilter, DirectorStat, FeedEntry, MonthlyRating, Movie, MovieStats, Review, - ReviewHistory, ReviewSource, SortDirection, UserStats, UserTrends, - collections::{PageParams, Paginated}, - }, - ports::{DiaryRepository, MovieRepository, ReviewRepository, StatsRepository}, - value_objects::{ExternalMetadataId, MovieId, MovieTitle, ReleaseYear, ReviewId, UserId}, -}; use sqlx::SqlitePool; mod ap_content; +mod diary; mod goals; mod image_ref; mod import_profile; mod import_session; mod migrations; mod models; +mod movie; mod persons; mod profile; mod profile_fields; mod refresh_sessions; mod remote_goals; +mod review; +mod stats; mod user_settings; mod users; mod watch_event; mod watchlist; mod wrapup; -use models::{ - DiaryRow, DirectorCountRow, FeedRow, MonthlyRatingRow, MovieRow, MovieStatsRow, - MovieSummaryRow, ReviewRow, UserTotalsRow, datetime_to_str, -}; - pub use ap_content::SqliteApContentQuery; +pub use diary::SqliteDiaryRepository; pub use image_ref::{SqliteImageRefAdapter, create_image_ref}; pub use import_profile::SqliteImportProfileRepository; pub use import_session::SqliteImportSessionRepository; +pub use movie::SqliteMovieRepository; pub use persons::{SqlitePersonAdapter, create_person_adapter}; pub use profile::SqliteMovieProfileRepository; pub use profile_fields::SqliteProfileFieldsRepository; pub use refresh_sessions::SqliteRefreshSessionAdapter; +pub use review::SqliteReviewRepository; +pub use stats::SqliteStatsRepository; pub use users::SqliteUserRepository; pub use watch_event::{SqliteWatchEventRepository, SqliteWebhookTokenRepository}; pub use watchlist::SqliteWatchlistRepository; @@ -54,7 +45,7 @@ pub fn create_profile_fields_repo( std::sync::Arc::new(SqliteProfileFieldsRepository::new(pool)) } -fn format_year_month(ym: &str) -> String { +pub(crate) fn format_year_month(ym: &str) -> String { let parts: Vec<&str> = ym.splitn(2, '-').collect(); if parts.len() != 2 { return ym.to_string(); @@ -78,900 +69,8 @@ fn format_year_month(ym: &str) -> String { format!("{} '{}", month, year) } -pub struct SqliteMovieRepository { - pool: SqlitePool, -} - -impl SqliteMovieRepository { - pub fn new(pool: SqlitePool) -> Self { - Self { pool } - } - - pub async fn migrate(&self) -> Result<(), DomainError> { - migrations::run(&self.pool).await - } - - fn map_err(e: sqlx::Error) -> DomainError { - tracing::error!("Database error: {:?}", e); - DomainError::InfrastructureError("Database operation failed".into()) - } - - async fn count_diary_entries(&self, movie_id: Option<&str>) -> Result { - match movie_id { - None => sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM reviews") - .fetch_one(&self.pool) - .await - .map_err(Self::map_err), - Some(id) => { - sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM reviews WHERE movie_id = ?") - .bind(id) - .fetch_one(&self.pool) - .await - .map_err(Self::map_err) - } - } - } - - async fn fetch_all_diary_rows( - &self, - sort: &SortDirection, - limit: i64, - offset: i64, - ) -> Result, DomainError> { - let order_clause = match sort { - SortDirection::ByRatingDesc => "r.rating DESC, r.watched_at DESC", - SortDirection::ByRatingAsc => "r.rating ASC, r.watched_at ASC", - SortDirection::Ascending => "r.watched_at ASC", - SortDirection::Descending => "r.watched_at DESC", - }; - let sql = format!( - "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, - r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, r.remote_actor_url - FROM reviews r - INNER JOIN movies m ON m.id = r.movie_id - ORDER BY {} - LIMIT ? OFFSET ?", - order_clause - ); - sqlx::query_as::<_, DiaryRow>(&sql) - .bind(limit) - .bind(offset) - .fetch_all(&self.pool) - .await - .map_err(Self::map_err) - } - - async fn fetch_movie_diary_rows( - &self, - movie_id: &str, - sort: &SortDirection, - limit: i64, - offset: i64, - ) -> Result, DomainError> { - let order_clause = match sort { - SortDirection::ByRatingDesc => "r.rating DESC, r.watched_at DESC", - SortDirection::ByRatingAsc => "r.rating ASC, r.watched_at ASC", - SortDirection::Ascending => "r.watched_at ASC", - SortDirection::Descending => "r.watched_at DESC", - }; - let sql = format!( - "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, - r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, r.remote_actor_url - FROM reviews r - INNER JOIN movies m ON m.id = r.movie_id - WHERE r.movie_id = ? - ORDER BY {} - LIMIT ? OFFSET ?", - order_clause - ); - sqlx::query_as::<_, DiaryRow>(&sql) - .bind(movie_id) - .bind(limit) - .bind(offset) - .fetch_all(&self.pool) - .await - .map_err(Self::map_err) - } - - async fn count_user_diary_entries( - &self, - user_id: &str, - search: Option<&str>, - ) -> Result { - let has_search = search.map(|s| !s.is_empty()).unwrap_or(false); - let sql = if has_search { - "SELECT COUNT(*) FROM reviews r - INNER JOIN movies m ON m.id = r.movie_id - WHERE r.user_id = ? AND m.title LIKE '%' || ? || '%'" - .to_string() - } else { - "SELECT COUNT(*) FROM reviews r - INNER JOIN movies m ON m.id = r.movie_id - WHERE r.user_id = ?" - .to_string() - }; - let mut q = sqlx::query_scalar::<_, i64>(&sql).bind(user_id); - if has_search { - q = q.bind(search.unwrap()); - } - q.fetch_one(&self.pool).await.map_err(Self::map_err) - } - - async fn fetch_user_diary_rows( - &self, - user_id: &str, - sort: &SortDirection, - search: Option<&str>, - limit: i64, - offset: i64, - ) -> Result, DomainError> { - let has_search = search.map(|s| !s.is_empty()).unwrap_or(false); - let search_clause = if has_search { - " AND m.title LIKE '%' || ? || '%'" - } else { - "" - }; - let order_clause = match sort { - SortDirection::ByRatingDesc => "r.rating DESC, r.watched_at DESC", - SortDirection::ByRatingAsc => "r.rating ASC, r.watched_at ASC", - SortDirection::Ascending => "r.watched_at ASC", - SortDirection::Descending => "r.watched_at DESC", - }; - let sql = format!( - "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, - r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, r.remote_actor_url - FROM reviews r - INNER JOIN movies m ON m.id = r.movie_id - WHERE r.user_id = ?{} - ORDER BY {} - LIMIT ? OFFSET ?", - search_clause, order_clause - ); - let mut q = sqlx::query_as::<_, DiaryRow>(&sql).bind(user_id); - if has_search { - q = q.bind(search.unwrap()); - } - q.bind(limit) - .bind(offset) - .fetch_all(&self.pool) - .await - .map_err(Self::map_err) - } - - async fn fetch_user_totals(&self, user_id: &str) -> Result { - sqlx::query_as::<_, UserTotalsRow>( - "SELECT COUNT(DISTINCT movie_id) AS total, - AVG(CAST(rating AS REAL)) AS avg_rating - FROM reviews WHERE user_id = ?", - ) - .bind(user_id) - .fetch_one(&self.pool) - .await - .map_err(Self::map_err) - } - - async fn fetch_user_favorite_director( - &self, - user_id: &str, - ) -> Result, DomainError> { - let row: Option = sqlx::query_scalar( - "SELECT m.director - FROM reviews r - INNER JOIN movies m ON m.id = r.movie_id - WHERE r.user_id = ? AND m.director IS NOT NULL - GROUP BY m.director - ORDER BY COUNT(*) DESC - LIMIT 1", - ) - .bind(user_id) - .fetch_optional(&self.pool) - .await - .map_err(Self::map_err)?; - Ok(row) - } - - async fn fetch_user_most_active_month( - &self, - user_id: &str, - ) -> Result, DomainError> { - let row: Option = sqlx::query_scalar( - "SELECT strftime('%Y-%m', watched_at) - FROM reviews - WHERE user_id = ? - GROUP BY strftime('%Y-%m', watched_at) - ORDER BY COUNT(*) DESC - LIMIT 1", - ) - .bind(user_id) - .fetch_optional(&self.pool) - .await - .map_err(Self::map_err)?; - Ok(row) - } -} - -#[async_trait] -impl MovieRepository for SqliteMovieRepository { - async fn get_movie_by_external_id( - &self, - external_metadata_id: &ExternalMetadataId, - ) -> Result, DomainError> { - let id = external_metadata_id.value(); - sqlx::query_as::<_, MovieRow>( - "SELECT id, external_metadata_id, title, release_year, director, poster_path - FROM movies WHERE external_metadata_id = ?", - ) - .bind(id) - .fetch_optional(&self.pool) - .await - .map_err(Self::map_err)? - .map(MovieRow::into_domain) - .transpose() - } - - async fn get_movie_by_id(&self, movie_id: &MovieId) -> Result, DomainError> { - let id = movie_id.value().to_string(); - sqlx::query_as::<_, MovieRow>( - "SELECT id, external_metadata_id, title, release_year, director, poster_path - FROM movies WHERE id = ?", - ) - .bind(&id) - .fetch_optional(&self.pool) - .await - .map_err(Self::map_err)? - .map(MovieRow::into_domain) - .transpose() - } - - async fn get_movies_by_title_and_year( - &self, - title: &MovieTitle, - year: &ReleaseYear, - ) -> Result, DomainError> { - let t = title.value(); - let y = year.value() as i64; - sqlx::query_as::<_, MovieRow>( - "SELECT id, external_metadata_id, title, release_year, director, poster_path - FROM movies WHERE title = ? AND release_year = ?", - ) - .bind(t) - .bind(y) - .fetch_all(&self.pool) - .await - .map_err(Self::map_err)? - .into_iter() - .map(MovieRow::into_domain) - .collect() - } - - async fn upsert_movie(&self, movie: &Movie) -> Result<(), DomainError> { - let id = movie.id().value().to_string(); - let external_metadata_id = movie.external_metadata_id().map(|e| e.value().to_string()); - let title = movie.title().value(); - let release_year = movie.release_year().value() as i64; - let director = movie.director(); - let poster_path = movie.poster_path().map(|p| p.value().to_string()); - - sqlx::query( - "INSERT INTO movies (id, external_metadata_id, title, release_year, director, poster_path) - VALUES (?, ?, ?, ?, ?, ?) - ON CONFLICT(id) DO UPDATE SET - external_metadata_id = excluded.external_metadata_id, - title = excluded.title, - release_year = excluded.release_year, - director = excluded.director, - poster_path = excluded.poster_path", - ) - .bind(&id) - .bind(&external_metadata_id) - .bind(title) - .bind(release_year) - .bind(director) - .bind(&poster_path) - .execute(&self.pool) - .await - .map_err(Self::map_err)?; - - Ok(()) - } - - async fn delete_movie(&self, movie_id: &MovieId) -> Result<(), DomainError> { - let id = movie_id.value().to_string(); - sqlx::query("DELETE FROM movies WHERE id = ?") - .bind(&id) - .execute(&self.pool) - .await - .map_err(Self::map_err)?; - Ok(()) - } - - async fn existing_external_ids( - &self, - ids: &[ExternalMetadataId], - ) -> Result, DomainError> { - if ids.is_empty() { - return Ok(Default::default()); - } - let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect(); - let sql = format!( - "SELECT external_metadata_id FROM movies WHERE external_metadata_id IN ({})", - placeholders.join(",") - ); - let mut q = sqlx::query_scalar::<_, String>(&sql); - for id in ids { - q = q.bind(id.value().to_string()); - } - let rows = q.fetch_all(&self.pool).await.map_err(Self::map_err)?; - Ok(rows.into_iter().collect()) - } - - async fn existing_title_year_pairs( - &self, - pairs: &[(MovieTitle, ReleaseYear)], - ) -> Result, DomainError> { - if pairs.is_empty() { - return Ok(Default::default()); - } - let conditions: Vec = pairs - .iter() - .map(|_| "(title = ? AND release_year = ?)".to_string()) - .collect(); - let sql = format!( - "SELECT DISTINCT title, release_year FROM movies WHERE {}", - conditions.join(" OR ") - ); - use sqlx::Row; - let mut q = sqlx::query(&sql); - for (t, y) in pairs { - q = q.bind(t.value().to_string()).bind(y.value() as i64); - } - let rows = q.fetch_all(&self.pool).await.map_err(Self::map_err)?; - Ok(rows - .into_iter() - .map(|r| { - let t: String = r.get("title"); - let y: i64 = r.get("release_year"); - (t, y as u16) - }) - .collect()) - } - - async fn list_movies( - &self, - page: &domain::models::collections::PageParams, - filter: &domain::models::MovieFilter, - ) -> Result, DomainError> - { - use sqlx::Row; - let limit = page.limit as i64; - let offset = page.offset as i64; - let pattern = filter - .search - .as_deref() - .map(|s| format!("%{}%", s.to_lowercase())); - let genre = filter.genre.as_deref(); - let language = filter.language.as_deref(); - - let rows: Vec = sqlx::query_as( - "SELECT \ - m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, \ - p.overview, p.runtime_minutes, p.original_language, p.collection_name, \ - GROUP_CONCAT(g.name) AS genres \ - FROM movies m \ - LEFT JOIN movie_profiles p ON p.movie_id = m.id \ - LEFT JOIN movie_genres g ON g.movie_id = m.id \ - WHERE (? IS NULL OR LOWER(m.title) LIKE ?) \ - AND (? IS NULL OR p.original_language = ?) \ - AND (? IS NULL OR m.id IN (SELECT movie_id FROM movie_genres WHERE LOWER(name) = LOWER(?))) \ - GROUP BY m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, \ - p.overview, p.runtime_minutes, p.original_language, p.collection_name \ - ORDER BY m.title ASC \ - LIMIT ? OFFSET ?", - ) - .bind(&pattern) - .bind(&pattern) - .bind(language) - .bind(language) - .bind(genre) - .bind(genre) - .bind(limit) - .bind(offset) - .fetch_all(&self.pool) - .await - .map_err(Self::map_err)?; - - let total: i64 = sqlx::query( - "SELECT COUNT(DISTINCT m.id) \ - FROM movies m \ - LEFT JOIN movie_profiles p ON p.movie_id = m.id \ - WHERE (? IS NULL OR LOWER(m.title) LIKE ?) \ - AND (? IS NULL OR p.original_language = ?) \ - AND (? IS NULL OR m.id IN (SELECT movie_id FROM movie_genres WHERE LOWER(name) = LOWER(?)))", - ) - .bind(&pattern) - .bind(&pattern) - .bind(language) - .bind(language) - .bind(genre) - .bind(genre) - .fetch_one(&self.pool) - .await - .map_err(Self::map_err)? - .try_get(0) - .unwrap_or(0); - - let items = rows - .into_iter() - .map(|r| r.into_domain()) - .collect::, _>>()?; - - Ok(domain::models::collections::Paginated { - items, - total_count: total as u64, - limit: page.limit, - offset: page.offset, - }) - } -} - -#[async_trait] -impl ReviewRepository for SqliteMovieRepository { - async fn save_review(&self, review: &Review) -> Result { - let id = review.id().value().to_string(); - let movie_id = review.movie_id().value().to_string(); - let user_id = review.user_id().value().to_string(); - let rating = review.rating().value() as i64; - let comment = review.comment().map(|c| c.value().to_string()); - let watched_at = datetime_to_str(review.watched_at()); - let created_at = datetime_to_str(review.created_at()); - let remote_actor_url = match review.source() { - ReviewSource::Local => None, - ReviewSource::Remote { actor_url } => Some(actor_url.clone()), - }; - - sqlx::query( - "INSERT INTO reviews (id, movie_id, user_id, rating, comment, watched_at, created_at, remote_actor_url) - VALUES (?, ?, ?, ?, ?, ?, ?, ?)", - ) - .bind(&id) - .bind(&movie_id) - .bind(&user_id) - .bind(rating) - .bind(&comment) - .bind(&watched_at) - .bind(&created_at) - .bind(&remote_actor_url) - .execute(&self.pool) - .await - .map_err(Self::map_err)?; - - Ok(DomainEvent::ReviewLogged { - review_id: review.id().clone(), - movie_id: review.movie_id().clone(), - user_id: review.user_id().clone(), - rating: review.rating().clone(), - watched_at: *review.watched_at(), - }) - } - - async fn get_review_by_id(&self, review_id: &ReviewId) -> Result, DomainError> { - let id = review_id.value().to_string(); - sqlx::query_as::<_, ReviewRow>( - "SELECT id, movie_id, user_id, rating, comment, watched_at, created_at, remote_actor_url - FROM reviews WHERE id = ?", - ) - .bind(&id) - .fetch_optional(&self.pool) - .await - .map_err(Self::map_err)? - .map(ReviewRow::into_domain) - .transpose() - } - - async fn delete_review(&self, review_id: &ReviewId) -> Result<(), DomainError> { - let id = review_id.value().to_string(); - sqlx::query("DELETE FROM reviews WHERE id = ?") - .bind(&id) - .execute(&self.pool) - .await - .map_err(Self::map_err)?; - Ok(()) - } - - async fn get_all_reviews_for_user(&self, user_id: &UserId) -> Result, DomainError> { - let uid = user_id.value().to_string(); - sqlx::query_as::<_, ReviewRow>( - "SELECT id, movie_id, user_id, rating, comment, watched_at, created_at, remote_actor_url - FROM reviews WHERE user_id = ? ORDER BY watched_at DESC", - ) - .bind(&uid) - .fetch_all(&self.pool) - .await - .map_err(Self::map_err)? - .into_iter() - .map(ReviewRow::into_domain) - .collect() - } -} - -#[async_trait] -impl DiaryRepository for SqliteMovieRepository { - async fn query_diary( - &self, - filter: &DiaryFilter, - ) -> Result, DomainError> { - let limit = filter.page.limit as i64; - let offset = filter.page.offset as i64; - - let (total, rows) = match (&filter.movie_id, &filter.user_id) { - (None, None) => tokio::try_join!( - self.count_diary_entries(None), - self.fetch_all_diary_rows(&filter.sort_by, limit, offset) - )?, - (Some(id), None) => { - let id_str = id.value().to_string(); - tokio::try_join!( - self.count_diary_entries(Some(id_str.as_str())), - self.fetch_movie_diary_rows(&id_str, &filter.sort_by, limit, offset) - )? - } - (None, Some(uid)) => { - let uid_str = uid.value().to_string(); - let search = filter.search.as_deref(); - tokio::try_join!( - self.count_user_diary_entries(&uid_str, search), - self.fetch_user_diary_rows(&uid_str, &filter.sort_by, search, limit, offset) - )? - } - (Some(_), Some(_)) => { - return Err(DomainError::ValidationError( - "Combined movie_id + user_id filter not supported".into(), - )); - } - }; - - let items = rows - .into_iter() - .map(DiaryRow::into_domain) - .collect::, _>>()?; - - Ok(Paginated { - items, - total_count: total as u64, - limit: filter.page.limit, - offset: filter.page.offset, - }) - } - - async fn query_activity_feed( - &self, - page: &PageParams, - ) -> Result, DomainError> { - self.query_activity_feed_filtered(page, &domain::ports::FeedSortBy::Date, None, None) - .await - } - - async fn query_activity_feed_filtered( - &self, - page: &PageParams, - sort_by: &domain::ports::FeedSortBy, - search: Option<&str>, - following: Option<&domain::ports::FollowingFilter>, - ) -> Result, DomainError> { - use domain::ports::FeedSortBy; - - let limit = page.limit as i64; - let offset = page.offset as i64; - let has_search = search.map(|s| !s.is_empty()).unwrap_or(false); - - let mut where_parts = vec!["1=1".to_string()]; - - if has_search { - where_parts.push("m.title LIKE '%' || ? || '%'".to_string()); - } - - if let Some(f) = following { - let local_in = if f.local_user_ids.is_empty() { - "SELECT NULL WHERE 0".to_string() - } else { - f.local_user_ids - .iter() - .map(|_| "?") - .collect::>() - .join(",") - }; - let remote_in = if f.remote_actor_urls.is_empty() { - "SELECT NULL WHERE 0".to_string() - } else { - f.remote_actor_urls - .iter() - .map(|_| "?") - .collect::>() - .join(",") - }; - where_parts.push(format!( - "(r.user_id IN ({}) OR r.remote_actor_url IN ({}))", - local_in, remote_in - )); - } - - let order_clause = match sort_by { - FeedSortBy::Date => "r.watched_at DESC", - FeedSortBy::DateAsc => "r.watched_at ASC", - FeedSortBy::Rating => "r.rating DESC, r.watched_at DESC", - FeedSortBy::RatingAsc => "r.rating ASC, r.watched_at ASC", - }; - - let where_clause = where_parts.join(" AND "); - - let count_sql = format!( - "SELECT COUNT(*) FROM reviews r - INNER JOIN movies m ON m.id = r.movie_id - WHERE {}", - where_clause - ); - - let select_sql = format!( - "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, - r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, - r.watched_at, r.created_at, r.remote_actor_url, - COALESCE(u.email, r.remote_actor_url) AS user_email - FROM reviews r - INNER JOIN movies m ON m.id = r.movie_id - LEFT JOIN users u ON u.id = r.user_id - WHERE {} - ORDER BY {} - LIMIT ? OFFSET ?", - where_clause, order_clause - ); - - macro_rules! bind_filter_params { - ($q:expr) => {{ - let mut q = $q; - if has_search { - q = q.bind(search.unwrap()); - } - if let Some(f) = following { - for uid in &f.local_user_ids { - q = q.bind(uid.to_string()); - } - for url in &f.remote_actor_urls { - q = q.bind(url.as_str()); - } - } - q - }}; - } - - let count_q = bind_filter_params!(sqlx::query_scalar::<_, i64>(&count_sql)); - let total = count_q.fetch_one(&self.pool).await.map_err(Self::map_err)?; - - let rows_q = bind_filter_params!(sqlx::query_as::<_, FeedRow>(&select_sql)); - let rows = rows_q - .bind(limit) - .bind(offset) - .fetch_all(&self.pool) - .await - .map_err(Self::map_err)?; - - let items = rows - .into_iter() - .map(FeedRow::into_domain) - .collect::, _>>()?; - - Ok(Paginated { - items, - total_count: total as u64, - limit: page.limit, - offset: page.offset, - }) - } - - async fn get_review_history(&self, movie_id: &MovieId) -> Result { - let id_str = movie_id.value().to_string(); - - let movie = sqlx::query_as::<_, MovieRow>( - "SELECT id, external_metadata_id, title, release_year, director, poster_path - FROM movies WHERE id = ?", - ) - .bind(&id_str) - .fetch_optional(&self.pool) - .await - .map_err(Self::map_err)? - .ok_or_else(|| DomainError::NotFound(format!("Movie {}", id_str)))? - .into_domain()?; - - let viewings = sqlx::query_as::<_, ReviewRow>( - "SELECT id, movie_id, user_id, rating, comment, watched_at, created_at, remote_actor_url - FROM reviews WHERE movie_id = ? ORDER BY watched_at ASC", - ) - .bind(&id_str) - .fetch_all(&self.pool) - .await - .map_err(Self::map_err)? - .into_iter() - .map(ReviewRow::into_domain) - .collect::, _>>()?; - - Ok(ReviewHistory::new(movie, viewings)) - } - - async fn get_user_history(&self, user_id: &UserId) -> Result, DomainError> { - let uid = user_id.value().to_string(); - let rows = sqlx::query_as::<_, DiaryRow>( - "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, - r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, r.remote_actor_url - FROM reviews r - INNER JOIN movies m ON m.id = r.movie_id - WHERE r.user_id = ? - ORDER BY r.watched_at DESC", - ) - .bind(&uid) - .fetch_all(&self.pool) - .await - .map_err(Self::map_err)?; - - rows.into_iter().map(DiaryRow::into_domain).collect() - } - - async fn get_movie_stats(&self, movie_id: &MovieId) -> Result { - let id_str = movie_id.value().to_string(); - sqlx::query_as::<_, MovieStatsRow>( - "SELECT - COUNT(*) AS total_count, - AVG(CAST(rating AS REAL)) AS avg_rating, - COUNT(CASE WHEN remote_actor_url IS NOT NULL THEN 1 END) AS federated_count, - COUNT(CASE WHEN rating = 1 THEN 1 END) AS rating_1, - COUNT(CASE WHEN rating = 2 THEN 1 END) AS rating_2, - COUNT(CASE WHEN rating = 3 THEN 1 END) AS rating_3, - COUNT(CASE WHEN rating = 4 THEN 1 END) AS rating_4, - COUNT(CASE WHEN rating = 5 THEN 1 END) AS rating_5 - FROM reviews WHERE movie_id = ?", - ) - .bind(id_str) - .fetch_one(&self.pool) - .await - .map_err(Self::map_err) - .map(MovieStatsRow::into_domain) - } - - async fn get_movie_social_feed( - &self, - movie_id: &MovieId, - page: &PageParams, - ) -> Result, DomainError> { - let id_str = movie_id.value().to_string(); - let limit = page.limit as i64; - let offset = page.offset as i64; - - let total: i64 = - sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM reviews WHERE movie_id = ?") - .bind(&id_str) - .fetch_one(&self.pool) - .await - .map_err(Self::map_err)?; - - let rows = sqlx::query_as::<_, FeedRow>( - "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, - r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, - r.watched_at, r.created_at, r.remote_actor_url, - CASE WHEN r.remote_actor_url IS NOT NULL THEN r.remote_actor_url - WHEN u.email IS NOT NULL THEN u.email - ELSE r.user_id END AS user_email - FROM reviews r - INNER JOIN movies m ON m.id = r.movie_id - LEFT JOIN users u ON u.id = r.user_id - WHERE r.movie_id = ? - ORDER BY r.watched_at DESC - LIMIT ? OFFSET ?", - ) - .bind(&id_str) - .bind(limit) - .bind(offset) - .fetch_all(&self.pool) - .await - .map_err(Self::map_err)?; - - let items = rows - .into_iter() - .map(FeedRow::into_domain) - .collect::, _>>()?; - - Ok(Paginated { - items, - total_count: total as u64, - limit: page.limit, - offset: page.offset, - }) - } - - async fn count_local_posts(&self) -> Result { - let count: i64 = - sqlx::query_scalar("SELECT COUNT(*) FROM reviews WHERE remote_actor_url IS NULL") - .fetch_one(&self.pool) - .await - .map_err(Self::map_err)?; - Ok(count as u64) - } -} - -#[async_trait] -impl StatsRepository for SqliteMovieRepository { - async fn get_user_stats(&self, user_id: &UserId) -> Result { - let uid = user_id.value().to_string(); - - let (totals, fav_director, most_active) = tokio::try_join!( - self.fetch_user_totals(&uid), - self.fetch_user_favorite_director(&uid), - self.fetch_user_most_active_month(&uid) - )?; - - let most_active_month = most_active.map(|ym| format_year_month(&ym)); - - Ok(UserStats { - total_movies: totals.total, - avg_rating: totals.avg_rating, - favorite_director: fav_director, - most_active_month, - }) - } - - async fn get_user_trends(&self, user_id: &UserId) -> Result { - let uid = user_id.value().to_string(); - - let (rating_rows, director_rows) = tokio::try_join!( - sqlx::query_as::<_, MonthlyRatingRow>( - "SELECT strftime('%Y-%m', watched_at) AS month, - AVG(CAST(rating AS REAL)) AS avg_rating, - COUNT(*) AS count - FROM reviews - WHERE user_id = ? AND watched_at >= datetime('now', '-12 months') - GROUP BY month - ORDER BY month ASC", - ) - .bind(&uid) - .fetch_all(&self.pool), - sqlx::query_as::<_, DirectorCountRow>( - "SELECT m.director, - COUNT(*) AS count - FROM reviews r - INNER JOIN movies m ON m.id = r.movie_id - WHERE r.user_id = ? AND m.director IS NOT NULL - GROUP BY m.director - ORDER BY COUNT(*) DESC - LIMIT 5", - ) - .bind(&uid) - .fetch_all(&self.pool) - ) - .map_err(Self::map_err)?; - - let max_director_count = director_rows.iter().map(|d| d.count).max().unwrap_or(1); - - let monthly_ratings = rating_rows - .into_iter() - .map(|r| MonthlyRating { - month_label: format_year_month(&r.month), - year_month: r.month, - avg_rating: r.avg_rating, - count: r.count, - }) - .collect(); - - let top_directors = director_rows - .into_iter() - .map(|d| DirectorStat { - director: d.director, - count: d.count, - }) - .collect(); - - Ok(UserTrends { - monthly_ratings, - top_directors, - max_director_count, - }) - } +pub async fn migrate(pool: &SqlitePool) -> Result<(), domain::errors::DomainError> { + migrations::run(pool).await } pub struct SqliteWireOutput { @@ -1009,18 +108,17 @@ pub async fn wire(database_url: &str) -> anyhow::Result { .await .context("Failed to connect to SQLite database")?; - let repo = std::sync::Arc::new(SqliteMovieRepository::new(pool.clone())); - repo.migrate() + migrate(&pool) .await .map_err(|e| anyhow::anyhow!("{e}")) .context("Database migration failed")?; Ok(SqliteWireOutput { pool: pool.clone(), - movie: std::sync::Arc::clone(&repo) as _, - review: std::sync::Arc::clone(&repo) as _, - diary: std::sync::Arc::clone(&repo) as _, - stats: std::sync::Arc::clone(&repo) as _, + movie: std::sync::Arc::new(SqliteMovieRepository::new(pool.clone())) as _, + review: std::sync::Arc::new(SqliteReviewRepository::new(pool.clone())) as _, + diary: std::sync::Arc::new(SqliteDiaryRepository::new(pool.clone())) as _, + stats: std::sync::Arc::new(SqliteStatsRepository::new(pool.clone())) as _, user: std::sync::Arc::new(SqliteUserRepository::new(pool.clone())) as _, import_session: std::sync::Arc::new(SqliteImportSessionRepository::new(pool.clone())) as _, import_profile: std::sync::Arc::new(SqliteImportProfileRepository::new(pool.clone())) as _, @@ -1036,258 +134,3 @@ pub async fn wire(database_url: &str) -> anyhow::Result { remote_goal: std::sync::Arc::new(remote_goals::SqliteRemoteGoalRepository::new(pool)) as _, }) } - -#[cfg(test)] -mod feed_filter_tests { - use super::*; - use domain::{ - models::collections::PageParams, - ports::{DiaryRepository, FeedSortBy, FollowingFilter}, - }; - use sqlx::SqlitePool; - - async fn setup(pool: &SqlitePool) { - sqlx::migrate!("./migrations").run(pool).await.unwrap(); - - // carol is a remote actor; we still need a non-null user_id for the schema, - // so we create a local "ghost" user and link the remote review via remote_actor_url. - sqlx::query( - "INSERT INTO users (id, email, username, password_hash, created_at) VALUES - ('11111111-1111-1111-1111-111111111111', 'alice@example.com', 'alice', 'hash', '2024-01-01 00:00:00'), - ('22222222-2222-2222-2222-222222222222', 'bob@example.com', 'bob', 'hash', '2024-01-01 00:00:00'), - ('33333333-3333-3333-3333-333333333333', 'carol@remote.social', 'carol', 'hash', '2024-01-01 00:00:00')", - ) - .execute(pool) - .await - .unwrap(); - - sqlx::query( - "INSERT INTO movies (id, title, release_year) VALUES - ('aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa', 'Inception', 2010), - ('bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb', 'Interstellar', 2014), - ('cccccccc-cccc-cccc-cccc-cccccccccccc', 'Dune', 2021)", - ) - .execute(pool) - .await - .unwrap(); - - // carol's review: local user_id=33333333, remote_actor_url set → remote review - sqlx::query( - "INSERT INTO reviews (id, movie_id, user_id, rating, watched_at, created_at, remote_actor_url) VALUES - ('a1a1a1a1-a1a1-a1a1-a1a1-a1a1a1a1a1a1', 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa', '11111111-1111-1111-1111-111111111111', 5, '2024-01-01 00:00:00', '2024-01-01 00:00:00', NULL), - ('b2b2b2b2-b2b2-b2b2-b2b2-b2b2b2b2b2b2', 'bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb', '22222222-2222-2222-2222-222222222222', 3, '2024-01-02 00:00:00', '2024-01-02 00:00:00', NULL), - ('c3c3c3c3-c3c3-c3c3-c3c3-c3c3c3c3c3c3', 'cccccccc-cccc-cccc-cccc-cccccccccccc', '33333333-3333-3333-3333-333333333333', 4, '2024-01-03 00:00:00', '2024-01-03 00:00:00', 'https://remote.social/users/carol')", - ) - .execute(pool) - .await - .unwrap(); - } - - #[tokio::test] - async fn test_sort_by_rating_descending() { - let pool = SqlitePool::connect(":memory:").await.unwrap(); - setup(&pool).await; - let repo = SqliteMovieRepository::new(pool); - - let page = PageParams::new(Some(10), Some(0)).unwrap(); - let result = repo - .query_activity_feed_filtered(&page, &FeedSortBy::Rating, None, None) - .await - .unwrap(); - - let ratings: Vec = result - .items - .iter() - .map(|e| e.review().rating().value()) - .collect(); - assert_eq!(ratings, vec![5, 4, 3]); - } - - #[tokio::test] - async fn test_search_by_title() { - let pool = SqlitePool::connect(":memory:").await.unwrap(); - setup(&pool).await; - let repo = SqliteMovieRepository::new(pool); - - let page = PageParams::new(Some(10), Some(0)).unwrap(); - let result = repo - .query_activity_feed_filtered(&page, &FeedSortBy::Date, Some("Dune"), None) - .await - .unwrap(); - - assert_eq!(result.items.len(), 1); - assert_eq!(result.items[0].movie().title().value(), "Dune"); - } - - #[tokio::test] - async fn test_following_filter() { - let pool = SqlitePool::connect(":memory:").await.unwrap(); - setup(&pool).await; - let repo = SqliteMovieRepository::new(pool); - - let filter = FollowingFilter { - local_user_ids: vec![ - uuid::Uuid::parse_str("11111111-1111-1111-1111-111111111111").unwrap(), - ], - remote_actor_urls: vec!["https://remote.social/users/carol".to_string()], - }; - let page = PageParams::new(Some(10), Some(0)).unwrap(); - let result = repo - .query_activity_feed_filtered(&page, &FeedSortBy::Date, None, Some(&filter)) - .await - .unwrap(); - - assert_eq!(result.items.len(), 2); // alice + carol, NOT bob - let titles: Vec = result - .items - .iter() - .map(|e| e.movie().title().value().to_string()) - .collect(); - assert!(titles.contains(&"Inception".to_string())); - assert!(titles.contains(&"Dune".to_string())); - } - - #[tokio::test] - async fn test_get_movie_stats_local() { - let pool = SqlitePool::connect(":memory:").await.unwrap(); - setup(&pool).await; - let repo = SqliteMovieRepository::new(pool); - - // Inception: 1 local review, rating=5, no federated - let movie_id = domain::value_objects::MovieId::from_uuid( - uuid::Uuid::parse_str("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa").unwrap(), - ); - let stats = repo.get_movie_stats(&movie_id).await.unwrap(); - - assert_eq!(stats.total_count, 1); - assert_eq!(stats.federated_count, 0); - assert!((stats.avg_rating.unwrap() - 5.0).abs() < 0.001); - assert_eq!(stats.rating_histogram[4], 1); // 5★ bucket - assert_eq!(stats.rating_histogram[0], 0); // 1★ bucket - } - - #[tokio::test] - async fn test_get_movie_social_feed_returns_reviews_for_movie() { - let pool = SqlitePool::connect(":memory:").await.unwrap(); - setup(&pool).await; - let repo = SqliteMovieRepository::new(pool); - - let movie_id = domain::value_objects::MovieId::from_uuid( - uuid::Uuid::parse_str("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa").unwrap(), - ); - let page = PageParams::new(Some(10), Some(0)).unwrap(); - let result = repo.get_movie_social_feed(&movie_id, &page).await.unwrap(); - - assert_eq!(result.total_count, 1); - assert_eq!(result.items.len(), 1); - assert_eq!(result.items[0].movie().title().value(), "Inception"); - assert_eq!(result.items[0].review().rating().value(), 5); - assert_eq!(result.items[0].user_display_name(), "alice"); - assert!(!result.items[0].review().is_remote()); - } - - #[tokio::test] - async fn test_get_movie_social_feed_federated_review() { - let pool = SqlitePool::connect(":memory:").await.unwrap(); - setup(&pool).await; - let repo = SqliteMovieRepository::new(pool); - - let movie_id = domain::value_objects::MovieId::from_uuid( - uuid::Uuid::parse_str("cccccccc-cccc-cccc-cccc-cccccccccccc").unwrap(), - ); - let page = PageParams::new(Some(10), Some(0)).unwrap(); - let result = repo.get_movie_social_feed(&movie_id, &page).await.unwrap(); - - assert_eq!(result.total_count, 1); - assert_eq!(result.items.len(), 1); - assert!(result.items[0].review().is_remote()); - assert_eq!( - result.items[0].user_email(), - "https://remote.social/users/carol" - ); - } - - #[tokio::test] - async fn test_get_movie_social_feed_pagination() { - let pool = SqlitePool::connect(":memory:").await.unwrap(); - setup(&pool).await; - let repo = SqliteMovieRepository::new(pool); - - let movie_id = domain::value_objects::MovieId::from_uuid( - uuid::Uuid::parse_str("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa").unwrap(), - ); - // offset beyond results: total_count still correct, items empty - let page = PageParams::new(Some(10), Some(5)).unwrap(); - let result = repo.get_movie_social_feed(&movie_id, &page).await.unwrap(); - - assert_eq!(result.total_count, 1); - assert_eq!(result.items.len(), 0); - } - - #[tokio::test] - async fn test_get_movie_stats_federated() { - let pool = SqlitePool::connect(":memory:").await.unwrap(); - setup(&pool).await; - let repo = SqliteMovieRepository::new(pool); - - // Dune: 1 federated review, rating=4 - let movie_id = domain::value_objects::MovieId::from_uuid( - uuid::Uuid::parse_str("cccccccc-cccc-cccc-cccc-cccccccccccc").unwrap(), - ); - let stats = repo.get_movie_stats(&movie_id).await.unwrap(); - - assert_eq!(stats.total_count, 1); - assert_eq!(stats.federated_count, 1); - assert_eq!(stats.rating_histogram[3], 1); // 4★ bucket - assert_eq!(stats.rating_histogram[4], 0); // 5★ bucket - } -} - -#[cfg(test)] -mod diary_count_tests { - use super::*; - use sqlx::SqlitePool; - - async fn test_pool() -> SqlitePool { - let pool = SqlitePool::connect("sqlite::memory:").await.unwrap(); - sqlx::migrate!("./migrations").run(&pool).await.unwrap(); - pool - } - - #[tokio::test] - async fn count_local_posts_excludes_remote_reviews() { - use domain::ports::DiaryRepository; - let pool = test_pool().await; - let repo = SqliteMovieRepository::new(pool.clone()); - - let user_id = uuid::Uuid::new_v4().to_string(); - let movie_id = uuid::Uuid::new_v4().to_string(); - sqlx::query("INSERT INTO users (id, email, password_hash, created_at, username) VALUES (?, ?, ?, ?, ?)") - .bind(&user_id).bind("a@b.com").bind("hash").bind("2024-01-01 00:00:00").bind("alice") - .execute(&pool).await.unwrap(); - sqlx::query("INSERT INTO movies (id, title, release_year) VALUES (?, ?, ?)") - .bind(&movie_id) - .bind("Test Movie") - .bind(2024i32) - .execute(&pool) - .await - .unwrap(); - - // Local review (remote_actor_url IS NULL) - let r1 = uuid::Uuid::new_v4().to_string(); - sqlx::query("INSERT INTO reviews (id, movie_id, user_id, rating, watched_at, created_at) VALUES (?, ?, ?, ?, ?, ?)") - .bind(&r1).bind(&movie_id).bind(&user_id).bind(4i32) - .bind("2024-01-01 00:00:00").bind("2024-01-01 00:00:00") - .execute(&pool).await.unwrap(); - - // Remote review (remote_actor_url IS NOT NULL) - let r2 = uuid::Uuid::new_v4().to_string(); - sqlx::query("INSERT INTO reviews (id, movie_id, user_id, rating, watched_at, created_at, remote_actor_url) VALUES (?, ?, ?, ?, ?, ?, ?)") - .bind(&r2).bind(&movie_id).bind(&user_id).bind(3i32) - .bind("2024-01-01 00:00:00").bind("2024-01-01 00:00:00").bind("https://remote/user") - .execute(&pool).await.unwrap(); - - let count = repo.count_local_posts().await.unwrap(); - assert_eq!(count, 1); - } -} diff --git a/crates/adapters/sqlite/src/movie.rs b/crates/adapters/sqlite/src/movie.rs new file mode 100644 index 0000000..742b6b6 --- /dev/null +++ b/crates/adapters/sqlite/src/movie.rs @@ -0,0 +1,249 @@ +use async_trait::async_trait; +use domain::{ + errors::DomainError, + models::{Movie, MovieFilter, MovieSummary, collections::{PageParams, Paginated}}, + ports::MovieRepository, + value_objects::{ExternalMetadataId, MovieId, MovieTitle, ReleaseYear}, +}; +use sqlx::SqlitePool; + +use crate::models::{MovieRow, MovieSummaryRow}; + +pub struct SqliteMovieRepository { + pool: SqlitePool, +} + +impl SqliteMovieRepository { + pub fn new(pool: SqlitePool) -> Self { + Self { pool } + } + + fn map_err(e: sqlx::Error) -> DomainError { + tracing::error!("Database error: {:?}", e); + DomainError::InfrastructureError("Database operation failed".into()) + } +} + +#[async_trait] +impl MovieRepository for SqliteMovieRepository { + async fn get_movie_by_external_id( + &self, + external_metadata_id: &ExternalMetadataId, + ) -> Result, DomainError> { + let id = external_metadata_id.value(); + sqlx::query_as::<_, MovieRow>( + "SELECT id, external_metadata_id, title, release_year, director, poster_path + FROM movies WHERE external_metadata_id = ?", + ) + .bind(id) + .fetch_optional(&self.pool) + .await + .map_err(Self::map_err)? + .map(MovieRow::into_domain) + .transpose() + } + + async fn get_movie_by_id(&self, movie_id: &MovieId) -> Result, DomainError> { + let id = movie_id.value().to_string(); + sqlx::query_as::<_, MovieRow>( + "SELECT id, external_metadata_id, title, release_year, director, poster_path + FROM movies WHERE id = ?", + ) + .bind(&id) + .fetch_optional(&self.pool) + .await + .map_err(Self::map_err)? + .map(MovieRow::into_domain) + .transpose() + } + + async fn get_movies_by_title_and_year( + &self, + title: &MovieTitle, + year: &ReleaseYear, + ) -> Result, DomainError> { + let t = title.value(); + let y = year.value() as i64; + sqlx::query_as::<_, MovieRow>( + "SELECT id, external_metadata_id, title, release_year, director, poster_path + FROM movies WHERE title = ? AND release_year = ?", + ) + .bind(t) + .bind(y) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err)? + .into_iter() + .map(MovieRow::into_domain) + .collect() + } + + async fn upsert_movie(&self, movie: &Movie) -> Result<(), DomainError> { + let id = movie.id().value().to_string(); + let external_metadata_id = movie.external_metadata_id().map(|e| e.value().to_string()); + let title = movie.title().value(); + let release_year = movie.release_year().value() as i64; + let director = movie.director(); + let poster_path = movie.poster_path().map(|p| p.value().to_string()); + + sqlx::query( + "INSERT INTO movies (id, external_metadata_id, title, release_year, director, poster_path) + VALUES (?, ?, ?, ?, ?, ?) + ON CONFLICT(id) DO UPDATE SET + external_metadata_id = excluded.external_metadata_id, + title = excluded.title, + release_year = excluded.release_year, + director = excluded.director, + poster_path = excluded.poster_path", + ) + .bind(&id) + .bind(&external_metadata_id) + .bind(title) + .bind(release_year) + .bind(director) + .bind(&poster_path) + .execute(&self.pool) + .await + .map_err(Self::map_err)?; + + Ok(()) + } + + async fn delete_movie(&self, movie_id: &MovieId) -> Result<(), DomainError> { + let id = movie_id.value().to_string(); + sqlx::query("DELETE FROM movies WHERE id = ?") + .bind(&id) + .execute(&self.pool) + .await + .map_err(Self::map_err)?; + Ok(()) + } + + async fn existing_external_ids( + &self, + ids: &[ExternalMetadataId], + ) -> Result, DomainError> { + if ids.is_empty() { + return Ok(Default::default()); + } + let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect(); + let sql = format!( + "SELECT external_metadata_id FROM movies WHERE external_metadata_id IN ({})", + placeholders.join(",") + ); + let mut q = sqlx::query_scalar::<_, String>(&sql); + for id in ids { + q = q.bind(id.value().to_string()); + } + let rows = q.fetch_all(&self.pool).await.map_err(Self::map_err)?; + Ok(rows.into_iter().collect()) + } + + async fn existing_title_year_pairs( + &self, + pairs: &[(MovieTitle, ReleaseYear)], + ) -> Result, DomainError> { + if pairs.is_empty() { + return Ok(Default::default()); + } + let conditions: Vec = pairs + .iter() + .map(|_| "(title = ? AND release_year = ?)".to_string()) + .collect(); + let sql = format!( + "SELECT DISTINCT title, release_year FROM movies WHERE {}", + conditions.join(" OR ") + ); + use sqlx::Row; + let mut q = sqlx::query(&sql); + for (t, y) in pairs { + q = q.bind(t.value().to_string()).bind(y.value() as i64); + } + let rows = q.fetch_all(&self.pool).await.map_err(Self::map_err)?; + Ok(rows + .into_iter() + .map(|r| { + let t: String = r.get("title"); + let y: i64 = r.get("release_year"); + (t, y as u16) + }) + .collect()) + } + + async fn list_movies( + &self, + page: &PageParams, + filter: &MovieFilter, + ) -> Result, DomainError> + { + use sqlx::Row; + let limit = page.limit as i64; + let offset = page.offset as i64; + let pattern = filter + .search + .as_deref() + .map(|s| format!("%{}%", s.to_lowercase())); + let genre = filter.genre.as_deref(); + let language = filter.language.as_deref(); + + let rows: Vec = sqlx::query_as( + "SELECT \ + m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, \ + p.overview, p.runtime_minutes, p.original_language, p.collection_name, \ + GROUP_CONCAT(g.name) AS genres \ + FROM movies m \ + LEFT JOIN movie_profiles p ON p.movie_id = m.id \ + LEFT JOIN movie_genres g ON g.movie_id = m.id \ + WHERE (? IS NULL OR LOWER(m.title) LIKE ?) \ + AND (? IS NULL OR p.original_language = ?) \ + AND (? IS NULL OR m.id IN (SELECT movie_id FROM movie_genres WHERE LOWER(name) = LOWER(?))) \ + GROUP BY m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, \ + p.overview, p.runtime_minutes, p.original_language, p.collection_name \ + ORDER BY m.title ASC \ + LIMIT ? OFFSET ?", + ) + .bind(&pattern) + .bind(&pattern) + .bind(language) + .bind(language) + .bind(genre) + .bind(genre) + .bind(limit) + .bind(offset) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err)?; + + let total: i64 = sqlx::query( + "SELECT COUNT(DISTINCT m.id) \ + FROM movies m \ + LEFT JOIN movie_profiles p ON p.movie_id = m.id \ + WHERE (? IS NULL OR LOWER(m.title) LIKE ?) \ + AND (? IS NULL OR p.original_language = ?) \ + AND (? IS NULL OR m.id IN (SELECT movie_id FROM movie_genres WHERE LOWER(name) = LOWER(?)))", + ) + .bind(&pattern) + .bind(&pattern) + .bind(language) + .bind(language) + .bind(genre) + .bind(genre) + .fetch_one(&self.pool) + .await + .map_err(Self::map_err)? + .try_get(0) + .unwrap_or(0); + + let items = rows + .into_iter() + .map(|r| r.into_domain()) + .collect::, _>>()?; + + Ok(Paginated { + items, + total_count: total as u64, + limit: page.limit, + offset: page.offset, + }) + } +} diff --git a/crates/adapters/sqlite/src/review.rs b/crates/adapters/sqlite/src/review.rs new file mode 100644 index 0000000..d29bbd4 --- /dev/null +++ b/crates/adapters/sqlite/src/review.rs @@ -0,0 +1,106 @@ +use async_trait::async_trait; +use domain::{ + errors::DomainError, + events::DomainEvent, + models::{Review, ReviewSource}, + ports::ReviewRepository, + value_objects::{ReviewId, UserId}, +}; +use sqlx::SqlitePool; + +use crate::models::{ReviewRow, datetime_to_str}; + +pub struct SqliteReviewRepository { + pool: SqlitePool, +} + +impl SqliteReviewRepository { + pub fn new(pool: SqlitePool) -> Self { + Self { pool } + } + + fn map_err(e: sqlx::Error) -> DomainError { + tracing::error!("Database error: {:?}", e); + DomainError::InfrastructureError("Database operation failed".into()) + } +} + +#[async_trait] +impl ReviewRepository for SqliteReviewRepository { + async fn save_review(&self, review: &Review) -> Result { + let id = review.id().value().to_string(); + let movie_id = review.movie_id().value().to_string(); + let user_id = review.user_id().value().to_string(); + let rating = review.rating().value() as i64; + let comment = review.comment().map(|c| c.value().to_string()); + let watched_at = datetime_to_str(review.watched_at()); + let created_at = datetime_to_str(review.created_at()); + let remote_actor_url = match review.source() { + ReviewSource::Local => None, + ReviewSource::Remote { actor_url } => Some(actor_url.clone()), + }; + + sqlx::query( + "INSERT INTO reviews (id, movie_id, user_id, rating, comment, watched_at, created_at, remote_actor_url) + VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + ) + .bind(&id) + .bind(&movie_id) + .bind(&user_id) + .bind(rating) + .bind(&comment) + .bind(&watched_at) + .bind(&created_at) + .bind(&remote_actor_url) + .execute(&self.pool) + .await + .map_err(Self::map_err)?; + + Ok(DomainEvent::ReviewLogged { + review_id: review.id().clone(), + movie_id: review.movie_id().clone(), + user_id: review.user_id().clone(), + rating: review.rating().clone(), + watched_at: *review.watched_at(), + }) + } + + async fn get_review_by_id(&self, review_id: &ReviewId) -> Result, DomainError> { + let id = review_id.value().to_string(); + sqlx::query_as::<_, ReviewRow>( + "SELECT id, movie_id, user_id, rating, comment, watched_at, created_at, remote_actor_url + FROM reviews WHERE id = ?", + ) + .bind(&id) + .fetch_optional(&self.pool) + .await + .map_err(Self::map_err)? + .map(ReviewRow::into_domain) + .transpose() + } + + async fn delete_review(&self, review_id: &ReviewId) -> Result<(), DomainError> { + let id = review_id.value().to_string(); + sqlx::query("DELETE FROM reviews WHERE id = ?") + .bind(&id) + .execute(&self.pool) + .await + .map_err(Self::map_err)?; + Ok(()) + } + + async fn get_all_reviews_for_user(&self, user_id: &UserId) -> Result, DomainError> { + let uid = user_id.value().to_string(); + sqlx::query_as::<_, ReviewRow>( + "SELECT id, movie_id, user_id, rating, comment, watched_at, created_at, remote_actor_url + FROM reviews WHERE user_id = ? ORDER BY watched_at DESC", + ) + .bind(&uid) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err)? + .into_iter() + .map(ReviewRow::into_domain) + .collect() + } +} diff --git a/crates/adapters/sqlite/src/stats.rs b/crates/adapters/sqlite/src/stats.rs new file mode 100644 index 0000000..0c1b97f --- /dev/null +++ b/crates/adapters/sqlite/src/stats.rs @@ -0,0 +1,155 @@ +use async_trait::async_trait; +use domain::{ + errors::DomainError, + models::{DirectorStat, MonthlyRating, UserStats, UserTrends}, + ports::StatsRepository, + value_objects::UserId, +}; +use sqlx::SqlitePool; + +use crate::models::{DirectorCountRow, MonthlyRatingRow, UserTotalsRow}; + +pub struct SqliteStatsRepository { + pool: SqlitePool, +} + +impl SqliteStatsRepository { + pub fn new(pool: SqlitePool) -> Self { + Self { pool } + } + + fn map_err(e: sqlx::Error) -> DomainError { + tracing::error!("Database error: {:?}", e); + DomainError::InfrastructureError("Database operation failed".into()) + } + + async fn fetch_user_totals(&self, user_id: &str) -> Result { + sqlx::query_as::<_, UserTotalsRow>( + "SELECT COUNT(DISTINCT movie_id) AS total, + AVG(CAST(rating AS REAL)) AS avg_rating + FROM reviews WHERE user_id = ?", + ) + .bind(user_id) + .fetch_one(&self.pool) + .await + .map_err(Self::map_err) + } + + async fn fetch_user_favorite_director( + &self, + user_id: &str, + ) -> Result, DomainError> { + let row: Option = sqlx::query_scalar( + "SELECT m.director + FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + WHERE r.user_id = ? AND m.director IS NOT NULL + GROUP BY m.director + ORDER BY COUNT(*) DESC + LIMIT 1", + ) + .bind(user_id) + .fetch_optional(&self.pool) + .await + .map_err(Self::map_err)?; + Ok(row) + } + + async fn fetch_user_most_active_month( + &self, + user_id: &str, + ) -> Result, DomainError> { + let row: Option = sqlx::query_scalar( + "SELECT strftime('%Y-%m', watched_at) + FROM reviews + WHERE user_id = ? + GROUP BY strftime('%Y-%m', watched_at) + ORDER BY COUNT(*) DESC + LIMIT 1", + ) + .bind(user_id) + .fetch_optional(&self.pool) + .await + .map_err(Self::map_err)?; + Ok(row) + } +} + +#[async_trait] +impl StatsRepository for SqliteStatsRepository { + async fn get_user_stats(&self, user_id: &UserId) -> Result { + let uid = user_id.value().to_string(); + + let (totals, fav_director, most_active) = tokio::try_join!( + self.fetch_user_totals(&uid), + self.fetch_user_favorite_director(&uid), + self.fetch_user_most_active_month(&uid) + )?; + + let most_active_month = most_active.map(|ym| crate::format_year_month(&ym)); + + Ok(UserStats { + total_movies: totals.total, + avg_rating: totals.avg_rating, + favorite_director: fav_director, + most_active_month, + }) + } + + async fn get_user_trends(&self, user_id: &UserId) -> Result { + let uid = user_id.value().to_string(); + + let (rating_rows, director_rows) = tokio::try_join!( + sqlx::query_as::<_, MonthlyRatingRow>( + "SELECT strftime('%Y-%m', watched_at) AS month, + AVG(CAST(rating AS REAL)) AS avg_rating, + COUNT(*) AS count + FROM reviews + WHERE user_id = ? AND watched_at >= datetime('now', '-12 months') + GROUP BY month + ORDER BY month ASC", + ) + .bind(&uid) + .fetch_all(&self.pool), + sqlx::query_as::<_, DirectorCountRow>( + "SELECT m.director, + COUNT(*) AS count + FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + WHERE r.user_id = ? AND m.director IS NOT NULL + GROUP BY m.director + ORDER BY COUNT(*) DESC + LIMIT 5", + ) + .bind(&uid) + .fetch_all(&self.pool) + ) + .map_err(Self::map_err)?; + + let max_director_count = director_rows.iter().map(|d| d.count).max().unwrap_or(1); + + let monthly_ratings = rating_rows + .into_iter() + .map(|r| MonthlyRating { + month_label: crate::format_year_month(&r.month), + year_month: r.month, + avg_rating: r.avg_rating, + count: r.count, + }) + .collect(); + + let top_directors = director_rows + .into_iter() + .map(|d| DirectorStat { + director: d.director, + count: d.count, + }) + .collect(); + + Ok(UserTrends { + monthly_ratings, + top_directors, + max_director_count, + }) + } +} diff --git a/crates/adapters/sqlite/src/tests/diary.rs b/crates/adapters/sqlite/src/tests/diary.rs new file mode 100644 index 0000000..a4f2b89 --- /dev/null +++ b/crates/adapters/sqlite/src/tests/diary.rs @@ -0,0 +1,213 @@ +use super::*; +use domain::{ + models::collections::PageParams, + ports::{DiaryRepository, FeedSortBy, FollowingFilter}, +}; +use sqlx::SqlitePool; + +async fn setup(pool: &SqlitePool) { + sqlx::migrate!("./migrations").run(pool).await.unwrap(); + + // carol is a remote actor; we still need a non-null user_id for the schema, + // so we create a local "ghost" user and link the remote review via remote_actor_url. + sqlx::query( + "INSERT INTO users (id, email, username, password_hash, created_at) VALUES + ('11111111-1111-1111-1111-111111111111', 'alice@example.com', 'alice', 'hash', '2024-01-01 00:00:00'), + ('22222222-2222-2222-2222-222222222222', 'bob@example.com', 'bob', 'hash', '2024-01-01 00:00:00'), + ('33333333-3333-3333-3333-333333333333', 'carol@remote.social', 'carol', 'hash', '2024-01-01 00:00:00')", + ) + .execute(pool) + .await + .unwrap(); + + sqlx::query( + "INSERT INTO movies (id, title, release_year) VALUES + ('aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa', 'Inception', 2010), + ('bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb', 'Interstellar', 2014), + ('cccccccc-cccc-cccc-cccc-cccccccccccc', 'Dune', 2021)", + ) + .execute(pool) + .await + .unwrap(); + + // carol's review: local user_id=33333333, remote_actor_url set → remote review + sqlx::query( + "INSERT INTO reviews (id, movie_id, user_id, rating, watched_at, created_at, remote_actor_url) VALUES + ('a1a1a1a1-a1a1-a1a1-a1a1-a1a1a1a1a1a1', 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa', '11111111-1111-1111-1111-111111111111', 5, '2024-01-01 00:00:00', '2024-01-01 00:00:00', NULL), + ('b2b2b2b2-b2b2-b2b2-b2b2-b2b2b2b2b2b2', 'bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb', '22222222-2222-2222-2222-222222222222', 3, '2024-01-02 00:00:00', '2024-01-02 00:00:00', NULL), + ('c3c3c3c3-c3c3-c3c3-c3c3-c3c3c3c3c3c3', 'cccccccc-cccc-cccc-cccc-cccccccccccc', '33333333-3333-3333-3333-333333333333', 4, '2024-01-03 00:00:00', '2024-01-03 00:00:00', 'https://remote.social/users/carol')", + ) + .execute(pool) + .await + .unwrap(); +} + +#[tokio::test] +async fn test_sort_by_rating_descending() { + let pool = SqlitePool::connect(":memory:").await.unwrap(); + setup(&pool).await; + let repo = SqliteDiaryRepository::new(pool); + + let page = PageParams::new(Some(10), Some(0)).unwrap(); + let result = repo + .query_activity_feed_filtered(&page, &FeedSortBy::Rating, None, None) + .await + .unwrap(); + + let ratings: Vec = result + .items + .iter() + .map(|e| e.review().rating().value()) + .collect(); + assert_eq!(ratings, vec![5, 4, 3]); +} + +#[tokio::test] +async fn test_search_by_title() { + let pool = SqlitePool::connect(":memory:").await.unwrap(); + setup(&pool).await; + let repo = SqliteDiaryRepository::new(pool); + + let page = PageParams::new(Some(10), Some(0)).unwrap(); + let result = repo + .query_activity_feed_filtered(&page, &FeedSortBy::Date, Some("Dune"), None) + .await + .unwrap(); + + assert_eq!(result.items.len(), 1); + assert_eq!(result.items[0].movie().title().value(), "Dune"); +} + +#[tokio::test] +async fn test_following_filter() { + let pool = SqlitePool::connect(":memory:").await.unwrap(); + setup(&pool).await; + let repo = SqliteDiaryRepository::new(pool); + + let filter = FollowingFilter { + local_user_ids: vec![ + uuid::Uuid::parse_str("11111111-1111-1111-1111-111111111111").unwrap(), + ], + remote_actor_urls: vec!["https://remote.social/users/carol".to_string()], + }; + let page = PageParams::new(Some(10), Some(0)).unwrap(); + let result = repo + .query_activity_feed_filtered(&page, &FeedSortBy::Date, None, Some(&filter)) + .await + .unwrap(); + + assert_eq!(result.items.len(), 2); // alice + carol, NOT bob + let titles: Vec = result + .items + .iter() + .map(|e| e.movie().title().value().to_string()) + .collect(); + assert!(titles.contains(&"Inception".to_string())); + assert!(titles.contains(&"Dune".to_string())); +} + +#[tokio::test] +async fn test_get_movie_stats_local() { + let pool = SqlitePool::connect(":memory:").await.unwrap(); + setup(&pool).await; + let repo = SqliteDiaryRepository::new(pool); + + // Inception: 1 local review, rating=5, no federated + let movie_id = domain::value_objects::MovieId::from_uuid( + uuid::Uuid::parse_str("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa").unwrap(), + ); + let stats = repo.get_movie_stats(&movie_id).await.unwrap(); + + assert_eq!(stats.total_count, 1); + assert_eq!(stats.federated_count, 0); + assert!((stats.avg_rating.unwrap() - 5.0).abs() < 0.001); + assert_eq!(stats.rating_histogram[4], 1); // 5★ bucket + assert_eq!(stats.rating_histogram[0], 0); // 1★ bucket +} + +#[tokio::test] +async fn test_get_movie_social_feed_returns_reviews_for_movie() { + let pool = SqlitePool::connect(":memory:").await.unwrap(); + setup(&pool).await; + let repo = SqliteDiaryRepository::new(pool); + + let movie_id = domain::value_objects::MovieId::from_uuid( + uuid::Uuid::parse_str("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa").unwrap(), + ); + let page = PageParams::new(Some(10), Some(0)).unwrap(); + let result = repo.get_movie_social_feed(&movie_id, &page).await.unwrap(); + + assert_eq!(result.total_count, 1); + assert_eq!(result.items.len(), 1); + assert_eq!(result.items[0].movie().title().value(), "Inception"); + assert_eq!(result.items[0].review().rating().value(), 5); + assert_eq!(result.items[0].user_display_name(), "alice"); + assert!(!result.items[0].review().is_remote()); +} + +#[tokio::test] +async fn test_get_movie_social_feed_federated_review() { + let pool = SqlitePool::connect(":memory:").await.unwrap(); + setup(&pool).await; + let repo = SqliteDiaryRepository::new(pool); + + let movie_id = domain::value_objects::MovieId::from_uuid( + uuid::Uuid::parse_str("cccccccc-cccc-cccc-cccc-cccccccccccc").unwrap(), + ); + let page = PageParams::new(Some(10), Some(0)).unwrap(); + let result = repo.get_movie_social_feed(&movie_id, &page).await.unwrap(); + + assert_eq!(result.total_count, 1); + assert_eq!(result.items.len(), 1); + assert!(result.items[0].review().is_remote()); + assert_eq!( + result.items[0].user_email(), + "https://remote.social/users/carol" + ); +} + +#[tokio::test] +async fn test_get_movie_social_feed_pagination() { + let pool = SqlitePool::connect(":memory:").await.unwrap(); + setup(&pool).await; + let repo = SqliteDiaryRepository::new(pool); + + let movie_id = domain::value_objects::MovieId::from_uuid( + uuid::Uuid::parse_str("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa").unwrap(), + ); + // offset beyond results: total_count still correct, items empty + let page = PageParams::new(Some(10), Some(5)).unwrap(); + let result = repo.get_movie_social_feed(&movie_id, &page).await.unwrap(); + + assert_eq!(result.total_count, 1); + assert_eq!(result.items.len(), 0); +} + +#[tokio::test] +async fn test_get_movie_stats_federated() { + let pool = SqlitePool::connect(":memory:").await.unwrap(); + setup(&pool).await; + let repo = SqliteDiaryRepository::new(pool); + + // Dune: 1 federated review, rating=4 + let movie_id = domain::value_objects::MovieId::from_uuid( + uuid::Uuid::parse_str("cccccccc-cccc-cccc-cccc-cccccccccccc").unwrap(), + ); + let stats = repo.get_movie_stats(&movie_id).await.unwrap(); + + assert_eq!(stats.total_count, 1); + assert_eq!(stats.federated_count, 1); + assert_eq!(stats.rating_histogram[3], 1); // 4★ bucket + assert_eq!(stats.rating_histogram[4], 0); // 5★ bucket +} + +#[tokio::test] +async fn count_local_posts_excludes_remote_reviews() { + let pool = SqlitePool::connect(":memory:").await.unwrap(); + setup(&pool).await; + let repo = SqliteDiaryRepository::new(pool); + + // setup() seeds 3 reviews: 2 local (alice, bob) + 1 remote (carol) + let count = repo.count_local_posts().await.unwrap(); + assert_eq!(count, 2); +}