diff --git a/crates/adapters/postgres/src/lib.rs b/crates/adapters/postgres/src/lib.rs index 01dc951..d9d109d 100644 --- a/crates/adapters/postgres/src/lib.rs +++ b/crates/adapters/postgres/src/lib.rs @@ -388,6 +388,54 @@ impl MovieRepository for PostgresRepository { Ok(()) } + async fn existing_external_ids( + &self, + ids: &[ExternalMetadataId], + ) -> Result, DomainError> { + if ids.is_empty() { + return Ok(Default::default()); + } + let vals: Vec = ids.iter().map(|id| id.value().to_string()).collect(); + let rows: Vec<(String,)> = sqlx::query_as( + "SELECT external_metadata_id FROM movies WHERE external_metadata_id = ANY($1)", + ) + .bind(&vals) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err)?; + Ok(rows.into_iter().map(|(id,)| id).collect()) + } + + async fn existing_title_year_pairs( + &self, + pairs: &[(MovieTitle, ReleaseYear)], + ) -> Result, DomainError> { + if pairs.is_empty() { + return Ok(Default::default()); + } + let titles: Vec<&str> = pairs.iter().map(|(t, _)| t.value()).collect(); + let years: Vec = pairs.iter().map(|(_, y)| y.value() as i64).collect(); + use sqlx::Row; + let rows = sqlx::query( + "SELECT DISTINCT m.title, m.release_year FROM movies m \ + INNER JOIN unnest($1::text[], $2::bigint[]) AS p(title, release_year) \ + ON m.title = p.title AND m.release_year = p.release_year", + ) + .bind(&titles) + .bind(&years) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err)?; + Ok(rows + .into_iter() + .map(|r| { + let t: String = r.get("title"); + let y: i64 = r.get("release_year"); + (t, y as u16) + }) + .collect()) + } + async fn list_movies( &self, page: &domain::models::collections::PageParams, diff --git a/crates/adapters/postgres/src/watch_event.rs b/crates/adapters/postgres/src/watch_event.rs index 2802630..2f121e3 100644 --- a/crates/adapters/postgres/src/watch_event.rs +++ b/crates/adapters/postgres/src/watch_event.rs @@ -115,6 +115,45 @@ impl WatchEventRepository for PostgresWatchEventRepository { row.as_ref().map(row_to_watch_event).transpose() } + async fn get_by_ids(&self, ids: &[WatchEventId]) -> Result, DomainError> { + if ids.is_empty() { + return Ok(vec![]); + } + let id_strs: Vec = ids.iter().map(|id| id.value().to_string()).collect(); + let rows = sqlx::query( + "SELECT id, user_id, movie_id, title, year, external_metadata_id, \ + source, \ + to_char(watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at, \ + status, \ + to_char(created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at \ + FROM watch_events WHERE id = ANY($1)", + ) + .bind(&id_strs) + .fetch_all(&self.pool) + .await + .map_err(map_err)?; + rows.iter().map(row_to_watch_event).collect() + } + + async fn update_status_batch( + &self, + ids: &[WatchEventId], + status: WatchEventStatus, + ) -> Result { + if ids.is_empty() { + return Ok(0); + } + let id_strs: Vec = ids.iter().map(|id| id.value().to_string()).collect(); + let status_str = status.to_string(); + let result = sqlx::query("UPDATE watch_events SET status = $1 WHERE id = ANY($2)") + .bind(&status_str) + .bind(&id_strs) + .execute(&self.pool) + .await + .map_err(map_err)?; + Ok(result.rows_affected()) + } + async fn find_duplicate( &self, user_id: &UserId, diff --git a/crates/adapters/sqlite/src/lib.rs b/crates/adapters/sqlite/src/lib.rs index de7f104..d5b5e2f 100644 --- a/crates/adapters/sqlite/src/lib.rs +++ b/crates/adapters/sqlite/src/lib.rs @@ -402,6 +402,57 @@ impl MovieRepository for SqliteMovieRepository { Ok(()) } + async fn existing_external_ids( + &self, + ids: &[ExternalMetadataId], + ) -> Result, DomainError> { + if ids.is_empty() { + return Ok(Default::default()); + } + let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect(); + let sql = format!( + "SELECT external_metadata_id FROM movies WHERE external_metadata_id IN ({})", + placeholders.join(",") + ); + let mut q = sqlx::query_scalar::<_, String>(&sql); + for id in ids { + q = q.bind(id.value().to_string()); + } + let rows = q.fetch_all(&self.pool).await.map_err(Self::map_err)?; + Ok(rows.into_iter().collect()) + } + + async fn existing_title_year_pairs( + &self, + pairs: &[(MovieTitle, ReleaseYear)], + ) -> Result, DomainError> { + if pairs.is_empty() { + return Ok(Default::default()); + } + let conditions: Vec = pairs + .iter() + .map(|_| "(title = ? AND release_year = ?)".to_string()) + .collect(); + let sql = format!( + "SELECT DISTINCT title, release_year FROM movies WHERE {}", + conditions.join(" OR ") + ); + use sqlx::Row; + let mut q = sqlx::query(&sql); + for (t, y) in pairs { + q = q.bind(t.value().to_string()).bind(y.value() as i64); + } + let rows = q.fetch_all(&self.pool).await.map_err(Self::map_err)?; + Ok(rows + .into_iter() + .map(|r| { + let t: String = r.get("title"); + let y: i64 = r.get("release_year"); + (t, y as u16) + }) + .collect()) + } + async fn list_movies( &self, page: &domain::models::collections::PageParams, diff --git a/crates/adapters/sqlite/src/watch_event.rs b/crates/adapters/sqlite/src/watch_event.rs index c8f1ab5..00af73a 100644 --- a/crates/adapters/sqlite/src/watch_event.rs +++ b/crates/adapters/sqlite/src/watch_event.rs @@ -122,6 +122,46 @@ impl WatchEventRepository for SqliteWatchEventRepository { row.as_ref().map(row_to_watch_event).transpose() } + async fn get_by_ids(&self, ids: &[WatchEventId]) -> Result, DomainError> { + if ids.is_empty() { + return Ok(vec![]); + } + let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect(); + let sql = format!( + "SELECT id, user_id, movie_id, title, year, external_metadata_id, \ + source, watched_at, status, created_at \ + FROM watch_events WHERE id IN ({})", + placeholders.join(",") + ); + let mut q = sqlx::query(&sql); + for id in ids { + q = q.bind(id.value().to_string()); + } + let rows = q.fetch_all(&self.pool).await.map_err(map_err)?; + rows.iter().map(row_to_watch_event).collect() + } + + async fn update_status_batch( + &self, + ids: &[WatchEventId], + status: WatchEventStatus, + ) -> Result { + if ids.is_empty() { + return Ok(0); + } + let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect(); + let sql = format!( + "UPDATE watch_events SET status = ? WHERE id IN ({})", + placeholders.join(",") + ); + let mut q = sqlx::query(&sql).bind(status.to_string()); + for id in ids { + q = q.bind(id.value().to_string()); + } + let result = q.execute(&self.pool).await.map_err(map_err)?; + Ok(result.rows_affected()) + } + async fn find_duplicate( &self, user_id: &UserId, diff --git a/crates/application/src/diary/tests/movie_resolver.rs b/crates/application/src/diary/tests/movie_resolver.rs index eb576a3..140bed1 100644 --- a/crates/application/src/diary/tests/movie_resolver.rs +++ b/crates/application/src/diary/tests/movie_resolver.rs @@ -55,6 +55,18 @@ impl MovieRepository for RepoWithExternalMovie { async fn delete_movie(&self, _: &MovieId) -> Result<(), DomainError> { panic!("unexpected") } + async fn existing_external_ids( + &self, + _: &[ExternalMetadataId], + ) -> Result, DomainError> { + panic!("unexpected") + } + async fn existing_title_year_pairs( + &self, + _: &[(MovieTitle, ReleaseYear)], + ) -> Result, DomainError> { + panic!("unexpected") + } async fn list_movies( &self, _: &domain::models::collections::PageParams, @@ -89,6 +101,18 @@ impl MovieRepository for RepoEmpty { async fn delete_movie(&self, _: &MovieId) -> Result<(), DomainError> { panic!("unexpected") } + async fn existing_external_ids( + &self, + _: &[ExternalMetadataId], + ) -> Result, DomainError> { + panic!("unexpected") + } + async fn existing_title_year_pairs( + &self, + _: &[(MovieTitle, ReleaseYear)], + ) -> Result, DomainError> { + panic!("unexpected") + } async fn list_movies( &self, _: &domain::models::collections::PageParams, @@ -123,6 +147,18 @@ impl MovieRepository for RepoWithTitleMatch { async fn delete_movie(&self, _: &MovieId) -> Result<(), DomainError> { panic!("unexpected") } + async fn existing_external_ids( + &self, + _: &[ExternalMetadataId], + ) -> Result, DomainError> { + panic!("unexpected") + } + async fn existing_title_year_pairs( + &self, + _: &[(MovieTitle, ReleaseYear)], + ) -> Result, DomainError> { + panic!("unexpected") + } async fn list_movies( &self, _: &domain::models::collections::PageParams, diff --git a/crates/application/src/import/apply_mapping.rs b/crates/application/src/import/apply_mapping.rs index 0972c5a..0cd9762 100644 --- a/crates/application/src/import/apply_mapping.rs +++ b/crates/application/src/import/apply_mapping.rs @@ -20,7 +20,6 @@ pub async fn execute( .await? .ok_or_else(|| DomainError::NotFound("import session".into()))?; - // clone to avoid borrow conflict when mutating session fields below let parsed = session .parsed_file .clone() @@ -31,11 +30,7 @@ pub async fn execute( .document_parser .apply_mapping(&parsed, &mappings); - for row in annotated.iter_mut() { - if let RowResult::Valid(ref import_row) = row.result { - row.is_duplicate = check_duplicate(ctx, import_row).await?; - } - } + mark_duplicates(ctx, &mut annotated).await?; session.field_mappings = Some(mappings); session.row_results = Some(annotated.clone()); @@ -45,33 +40,52 @@ pub async fn execute( Ok(annotated) } -async fn check_duplicate( - ctx: &AppContext, - row: &domain::models::ImportRow, -) -> Result { - if let Some(ext_id) = &row.external_metadata_id - && let Ok(eid) = ExternalMetadataId::new(ext_id.clone()) - && ctx - .repos - .movie - .get_movie_by_external_id(&eid) - .await? - .is_some() - { - return Ok(true); - } - if let (Some(title), Some(year_str)) = (&row.title, &row.release_year) { - let title_vo = MovieTitle::new(title.clone()); - let year_vo = year_str - .parse::() - .ok() - .and_then(|y| ReleaseYear::new(y).ok()); - if let (Ok(t), Some(y)) = (title_vo, year_vo) { - let matches = ctx.repos.movie.get_movies_by_title_and_year(&t, &y).await?; - if !matches.is_empty() { - return Ok(true); +async fn mark_duplicates(ctx: &AppContext, rows: &mut [AnnotatedRow]) -> Result<(), DomainError> { + let mut ext_ids = Vec::new(); + let mut title_year_pairs = Vec::new(); + + for row in rows.iter() { + if let RowResult::Valid(ref r) = row.result { + if let Some(ext_id) = &r.external_metadata_id + && let Ok(eid) = ExternalMetadataId::new(ext_id.clone()) + { + ext_ids.push(eid); + } + if let (Some(title), Some(year_str)) = (&r.title, &r.release_year) + && let Ok(t) = MovieTitle::new(title.clone()) + && let Some(y) = year_str + .parse::() + .ok() + .and_then(|y| ReleaseYear::new(y).ok()) + { + title_year_pairs.push((t, y)); } } } - Ok(false) + + let known_ext = ctx.repos.movie.existing_external_ids(&ext_ids).await?; + let known_ty = ctx + .repos + .movie + .existing_title_year_pairs(&title_year_pairs) + .await?; + + for row in rows.iter_mut() { + if let RowResult::Valid(ref r) = row.result { + if let Some(ext_id) = &r.external_metadata_id + && known_ext.contains(ext_id) + { + row.is_duplicate = true; + continue; + } + if let (Some(title), Some(year_str)) = (&r.title, &r.release_year) + && let Ok(y) = year_str.parse::() + && known_ty.contains(&(title.clone(), y)) + { + row.is_duplicate = true; + } + } + } + + Ok(()) } diff --git a/crates/application/src/integrations/dismiss.rs b/crates/application/src/integrations/dismiss.rs index f587d84..e13c611 100644 --- a/crates/application/src/integrations/dismiss.rs +++ b/crates/application/src/integrations/dismiss.rs @@ -8,28 +8,34 @@ use crate::{context::AppContext, integrations::commands::DismissWatchEventsComma pub async fn execute(ctx: &AppContext, cmd: DismissWatchEventsCommand) -> Result { let user_id = UserId::from_uuid(cmd.user_id); - let mut dismissed = 0u32; + if cmd.event_ids.is_empty() { + return Ok(0); + } - for id in cmd.event_ids { - let event_id = WatchEventId::from_uuid(id); - let event = ctx - .repos - .watch_event - .get_by_id(&event_id) - .await? - .ok_or_else(|| DomainError::NotFound(format!("WatchEvent {id}")))?; + let ids: Vec = cmd + .event_ids + .iter() + .map(|id| WatchEventId::from_uuid(*id)) + .collect(); + let events = ctx.repos.watch_event.get_by_ids(&ids).await?; + + if events.len() != ids.len() { + return Err(DomainError::NotFound( + "one or more WatchEvents not found".into(), + )); + } + for event in &events { if event.user_id() != &user_id { return Err(DomainError::Unauthorized("not your watch event".into())); } - - ctx.repos - .watch_event - .update_status(&event_id, WatchEventStatus::Dismissed) - .await?; - - dismissed += 1; } - Ok(dismissed) + let count = ctx + .repos + .watch_event + .update_status_batch(&ids, WatchEventStatus::Dismissed) + .await?; + + Ok(count as u32) } diff --git a/crates/domain/src/ports.rs b/crates/domain/src/ports.rs index c10def7..bb5e74d 100644 --- a/crates/domain/src/ports.rs +++ b/crates/domain/src/ports.rs @@ -100,6 +100,14 @@ pub trait MovieRepository: Send + Sync { ) -> Result, DomainError>; async fn upsert_movie(&self, movie: &Movie) -> Result<(), DomainError>; async fn delete_movie(&self, movie_id: &MovieId) -> Result<(), DomainError>; + async fn existing_external_ids( + &self, + ids: &[ExternalMetadataId], + ) -> Result, DomainError>; + async fn existing_title_year_pairs( + &self, + pairs: &[(MovieTitle, ReleaseYear)], + ) -> Result, DomainError>; async fn list_movies( &self, page: &collections::PageParams, @@ -434,6 +442,12 @@ pub trait WatchEventRepository: Send + Sync { ) -> Result<(), DomainError>; async fn list_pending(&self, user_id: &UserId) -> Result, DomainError>; async fn get_by_id(&self, id: &WatchEventId) -> Result, DomainError>; + async fn get_by_ids(&self, ids: &[WatchEventId]) -> Result, DomainError>; + async fn update_status_batch( + &self, + ids: &[WatchEventId], + status: WatchEventStatus, + ) -> Result; async fn find_duplicate( &self, user_id: &UserId, diff --git a/crates/domain/src/testing.rs b/crates/domain/src/testing.rs index d9ad879..7137723 100644 --- a/crates/domain/src/testing.rs +++ b/crates/domain/src/testing.rs @@ -97,6 +97,38 @@ impl MovieRepository for InMemoryMovieRepository { Ok(()) } + async fn existing_external_ids( + &self, + ids: &[ExternalMetadataId], + ) -> Result, DomainError> { + let store = self.store.lock().unwrap(); + let known: std::collections::HashSet = store + .values() + .filter_map(|m| m.external_metadata_id().map(|e| e.value().to_string())) + .collect(); + Ok(ids + .iter() + .map(|id| id.value().to_string()) + .filter(|v| known.contains(v)) + .collect()) + } + + async fn existing_title_year_pairs( + &self, + pairs: &[(MovieTitle, ReleaseYear)], + ) -> Result, DomainError> { + let store = self.store.lock().unwrap(); + let known: std::collections::HashSet<(String, u16)> = store + .values() + .map(|m| (m.title().value().to_string(), m.release_year().value())) + .collect(); + Ok(pairs + .iter() + .map(|(t, y)| (t.value().to_string(), y.value())) + .filter(|p| known.contains(p)) + .collect()) + } + async fn list_movies( &self, _page: &crate::models::collections::PageParams, @@ -868,6 +900,19 @@ impl crate::ports::WatchEventRepository for PanicWatchEventRepository { ) -> Result, DomainError> { panic!("PanicWatchEventRepository called") } + async fn get_by_ids( + &self, + _: &[crate::value_objects::WatchEventId], + ) -> Result, DomainError> { + panic!("PanicWatchEventRepository called") + } + async fn update_status_batch( + &self, + _: &[crate::value_objects::WatchEventId], + _: crate::models::WatchEventStatus, + ) -> Result { + panic!("PanicWatchEventRepository called") + } async fn find_duplicate( &self, _: &UserId, diff --git a/crates/presentation/src/tests/extractors.rs b/crates/presentation/src/tests/extractors.rs index b123635..bcda303 100644 --- a/crates/presentation/src/tests/extractors.rs +++ b/crates/presentation/src/tests/extractors.rs @@ -60,6 +60,18 @@ impl MovieRepository for Panic { async fn delete_movie(&self, _: &MovieId) -> Result<(), DomainError> { panic!() } + async fn existing_external_ids( + &self, + _: &[ExternalMetadataId], + ) -> Result, DomainError> { + panic!() + } + async fn existing_title_year_pairs( + &self, + _: &[(MovieTitle, ReleaseYear)], + ) -> Result, DomainError> { + panic!() + } async fn list_movies( &self, _: &domain::models::collections::PageParams, @@ -496,6 +508,19 @@ impl domain::ports::WatchEventRepository for Panic { ) -> Result, DomainError> { panic!() } + async fn get_by_ids( + &self, + _: &[domain::value_objects::WatchEventId], + ) -> Result, DomainError> { + panic!() + } + async fn update_status_batch( + &self, + _: &[domain::value_objects::WatchEventId], + _: domain::models::WatchEventStatus, + ) -> Result { + panic!() + } async fn find_duplicate( &self, _: &domain::value_objects::UserId,