From b5ff43d9dcafb673dea3e2b36c59f4a4949daaa4 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Thu, 11 Jun 2026 21:49:15 +0200 Subject: [PATCH] refactor(import): scoped Arc deps, ImportSessionCleanupJob --- .../application/src/import/apply_mapping.rs | 35 ++++--- .../application/src/import/apply_profile.rs | 21 ++-- crates/application/src/import/cleanup.rs | 9 +- .../application/src/import/create_session.rs | 17 ++-- .../application/src/import/delete_profile.rs | 15 ++- crates/application/src/import/execute.rs | 16 +-- .../application/src/import/list_profiles.rs | 9 +- crates/application/src/import/save_profile.rs | 14 +-- .../src/import/tests/apply_mapping.rs | 47 +++++---- .../src/import/tests/apply_profile.rs | 25 ++--- .../application/src/import/tests/cleanup.rs | 6 +- .../src/import/tests/create_session.rs | 8 +- .../src/import/tests/delete_profile.rs | 6 +- .../application/src/import/tests/execute.rs | 99 ++++++------------- .../src/import/tests/list_profiles.rs | 6 +- .../src/import/tests/save_profile.rs | 18 ++-- crates/application/src/jobs/import_cleanup.rs | 13 ++- crates/presentation/src/handlers/import.rs | 47 ++++++--- crates/worker/src/main.rs | 2 +- 19 files changed, 198 insertions(+), 215 deletions(-) diff --git a/crates/application/src/import/apply_mapping.rs b/crates/application/src/import/apply_mapping.rs index f709bf0..a08f888 100644 --- a/crates/application/src/import/apply_mapping.rs +++ b/crates/application/src/import/apply_mapping.rs @@ -1,21 +1,24 @@ +use std::sync::Arc; + use domain::{ errors::DomainError, models::{AnnotatedRow, import::RowResult}, + ports::{DocumentParser, ImportSessionRepository, MovieRepository}, value_objects::{ExternalMetadataId, ImportSessionId, MovieTitle, ReleaseYear, UserId}, }; -use crate::{context::AppContext, import::commands::ApplyImportMappingCommand}; +use crate::import::commands::ApplyImportMappingCommand; pub async fn execute( - ctx: &AppContext, + import_session: Arc, + document_parser: Arc, + movie: Arc, cmd: ApplyImportMappingCommand, ) -> Result, DomainError> { let user_id = UserId::from_uuid(cmd.user_id); let session_id = ImportSessionId::from_uuid(cmd.session_id); let mappings = cmd.mappings; - let mut session = ctx - .repos - .import_session + let mut session = import_session .get(&session_id, &user_id) .await? .ok_or_else(|| DomainError::NotFound("import session".into()))?; @@ -25,22 +28,22 @@ pub async fn execute( .clone() .ok_or_else(|| DomainError::ValidationError("session has no parsed file".into()))?; - let mut annotated = ctx - .services - .document_parser - .apply_mapping(&parsed, &mappings); + let mut annotated = document_parser.apply_mapping(&parsed, &mappings); - mark_duplicates(ctx, &mut annotated).await?; + mark_duplicates(movie, &mut annotated).await?; session.field_mappings = Some(mappings); session.row_results = Some(annotated.clone()); - ctx.repos.import_session.update(&session).await?; + import_session.update(&session).await?; Ok(annotated) } -async fn mark_duplicates(ctx: &AppContext, rows: &mut [AnnotatedRow]) -> Result<(), DomainError> { +async fn mark_duplicates( + movie: Arc, + rows: &mut [AnnotatedRow], +) -> Result<(), DomainError> { let mut ext_ids = Vec::new(); let mut title_year_pairs = Vec::new(); @@ -63,12 +66,8 @@ async fn mark_duplicates(ctx: &AppContext, rows: &mut [AnnotatedRow]) -> Result< } } - let known_ext = ctx.repos.movie.existing_external_ids(&ext_ids).await?; - let known_ty = ctx - .repos - .movie - .existing_title_year_pairs(&title_year_pairs) - .await?; + let known_ext = movie.existing_external_ids(&ext_ids).await?; + let known_ty = movie.existing_title_year_pairs(&title_year_pairs).await?; for row in rows.iter_mut() { if let RowResult::Valid(ref r) = row.result { diff --git a/crates/application/src/import/apply_profile.rs b/crates/application/src/import/apply_profile.rs index 02a36b7..98f9617 100644 --- a/crates/application/src/import/apply_profile.rs +++ b/crates/application/src/import/apply_profile.rs @@ -1,31 +1,34 @@ -use crate::{context::AppContext, import::commands::ApplyImportProfileCommand}; +use std::sync::Arc; + +use crate::import::commands::ApplyImportProfileCommand; use domain::{ errors::DomainError, + ports::{ImportProfileRepository, ImportSessionRepository}, value_objects::{ImportProfileId, ImportSessionId, UserId}, }; /// Copies the profile's field_mappings onto the session. Caller must then invoke /// apply_import_mapping to regenerate row_results with the new mappings. -pub async fn execute(ctx: &AppContext, cmd: ApplyImportProfileCommand) -> Result<(), DomainError> { +pub async fn execute( + import_profile: Arc, + import_session: Arc, + cmd: ApplyImportProfileCommand, +) -> Result<(), DomainError> { let user_id = UserId::from_uuid(cmd.user_id); let session_id = ImportSessionId::from_uuid(cmd.session_id); let profile_id = ImportProfileId::from_uuid(cmd.profile_id); - let profile = ctx - .repos - .import_profile + let profile = import_profile .get(&profile_id, &user_id) .await? .ok_or_else(|| DomainError::NotFound("import profile".into()))?; - let mut session = ctx - .repos - .import_session + let mut session = import_session .get(&session_id, &user_id) .await? .ok_or_else(|| DomainError::NotFound("import session".into()))?; session.field_mappings = Some(profile.field_mappings); session.row_results = None; - ctx.repos.import_session.update(&session).await + import_session.update(&session).await } #[cfg(test)] diff --git a/crates/application/src/import/cleanup.rs b/crates/application/src/import/cleanup.rs index f98ce65..2bd6416 100644 --- a/crates/application/src/import/cleanup.rs +++ b/crates/application/src/import/cleanup.rs @@ -1,8 +1,9 @@ -use crate::context::AppContext; -use domain::errors::DomainError; +use std::sync::Arc; -pub async fn execute(ctx: &AppContext) -> Result { - ctx.repos.import_session.delete_expired().await +use domain::{errors::DomainError, ports::ImportSessionRepository}; + +pub async fn execute(import_session: Arc) -> Result { + import_session.delete_expired().await } #[cfg(test)] diff --git a/crates/application/src/import/create_session.rs b/crates/application/src/import/create_session.rs index 0bc9cc4..f54084b 100644 --- a/crates/application/src/import/create_session.rs +++ b/crates/application/src/import/create_session.rs @@ -1,11 +1,14 @@ +use std::sync::Arc; + use chrono::Utc; use domain::{ errors::DomainError, models::ImportSession, + ports::{DocumentParser, ImportSessionRepository}, value_objects::{ImportSessionId, UserId}, }; -use crate::{context::AppContext, import::commands::CreateImportSessionCommand}; +use crate::import::commands::CreateImportSessionCommand; pub struct CreateSessionResult { pub session_id: ImportSessionId, @@ -14,18 +17,16 @@ pub struct CreateSessionResult { } pub async fn execute( - ctx: &AppContext, + import_session: Arc, + document_parser: Arc, cmd: CreateImportSessionCommand, ) -> Result { let user_id = UserId::from_uuid(cmd.user_id); - ctx.repos - .import_session + import_session .delete_expired_for_user(&user_id) .await?; - let parsed = ctx - .services - .document_parser + let parsed = document_parser .parse(&cmd.bytes, cmd.format) .map_err(|e| DomainError::ValidationError(e.to_string()))?; @@ -37,7 +38,7 @@ pub async fn execute( let session_id = session.id.clone(); session.parsed_file = Some(parsed); - ctx.repos.import_session.create(&session).await?; + import_session.create(&session).await?; Ok(CreateSessionResult { session_id, diff --git a/crates/application/src/import/delete_profile.rs b/crates/application/src/import/delete_profile.rs index 5a793c3..2ccd9e3 100644 --- a/crates/application/src/import/delete_profile.rs +++ b/crates/application/src/import/delete_profile.rs @@ -1,19 +1,24 @@ -use crate::{context::AppContext, import::commands::DeleteImportProfileCommand}; +use std::sync::Arc; + +use crate::import::commands::DeleteImportProfileCommand; use domain::{ errors::DomainError, + ports::ImportProfileRepository, value_objects::{ImportProfileId, UserId}, }; -pub async fn execute(ctx: &AppContext, cmd: DeleteImportProfileCommand) -> Result<(), DomainError> { +pub async fn execute( + import_profile: Arc, + cmd: DeleteImportProfileCommand, +) -> Result<(), DomainError> { let user_id = UserId::from_uuid(cmd.user_id); let profile_id = ImportProfileId::from_uuid(cmd.profile_id); - ctx.repos - .import_profile + import_profile .get(&profile_id, &user_id) .await? .ok_or_else(|| DomainError::NotFound("import profile".into()))?; - ctx.repos.import_profile.delete(&profile_id).await + import_profile.delete(&profile_id).await } #[cfg(test)] diff --git a/crates/application/src/import/execute.rs b/crates/application/src/import/execute.rs index f207a94..7ff784d 100644 --- a/crates/application/src/import/execute.rs +++ b/crates/application/src/import/execute.rs @@ -1,15 +1,18 @@ +use std::sync::Arc; + use chrono::NaiveDateTime; use domain::{ errors::DomainError, models::{ImportRow, import::RowResult}, + ports::ImportSessionRepository, value_objects::{ImportSessionId, UserId}, }; use uuid::Uuid; use crate::{ - context::AppContext, diary::commands::{LogReviewCommand, MovieInput}, import::commands::ExecuteImportCommand, + ports::ReviewLogger, }; pub struct ImportSummary { @@ -19,15 +22,14 @@ pub struct ImportSummary { } pub async fn execute( - ctx: &AppContext, + import_session: Arc, + review_logger: Arc, cmd: ExecuteImportCommand, ) -> Result { let user_id = UserId::from_uuid(cmd.user_id); let session_id = ImportSessionId::from_uuid(cmd.session_id); let confirmed_indices = cmd.confirmed_indices; - let session = ctx - .repos - .import_session + let session = import_session .get(&session_id, &user_id) .await? .ok_or_else(|| DomainError::NotFound("import session".into()))?; @@ -46,7 +48,7 @@ pub async fn execute( } match annotated.result { RowResult::Valid(row) => match row_to_command(&row, user_id.value()) { - Ok(cmd) => match ctx.services.review_logger.log_review(cmd).await { + Ok(cmd) => match review_logger.log_review(cmd).await { Ok(_) => imported += 1, Err(e) => failed.push((idx, e.to_string())), }, @@ -58,7 +60,7 @@ pub async fn execute( } } - ctx.repos.import_session.delete(&session_id).await?; + import_session.delete(&session_id).await?; Ok(ImportSummary { imported, diff --git a/crates/application/src/import/list_profiles.rs b/crates/application/src/import/list_profiles.rs index 9ac0d11..7fa9d2d 100644 --- a/crates/application/src/import/list_profiles.rs +++ b/crates/application/src/import/list_profiles.rs @@ -1,11 +1,12 @@ -use crate::context::AppContext; -use domain::{errors::DomainError, models::ImportProfile, value_objects::UserId}; +use std::sync::Arc; + +use domain::{errors::DomainError, models::ImportProfile, ports::ImportProfileRepository, value_objects::UserId}; pub async fn execute( - ctx: &AppContext, + import_profile: Arc, user_id: &UserId, ) -> Result, DomainError> { - ctx.repos.import_profile.list_for_user(user_id).await + import_profile.list_for_user(user_id).await } #[cfg(test)] diff --git a/crates/application/src/import/save_profile.rs b/crates/application/src/import/save_profile.rs index 77a097f..b244cbd 100644 --- a/crates/application/src/import/save_profile.rs +++ b/crates/application/src/import/save_profile.rs @@ -1,21 +1,23 @@ -use crate::{context::AppContext, import::commands::SaveImportProfileCommand}; +use std::sync::Arc; + +use crate::import::commands::SaveImportProfileCommand; use chrono::Utc; use domain::{ errors::DomainError, models::ImportProfile, + ports::{ImportProfileRepository, ImportSessionRepository}, value_objects::{ImportProfileId, ImportSessionId, UserId}, }; pub async fn execute( - ctx: &AppContext, + import_session: Arc, + import_profile: Arc, cmd: SaveImportProfileCommand, ) -> Result { let user_id = UserId::from_uuid(cmd.user_id); let session_id = ImportSessionId::from_uuid(cmd.session_id); - let session = ctx - .repos - .import_session + let session = import_session .get(&session_id, &user_id) .await? .ok_or_else(|| DomainError::NotFound("import session".into()))?; @@ -30,7 +32,7 @@ pub async fn execute( Utc::now().naive_utc(), ); let id = profile.id.clone(); - ctx.repos.import_profile.save(&profile).await?; + import_profile.save(&profile).await?; Ok(id) } diff --git a/crates/application/src/import/tests/apply_mapping.rs b/crates/application/src/import/tests/apply_mapping.rs index 6fa6fb4..0d7f41c 100644 --- a/crates/application/src/import/tests/apply_mapping.rs +++ b/crates/application/src/import/tests/apply_mapping.rs @@ -8,7 +8,7 @@ use domain::{ import::{ImportRow, ParsedFile, RowResult}, }, ports::{DocumentParser, MovieRepository}, - testing::InMemoryMovieRepository, + testing::{InMemoryImportSessionRepository, InMemoryMovieRepository}, value_objects::{ExternalMetadataId, MovieTitle, ReleaseYear}, }; @@ -21,11 +21,13 @@ use crate::test_helpers::TestContextBuilder; #[tokio::test] async fn applies_mapping_to_session() { + let sessions = InMemoryImportSessionRepository::new(); let ctx = TestContextBuilder::new().build(); let user_id = Uuid::new_v4(); let session = create_session::execute( - &ctx, + Arc::clone(&sessions) as _, + ctx.services.document_parser.clone(), CreateImportSessionCommand { user_id, bytes: b"title\nTest".to_vec(), @@ -36,7 +38,9 @@ async fn applies_mapping_to_session() { .unwrap(); let rows = apply_mapping::execute( - &ctx, + Arc::clone(&sessions) as _, + ctx.services.document_parser.clone(), + ctx.repos.movie.clone(), ApplyImportMappingCommand { user_id, session_id: session.session_id.value(), @@ -51,10 +55,13 @@ async fn applies_mapping_to_session() { #[tokio::test] async fn fails_when_session_not_found() { + let sessions = InMemoryImportSessionRepository::new(); let ctx = TestContextBuilder::new().build(); let result = apply_mapping::execute( - &ctx, + Arc::clone(&sessions) as _, + ctx.services.document_parser.clone(), + ctx.repos.movie.clone(), ApplyImportMappingCommand { user_id: Uuid::new_v4(), session_id: Uuid::new_v4(), @@ -102,6 +109,7 @@ impl DocumentParser for DuplicateTestParser { #[tokio::test] async fn marks_duplicate_by_external_id() { let movies = InMemoryMovieRepository::new(); + let sessions = InMemoryImportSessionRepository::new(); let ext_id = ExternalMetadataId::new("tt1234567".into()).unwrap(); let movie = Movie::new( @@ -113,23 +121,20 @@ async fn marks_duplicate_by_external_id() { ); movies.upsert_movie(&movie).await.unwrap(); - let parser = DuplicateTestParser { + let parser = Arc::new(DuplicateTestParser { rows: vec![ImportRow { title: Some("Known Movie".into()), release_year: Some("2020".into()), external_metadata_id: Some("tt1234567".into()), ..ImportRow::default() }], - }; + }); - let ctx = TestContextBuilder::new() - .with_movies(Arc::clone(&movies) as _) - .with_document_parser(Arc::new(parser) as _) - .build(); let user_id = Uuid::new_v4(); let session = create_session::execute( - &ctx, + Arc::clone(&sessions) as _, + Arc::clone(&parser) as _, CreateImportSessionCommand { user_id, bytes: b"title\nKnown Movie".to_vec(), @@ -140,7 +145,9 @@ async fn marks_duplicate_by_external_id() { .unwrap(); let rows = apply_mapping::execute( - &ctx, + Arc::clone(&sessions) as _, + Arc::clone(&parser) as _, + Arc::clone(&movies) as _, ApplyImportMappingCommand { user_id, session_id: session.session_id.value(), @@ -157,6 +164,7 @@ async fn marks_duplicate_by_external_id() { #[tokio::test] async fn marks_duplicate_by_title_and_year() { let movies = InMemoryMovieRepository::new(); + let sessions = InMemoryImportSessionRepository::new(); let movie = Movie::new( None, @@ -167,22 +175,19 @@ async fn marks_duplicate_by_title_and_year() { ); movies.upsert_movie(&movie).await.unwrap(); - let parser = DuplicateTestParser { + let parser = Arc::new(DuplicateTestParser { rows: vec![ImportRow { title: Some("Duplicate Film".into()), release_year: Some("2022".into()), ..ImportRow::default() }], - }; + }); - let ctx = TestContextBuilder::new() - .with_movies(Arc::clone(&movies) as _) - .with_document_parser(Arc::new(parser) as _) - .build(); let user_id = Uuid::new_v4(); let session = create_session::execute( - &ctx, + Arc::clone(&sessions) as _, + Arc::clone(&parser) as _, CreateImportSessionCommand { user_id, bytes: b"title\nDuplicate Film".to_vec(), @@ -193,7 +198,9 @@ async fn marks_duplicate_by_title_and_year() { .unwrap(); let rows = apply_mapping::execute( - &ctx, + Arc::clone(&sessions) as _, + Arc::clone(&parser) as _, + Arc::clone(&movies) as _, ApplyImportMappingCommand { user_id, session_id: session.session_id.value(), diff --git a/crates/application/src/import/tests/apply_profile.rs b/crates/application/src/import/tests/apply_profile.rs index 6e8c6d6..7839ea0 100644 --- a/crates/application/src/import/tests/apply_profile.rs +++ b/crates/application/src/import/tests/apply_profile.rs @@ -3,19 +3,20 @@ use std::sync::Arc; use chrono::Utc; use domain::models::ImportProfile; use domain::ports::{ImportProfileRepository, ImportSessionRepository}; -use domain::testing::InMemoryImportProfileRepository; +use domain::testing::{InMemoryImportProfileRepository, InMemoryImportSessionRepository}; use domain::value_objects::{ImportProfileId, UserId}; use uuid::Uuid; use crate::import::{apply_profile, commands::ApplyImportProfileCommand}; -use crate::test_helpers::TestContextBuilder; #[tokio::test] async fn fails_when_profile_not_found() { - let ctx = TestContextBuilder::new().build(); + let profiles = InMemoryImportProfileRepository::new(); + let sessions = InMemoryImportSessionRepository::new(); let result = apply_profile::execute( - &ctx, + Arc::clone(&profiles) as _, + Arc::clone(&sessions) as _, ApplyImportProfileCommand { user_id: Uuid::new_v4(), session_id: Uuid::new_v4(), @@ -30,6 +31,7 @@ async fn fails_when_profile_not_found() { #[tokio::test] async fn fails_when_session_not_found() { let profiles = InMemoryImportProfileRepository::new(); + let sessions = InMemoryImportSessionRepository::new(); let user_id = Uuid::new_v4(); let profile = ImportProfile::new( @@ -42,12 +44,9 @@ async fn fails_when_session_not_found() { let profile_id = profile.id.clone(); profiles.save(&profile).await.unwrap(); - let ctx = TestContextBuilder::new() - .with_import_profiles(Arc::clone(&profiles) as _) - .build(); - let result = apply_profile::execute( - &ctx, + Arc::clone(&profiles) as _, + Arc::clone(&sessions) as _, ApplyImportProfileCommand { user_id, session_id: Uuid::new_v4(), @@ -87,13 +86,9 @@ async fn applies_profile_mappings_to_session() { let session_id = session.id.clone(); sessions.create(&session).await.unwrap(); - let ctx = TestContextBuilder::new() - .with_import_profiles(Arc::clone(&profiles) as _) - .with_import_sessions(Arc::clone(&sessions) as _) - .build(); - apply_profile::execute( - &ctx, + Arc::clone(&profiles) as _, + Arc::clone(&sessions) as _, ApplyImportProfileCommand { user_id, session_id: session_id.value(), diff --git a/crates/application/src/import/tests/cleanup.rs b/crates/application/src/import/tests/cleanup.rs index 5550367..9eb3b81 100644 --- a/crates/application/src/import/tests/cleanup.rs +++ b/crates/application/src/import/tests/cleanup.rs @@ -3,16 +3,12 @@ use std::sync::Arc; use domain::testing::InMemoryImportSessionRepository; use crate::import::cleanup; -use crate::test_helpers::TestContextBuilder; #[tokio::test] async fn returns_zero_when_nothing_expired() { let sessions = InMemoryImportSessionRepository::new(); - let ctx = TestContextBuilder::new() - .with_import_sessions(Arc::clone(&sessions) as _) - .build(); - let result = cleanup::execute(&ctx).await.unwrap(); + let result = cleanup::execute(Arc::clone(&sessions) as _).await.unwrap(); assert_eq!(result, 0); } diff --git a/crates/application/src/import/tests/create_session.rs b/crates/application/src/import/tests/create_session.rs index c899b3f..bd10fcd 100644 --- a/crates/application/src/import/tests/create_session.rs +++ b/crates/application/src/import/tests/create_session.rs @@ -1,14 +1,20 @@ +use std::sync::Arc; + use uuid::Uuid; +use domain::testing::InMemoryImportSessionRepository; + use crate::import::{commands::CreateImportSessionCommand, create_session}; use crate::test_helpers::TestContextBuilder; #[tokio::test] async fn creates_session_with_parsed_file() { + let sessions = InMemoryImportSessionRepository::new(); let ctx = TestContextBuilder::new().build(); let result = create_session::execute( - &ctx, + Arc::clone(&sessions) as _, + ctx.services.document_parser.clone(), CreateImportSessionCommand { user_id: Uuid::new_v4(), bytes: b"col1\nval1".to_vec(), diff --git a/crates/application/src/import/tests/delete_profile.rs b/crates/application/src/import/tests/delete_profile.rs index a0bfca1..c1e2115 100644 --- a/crates/application/src/import/tests/delete_profile.rs +++ b/crates/application/src/import/tests/delete_profile.rs @@ -4,17 +4,13 @@ use domain::testing::InMemoryImportProfileRepository; use uuid::Uuid; use crate::import::{commands::DeleteImportProfileCommand, delete_profile}; -use crate::test_helpers::TestContextBuilder; #[tokio::test] async fn fails_when_profile_not_found() { let profiles = InMemoryImportProfileRepository::new(); - let ctx = TestContextBuilder::new() - .with_import_profiles(Arc::clone(&profiles) as _) - .build(); let result = delete_profile::execute( - &ctx, + Arc::clone(&profiles) as _, DeleteImportProfileCommand { user_id: Uuid::new_v4(), profile_id: Uuid::new_v4(), diff --git a/crates/application/src/import/tests/execute.rs b/crates/application/src/import/tests/execute.rs index c6565bb..097fe1a 100644 --- a/crates/application/src/import/tests/execute.rs +++ b/crates/application/src/import/tests/execute.rs @@ -9,7 +9,7 @@ use uuid::Uuid; use crate::import::commands::ExecuteImportCommand; use crate::import::execute; -use crate::test_helpers::TestContextBuilder; +use crate::test_helpers::NoopReviewLogger; fn make_session_with_rows(user_id: UserId, session_id: ImportSessionId) -> ImportSession { let now = Utc::now().naive_utc(); @@ -52,12 +52,9 @@ async fn imports_confirmed_rows() { let session = make_session_with_rows(UserId::from_uuid(uid), sid.clone()); sessions.create(&session).await.unwrap(); - let ctx = TestContextBuilder::new() - .with_import_sessions(Arc::clone(&sessions) as _) - .build(); - let result = execute::execute( - &ctx, + Arc::clone(&sessions) as _, + Arc::new(NoopReviewLogger), ExecuteImportCommand { user_id: uid, session_id: sid.value(), @@ -81,12 +78,9 @@ async fn skips_unconfirmed_rows() { let session = make_session_with_rows(UserId::from_uuid(uid), sid.clone()); sessions.create(&session).await.unwrap(); - let ctx = TestContextBuilder::new() - .with_import_sessions(Arc::clone(&sessions) as _) - .build(); - let result = execute::execute( - &ctx, + Arc::clone(&sessions) as _, + Arc::new(NoopReviewLogger), ExecuteImportCommand { user_id: uid, session_id: sid.value(), @@ -102,9 +96,11 @@ async fn skips_unconfirmed_rows() { #[tokio::test] async fn fails_when_session_not_found() { - let ctx = TestContextBuilder::new().build(); + let sessions = InMemoryImportSessionRepository::new(); + let result = execute::execute( - &ctx, + Arc::clone(&sessions) as _, + Arc::new(NoopReviewLogger), ExecuteImportCommand { user_id: Uuid::new_v4(), session_id: Uuid::new_v4(), @@ -138,12 +134,9 @@ async fn handles_datetime_format() { }]); sessions.create(&session).await.unwrap(); - let ctx = TestContextBuilder::new() - .with_import_sessions(Arc::clone(&sessions) as _) - .build(); - let result = execute::execute( - &ctx, + Arc::clone(&sessions) as _, + Arc::new(NoopReviewLogger), ExecuteImportCommand { user_id: uid, session_id: sid.value(), @@ -179,12 +172,9 @@ async fn fails_on_invalid_rating() { }]); sessions.create(&session).await.unwrap(); - let ctx = TestContextBuilder::new() - .with_import_sessions(Arc::clone(&sessions) as _) - .build(); - let result = execute::execute( - &ctx, + Arc::clone(&sessions) as _, + Arc::new(NoopReviewLogger), ExecuteImportCommand { user_id: uid, session_id: sid.value(), @@ -220,12 +210,9 @@ async fn fails_on_missing_watched_at() { }]); sessions.create(&session).await.unwrap(); - let ctx = TestContextBuilder::new() - .with_import_sessions(Arc::clone(&sessions) as _) - .build(); - let result = execute::execute( - &ctx, + Arc::clone(&sessions) as _, + Arc::new(NoopReviewLogger), ExecuteImportCommand { user_id: uid, session_id: sid.value(), @@ -261,12 +248,9 @@ async fn imports_row_with_external_metadata_id() { }]); sessions.create(&session).await.unwrap(); - let ctx = TestContextBuilder::new() - .with_import_sessions(Arc::clone(&sessions) as _) - .build(); - let result = execute::execute( - &ctx, + Arc::clone(&sessions) as _, + Arc::new(NoopReviewLogger), ExecuteImportCommand { user_id: uid, session_id: sid.value(), @@ -302,12 +286,9 @@ async fn imports_row_with_director_and_comment() { }]); sessions.create(&session).await.unwrap(); - let ctx = TestContextBuilder::new() - .with_import_sessions(Arc::clone(&sessions) as _) - .build(); - let result = execute::execute( - &ctx, + Arc::clone(&sessions) as _, + Arc::new(NoopReviewLogger), ExecuteImportCommand { user_id: uid, session_id: sid.value(), @@ -343,12 +324,9 @@ async fn handles_space_separated_datetime_format() { }]); sessions.create(&session).await.unwrap(); - let ctx = TestContextBuilder::new() - .with_import_sessions(Arc::clone(&sessions) as _) - .build(); - let result = execute::execute( - &ctx, + Arc::clone(&sessions) as _, + Arc::new(NoopReviewLogger), ExecuteImportCommand { user_id: uid, session_id: sid.value(), @@ -379,12 +357,9 @@ async fn reports_invalid_row_result_errors() { }]); sessions.create(&session).await.unwrap(); - let ctx = TestContextBuilder::new() - .with_import_sessions(Arc::clone(&sessions) as _) - .build(); - let result = execute::execute( - &ctx, + Arc::clone(&sessions) as _, + Arc::new(NoopReviewLogger), ExecuteImportCommand { user_id: uid, session_id: sid.value(), @@ -422,12 +397,9 @@ async fn fails_on_missing_rating() { }]); sessions.create(&session).await.unwrap(); - let ctx = TestContextBuilder::new() - .with_import_sessions(Arc::clone(&sessions) as _) - .build(); - let result = execute::execute( - &ctx, + Arc::clone(&sessions) as _, + Arc::new(NoopReviewLogger), ExecuteImportCommand { user_id: uid, session_id: sid.value(), @@ -464,12 +436,9 @@ async fn fails_on_unparseable_date() { }]); sessions.create(&session).await.unwrap(); - let ctx = TestContextBuilder::new() - .with_import_sessions(Arc::clone(&sessions) as _) - .build(); - let result = execute::execute( - &ctx, + Arc::clone(&sessions) as _, + Arc::new(NoopReviewLogger), ExecuteImportCommand { user_id: uid, session_id: sid.value(), @@ -506,12 +475,9 @@ async fn imports_row_without_release_year() { }]); sessions.create(&session).await.unwrap(); - let ctx = TestContextBuilder::new() - .with_import_sessions(Arc::clone(&sessions) as _) - .build(); - let result = execute::execute( - &ctx, + Arc::clone(&sessions) as _, + Arc::new(NoopReviewLogger), ExecuteImportCommand { user_id: uid, session_id: sid.value(), @@ -535,12 +501,9 @@ async fn deletes_session_after_import() { sessions.create(&session).await.unwrap(); assert_eq!(sessions.count(), 1); - let ctx = TestContextBuilder::new() - .with_import_sessions(Arc::clone(&sessions) as _) - .build(); - execute::execute( - &ctx, + Arc::clone(&sessions) as _, + Arc::new(NoopReviewLogger), ExecuteImportCommand { user_id: uid, session_id: sid.value(), diff --git a/crates/application/src/import/tests/list_profiles.rs b/crates/application/src/import/tests/list_profiles.rs index fe65fc0..f0ab34a 100644 --- a/crates/application/src/import/tests/list_profiles.rs +++ b/crates/application/src/import/tests/list_profiles.rs @@ -5,17 +5,13 @@ use domain::value_objects::UserId; use uuid::Uuid; use crate::import::list_profiles; -use crate::test_helpers::TestContextBuilder; #[tokio::test] async fn returns_empty_when_no_profiles() { let profiles = InMemoryImportProfileRepository::new(); - let ctx = TestContextBuilder::new() - .with_import_profiles(Arc::clone(&profiles) as _) - .build(); let user_id = UserId::from_uuid(Uuid::new_v4()); - let result = list_profiles::execute(&ctx, &user_id).await.unwrap(); + let result = list_profiles::execute(Arc::clone(&profiles) as _, &user_id).await.unwrap(); assert!(result.is_empty()); } diff --git a/crates/application/src/import/tests/save_profile.rs b/crates/application/src/import/tests/save_profile.rs index 73768a0..9dca21c 100644 --- a/crates/application/src/import/tests/save_profile.rs +++ b/crates/application/src/import/tests/save_profile.rs @@ -3,22 +3,20 @@ use std::sync::Arc; use chrono::Utc; use domain::models::ImportSession; use domain::ports::ImportSessionRepository; -use domain::testing::InMemoryImportSessionRepository; +use domain::testing::{InMemoryImportProfileRepository, InMemoryImportSessionRepository}; use domain::value_objects::{ImportSessionId, UserId}; use uuid::Uuid; use crate::import::{commands::SaveImportProfileCommand, save_profile}; -use crate::test_helpers::TestContextBuilder; #[tokio::test] async fn fails_when_session_not_found() { let sessions = InMemoryImportSessionRepository::new(); - let ctx = TestContextBuilder::new() - .with_import_sessions(Arc::clone(&sessions) as _) - .build(); + let profiles = InMemoryImportProfileRepository::new(); let result = save_profile::execute( - &ctx, + Arc::clone(&sessions) as _, + Arc::clone(&profiles) as _, SaveImportProfileCommand { user_id: Uuid::new_v4(), session_id: Uuid::new_v4(), @@ -33,6 +31,7 @@ async fn fails_when_session_not_found() { #[tokio::test] async fn saves_profile_from_session() { let sessions = InMemoryImportSessionRepository::new(); + let profiles = InMemoryImportProfileRepository::new(); let user_id = Uuid::new_v4(); let sid = ImportSessionId::generate(); @@ -44,12 +43,9 @@ async fn saves_profile_from_session() { session.field_mappings = Some(vec![]); sessions.create(&session).await.unwrap(); - let ctx = TestContextBuilder::new() - .with_import_sessions(Arc::clone(&sessions) as _) - .build(); - let result = save_profile::execute( - &ctx, + Arc::clone(&sessions) as _, + Arc::clone(&profiles) as _, SaveImportProfileCommand { user_id, session_id: sid.value(), diff --git a/crates/application/src/jobs/import_cleanup.rs b/crates/application/src/jobs/import_cleanup.rs index 4daf018..511554e 100644 --- a/crates/application/src/jobs/import_cleanup.rs +++ b/crates/application/src/jobs/import_cleanup.rs @@ -1,17 +1,16 @@ +use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; -use domain::{errors::DomainError, ports::PeriodicJob}; - -use crate::context::AppContext; +use domain::{errors::DomainError, ports::{ImportSessionRepository, PeriodicJob}}; pub struct ImportSessionCleanupJob { - ctx: AppContext, + import_session: Arc, } impl ImportSessionCleanupJob { - pub fn new(ctx: AppContext) -> Self { - Self { ctx } + pub fn new(import_session: Arc) -> Self { + Self { import_session } } } @@ -22,7 +21,7 @@ impl PeriodicJob for ImportSessionCleanupJob { } async fn run(&self) -> Result<(), DomainError> { - let n = crate::import::cleanup::execute(&self.ctx).await?; + let n = crate::import::cleanup::execute(self.import_session.clone()).await?; tracing::info!("import session cleanup: removed {} expired sessions", n); Ok(()) } diff --git a/crates/presentation/src/handlers/import.rs b/crates/presentation/src/handlers/import.rs index 73ea4cb..b7e00fc 100644 --- a/crates/presentation/src/handlers/import.rs +++ b/crates/presentation/src/handlers/import.rs @@ -109,9 +109,12 @@ pub async fn get_import_page( Extension(csrf): Extension, ) -> impl IntoResponse { let ctx = super::helpers::build_page_context(&state, Some(user_id.clone()), csrf.0).await; - let profiles = list_import_profiles::execute(&state.app_ctx, &user_id) - .await - .unwrap_or_default() + let profiles = list_import_profiles::execute( + state.app_ctx.repos.import_profile.clone(), + &user_id, + ) + .await + .unwrap_or_default() .into_iter() .map(|p| ImportProfileView { id: p.id.value().to_string(), @@ -161,7 +164,8 @@ pub async fn post_upload( }; match create_import_session::execute( - &state.app_ctx, + state.app_ctx.repos.import_session.clone(), + state.app_ctx.services.document_parser.clone(), CreateImportSessionCommand { user_id: user_id.value(), bytes, @@ -250,7 +254,9 @@ pub async fn post_mapping( .into_response(); } match apply_import_mapping::execute( - &state.app_ctx, + state.app_ctx.repos.import_session.clone(), + state.app_ctx.services.document_parser.clone(), + state.app_ctx.repos.movie.clone(), ApplyImportMappingCommand { user_id: user_id.value(), session_id: session_id.value(), @@ -344,7 +350,8 @@ pub async fn post_confirm( .filter(|n| !n.trim().is_empty()); if let Some(name) = profile_name { let _ = save_import_profile::execute( - &state.app_ctx, + state.app_ctx.repos.import_session.clone(), + state.app_ctx.repos.import_profile.clone(), SaveImportProfileCommand { user_id: user_id.value(), session_id: session_id.value(), @@ -362,7 +369,8 @@ pub async fn post_confirm( .collect(); match execute_import::execute( - &state.app_ctx, + state.app_ctx.repos.import_session.clone(), + state.app_ctx.services.review_logger.clone(), ExecuteImportCommand { user_id: user_id.value(), session_id: session_id.value(), @@ -397,7 +405,7 @@ pub async fn post_delete_profile( } if let Ok(profile_id) = profile_id_str.parse::() { let _ = delete_import_profile::execute( - &state.app_ctx, + state.app_ctx.repos.import_profile.clone(), DeleteImportProfileCommand { user_id: user_id.value(), profile_id, @@ -489,7 +497,8 @@ pub async fn api_post_session( _ => FileFormat::Csv, }; match create_import_session::execute( - &state.app_ctx, + state.app_ctx.repos.import_session.clone(), + state.app_ctx.services.document_parser.clone(), CreateImportSessionCommand { user_id: user_id.value(), bytes, @@ -618,7 +627,9 @@ pub async fn api_put_mapping( .collect(); match apply_import_mapping::execute( - &state.app_ctx, + state.app_ctx.repos.import_session.clone(), + state.app_ctx.services.document_parser.clone(), + state.app_ctx.repos.movie.clone(), ApplyImportMappingCommand { user_id: user_id.value(), session_id: session_id.value(), @@ -725,7 +736,7 @@ pub async fn api_post_confirm( ) .into_response(); }; - match execute_import::execute(&state.app_ctx, ExecuteImportCommand { user_id: user_id.value(), session_id: session_id.value(), confirmed_indices: body.confirmed_indices }).await { + match execute_import::execute(state.app_ctx.repos.import_session.clone(), state.app_ctx.services.review_logger.clone(), ExecuteImportCommand { user_id: user_id.value(), session_id: session_id.value(), confirmed_indices: body.confirmed_indices }).await { Ok(s) => axum::Json(serde_json::json!({ "imported": s.imported, "skipped_duplicates": s.skipped_duplicates, @@ -754,7 +765,7 @@ pub async fn api_get_profiles( State(state): State, AuthenticatedUser(user_id): AuthenticatedUser, ) -> impl IntoResponse { - match list_import_profiles::execute(&state.app_ctx, &user_id).await { + match list_import_profiles::execute(state.app_ctx.repos.import_profile.clone(), &user_id).await { Ok(profiles) => axum::Json( profiles .into_iter() @@ -803,7 +814,8 @@ pub async fn api_post_profile( .into_response(); }; match save_import_profile::execute( - &state.app_ctx, + state.app_ctx.repos.import_session.clone(), + state.app_ctx.repos.import_profile.clone(), SaveImportProfileCommand { user_id: user_id.value(), session_id: session_id.value(), @@ -840,7 +852,7 @@ pub async fn api_delete_profile( return StatusCode::BAD_REQUEST.into_response(); }; match delete_import_profile::execute( - &state.app_ctx, + state.app_ctx.repos.import_profile.clone(), DeleteImportProfileCommand { user_id: user_id.value(), profile_id, @@ -896,7 +908,8 @@ pub async fn api_apply_profile( }; if let Err(e) = apply_import_profile::execute( - &state.app_ctx, + state.app_ctx.repos.import_profile.clone(), + state.app_ctx.repos.import_session.clone(), ApplyImportProfileCommand { user_id: user_id.value(), session_id, @@ -936,7 +949,9 @@ pub async fn api_apply_profile( let mappings = session.field_mappings.unwrap_or_default(); match apply_import_mapping::execute( - &state.app_ctx, + state.app_ctx.repos.import_session.clone(), + state.app_ctx.services.document_parser.clone(), + state.app_ctx.repos.movie.clone(), ApplyImportMappingCommand { user_id: user_id.value(), session_id, diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index 8d97fc2..dcf8fa2 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -174,7 +174,7 @@ async fn main() -> anyhow::Result<()> { // ── Periodic jobs ───────────────────────────────────────────────────────── let mut periodic_jobs: Vec> = vec![ - Arc::new(application::jobs::ImportSessionCleanupJob::new(ctx.clone())), + Arc::new(application::jobs::ImportSessionCleanupJob::new(ctx.repos.import_session.clone())), Arc::new(application::jobs::WatchEventCleanupJob::new(ctx.clone())), Arc::new(application::jobs::WrapUpAutoGenerateJob::new(ctx.clone())), Arc::new(application::jobs::WrapUpCleanupJob::new(ctx.clone())),