perf(import): parallelize row processing with JoinSet + Semaphore (limit 10)
Some checks failed
CI / Check / Test (push) Has been cancelled
Some checks failed
CI / Check / Test (push) Has been cancelled
This commit is contained in:
@@ -15,6 +15,8 @@ use crate::{
|
|||||||
ports::ReviewLogger,
|
ports::ReviewLogger,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const CONCURRENCY_LIMIT: usize = 10;
|
||||||
|
|
||||||
pub struct ImportSummary {
|
pub struct ImportSummary {
|
||||||
pub imported: usize,
|
pub imported: usize,
|
||||||
pub skipped_duplicates: usize,
|
pub skipped_duplicates: usize,
|
||||||
@@ -41,22 +43,38 @@ pub async fn execute(
|
|||||||
let mut skipped_duplicates = 0;
|
let mut skipped_duplicates = 0;
|
||||||
let mut failed = Vec::new();
|
let mut failed = Vec::new();
|
||||||
|
|
||||||
|
let semaphore = Arc::new(tokio::sync::Semaphore::new(CONCURRENCY_LIMIT));
|
||||||
|
let mut tasks: tokio::task::JoinSet<(usize, Result<(), String>)> = tokio::task::JoinSet::new();
|
||||||
|
|
||||||
for (idx, annotated) in row_results.into_iter().enumerate() {
|
for (idx, annotated) in row_results.into_iter().enumerate() {
|
||||||
if !confirmed_set.contains(&idx) {
|
if !confirmed_set.contains(&idx) {
|
||||||
skipped_duplicates += 1;
|
skipped_duplicates += 1;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
match annotated.result {
|
match annotated.result {
|
||||||
RowResult::Valid(row) => match row_to_command(&row, user_id.value()) {
|
|
||||||
Ok(cmd) => match review_logger.log_review(cmd).await {
|
|
||||||
Ok(_) => imported += 1,
|
|
||||||
Err(e) => failed.push((idx, e.to_string())),
|
|
||||||
},
|
|
||||||
Err(e) => failed.push((idx, e)),
|
|
||||||
},
|
|
||||||
RowResult::Invalid { errors, .. } => {
|
RowResult::Invalid { errors, .. } => {
|
||||||
failed.push((idx, errors.join("; ")));
|
failed.push((idx, errors.join("; ")));
|
||||||
}
|
}
|
||||||
|
RowResult::Valid(row) => match row_to_command(&row, user_id.value()) {
|
||||||
|
Err(e) => failed.push((idx, e)),
|
||||||
|
Ok(log_cmd) => {
|
||||||
|
let permit = Arc::clone(&semaphore).acquire_owned().await.unwrap();
|
||||||
|
let logger = Arc::clone(&review_logger);
|
||||||
|
tasks.spawn(async move {
|
||||||
|
let result = logger.log_review(log_cmd).await.map_err(|e| e.to_string());
|
||||||
|
drop(permit);
|
||||||
|
(idx, result)
|
||||||
|
});
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
while let Some(res) = tasks.join_next().await {
|
||||||
|
let (idx, outcome) = res.expect("import task panicked");
|
||||||
|
match outcome {
|
||||||
|
Ok(()) => imported += 1,
|
||||||
|
Err(e) => failed.push((idx, e)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -519,3 +519,47 @@ async fn deletes_session_after_import() {
|
|||||||
"session should be deleted after import"
|
"session should be deleted after import"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn imports_more_rows_than_concurrency_limit() {
|
||||||
|
let sessions = InMemoryImportSessionRepository::new();
|
||||||
|
let uid = Uuid::new_v4();
|
||||||
|
let sid = ImportSessionId::generate();
|
||||||
|
|
||||||
|
let now = Utc::now().naive_utc();
|
||||||
|
let rows: Vec<_> = (0..15)
|
||||||
|
.map(|i| AnnotatedRow {
|
||||||
|
result: RowResult::Valid(domain::models::ImportRow {
|
||||||
|
title: Some(format!("Movie {i}")),
|
||||||
|
release_year: Some("2024".into()),
|
||||||
|
rating: Some("4".into()),
|
||||||
|
watched_at: Some("2024-06-01".into()),
|
||||||
|
external_metadata_id: None,
|
||||||
|
director: None,
|
||||||
|
comment: None,
|
||||||
|
}),
|
||||||
|
is_duplicate: false,
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let mut session = ImportSession::new(sid.clone(), UserId::from_uuid(uid), now);
|
||||||
|
session.row_results = Some(rows);
|
||||||
|
sessions.create(&session).await.unwrap();
|
||||||
|
|
||||||
|
let confirmed_indices: Vec<usize> = (0..15).collect();
|
||||||
|
let result = execute::execute(
|
||||||
|
Arc::clone(&sessions) as _,
|
||||||
|
Arc::new(NoopReviewLogger),
|
||||||
|
ExecuteImportCommand {
|
||||||
|
user_id: uid,
|
||||||
|
session_id: sid.value(),
|
||||||
|
confirmed_indices,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(result.imported, 15);
|
||||||
|
assert_eq!(result.skipped_duplicates, 0);
|
||||||
|
assert!(result.failed.is_empty());
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user