From 9c44330f14f013d445f22f204be4e46352413e4b Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Fri, 12 Jun 2026 01:10:21 +0200 Subject: [PATCH] feat(adapters): stream_user_history in SQLite and Postgres diary adapters --- crates/adapters/postgres/src/diary.rs | 30 +++++++++++++++++++++++++++ crates/adapters/sqlite/src/diary.rs | 27 ++++++++++++++++++++++++ 2 files changed, 57 insertions(+) diff --git a/crates/adapters/postgres/src/diary.rs b/crates/adapters/postgres/src/diary.rs index 2324672..44e65b2 100644 --- a/crates/adapters/postgres/src/diary.rs +++ b/crates/adapters/postgres/src/diary.rs @@ -1,4 +1,5 @@ use async_trait::async_trait; +use futures::stream::BoxStream; use domain::{ errors::DomainError, models::{ @@ -427,6 +428,35 @@ impl DiaryRepository for PostgresDiaryRepository { rows.into_iter().map(DiaryRow::into_domain).collect() } + fn stream_user_history( + &self, + user_id: UserId, + ) -> BoxStream<'static, Result> { + let pool = self.pool.clone(); + let uid = user_id.value().to_string(); + Box::pin(async_stream::stream! { + let mut rows = sqlx::query_as::<_, DiaryRow>( + "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, + r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, + to_char(r.watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at, + to_char(r.created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, + r.remote_actor_url + FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + WHERE r.user_id = $1 + ORDER BY r.watched_at DESC", + ) + .bind(&uid) + .fetch(&pool); + while let Some(row) = futures::StreamExt::next(&mut rows).await { + yield match row { + Ok(r) => r.into_domain(), + Err(e) => Err(Self::map_err(e)), + }; + } + }) + } + async fn get_movie_stats(&self, movie_id: &MovieId) -> Result { let id_str = movie_id.value().to_string(); sqlx::query_as::<_, MovieStatsRow>( diff --git a/crates/adapters/sqlite/src/diary.rs b/crates/adapters/sqlite/src/diary.rs index 959eea6..745e2ac 100644 --- a/crates/adapters/sqlite/src/diary.rs +++ b/crates/adapters/sqlite/src/diary.rs @@ -1,4 +1,5 @@ use async_trait::async_trait; +use futures::stream::BoxStream; use domain::{ errors::DomainError, models::{ @@ -389,6 +390,32 @@ impl DiaryRepository for SqliteDiaryRepository { rows.into_iter().map(DiaryRow::into_domain).collect() } + fn stream_user_history( + &self, + user_id: UserId, + ) -> BoxStream<'static, Result> { + let pool = self.pool.clone(); + let uid = user_id.value().to_string(); + Box::pin(async_stream::stream! { + let mut rows = sqlx::query_as::<_, DiaryRow>( + "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, + r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, r.remote_actor_url + FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + WHERE r.user_id = ? + ORDER BY r.watched_at DESC", + ) + .bind(&uid) + .fetch(&pool); + while let Some(row) = futures::StreamExt::next(&mut rows).await { + yield match row { + Ok(r) => r.into_domain(), + Err(e) => Err(Self::map_err(e)), + }; + } + }) + } + async fn get_movie_stats(&self, movie_id: &MovieId) -> Result { let id_str = movie_id.value().to_string(); sqlx::query_as::<_, MovieStatsRow>(