feat: implement feed/stats/history/trends SQLite queries
This commit is contained in:
@@ -3,11 +3,12 @@ use domain::{
|
||||
errors::DomainError,
|
||||
events::DomainEvent,
|
||||
models::{
|
||||
DiaryEntry, DiaryFilter, Movie, Review, ReviewHistory, SortDirection,
|
||||
collections::Paginated,
|
||||
DiaryEntry, DiaryFilter, DirectorStat, FeedEntry, Movie, MonthlyRating,
|
||||
Review, ReviewHistory, SortDirection, UserStats, UserTrends,
|
||||
collections::{PageParams, Paginated},
|
||||
},
|
||||
ports::MovieRepository,
|
||||
value_objects::{ExternalMetadataId, MovieId, MovieTitle, ReleaseYear, ReviewId},
|
||||
value_objects::{ExternalMetadataId, MovieId, MovieTitle, ReleaseYear, ReviewId, UserId},
|
||||
};
|
||||
use sqlx::SqlitePool;
|
||||
|
||||
@@ -15,10 +16,26 @@ mod migrations;
|
||||
mod models;
|
||||
mod users;
|
||||
|
||||
use models::{DiaryRow, MovieRow, ReviewRow, datetime_to_str};
|
||||
use models::{
|
||||
DiaryRow, DirectorCountRow, FeedRow, MonthlyRatingRow, MovieRow, ReviewRow,
|
||||
UserTotalsRow, datetime_to_str,
|
||||
};
|
||||
|
||||
pub use users::SqliteUserRepository;
|
||||
|
||||
fn format_year_month(ym: &str) -> String {
|
||||
let parts: Vec<&str> = ym.splitn(2, '-').collect();
|
||||
if parts.len() != 2 { return ym.to_string(); }
|
||||
let year = parts[0].get(2..).unwrap_or(parts[0]);
|
||||
let month = match parts[1] {
|
||||
"01" => "Jan", "02" => "Feb", "03" => "Mar", "04" => "Apr",
|
||||
"05" => "May", "06" => "Jun", "07" => "Jul", "08" => "Aug",
|
||||
"09" => "Sep", "10" => "Oct", "11" => "Nov", "12" => "Dec",
|
||||
_ => parts[1],
|
||||
};
|
||||
format!("{} '{}", month, year)
|
||||
}
|
||||
|
||||
pub struct SqliteMovieRepository {
|
||||
pool: SqlitePool,
|
||||
}
|
||||
@@ -59,7 +76,7 @@ impl SqliteMovieRepository {
|
||||
offset: i64,
|
||||
) -> Result<Vec<DiaryRow>, DomainError> {
|
||||
match sort {
|
||||
SortDirection::Descending => sqlx::query_as!(
|
||||
SortDirection::Descending | SortDirection::ByRatingDesc => sqlx::query_as!(
|
||||
DiaryRow,
|
||||
"SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path,
|
||||
r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at
|
||||
@@ -99,7 +116,7 @@ impl SqliteMovieRepository {
|
||||
offset: i64,
|
||||
) -> Result<Vec<DiaryRow>, DomainError> {
|
||||
match sort {
|
||||
SortDirection::Descending => sqlx::query_as!(
|
||||
SortDirection::Descending | SortDirection::ByRatingDesc => sqlx::query_as!(
|
||||
DiaryRow,
|
||||
"SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path,
|
||||
r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at
|
||||
@@ -134,6 +151,141 @@ impl SqliteMovieRepository {
|
||||
.map_err(Self::map_err),
|
||||
}
|
||||
}
|
||||
|
||||
async fn count_user_diary_entries(&self, user_id: &str) -> Result<i64, DomainError> {
|
||||
sqlx::query_scalar!(
|
||||
"SELECT COUNT(*) FROM reviews WHERE user_id = ?",
|
||||
user_id
|
||||
)
|
||||
.fetch_one(&self.pool)
|
||||
.await
|
||||
.map_err(Self::map_err)
|
||||
}
|
||||
|
||||
async fn fetch_user_diary_rows_by_watched(
|
||||
&self,
|
||||
user_id: &str,
|
||||
limit: i64,
|
||||
offset: i64,
|
||||
) -> Result<Vec<DiaryRow>, DomainError> {
|
||||
sqlx::query_as!(
|
||||
DiaryRow,
|
||||
"SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path,
|
||||
r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at
|
||||
FROM reviews r
|
||||
INNER JOIN movies m ON m.id = r.movie_id
|
||||
WHERE r.user_id = ?
|
||||
ORDER BY r.watched_at DESC
|
||||
LIMIT ? OFFSET ?",
|
||||
user_id, limit, offset
|
||||
)
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(Self::map_err)
|
||||
}
|
||||
|
||||
async fn fetch_user_diary_rows_by_rating(
|
||||
&self,
|
||||
user_id: &str,
|
||||
limit: i64,
|
||||
offset: i64,
|
||||
) -> Result<Vec<DiaryRow>, DomainError> {
|
||||
sqlx::query_as!(
|
||||
DiaryRow,
|
||||
"SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path,
|
||||
r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at
|
||||
FROM reviews r
|
||||
INNER JOIN movies m ON m.id = r.movie_id
|
||||
WHERE r.user_id = ?
|
||||
ORDER BY r.rating DESC, r.watched_at DESC
|
||||
LIMIT ? OFFSET ?",
|
||||
user_id, limit, offset
|
||||
)
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(Self::map_err)
|
||||
}
|
||||
|
||||
async fn count_feed_entries(&self) -> Result<i64, DomainError> {
|
||||
sqlx::query_scalar!("SELECT COUNT(*) FROM reviews")
|
||||
.fetch_one(&self.pool)
|
||||
.await
|
||||
.map_err(Self::map_err)
|
||||
}
|
||||
|
||||
async fn fetch_feed_rows(
|
||||
&self,
|
||||
limit: i64,
|
||||
offset: i64,
|
||||
) -> Result<Vec<FeedRow>, DomainError> {
|
||||
sqlx::query_as!(
|
||||
FeedRow,
|
||||
"SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path,
|
||||
r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at,
|
||||
u.email AS user_email
|
||||
FROM reviews r
|
||||
INNER JOIN movies m ON m.id = r.movie_id
|
||||
INNER JOIN users u ON u.id = r.user_id
|
||||
ORDER BY r.watched_at DESC
|
||||
LIMIT ? OFFSET ?",
|
||||
limit, offset
|
||||
)
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(Self::map_err)
|
||||
}
|
||||
|
||||
async fn fetch_user_totals(&self, user_id: &str) -> Result<UserTotalsRow, DomainError> {
|
||||
sqlx::query_as!(
|
||||
UserTotalsRow,
|
||||
r#"SELECT COUNT(*) AS "total!: i64",
|
||||
AVG(CAST(rating AS REAL)) AS avg_rating
|
||||
FROM reviews WHERE user_id = ?"#,
|
||||
user_id
|
||||
)
|
||||
.fetch_one(&self.pool)
|
||||
.await
|
||||
.map_err(Self::map_err)
|
||||
}
|
||||
|
||||
async fn fetch_user_favorite_director(
|
||||
&self,
|
||||
user_id: &str,
|
||||
) -> Result<Option<String>, DomainError> {
|
||||
let row = sqlx::query_scalar!(
|
||||
"SELECT m.director
|
||||
FROM reviews r
|
||||
INNER JOIN movies m ON m.id = r.movie_id
|
||||
WHERE r.user_id = ? AND m.director IS NOT NULL
|
||||
GROUP BY m.director
|
||||
ORDER BY COUNT(*) DESC
|
||||
LIMIT 1",
|
||||
user_id
|
||||
)
|
||||
.fetch_optional(&self.pool)
|
||||
.await
|
||||
.map_err(Self::map_err)?;
|
||||
Ok(row.flatten())
|
||||
}
|
||||
|
||||
async fn fetch_user_most_active_month(
|
||||
&self,
|
||||
user_id: &str,
|
||||
) -> Result<Option<String>, DomainError> {
|
||||
let result: Option<Option<String>> = sqlx::query_scalar!(
|
||||
"SELECT strftime('%Y-%m', watched_at) AS month
|
||||
FROM reviews
|
||||
WHERE user_id = ?
|
||||
GROUP BY month
|
||||
ORDER BY COUNT(*) DESC
|
||||
LIMIT 1",
|
||||
user_id
|
||||
)
|
||||
.fetch_optional(&self.pool)
|
||||
.await
|
||||
.map_err(Self::map_err)?;
|
||||
Ok(result.flatten())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -261,18 +413,39 @@ impl MovieRepository for SqliteMovieRepository {
|
||||
let limit = filter.page.limit as i64;
|
||||
let offset = filter.page.offset as i64;
|
||||
|
||||
let (total, rows) = match &filter.movie_id {
|
||||
None => tokio::try_join!(
|
||||
let (total, rows) = match (&filter.movie_id, &filter.user_id) {
|
||||
(None, None) => tokio::try_join!(
|
||||
self.count_diary_entries(None),
|
||||
self.fetch_all_diary_rows(&filter.sort_by, limit, offset)
|
||||
)?,
|
||||
Some(id) => {
|
||||
(Some(id), None) => {
|
||||
let id_str = id.value().to_string();
|
||||
tokio::try_join!(
|
||||
self.count_diary_entries(Some(id_str.as_str())),
|
||||
self.fetch_movie_diary_rows(&id_str, &filter.sort_by, limit, offset)
|
||||
)?
|
||||
}
|
||||
(None, Some(uid)) => {
|
||||
let uid_str = uid.value().to_string();
|
||||
match &filter.sort_by {
|
||||
SortDirection::ByRatingDesc => tokio::try_join!(
|
||||
self.count_user_diary_entries(&uid_str),
|
||||
self.fetch_user_diary_rows_by_rating(&uid_str, limit, offset)
|
||||
)?,
|
||||
_ => tokio::try_join!(
|
||||
self.count_user_diary_entries(&uid_str),
|
||||
self.fetch_user_diary_rows_by_watched(&uid_str, limit, offset)
|
||||
)?,
|
||||
}
|
||||
}
|
||||
(Some(mid), Some(uid)) => {
|
||||
let mid_str = mid.value().to_string();
|
||||
let uid_str = uid.value().to_string();
|
||||
tokio::try_join!(
|
||||
self.count_user_diary_entries(&uid_str),
|
||||
self.fetch_movie_diary_rows(&mid_str, &filter.sort_by, limit, offset)
|
||||
)?
|
||||
}
|
||||
};
|
||||
|
||||
let items = rows
|
||||
@@ -351,4 +524,119 @@ impl MovieRepository for SqliteMovieRepository {
|
||||
|
||||
Ok(ReviewHistory::new(movie, viewings))
|
||||
}
|
||||
|
||||
async fn query_activity_feed(
|
||||
&self,
|
||||
page: &PageParams,
|
||||
) -> Result<Paginated<FeedEntry>, DomainError> {
|
||||
let limit = page.limit as i64;
|
||||
let offset = page.offset as i64;
|
||||
|
||||
let (total, rows) = tokio::try_join!(
|
||||
self.count_feed_entries(),
|
||||
self.fetch_feed_rows(limit, offset)
|
||||
)?;
|
||||
|
||||
let items = rows
|
||||
.into_iter()
|
||||
.map(FeedRow::to_domain)
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
Ok(Paginated {
|
||||
items,
|
||||
total_count: total as u64,
|
||||
limit: page.limit,
|
||||
offset: page.offset,
|
||||
})
|
||||
}
|
||||
|
||||
async fn get_user_stats(&self, user_id: &UserId) -> Result<UserStats, DomainError> {
|
||||
let uid = user_id.value().to_string();
|
||||
|
||||
let (totals, fav_director, most_active) = tokio::try_join!(
|
||||
self.fetch_user_totals(&uid),
|
||||
self.fetch_user_favorite_director(&uid),
|
||||
self.fetch_user_most_active_month(&uid)
|
||||
)?;
|
||||
|
||||
let most_active_month = most_active.map(|ym| format_year_month(&ym));
|
||||
|
||||
Ok(UserStats {
|
||||
total_movies: totals.total,
|
||||
avg_rating: totals.avg_rating,
|
||||
favorite_director: fav_director,
|
||||
most_active_month,
|
||||
})
|
||||
}
|
||||
|
||||
async fn get_user_history(&self, user_id: &UserId) -> Result<Vec<DiaryEntry>, DomainError> {
|
||||
let uid = user_id.value().to_string();
|
||||
let rows = sqlx::query_as!(
|
||||
DiaryRow,
|
||||
"SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path,
|
||||
r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at
|
||||
FROM reviews r
|
||||
INNER JOIN movies m ON m.id = r.movie_id
|
||||
WHERE r.user_id = ?
|
||||
ORDER BY r.watched_at DESC",
|
||||
uid
|
||||
)
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(Self::map_err)?;
|
||||
|
||||
rows.into_iter().map(DiaryRow::to_domain).collect()
|
||||
}
|
||||
|
||||
async fn get_user_trends(&self, user_id: &UserId) -> Result<UserTrends, DomainError> {
|
||||
let uid = user_id.value().to_string();
|
||||
|
||||
let (rating_rows, director_rows) = tokio::try_join!(
|
||||
sqlx::query_as!(
|
||||
MonthlyRatingRow,
|
||||
r#"SELECT strftime('%Y-%m', watched_at) AS "month!",
|
||||
AVG(CAST(rating AS REAL)) AS "avg_rating!: f64",
|
||||
COUNT(*) AS "count!: i64"
|
||||
FROM reviews
|
||||
WHERE user_id = ? AND watched_at >= datetime('now', '-12 months')
|
||||
GROUP BY "month!"
|
||||
ORDER BY "month!" ASC"#,
|
||||
uid
|
||||
)
|
||||
.fetch_all(&self.pool),
|
||||
sqlx::query_as!(
|
||||
DirectorCountRow,
|
||||
r#"SELECT m.director AS "director!",
|
||||
COUNT(*) AS "count!: i64"
|
||||
FROM reviews r
|
||||
INNER JOIN movies m ON m.id = r.movie_id
|
||||
WHERE r.user_id = ? AND m.director IS NOT NULL
|
||||
GROUP BY m.director
|
||||
ORDER BY COUNT(*) DESC
|
||||
LIMIT 5"#,
|
||||
uid
|
||||
)
|
||||
.fetch_all(&self.pool)
|
||||
)
|
||||
.map_err(Self::map_err)?;
|
||||
|
||||
let max_director_count = director_rows.iter().map(|d| d.count).max().unwrap_or(1);
|
||||
|
||||
let monthly_ratings = rating_rows
|
||||
.into_iter()
|
||||
.map(|r| MonthlyRating {
|
||||
month_label: format_year_month(&r.month),
|
||||
year_month: r.month,
|
||||
avg_rating: r.avg_rating,
|
||||
count: r.count,
|
||||
})
|
||||
.collect();
|
||||
|
||||
let top_directors = director_rows
|
||||
.into_iter()
|
||||
.map(|d| DirectorStat { director: d.director, count: d.count })
|
||||
.collect();
|
||||
|
||||
Ok(UserTrends { monthly_ratings, top_directors, max_director_count })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -105,7 +105,7 @@ impl UserRepository for SqliteUserRepository {
|
||||
r#"SELECT u.id,
|
||||
u.email,
|
||||
COUNT(r.id) AS "total_movies!: i64",
|
||||
AVG(CAST(r.rating AS REAL)) AS "avg_rating: Option<f64>"
|
||||
AVG(CAST(r.rating AS REAL)) AS avg_rating
|
||||
FROM users u
|
||||
LEFT JOIN reviews r ON r.user_id = u.id
|
||||
GROUP BY u.id, u.email
|
||||
|
||||
Reference in New Issue
Block a user