use async_trait::async_trait; use domain::{ errors::DomainError, events::DomainEvent, models::{ DiaryEntry, DiaryFilter, DirectorStat, FeedEntry, MonthlyRating, Movie, MovieStats, Review, ReviewHistory, ReviewSource, SortDirection, UserStats, UserTrends, collections::{PageParams, Paginated}, }, ports::{DiaryRepository, MovieRepository, ReviewRepository, StatsRepository}, value_objects::{ExternalMetadataId, MovieId, MovieTitle, ReleaseYear, ReviewId, UserId}, }; use sqlx::SqlitePool; mod ap_content; mod image_ref; mod import_profile; mod import_session; mod migrations; mod models; mod persons; mod profile; mod profile_fields; mod users; mod watch_event; mod watchlist; mod wrapup; use models::{ DiaryRow, DirectorCountRow, FeedRow, MonthlyRatingRow, MovieRow, MovieStatsRow, MovieSummaryRow, ReviewRow, UserTotalsRow, datetime_to_str, }; pub use ap_content::SqliteApContentQuery; pub use image_ref::{SqliteImageRefAdapter, create_image_ref}; pub use import_profile::SqliteImportProfileRepository; pub use import_session::SqliteImportSessionRepository; pub use persons::{SqlitePersonAdapter, create_person_adapter}; pub use profile::SqliteMovieProfileRepository; pub use profile_fields::SqliteProfileFieldsRepository; pub use users::SqliteUserRepository; pub use watch_event::{SqliteWatchEventRepository, SqliteWebhookTokenRepository}; pub use watchlist::SqliteWatchlistRepository; pub use wrapup::{SqliteWrapUpRepository, SqliteWrapUpStatsQuery}; pub fn create_profile_fields_repo( pool: sqlx::SqlitePool, ) -> std::sync::Arc { std::sync::Arc::new(SqliteProfileFieldsRepository::new(pool)) } fn format_year_month(ym: &str) -> String { let parts: Vec<&str> = ym.splitn(2, '-').collect(); if parts.len() != 2 { return ym.to_string(); } let year = parts[0].get(2..).unwrap_or(parts[0]); let month = match parts[1] { "01" => "Jan", "02" => "Feb", "03" => "Mar", "04" => "Apr", "05" => "May", "06" => "Jun", "07" => "Jul", "08" => "Aug", "09" => "Sep", "10" => "Oct", "11" => "Nov", "12" => "Dec", _ => parts[1], }; format!("{} '{}", month, year) } pub struct SqliteMovieRepository { pool: SqlitePool, } impl SqliteMovieRepository { pub fn new(pool: SqlitePool) -> Self { Self { pool } } pub async fn migrate(&self) -> Result<(), DomainError> { migrations::run(&self.pool).await } fn map_err(e: sqlx::Error) -> DomainError { tracing::error!("Database error: {:?}", e); DomainError::InfrastructureError("Database operation failed".into()) } async fn count_diary_entries(&self, movie_id: Option<&str>) -> Result { match movie_id { None => sqlx::query_scalar!("SELECT COUNT(*) FROM reviews") .fetch_one(&self.pool) .await .map_err(Self::map_err), Some(id) => sqlx::query_scalar!("SELECT COUNT(*) FROM reviews WHERE movie_id = ?", id) .fetch_one(&self.pool) .await .map_err(Self::map_err), } } async fn fetch_all_diary_rows( &self, sort: &SortDirection, limit: i64, offset: i64, ) -> Result, DomainError> { let order_clause = match sort { SortDirection::ByRatingDesc => "r.rating DESC, r.watched_at DESC", SortDirection::ByRatingAsc => "r.rating ASC, r.watched_at ASC", SortDirection::Ascending => "r.watched_at ASC", SortDirection::Descending => "r.watched_at DESC", }; let sql = format!( "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, r.remote_actor_url FROM reviews r INNER JOIN movies m ON m.id = r.movie_id ORDER BY {} LIMIT ? OFFSET ?", order_clause ); sqlx::query_as::<_, DiaryRow>(&sql) .bind(limit) .bind(offset) .fetch_all(&self.pool) .await .map_err(Self::map_err) } async fn fetch_movie_diary_rows( &self, movie_id: &str, sort: &SortDirection, limit: i64, offset: i64, ) -> Result, DomainError> { let order_clause = match sort { SortDirection::ByRatingDesc => "r.rating DESC, r.watched_at DESC", SortDirection::ByRatingAsc => "r.rating ASC, r.watched_at ASC", SortDirection::Ascending => "r.watched_at ASC", SortDirection::Descending => "r.watched_at DESC", }; let sql = format!( "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, r.remote_actor_url FROM reviews r INNER JOIN movies m ON m.id = r.movie_id WHERE r.movie_id = ? ORDER BY {} LIMIT ? OFFSET ?", order_clause ); sqlx::query_as::<_, DiaryRow>(&sql) .bind(movie_id) .bind(limit) .bind(offset) .fetch_all(&self.pool) .await .map_err(Self::map_err) } async fn count_user_diary_entries( &self, user_id: &str, search: Option<&str>, ) -> Result { let has_search = search.map(|s| !s.is_empty()).unwrap_or(false); let sql = if has_search { "SELECT COUNT(*) FROM reviews r INNER JOIN movies m ON m.id = r.movie_id WHERE r.user_id = ? AND m.title LIKE '%' || ? || '%'" .to_string() } else { "SELECT COUNT(*) FROM reviews r INNER JOIN movies m ON m.id = r.movie_id WHERE r.user_id = ?" .to_string() }; let mut q = sqlx::query_scalar::<_, i64>(&sql).bind(user_id); if has_search { q = q.bind(search.unwrap()); } q.fetch_one(&self.pool).await.map_err(Self::map_err) } async fn fetch_user_diary_rows( &self, user_id: &str, sort: &SortDirection, search: Option<&str>, limit: i64, offset: i64, ) -> Result, DomainError> { let has_search = search.map(|s| !s.is_empty()).unwrap_or(false); let search_clause = if has_search { " AND m.title LIKE '%' || ? || '%'" } else { "" }; let order_clause = match sort { SortDirection::ByRatingDesc => "r.rating DESC, r.watched_at DESC", SortDirection::ByRatingAsc => "r.rating ASC, r.watched_at ASC", SortDirection::Ascending => "r.watched_at ASC", SortDirection::Descending => "r.watched_at DESC", }; let sql = format!( "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, r.remote_actor_url FROM reviews r INNER JOIN movies m ON m.id = r.movie_id WHERE r.user_id = ?{} ORDER BY {} LIMIT ? OFFSET ?", search_clause, order_clause ); let mut q = sqlx::query_as::<_, DiaryRow>(&sql).bind(user_id); if has_search { q = q.bind(search.unwrap()); } q.bind(limit) .bind(offset) .fetch_all(&self.pool) .await .map_err(Self::map_err) } async fn fetch_user_totals(&self, user_id: &str) -> Result { sqlx::query_as!( UserTotalsRow, r#"SELECT COUNT(DISTINCT movie_id) AS "total!: i64", AVG(CAST(rating AS REAL)) AS avg_rating FROM reviews WHERE user_id = ?"#, user_id ) .fetch_one(&self.pool) .await .map_err(Self::map_err) } async fn fetch_user_favorite_director( &self, user_id: &str, ) -> Result, DomainError> { let row = sqlx::query_scalar!( "SELECT m.director FROM reviews r INNER JOIN movies m ON m.id = r.movie_id WHERE r.user_id = ? AND m.director IS NOT NULL GROUP BY m.director ORDER BY COUNT(*) DESC LIMIT 1", user_id ) .fetch_optional(&self.pool) .await .map_err(Self::map_err)?; Ok(row.flatten()) } async fn fetch_user_most_active_month( &self, user_id: &str, ) -> Result, DomainError> { let result: Option> = sqlx::query_scalar!( "SELECT strftime('%Y-%m', watched_at) AS month FROM reviews WHERE user_id = ? GROUP BY month ORDER BY COUNT(*) DESC LIMIT 1", user_id ) .fetch_optional(&self.pool) .await .map_err(Self::map_err)?; Ok(result.flatten()) } } #[async_trait] impl MovieRepository for SqliteMovieRepository { async fn get_movie_by_external_id( &self, external_metadata_id: &ExternalMetadataId, ) -> Result, DomainError> { let id = external_metadata_id.value(); sqlx::query_as!( MovieRow, "SELECT id, external_metadata_id, title, release_year, director, poster_path FROM movies WHERE external_metadata_id = ?", id ) .fetch_optional(&self.pool) .await .map_err(Self::map_err)? .map(MovieRow::into_domain) .transpose() } async fn get_movie_by_id(&self, movie_id: &MovieId) -> Result, DomainError> { let id = movie_id.value().to_string(); sqlx::query_as!( MovieRow, "SELECT id, external_metadata_id, title, release_year, director, poster_path FROM movies WHERE id = ?", id ) .fetch_optional(&self.pool) .await .map_err(Self::map_err)? .map(MovieRow::into_domain) .transpose() } async fn get_movies_by_title_and_year( &self, title: &MovieTitle, year: &ReleaseYear, ) -> Result, DomainError> { let title = title.value(); let year = year.value() as i64; sqlx::query_as!( MovieRow, "SELECT id, external_metadata_id, title, release_year, director, poster_path FROM movies WHERE title = ? AND release_year = ?", title, year ) .fetch_all(&self.pool) .await .map_err(Self::map_err)? .into_iter() .map(MovieRow::into_domain) .collect() } async fn upsert_movie(&self, movie: &Movie) -> Result<(), DomainError> { let id = movie.id().value().to_string(); let external_metadata_id = movie.external_metadata_id().map(|e| e.value().to_string()); let title = movie.title().value(); let release_year = movie.release_year().value() as i64; let director = movie.director(); let poster_path = movie.poster_path().map(|p| p.value().to_string()); sqlx::query!( "INSERT INTO movies (id, external_metadata_id, title, release_year, director, poster_path) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET external_metadata_id = excluded.external_metadata_id, title = excluded.title, release_year = excluded.release_year, director = excluded.director, poster_path = excluded.poster_path", id, external_metadata_id, title, release_year, director, poster_path ) .execute(&self.pool) .await .map_err(Self::map_err)?; Ok(()) } async fn delete_movie(&self, movie_id: &MovieId) -> Result<(), DomainError> { let id = movie_id.value().to_string(); sqlx::query!("DELETE FROM movies WHERE id = ?", id) .execute(&self.pool) .await .map_err(Self::map_err)?; Ok(()) } async fn existing_external_ids( &self, ids: &[ExternalMetadataId], ) -> Result, DomainError> { if ids.is_empty() { return Ok(Default::default()); } let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect(); let sql = format!( "SELECT external_metadata_id FROM movies WHERE external_metadata_id IN ({})", placeholders.join(",") ); let mut q = sqlx::query_scalar::<_, String>(&sql); for id in ids { q = q.bind(id.value().to_string()); } let rows = q.fetch_all(&self.pool).await.map_err(Self::map_err)?; Ok(rows.into_iter().collect()) } async fn existing_title_year_pairs( &self, pairs: &[(MovieTitle, ReleaseYear)], ) -> Result, DomainError> { if pairs.is_empty() { return Ok(Default::default()); } let conditions: Vec = pairs .iter() .map(|_| "(title = ? AND release_year = ?)".to_string()) .collect(); let sql = format!( "SELECT DISTINCT title, release_year FROM movies WHERE {}", conditions.join(" OR ") ); use sqlx::Row; let mut q = sqlx::query(&sql); for (t, y) in pairs { q = q.bind(t.value().to_string()).bind(y.value() as i64); } let rows = q.fetch_all(&self.pool).await.map_err(Self::map_err)?; Ok(rows .into_iter() .map(|r| { let t: String = r.get("title"); let y: i64 = r.get("release_year"); (t, y as u16) }) .collect()) } async fn list_movies( &self, page: &domain::models::collections::PageParams, filter: &domain::models::MovieFilter, ) -> Result, DomainError> { use sqlx::Row; let limit = page.limit as i64; let offset = page.offset as i64; let pattern = filter .search .as_deref() .map(|s| format!("%{}%", s.to_lowercase())); let genre = filter.genre.as_deref(); let language = filter.language.as_deref(); let rows: Vec = sqlx::query_as( "SELECT \ m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, \ p.overview, p.runtime_minutes, p.original_language, p.collection_name, \ GROUP_CONCAT(g.name) AS genres \ FROM movies m \ LEFT JOIN movie_profiles p ON p.movie_id = m.id \ LEFT JOIN movie_genres g ON g.movie_id = m.id \ WHERE (? IS NULL OR LOWER(m.title) LIKE ?) \ AND (? IS NULL OR p.original_language = ?) \ AND (? IS NULL OR m.id IN (SELECT movie_id FROM movie_genres WHERE LOWER(name) = LOWER(?))) \ GROUP BY m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, \ p.overview, p.runtime_minutes, p.original_language, p.collection_name \ ORDER BY m.title ASC \ LIMIT ? OFFSET ?", ) .bind(&pattern) .bind(&pattern) .bind(language) .bind(language) .bind(genre) .bind(genre) .bind(limit) .bind(offset) .fetch_all(&self.pool) .await .map_err(Self::map_err)?; let total: i64 = sqlx::query( "SELECT COUNT(DISTINCT m.id) \ FROM movies m \ LEFT JOIN movie_profiles p ON p.movie_id = m.id \ WHERE (? IS NULL OR LOWER(m.title) LIKE ?) \ AND (? IS NULL OR p.original_language = ?) \ AND (? IS NULL OR m.id IN (SELECT movie_id FROM movie_genres WHERE LOWER(name) = LOWER(?)))", ) .bind(&pattern) .bind(&pattern) .bind(language) .bind(language) .bind(genre) .bind(genre) .fetch_one(&self.pool) .await .map_err(Self::map_err)? .try_get(0) .unwrap_or(0); let items = rows .into_iter() .map(|r| r.into_domain()) .collect::, _>>()?; Ok(domain::models::collections::Paginated { items, total_count: total as u64, limit: page.limit, offset: page.offset, }) } } #[async_trait] impl ReviewRepository for SqliteMovieRepository { async fn save_review(&self, review: &Review) -> Result { let id = review.id().value().to_string(); let movie_id = review.movie_id().value().to_string(); let user_id = review.user_id().value().to_string(); let rating = review.rating().value() as i64; let comment = review.comment().map(|c| c.value().to_string()); let watched_at = datetime_to_str(review.watched_at()); let created_at = datetime_to_str(review.created_at()); let remote_actor_url = match review.source() { ReviewSource::Local => None, ReviewSource::Remote { actor_url } => Some(actor_url.clone()), }; sqlx::query!( "INSERT INTO reviews (id, movie_id, user_id, rating, comment, watched_at, created_at, remote_actor_url) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", id, movie_id, user_id, rating, comment, watched_at, created_at, remote_actor_url ) .execute(&self.pool) .await .map_err(Self::map_err)?; Ok(DomainEvent::ReviewLogged { review_id: review.id().clone(), movie_id: review.movie_id().clone(), user_id: review.user_id().clone(), rating: review.rating().clone(), watched_at: *review.watched_at(), }) } async fn get_review_by_id(&self, review_id: &ReviewId) -> Result, DomainError> { let id = review_id.value().to_string(); sqlx::query_as!( ReviewRow, "SELECT id, movie_id, user_id, rating, comment, watched_at, created_at, remote_actor_url FROM reviews WHERE id = ?", id ) .fetch_optional(&self.pool) .await .map_err(Self::map_err)? .map(ReviewRow::into_domain) .transpose() } async fn delete_review(&self, review_id: &ReviewId) -> Result<(), DomainError> { let id = review_id.value().to_string(); sqlx::query!("DELETE FROM reviews WHERE id = ?", id) .execute(&self.pool) .await .map_err(Self::map_err)?; Ok(()) } async fn get_all_reviews_for_user( &self, _user_id: &UserId, ) -> Result, DomainError> { todo!() } } #[async_trait] impl DiaryRepository for SqliteMovieRepository { async fn query_diary( &self, filter: &DiaryFilter, ) -> Result, DomainError> { let limit = filter.page.limit as i64; let offset = filter.page.offset as i64; let (total, rows) = match (&filter.movie_id, &filter.user_id) { (None, None) => tokio::try_join!( self.count_diary_entries(None), self.fetch_all_diary_rows(&filter.sort_by, limit, offset) )?, (Some(id), None) => { let id_str = id.value().to_string(); tokio::try_join!( self.count_diary_entries(Some(id_str.as_str())), self.fetch_movie_diary_rows(&id_str, &filter.sort_by, limit, offset) )? } (None, Some(uid)) => { let uid_str = uid.value().to_string(); let search = filter.search.as_deref(); tokio::try_join!( self.count_user_diary_entries(&uid_str, search), self.fetch_user_diary_rows(&uid_str, &filter.sort_by, search, limit, offset) )? } (Some(_), Some(_)) => { return Err(DomainError::ValidationError( "Combined movie_id + user_id filter not supported".into(), )); } }; let items = rows .into_iter() .map(DiaryRow::into_domain) .collect::, _>>()?; Ok(Paginated { items, total_count: total as u64, limit: filter.page.limit, offset: filter.page.offset, }) } async fn query_activity_feed( &self, page: &PageParams, ) -> Result, DomainError> { self.query_activity_feed_filtered(page, &domain::ports::FeedSortBy::Date, None, None) .await } async fn query_activity_feed_filtered( &self, page: &PageParams, sort_by: &domain::ports::FeedSortBy, search: Option<&str>, following: Option<&domain::ports::FollowingFilter>, ) -> Result, DomainError> { use domain::ports::FeedSortBy; let limit = page.limit as i64; let offset = page.offset as i64; let has_search = search.map(|s| !s.is_empty()).unwrap_or(false); let mut where_parts = vec!["1=1".to_string()]; if has_search { where_parts.push("m.title LIKE '%' || ? || '%'".to_string()); } if let Some(f) = following { let local_in = if f.local_user_ids.is_empty() { "SELECT NULL WHERE 0".to_string() } else { f.local_user_ids .iter() .map(|_| "?") .collect::>() .join(",") }; let remote_in = if f.remote_actor_urls.is_empty() { "SELECT NULL WHERE 0".to_string() } else { f.remote_actor_urls .iter() .map(|_| "?") .collect::>() .join(",") }; where_parts.push(format!( "(r.user_id IN ({}) OR r.remote_actor_url IN ({}))", local_in, remote_in )); } let order_clause = match sort_by { FeedSortBy::Date => "r.watched_at DESC", FeedSortBy::DateAsc => "r.watched_at ASC", FeedSortBy::Rating => "r.rating DESC, r.watched_at DESC", FeedSortBy::RatingAsc => "r.rating ASC, r.watched_at ASC", }; let where_clause = where_parts.join(" AND "); let count_sql = format!( "SELECT COUNT(*) FROM reviews r INNER JOIN movies m ON m.id = r.movie_id WHERE {}", where_clause ); let select_sql = format!( "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, r.remote_actor_url, COALESCE(u.email, r.remote_actor_url) AS user_email FROM reviews r INNER JOIN movies m ON m.id = r.movie_id LEFT JOIN users u ON u.id = r.user_id WHERE {} ORDER BY {} LIMIT ? OFFSET ?", where_clause, order_clause ); macro_rules! bind_filter_params { ($q:expr) => {{ let mut q = $q; if has_search { q = q.bind(search.unwrap()); } if let Some(f) = following { for uid in &f.local_user_ids { q = q.bind(uid.to_string()); } for url in &f.remote_actor_urls { q = q.bind(url.as_str()); } } q }}; } let count_q = bind_filter_params!(sqlx::query_scalar::<_, i64>(&count_sql)); let total = count_q.fetch_one(&self.pool).await.map_err(Self::map_err)?; let rows_q = bind_filter_params!(sqlx::query_as::<_, FeedRow>(&select_sql)); let rows = rows_q .bind(limit) .bind(offset) .fetch_all(&self.pool) .await .map_err(Self::map_err)?; let items = rows .into_iter() .map(FeedRow::into_domain) .collect::, _>>()?; Ok(Paginated { items, total_count: total as u64, limit: page.limit, offset: page.offset, }) } async fn get_review_history(&self, movie_id: &MovieId) -> Result { let id_str = movie_id.value().to_string(); let movie = sqlx::query_as!( MovieRow, "SELECT id, external_metadata_id, title, release_year, director, poster_path FROM movies WHERE id = ?", id_str ) .fetch_optional(&self.pool) .await .map_err(Self::map_err)? .ok_or_else(|| DomainError::NotFound(format!("Movie {}", id_str)))? .into_domain()?; let viewings = sqlx::query_as!( ReviewRow, "SELECT id, movie_id, user_id, rating, comment, watched_at, created_at, remote_actor_url FROM reviews WHERE movie_id = ? ORDER BY watched_at ASC", id_str ) .fetch_all(&self.pool) .await .map_err(Self::map_err)? .into_iter() .map(ReviewRow::into_domain) .collect::, _>>()?; Ok(ReviewHistory::new(movie, viewings)) } async fn get_user_history(&self, user_id: &UserId) -> Result, DomainError> { let uid = user_id.value().to_string(); let rows = sqlx::query_as!( DiaryRow, "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, r.remote_actor_url FROM reviews r INNER JOIN movies m ON m.id = r.movie_id WHERE r.user_id = ? ORDER BY r.watched_at DESC", uid ) .fetch_all(&self.pool) .await .map_err(Self::map_err)?; rows.into_iter().map(DiaryRow::into_domain).collect() } async fn get_movie_stats(&self, movie_id: &MovieId) -> Result { let id_str = movie_id.value().to_string(); sqlx::query_as::<_, MovieStatsRow>( "SELECT COUNT(*) AS total_count, AVG(CAST(rating AS REAL)) AS avg_rating, COUNT(CASE WHEN remote_actor_url IS NOT NULL THEN 1 END) AS federated_count, COUNT(CASE WHEN rating = 1 THEN 1 END) AS rating_1, COUNT(CASE WHEN rating = 2 THEN 1 END) AS rating_2, COUNT(CASE WHEN rating = 3 THEN 1 END) AS rating_3, COUNT(CASE WHEN rating = 4 THEN 1 END) AS rating_4, COUNT(CASE WHEN rating = 5 THEN 1 END) AS rating_5 FROM reviews WHERE movie_id = ?", ) .bind(id_str) .fetch_one(&self.pool) .await .map_err(Self::map_err) .map(MovieStatsRow::into_domain) } async fn get_movie_social_feed( &self, movie_id: &MovieId, page: &PageParams, ) -> Result, DomainError> { let id_str = movie_id.value().to_string(); let limit = page.limit as i64; let offset = page.offset as i64; let total = sqlx::query_scalar!("SELECT COUNT(*) FROM reviews WHERE movie_id = ?", id_str) .fetch_one(&self.pool) .await .map_err(Self::map_err)?; let rows = sqlx::query_as::<_, FeedRow>( "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, r.remote_actor_url, CASE WHEN r.remote_actor_url IS NOT NULL THEN r.remote_actor_url WHEN u.email IS NOT NULL THEN u.email ELSE r.user_id END AS user_email FROM reviews r INNER JOIN movies m ON m.id = r.movie_id LEFT JOIN users u ON u.id = r.user_id WHERE r.movie_id = ? ORDER BY r.watched_at DESC LIMIT ? OFFSET ?", ) .bind(&id_str) .bind(limit) .bind(offset) .fetch_all(&self.pool) .await .map_err(Self::map_err)?; let items = rows .into_iter() .map(FeedRow::into_domain) .collect::, _>>()?; Ok(Paginated { items, total_count: total as u64, limit: page.limit, offset: page.offset, }) } async fn count_local_posts(&self) -> Result { let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM reviews WHERE remote_actor_url IS NULL") .fetch_one(&self.pool) .await .map_err(Self::map_err)?; Ok(count as u64) } } #[async_trait] impl StatsRepository for SqliteMovieRepository { async fn get_user_stats(&self, user_id: &UserId) -> Result { let uid = user_id.value().to_string(); let (totals, fav_director, most_active) = tokio::try_join!( self.fetch_user_totals(&uid), self.fetch_user_favorite_director(&uid), self.fetch_user_most_active_month(&uid) )?; let most_active_month = most_active.map(|ym| format_year_month(&ym)); Ok(UserStats { total_movies: totals.total, avg_rating: totals.avg_rating, favorite_director: fav_director, most_active_month, }) } async fn get_user_trends(&self, user_id: &UserId) -> Result { let uid = user_id.value().to_string(); let (rating_rows, director_rows) = tokio::try_join!( sqlx::query_as!( MonthlyRatingRow, r#"SELECT strftime('%Y-%m', watched_at) AS "month!", AVG(CAST(rating AS REAL)) AS "avg_rating!: f64", COUNT(*) AS "count!: i64" FROM reviews WHERE user_id = ? AND watched_at >= datetime('now', '-12 months') GROUP BY "month!" ORDER BY "month!" ASC"#, uid ) .fetch_all(&self.pool), sqlx::query_as!( DirectorCountRow, r#"SELECT m.director AS "director!", COUNT(*) AS "count!: i64" FROM reviews r INNER JOIN movies m ON m.id = r.movie_id WHERE r.user_id = ? AND m.director IS NOT NULL GROUP BY m.director ORDER BY COUNT(*) DESC LIMIT 5"#, uid ) .fetch_all(&self.pool) ) .map_err(Self::map_err)?; let max_director_count = director_rows.iter().map(|d| d.count).max().unwrap_or(1); let monthly_ratings = rating_rows .into_iter() .map(|r| MonthlyRating { month_label: format_year_month(&r.month), year_month: r.month, avg_rating: r.avg_rating, count: r.count, }) .collect(); let top_directors = director_rows .into_iter() .map(|d| DirectorStat { director: d.director, count: d.count, }) .collect(); Ok(UserTrends { monthly_ratings, top_directors, max_director_count, }) } } pub struct SqliteWireOutput { pub pool: SqlitePool, pub movie: std::sync::Arc, pub review: std::sync::Arc, pub diary: std::sync::Arc, pub stats: std::sync::Arc, pub user: std::sync::Arc, pub import_session: std::sync::Arc, pub import_profile: std::sync::Arc, pub movie_profile: std::sync::Arc, pub watchlist: std::sync::Arc, pub ap_content: std::sync::Arc, pub wrapup_repo: std::sync::Arc, pub wrapup_stats: std::sync::Arc, } pub async fn wire(database_url: &str) -> anyhow::Result { use anyhow::Context; use sqlx::sqlite::SqliteConnectOptions; use std::str::FromStr; let opts = SqliteConnectOptions::from_str(database_url) .context("Invalid DATABASE_URL")? .create_if_missing(true) .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal) .busy_timeout(std::time::Duration::from_secs(5)); let pool = sqlx::pool::PoolOptions::new() .max_connections(4) .connect_with(opts) .await .context("Failed to connect to SQLite database")?; let repo = std::sync::Arc::new(SqliteMovieRepository::new(pool.clone())); repo.migrate() .await .map_err(|e| anyhow::anyhow!("{e}")) .context("Database migration failed")?; Ok(SqliteWireOutput { pool: pool.clone(), movie: std::sync::Arc::clone(&repo) as _, review: std::sync::Arc::clone(&repo) as _, diary: std::sync::Arc::clone(&repo) as _, stats: std::sync::Arc::clone(&repo) as _, user: std::sync::Arc::new(SqliteUserRepository::new(pool.clone())) as _, import_session: std::sync::Arc::new(SqliteImportSessionRepository::new(pool.clone())) as _, import_profile: std::sync::Arc::new(SqliteImportProfileRepository::new(pool.clone())) as _, movie_profile: std::sync::Arc::new(SqliteMovieProfileRepository::new(pool.clone())) as _, watchlist: std::sync::Arc::new(SqliteWatchlistRepository::new(pool.clone())) as _, ap_content: std::sync::Arc::new(SqliteApContentQuery::new(pool.clone())) as _, wrapup_repo: std::sync::Arc::new(SqliteWrapUpRepository::new(pool.clone())) as _, wrapup_stats: std::sync::Arc::new(SqliteWrapUpStatsQuery::new(pool)) as _, }) } #[cfg(test)] mod feed_filter_tests { use super::*; use domain::{ models::collections::PageParams, ports::{DiaryRepository, FeedSortBy, FollowingFilter}, }; use sqlx::SqlitePool; async fn setup(pool: &SqlitePool) { sqlx::migrate!("./migrations").run(pool).await.unwrap(); // carol is a remote actor; we still need a non-null user_id for the schema, // so we create a local "ghost" user and link the remote review via remote_actor_url. sqlx::query( "INSERT INTO users (id, email, username, password_hash, created_at) VALUES ('11111111-1111-1111-1111-111111111111', 'alice@example.com', 'alice', 'hash', '2024-01-01 00:00:00'), ('22222222-2222-2222-2222-222222222222', 'bob@example.com', 'bob', 'hash', '2024-01-01 00:00:00'), ('33333333-3333-3333-3333-333333333333', 'carol@remote.social', 'carol', 'hash', '2024-01-01 00:00:00')", ) .execute(pool) .await .unwrap(); sqlx::query( "INSERT INTO movies (id, title, release_year) VALUES ('aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa', 'Inception', 2010), ('bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb', 'Interstellar', 2014), ('cccccccc-cccc-cccc-cccc-cccccccccccc', 'Dune', 2021)", ) .execute(pool) .await .unwrap(); // carol's review: local user_id=33333333, remote_actor_url set → remote review sqlx::query( "INSERT INTO reviews (id, movie_id, user_id, rating, watched_at, created_at, remote_actor_url) VALUES ('a1a1a1a1-a1a1-a1a1-a1a1-a1a1a1a1a1a1', 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa', '11111111-1111-1111-1111-111111111111', 5, '2024-01-01 00:00:00', '2024-01-01 00:00:00', NULL), ('b2b2b2b2-b2b2-b2b2-b2b2-b2b2b2b2b2b2', 'bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb', '22222222-2222-2222-2222-222222222222', 3, '2024-01-02 00:00:00', '2024-01-02 00:00:00', NULL), ('c3c3c3c3-c3c3-c3c3-c3c3-c3c3c3c3c3c3', 'cccccccc-cccc-cccc-cccc-cccccccccccc', '33333333-3333-3333-3333-333333333333', 4, '2024-01-03 00:00:00', '2024-01-03 00:00:00', 'https://remote.social/users/carol')", ) .execute(pool) .await .unwrap(); } #[tokio::test] async fn test_sort_by_rating_descending() { let pool = SqlitePool::connect(":memory:").await.unwrap(); setup(&pool).await; let repo = SqliteMovieRepository::new(pool); let page = PageParams::new(Some(10), Some(0)).unwrap(); let result = repo .query_activity_feed_filtered(&page, &FeedSortBy::Rating, None, None) .await .unwrap(); let ratings: Vec = result .items .iter() .map(|e| e.review().rating().value()) .collect(); assert_eq!(ratings, vec![5, 4, 3]); } #[tokio::test] async fn test_search_by_title() { let pool = SqlitePool::connect(":memory:").await.unwrap(); setup(&pool).await; let repo = SqliteMovieRepository::new(pool); let page = PageParams::new(Some(10), Some(0)).unwrap(); let result = repo .query_activity_feed_filtered(&page, &FeedSortBy::Date, Some("Dune"), None) .await .unwrap(); assert_eq!(result.items.len(), 1); assert_eq!(result.items[0].movie().title().value(), "Dune"); } #[tokio::test] async fn test_following_filter() { let pool = SqlitePool::connect(":memory:").await.unwrap(); setup(&pool).await; let repo = SqliteMovieRepository::new(pool); let filter = FollowingFilter { local_user_ids: vec![ uuid::Uuid::parse_str("11111111-1111-1111-1111-111111111111").unwrap(), ], remote_actor_urls: vec!["https://remote.social/users/carol".to_string()], }; let page = PageParams::new(Some(10), Some(0)).unwrap(); let result = repo .query_activity_feed_filtered(&page, &FeedSortBy::Date, None, Some(&filter)) .await .unwrap(); assert_eq!(result.items.len(), 2); // alice + carol, NOT bob let titles: Vec = result .items .iter() .map(|e| e.movie().title().value().to_string()) .collect(); assert!(titles.contains(&"Inception".to_string())); assert!(titles.contains(&"Dune".to_string())); } #[tokio::test] async fn test_get_movie_stats_local() { let pool = SqlitePool::connect(":memory:").await.unwrap(); setup(&pool).await; let repo = SqliteMovieRepository::new(pool); // Inception: 1 local review, rating=5, no federated let movie_id = domain::value_objects::MovieId::from_uuid( uuid::Uuid::parse_str("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa").unwrap(), ); let stats = repo.get_movie_stats(&movie_id).await.unwrap(); assert_eq!(stats.total_count, 1); assert_eq!(stats.federated_count, 0); assert!((stats.avg_rating.unwrap() - 5.0).abs() < 0.001); assert_eq!(stats.rating_histogram[4], 1); // 5★ bucket assert_eq!(stats.rating_histogram[0], 0); // 1★ bucket } #[tokio::test] async fn test_get_movie_social_feed_returns_reviews_for_movie() { let pool = SqlitePool::connect(":memory:").await.unwrap(); setup(&pool).await; let repo = SqliteMovieRepository::new(pool); let movie_id = domain::value_objects::MovieId::from_uuid( uuid::Uuid::parse_str("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa").unwrap(), ); let page = PageParams::new(Some(10), Some(0)).unwrap(); let result = repo.get_movie_social_feed(&movie_id, &page).await.unwrap(); assert_eq!(result.total_count, 1); assert_eq!(result.items.len(), 1); assert_eq!(result.items[0].movie().title().value(), "Inception"); assert_eq!(result.items[0].review().rating().value(), 5); assert_eq!(result.items[0].user_display_name(), "alice"); assert!(!result.items[0].review().is_remote()); } #[tokio::test] async fn test_get_movie_social_feed_federated_review() { let pool = SqlitePool::connect(":memory:").await.unwrap(); setup(&pool).await; let repo = SqliteMovieRepository::new(pool); let movie_id = domain::value_objects::MovieId::from_uuid( uuid::Uuid::parse_str("cccccccc-cccc-cccc-cccc-cccccccccccc").unwrap(), ); let page = PageParams::new(Some(10), Some(0)).unwrap(); let result = repo.get_movie_social_feed(&movie_id, &page).await.unwrap(); assert_eq!(result.total_count, 1); assert_eq!(result.items.len(), 1); assert!(result.items[0].review().is_remote()); assert_eq!( result.items[0].user_email(), "https://remote.social/users/carol" ); } #[tokio::test] async fn test_get_movie_social_feed_pagination() { let pool = SqlitePool::connect(":memory:").await.unwrap(); setup(&pool).await; let repo = SqliteMovieRepository::new(pool); let movie_id = domain::value_objects::MovieId::from_uuid( uuid::Uuid::parse_str("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa").unwrap(), ); // offset beyond results: total_count still correct, items empty let page = PageParams::new(Some(10), Some(5)).unwrap(); let result = repo.get_movie_social_feed(&movie_id, &page).await.unwrap(); assert_eq!(result.total_count, 1); assert_eq!(result.items.len(), 0); } #[tokio::test] async fn test_get_movie_stats_federated() { let pool = SqlitePool::connect(":memory:").await.unwrap(); setup(&pool).await; let repo = SqliteMovieRepository::new(pool); // Dune: 1 federated review, rating=4 let movie_id = domain::value_objects::MovieId::from_uuid( uuid::Uuid::parse_str("cccccccc-cccc-cccc-cccc-cccccccccccc").unwrap(), ); let stats = repo.get_movie_stats(&movie_id).await.unwrap(); assert_eq!(stats.total_count, 1); assert_eq!(stats.federated_count, 1); assert_eq!(stats.rating_histogram[3], 1); // 4★ bucket assert_eq!(stats.rating_histogram[4], 0); // 5★ bucket } } #[cfg(test)] mod diary_count_tests { use super::*; use sqlx::SqlitePool; async fn test_pool() -> SqlitePool { let pool = SqlitePool::connect("sqlite::memory:").await.unwrap(); sqlx::migrate!("./migrations").run(&pool).await.unwrap(); pool } #[tokio::test] async fn count_local_posts_excludes_remote_reviews() { use domain::ports::DiaryRepository; let pool = test_pool().await; let repo = SqliteMovieRepository::new(pool.clone()); let user_id = uuid::Uuid::new_v4().to_string(); let movie_id = uuid::Uuid::new_v4().to_string(); sqlx::query("INSERT INTO users (id, email, password_hash, created_at, username) VALUES (?, ?, ?, ?, ?)") .bind(&user_id).bind("a@b.com").bind("hash").bind("2024-01-01 00:00:00").bind("alice") .execute(&pool).await.unwrap(); sqlx::query("INSERT INTO movies (id, title, release_year) VALUES (?, ?, ?)") .bind(&movie_id) .bind("Test Movie") .bind(2024i32) .execute(&pool) .await .unwrap(); // Local review (remote_actor_url IS NULL) let r1 = uuid::Uuid::new_v4().to_string(); sqlx::query("INSERT INTO reviews (id, movie_id, user_id, rating, watched_at, created_at) VALUES (?, ?, ?, ?, ?, ?)") .bind(&r1).bind(&movie_id).bind(&user_id).bind(4i32) .bind("2024-01-01 00:00:00").bind("2024-01-01 00:00:00") .execute(&pool).await.unwrap(); // Remote review (remote_actor_url IS NOT NULL) let r2 = uuid::Uuid::new_v4().to_string(); sqlx::query("INSERT INTO reviews (id, movie_id, user_id, rating, watched_at, created_at, remote_actor_url) VALUES (?, ?, ?, ?, ?, ?, ?)") .bind(&r2).bind(&movie_id).bind(&user_id).bind(3i32) .bind("2024-01-01 00:00:00").bind("2024-01-01 00:00:00").bind("https://remote/user") .execute(&pool).await.unwrap(); let count = repo.count_local_posts().await.unwrap(); assert_eq!(count, 1); } }