diff --git a/crates/application/src/import/execute.rs b/crates/application/src/import/execute.rs index 7ff784d..8729fc8 100644 --- a/crates/application/src/import/execute.rs +++ b/crates/application/src/import/execute.rs @@ -15,6 +15,8 @@ use crate::{ ports::ReviewLogger, }; +const CONCURRENCY_LIMIT: usize = 10; + pub struct ImportSummary { pub imported: usize, pub skipped_duplicates: usize, @@ -41,22 +43,38 @@ pub async fn execute( let mut skipped_duplicates = 0; let mut failed = Vec::new(); + let semaphore = Arc::new(tokio::sync::Semaphore::new(CONCURRENCY_LIMIT)); + let mut tasks: tokio::task::JoinSet<(usize, Result<(), String>)> = tokio::task::JoinSet::new(); + for (idx, annotated) in row_results.into_iter().enumerate() { if !confirmed_set.contains(&idx) { skipped_duplicates += 1; continue; } match annotated.result { - RowResult::Valid(row) => match row_to_command(&row, user_id.value()) { - Ok(cmd) => match review_logger.log_review(cmd).await { - Ok(_) => imported += 1, - Err(e) => failed.push((idx, e.to_string())), - }, - Err(e) => failed.push((idx, e)), - }, RowResult::Invalid { errors, .. } => { failed.push((idx, errors.join("; "))); } + RowResult::Valid(row) => match row_to_command(&row, user_id.value()) { + Err(e) => failed.push((idx, e)), + Ok(log_cmd) => { + let permit = Arc::clone(&semaphore).acquire_owned().await.unwrap(); + let logger = Arc::clone(&review_logger); + tasks.spawn(async move { + let result = logger.log_review(log_cmd).await.map_err(|e| e.to_string()); + drop(permit); + (idx, result) + }); + } + }, + } + } + + while let Some(res) = tasks.join_next().await { + let (idx, outcome) = res.expect("import task panicked"); + match outcome { + Ok(()) => imported += 1, + Err(e) => failed.push((idx, e)), } } diff --git a/crates/application/src/import/tests/execute.rs b/crates/application/src/import/tests/execute.rs index 097fe1a..90c4fef 100644 --- a/crates/application/src/import/tests/execute.rs +++ b/crates/application/src/import/tests/execute.rs @@ -519,3 +519,47 @@ async fn deletes_session_after_import() { "session should be deleted after import" ); } + +#[tokio::test] +async fn imports_more_rows_than_concurrency_limit() { + let sessions = InMemoryImportSessionRepository::new(); + let uid = Uuid::new_v4(); + let sid = ImportSessionId::generate(); + + let now = Utc::now().naive_utc(); + let rows: Vec<_> = (0..15) + .map(|i| AnnotatedRow { + result: RowResult::Valid(domain::models::ImportRow { + title: Some(format!("Movie {i}")), + release_year: Some("2024".into()), + rating: Some("4".into()), + watched_at: Some("2024-06-01".into()), + external_metadata_id: None, + director: None, + comment: None, + }), + is_duplicate: false, + }) + .collect(); + + let mut session = ImportSession::new(sid.clone(), UserId::from_uuid(uid), now); + session.row_results = Some(rows); + sessions.create(&session).await.unwrap(); + + let confirmed_indices: Vec = (0..15).collect(); + let result = execute::execute( + Arc::clone(&sessions) as _, + Arc::new(NoopReviewLogger), + ExecuteImportCommand { + user_id: uid, + session_id: sid.value(), + confirmed_indices, + }, + ) + .await + .unwrap(); + + assert_eq!(result.imported, 15); + assert_eq!(result.skipped_duplicates, 0); + assert!(result.failed.is_empty()); +}