diff --git a/crates/application/src/context.rs b/crates/application/src/context.rs index 670924f..4008ce3 100644 --- a/crates/application/src/context.rs +++ b/crates/application/src/context.rs @@ -6,7 +6,7 @@ use domain::ports::{ MovieRepository, PasswordHasher, PersonCommand, PersonQuery, PosterFetcherClient, RemoteWatchlistRepository, ReviewRepository, SearchCommand, SearchPort, SocialQueryPort, StatsRepository, UserProfileFieldsRepository, UserRepository, WatchEventRepository, - WatchlistRepository, WebhookTokenRepository, + WatchlistRepository, WrapUpStatsQuery, WebhookTokenRepository, }; use crate::config::AppConfig; @@ -31,6 +31,7 @@ pub struct Repositories { pub profile_fields: Arc, pub remote_watchlist: Arc, pub social_query: Arc, + pub wrapup_stats: Arc, } #[derive(Clone)] diff --git a/crates/application/src/lib.rs b/crates/application/src/lib.rs index 674058e..c25f4c8 100644 --- a/crates/application/src/lib.rs +++ b/crates/application/src/lib.rs @@ -13,6 +13,7 @@ pub mod person; pub mod search; pub mod users; pub mod watchlist; +pub mod wrapup; #[cfg(test)] pub mod test_helpers; diff --git a/crates/application/src/test_helpers.rs b/crates/application/src/test_helpers.rs index ac46aa1..f45e8f0 100644 --- a/crates/application/src/test_helpers.rs +++ b/crates/application/src/test_helpers.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use domain::testing::{NoopRemoteWatchlistRepository, NoopSocialQueryPort}; +use domain::testing::{InMemoryWrapUpStatsQuery, NoopRemoteWatchlistRepository, NoopSocialQueryPort}; use domain::{ ports::{ AuthService, DiaryExporter, DiaryRepository, DocumentParser, EventPublisher, ImageStorage, @@ -8,6 +8,7 @@ use domain::{ MovieRepository, PasswordHasher, PersonCommand, PersonQuery, PosterFetcherClient, ReviewRepository, SearchCommand, SearchPort, StatsRepository, UserProfileFieldsRepository, UserRepository, WatchEventRepository, WatchlistRepository, WebhookTokenRepository, + WrapUpStatsQuery, }, testing::{ FakeAuthService, FakeMetadataClient, FakePasswordHasher, InMemoryMovieRepository, @@ -50,6 +51,7 @@ pub struct TestContextBuilder { pub person_query: Arc, pub search_port: Arc, pub search_command: Arc, + pub wrapup_stats: Arc, pub config: AppConfig, } @@ -80,6 +82,7 @@ impl TestContextBuilder { person_query: Arc::new(PanicPersonQuery), search_port: Arc::new(PanicSearchPort), search_command: Arc::new(PanicSearchCommand), + wrapup_stats: InMemoryWrapUpStatsQuery::new(), config: AppConfig { allow_registration: true, base_url: "http://localhost:3000".into(), @@ -118,6 +121,11 @@ impl TestContextBuilder { self } + pub fn wrapup_stats(mut self, r: Arc) -> Self { + self.wrapup_stats = r; + self + } + pub fn with_config(mut self, config: AppConfig) -> Self { self.config = config; self @@ -144,6 +152,7 @@ impl TestContextBuilder { search_command: self.search_command, remote_watchlist: Arc::new(NoopRemoteWatchlistRepository), social_query: Arc::new(NoopSocialQueryPort), + wrapup_stats: self.wrapup_stats, }, services: Services { auth: self.auth_service, diff --git a/crates/application/src/wrapup/compute.rs b/crates/application/src/wrapup/compute.rs new file mode 100644 index 0000000..9a8b5ff --- /dev/null +++ b/crates/application/src/wrapup/compute.rs @@ -0,0 +1,453 @@ +use std::collections::HashMap; + +use chrono::Datelike; +use uuid::Uuid; + +use crate::context::AppContext; +use crate::wrapup::queries::ComputeWrapUpQuery; +use domain::errors::DomainError; +use domain::models::wrapup::*; +use domain::ports::WrapUpMovieRow; + +pub async fn execute( + ctx: &AppContext, + query: ComputeWrapUpQuery, +) -> Result { + let rows = ctx + .repos + .wrapup_stats + .get_reviews_with_profiles(&query.scope, &query.date_range) + .await?; + + Ok(build_report(query.scope, query.date_range, &rows)) +} + +fn build_report(scope: WrapUpScope, date_range: DateRange, rows: &[WrapUpMovieRow]) -> WrapUpReport { + let total_movies = rows.len() as u32; + + let total_watch_time_minutes: u32 = rows.iter().filter_map(|r| r.runtime_minutes).sum(); + + let avg_rating = if rows.is_empty() { + None + } else { + Some(rows.iter().map(|r| r.rating as f64).sum::() / rows.len() as f64) + }; + + let rating_distribution = { + let mut dist = [0u32; 5]; + for r in rows { + let idx = (r.rating as usize).saturating_sub(1).min(4); + dist[idx] += 1; + } + dist + }; + + let movies_per_month = compute_movies_per_month(rows); + let busiest_month = movies_per_month + .iter() + .max_by_key(|m| m.count) + .map(|m| m.label.clone()); + + let busiest_day_of_week = compute_busiest_day(rows); + + let (longest_movie, shortest_movie) = compute_runtime_extremes(rows); + let (highest_rated_movie, lowest_rated_movie) = compute_rating_extremes(rows); + let (first_movie_of_period, last_movie_of_period) = compute_chronological_extremes(rows); + let (oldest_movie, newest_movie) = compute_year_extremes(rows); + + let (top_directors, director_diversity) = compute_director_stats(rows); + let (top_actors, actor_diversity, top_cast_profile_paths) = compute_actor_stats(rows); + + let (top_genres, genre_diversity, highest_rated_genre, lowest_rated_genre) = + compute_genre_stats(rows); + let top_keywords = compute_keyword_stats(rows); + + let (total_budget_watched, avg_budget) = compute_budget_stats(rows); + let language_distribution = compute_language_stats(rows); + + let (total_rewatches, most_rewatched_movie, avg_rating_change_on_rewatch) = + compute_rewatch_stats(rows); + + let (most_active_user, most_watched_movie_global, total_users_active) = + if matches!(scope, WrapUpScope::Global) { + compute_global_stats(rows) + } else { + (None, None, None) + }; + + let poster_paths: Vec = rows.iter().filter_map(|r| r.poster_path.clone()).collect(); + + WrapUpReport { + scope, + date_range, + generated_at: chrono::Utc::now(), + total_movies, + total_watch_time_minutes, + movies_per_month, + busiest_month, + busiest_day_of_week, + avg_rating, + rating_distribution, + longest_movie, + shortest_movie, + top_directors, + top_actors, + director_diversity, + actor_diversity, + top_genres, + genre_diversity, + highest_rated_genre, + lowest_rated_genre, + top_keywords, + total_budget_watched, + avg_budget, + language_distribution, + oldest_movie, + newest_movie, + total_rewatches, + most_rewatched_movie, + avg_rating_change_on_rewatch, + highest_rated_movie, + lowest_rated_movie, + first_movie_of_period, + last_movie_of_period, + most_active_user, + most_watched_movie_global, + total_users_active, + poster_paths, + top_cast_profile_paths, + } +} + +fn movie_ref(r: &WrapUpMovieRow) -> MovieRef { + MovieRef { + title: r.title.clone(), + year: r.release_year, + runtime_minutes: r.runtime_minutes, + poster_path: r.poster_path.clone(), + } +} + +fn compute_movies_per_month(rows: &[WrapUpMovieRow]) -> Vec { + let mut counts: HashMap = HashMap::new(); + for r in rows { + let ym = r.watched_at.format("%Y-%m").to_string(); + *counts.entry(ym).or_default() += 1; + } + let mut result: Vec = counts + .into_iter() + .map(|(ym, count)| { + let label = chrono::NaiveDate::parse_from_str(&format!("{ym}-01"), "%Y-%m-%d") + .map(|d| d.format("%B %Y").to_string()) + .unwrap_or_else(|_| ym.clone()); + MonthCount { + year_month: ym, + label, + count, + } + }) + .collect(); + result.sort_by(|a, b| a.year_month.cmp(&b.year_month)); + result +} + +fn compute_busiest_day(rows: &[WrapUpMovieRow]) -> Option { + if rows.is_empty() { + return None; + } + let mut day_counts = [0u32; 7]; + for r in rows { + let weekday = r.watched_at.date().weekday().num_days_from_monday() as usize; + day_counts[weekday] += 1; + } + let max_idx = day_counts + .iter() + .enumerate() + .max_by_key(|(_, c)| *c) + .map(|(i, _)| i) + .unwrap_or(0); + let names = [ + "Monday", + "Tuesday", + "Wednesday", + "Thursday", + "Friday", + "Saturday", + "Sunday", + ]; + Some(names[max_idx].to_string()) +} + +fn compute_runtime_extremes(rows: &[WrapUpMovieRow]) -> (Option, Option) { + let with_runtime: Vec<_> = rows.iter().filter(|r| r.runtime_minutes.is_some()).collect(); + let longest = with_runtime + .iter() + .max_by_key(|r| r.runtime_minutes.unwrap_or(0)) + .map(|r| movie_ref(r)); + let shortest = with_runtime + .iter() + .min_by_key(|r| r.runtime_minutes.unwrap_or(u32::MAX)) + .map(|r| movie_ref(r)); + (longest, shortest) +} + +fn compute_rating_extremes(rows: &[WrapUpMovieRow]) -> (Option, Option) { + let highest = rows.iter().max_by_key(|r| r.rating).map(|r| movie_ref(r)); + let lowest = rows.iter().min_by_key(|r| r.rating).map(|r| movie_ref(r)); + (highest, lowest) +} + +fn compute_chronological_extremes( + rows: &[WrapUpMovieRow], +) -> (Option, Option) { + let first = rows + .iter() + .min_by_key(|r| r.watched_at) + .map(|r| movie_ref(r)); + let last = rows + .iter() + .max_by_key(|r| r.watched_at) + .map(|r| movie_ref(r)); + (first, last) +} + +fn compute_year_extremes(rows: &[WrapUpMovieRow]) -> (Option, Option) { + let oldest = rows + .iter() + .min_by_key(|r| r.release_year) + .map(|r| movie_ref(r)); + let newest = rows + .iter() + .max_by_key(|r| r.release_year) + .map(|r| movie_ref(r)); + (oldest, newest) +} + +fn compute_director_stats(rows: &[WrapUpMovieRow]) -> (Vec, u32) { + let mut director_movies: HashMap> = HashMap::new(); + for r in rows { + if let Some(ref dir) = r.director { + director_movies + .entry(dir.clone()) + .or_default() + .push(r.rating); + } + } + let diversity = director_movies.len() as u32; + let mut stats: Vec = director_movies + .into_iter() + .map(|(name, ratings)| { + let count = ratings.len() as u32; + let avg = ratings.iter().map(|&r| r as f64).sum::() / ratings.len() as f64; + PersonStat { + name, + count, + avg_rating: avg, + } + }) + .collect(); + stats.sort_by(|a, b| b.count.cmp(&a.count).then(b.avg_rating.total_cmp(&a.avg_rating))); + stats.truncate(5); + (stats, diversity) +} + +fn compute_actor_stats(rows: &[WrapUpMovieRow]) -> (Vec, u32, Vec) { + let mut actor_movies: HashMap> = HashMap::new(); + let mut actor_profiles: HashMap> = HashMap::new(); + for r in rows { + for (i, (name, billing)) in r.cast_names.iter().enumerate() { + if *billing <= 3 { + actor_movies + .entry(name.clone()) + .or_default() + .push(r.rating); + if let Some(path) = r.cast_profile_paths.get(i) { + actor_profiles + .entry(name.clone()) + .or_insert_with(|| path.clone()); + } + } + } + } + let diversity = actor_movies.len() as u32; + let mut stats: Vec = actor_movies + .into_iter() + .map(|(name, ratings)| { + let count = ratings.len() as u32; + let avg = ratings.iter().map(|&r| r as f64).sum::() / ratings.len() as f64; + PersonStat { + name, + count, + avg_rating: avg, + } + }) + .collect(); + stats.sort_by(|a, b| b.count.cmp(&a.count).then(b.avg_rating.total_cmp(&a.avg_rating))); + stats.truncate(5); + let profile_paths: Vec = stats + .iter() + .filter_map(|s| actor_profiles.get(&s.name)?.clone()) + .collect(); + (stats, diversity, profile_paths) +} + +fn compute_genre_stats( + rows: &[WrapUpMovieRow], +) -> (Vec, u32, Option, Option) { + let mut genre_ratings: HashMap> = HashMap::new(); + for r in rows { + for genre in &r.genres { + genre_ratings + .entry(genre.clone()) + .or_default() + .push(r.rating); + } + } + let diversity = genre_ratings.len() as u32; + let mut stats: Vec = genre_ratings + .into_iter() + .map(|(genre, ratings)| { + let count = ratings.len() as u32; + let avg = ratings.iter().map(|&r| r as f64).sum::() / ratings.len() as f64; + GenreStat { + genre, + count, + avg_rating: avg, + } + }) + .collect(); + stats.sort_by(|a, b| b.count.cmp(&a.count)); + let highest = stats + .iter() + .max_by(|a, b| a.avg_rating.total_cmp(&b.avg_rating)) + .map(|g| g.genre.clone()); + let lowest = stats + .iter() + .filter(|g| g.count >= 3) + .min_by(|a, b| a.avg_rating.total_cmp(&b.avg_rating)) + .map(|g| g.genre.clone()); + stats.truncate(5); + (stats, diversity, highest, lowest) +} + +fn compute_keyword_stats(rows: &[WrapUpMovieRow]) -> Vec { + let mut kw_counts: HashMap = HashMap::new(); + for r in rows { + for kw in &r.keywords { + *kw_counts.entry(kw.clone()).or_default() += 1; + } + } + let mut stats: Vec = kw_counts + .into_iter() + .map(|(keyword, count)| KeywordStat { keyword, count }) + .collect(); + stats.sort_by(|a, b| b.count.cmp(&a.count)); + stats.truncate(20); + stats +} + +fn compute_budget_stats(rows: &[WrapUpMovieRow]) -> (Option, Option) { + let budgets: Vec = rows + .iter() + .filter_map(|r| r.budget_usd) + .filter(|&b| b > 0) + .collect(); + if budgets.is_empty() { + return (None, None); + } + let total: i64 = budgets.iter().sum(); + let avg = total / budgets.len() as i64; + (Some(total), Some(avg)) +} + +fn compute_language_stats(rows: &[WrapUpMovieRow]) -> Vec { + let mut lang_counts: HashMap = HashMap::new(); + for r in rows { + if let Some(ref lang) = r.original_language { + *lang_counts.entry(lang.clone()).or_default() += 1; + } + } + let mut stats: Vec = lang_counts + .into_iter() + .map(|(language, count)| LangStat { language, count }) + .collect(); + stats.sort_by(|a, b| b.count.cmp(&a.count)); + stats +} + +fn compute_rewatch_stats(rows: &[WrapUpMovieRow]) -> (u32, Option, Option) { + let mut movie_reviews: HashMap> = HashMap::new(); + for r in rows { + movie_reviews.entry(r.movie_id).or_default().push(r); + } + + let rewatched: Vec<_> = movie_reviews + .iter() + .filter(|(_, reviews)| reviews.len() > 1) + .collect(); + + let total_rewatches = rewatched.iter().map(|(_, rs)| rs.len() as u32 - 1).sum(); + + let most_rewatched = rewatched + .iter() + .max_by_key(|(_, rs)| rs.len()) + .map(|(_, rs)| movie_ref(rs[0])); + + let avg_change = if rewatched.is_empty() { + None + } else { + let changes: Vec = rewatched + .iter() + .filter_map(|(_, rs)| { + let mut sorted: Vec<_> = rs.to_vec(); + sorted.sort_by_key(|r| r.watched_at); + let first = sorted.first()?.rating as f64; + let last = sorted.last()?.rating as f64; + Some(last - first) + }) + .collect(); + if changes.is_empty() { + None + } else { + Some(changes.iter().sum::() / changes.len() as f64) + } + }; + + (total_rewatches, most_rewatched, avg_change) +} + +fn compute_global_stats( + rows: &[WrapUpMovieRow], +) -> (Option, Option, Option) { + if rows.is_empty() { + return (None, None, None); + } + + let mut user_counts: HashMap = HashMap::new(); + for r in rows { + *user_counts.entry(r.user_id).or_default() += 1; + } + let total_users_active = Some(user_counts.len() as u32); + + let most_active_user = user_counts + .iter() + .max_by_key(|(_, c)| *c) + .map(|(&uid, _)| UserRef { + user_id: uid, + display_name: String::new(), + }); + + let mut movie_counts: HashMap = HashMap::new(); + for r in rows { + movie_counts + .entry(r.movie_id) + .and_modify(|(c, _)| *c += 1) + .or_insert((1, r)); + } + let most_watched = movie_counts + .into_iter() + .max_by_key(|(_, (c, _))| *c) + .map(|(_, (_, r))| movie_ref(r)); + + (most_active_user, most_watched, total_users_active) +} diff --git a/crates/application/src/wrapup/mod.rs b/crates/application/src/wrapup/mod.rs new file mode 100644 index 0000000..6196e90 --- /dev/null +++ b/crates/application/src/wrapup/mod.rs @@ -0,0 +1,2 @@ +pub mod compute; +pub mod queries; diff --git a/crates/application/src/wrapup/queries.rs b/crates/application/src/wrapup/queries.rs new file mode 100644 index 0000000..aeb7d3c --- /dev/null +++ b/crates/application/src/wrapup/queries.rs @@ -0,0 +1,6 @@ +use domain::models::wrapup::{DateRange, WrapUpScope}; + +pub struct ComputeWrapUpQuery { + pub scope: WrapUpScope, + pub date_range: DateRange, +}