refactor(import): scoped Arc deps, ImportSessionCleanupJob
This commit is contained in:
@@ -1,21 +1,24 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use domain::{
|
||||
errors::DomainError,
|
||||
models::{AnnotatedRow, import::RowResult},
|
||||
ports::{DocumentParser, ImportSessionRepository, MovieRepository},
|
||||
value_objects::{ExternalMetadataId, ImportSessionId, MovieTitle, ReleaseYear, UserId},
|
||||
};
|
||||
|
||||
use crate::{context::AppContext, import::commands::ApplyImportMappingCommand};
|
||||
use crate::import::commands::ApplyImportMappingCommand;
|
||||
|
||||
pub async fn execute(
|
||||
ctx: &AppContext,
|
||||
import_session: Arc<dyn ImportSessionRepository>,
|
||||
document_parser: Arc<dyn DocumentParser>,
|
||||
movie: Arc<dyn MovieRepository>,
|
||||
cmd: ApplyImportMappingCommand,
|
||||
) -> Result<Vec<AnnotatedRow>, DomainError> {
|
||||
let user_id = UserId::from_uuid(cmd.user_id);
|
||||
let session_id = ImportSessionId::from_uuid(cmd.session_id);
|
||||
let mappings = cmd.mappings;
|
||||
let mut session = ctx
|
||||
.repos
|
||||
.import_session
|
||||
let mut session = import_session
|
||||
.get(&session_id, &user_id)
|
||||
.await?
|
||||
.ok_or_else(|| DomainError::NotFound("import session".into()))?;
|
||||
@@ -25,22 +28,22 @@ pub async fn execute(
|
||||
.clone()
|
||||
.ok_or_else(|| DomainError::ValidationError("session has no parsed file".into()))?;
|
||||
|
||||
let mut annotated = ctx
|
||||
.services
|
||||
.document_parser
|
||||
.apply_mapping(&parsed, &mappings);
|
||||
let mut annotated = document_parser.apply_mapping(&parsed, &mappings);
|
||||
|
||||
mark_duplicates(ctx, &mut annotated).await?;
|
||||
mark_duplicates(movie, &mut annotated).await?;
|
||||
|
||||
session.field_mappings = Some(mappings);
|
||||
session.row_results = Some(annotated.clone());
|
||||
|
||||
ctx.repos.import_session.update(&session).await?;
|
||||
import_session.update(&session).await?;
|
||||
|
||||
Ok(annotated)
|
||||
}
|
||||
|
||||
async fn mark_duplicates(ctx: &AppContext, rows: &mut [AnnotatedRow]) -> Result<(), DomainError> {
|
||||
async fn mark_duplicates(
|
||||
movie: Arc<dyn MovieRepository>,
|
||||
rows: &mut [AnnotatedRow],
|
||||
) -> Result<(), DomainError> {
|
||||
let mut ext_ids = Vec::new();
|
||||
let mut title_year_pairs = Vec::new();
|
||||
|
||||
@@ -63,12 +66,8 @@ async fn mark_duplicates(ctx: &AppContext, rows: &mut [AnnotatedRow]) -> Result<
|
||||
}
|
||||
}
|
||||
|
||||
let known_ext = ctx.repos.movie.existing_external_ids(&ext_ids).await?;
|
||||
let known_ty = ctx
|
||||
.repos
|
||||
.movie
|
||||
.existing_title_year_pairs(&title_year_pairs)
|
||||
.await?;
|
||||
let known_ext = movie.existing_external_ids(&ext_ids).await?;
|
||||
let known_ty = movie.existing_title_year_pairs(&title_year_pairs).await?;
|
||||
|
||||
for row in rows.iter_mut() {
|
||||
if let RowResult::Valid(ref r) = row.result {
|
||||
|
||||
@@ -1,31 +1,34 @@
|
||||
use crate::{context::AppContext, import::commands::ApplyImportProfileCommand};
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::import::commands::ApplyImportProfileCommand;
|
||||
use domain::{
|
||||
errors::DomainError,
|
||||
ports::{ImportProfileRepository, ImportSessionRepository},
|
||||
value_objects::{ImportProfileId, ImportSessionId, UserId},
|
||||
};
|
||||
|
||||
/// Copies the profile's field_mappings onto the session. Caller must then invoke
|
||||
/// apply_import_mapping to regenerate row_results with the new mappings.
|
||||
pub async fn execute(ctx: &AppContext, cmd: ApplyImportProfileCommand) -> Result<(), DomainError> {
|
||||
pub async fn execute(
|
||||
import_profile: Arc<dyn ImportProfileRepository>,
|
||||
import_session: Arc<dyn ImportSessionRepository>,
|
||||
cmd: ApplyImportProfileCommand,
|
||||
) -> Result<(), DomainError> {
|
||||
let user_id = UserId::from_uuid(cmd.user_id);
|
||||
let session_id = ImportSessionId::from_uuid(cmd.session_id);
|
||||
let profile_id = ImportProfileId::from_uuid(cmd.profile_id);
|
||||
|
||||
let profile = ctx
|
||||
.repos
|
||||
.import_profile
|
||||
let profile = import_profile
|
||||
.get(&profile_id, &user_id)
|
||||
.await?
|
||||
.ok_or_else(|| DomainError::NotFound("import profile".into()))?;
|
||||
let mut session = ctx
|
||||
.repos
|
||||
.import_session
|
||||
let mut session = import_session
|
||||
.get(&session_id, &user_id)
|
||||
.await?
|
||||
.ok_or_else(|| DomainError::NotFound("import session".into()))?;
|
||||
session.field_mappings = Some(profile.field_mappings);
|
||||
session.row_results = None;
|
||||
ctx.repos.import_session.update(&session).await
|
||||
import_session.update(&session).await
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
use crate::context::AppContext;
|
||||
use domain::errors::DomainError;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub async fn execute(ctx: &AppContext) -> Result<u64, DomainError> {
|
||||
ctx.repos.import_session.delete_expired().await
|
||||
use domain::{errors::DomainError, ports::ImportSessionRepository};
|
||||
|
||||
pub async fn execute(import_session: Arc<dyn ImportSessionRepository>) -> Result<u64, DomainError> {
|
||||
import_session.delete_expired().await
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -1,11 +1,14 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use chrono::Utc;
|
||||
use domain::{
|
||||
errors::DomainError,
|
||||
models::ImportSession,
|
||||
ports::{DocumentParser, ImportSessionRepository},
|
||||
value_objects::{ImportSessionId, UserId},
|
||||
};
|
||||
|
||||
use crate::{context::AppContext, import::commands::CreateImportSessionCommand};
|
||||
use crate::import::commands::CreateImportSessionCommand;
|
||||
|
||||
pub struct CreateSessionResult {
|
||||
pub session_id: ImportSessionId,
|
||||
@@ -14,18 +17,16 @@ pub struct CreateSessionResult {
|
||||
}
|
||||
|
||||
pub async fn execute(
|
||||
ctx: &AppContext,
|
||||
import_session: Arc<dyn ImportSessionRepository>,
|
||||
document_parser: Arc<dyn DocumentParser>,
|
||||
cmd: CreateImportSessionCommand,
|
||||
) -> Result<CreateSessionResult, DomainError> {
|
||||
let user_id = UserId::from_uuid(cmd.user_id);
|
||||
ctx.repos
|
||||
.import_session
|
||||
import_session
|
||||
.delete_expired_for_user(&user_id)
|
||||
.await?;
|
||||
|
||||
let parsed = ctx
|
||||
.services
|
||||
.document_parser
|
||||
let parsed = document_parser
|
||||
.parse(&cmd.bytes, cmd.format)
|
||||
.map_err(|e| DomainError::ValidationError(e.to_string()))?;
|
||||
|
||||
@@ -37,7 +38,7 @@ pub async fn execute(
|
||||
let session_id = session.id.clone();
|
||||
session.parsed_file = Some(parsed);
|
||||
|
||||
ctx.repos.import_session.create(&session).await?;
|
||||
import_session.create(&session).await?;
|
||||
|
||||
Ok(CreateSessionResult {
|
||||
session_id,
|
||||
|
||||
@@ -1,19 +1,24 @@
|
||||
use crate::{context::AppContext, import::commands::DeleteImportProfileCommand};
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::import::commands::DeleteImportProfileCommand;
|
||||
use domain::{
|
||||
errors::DomainError,
|
||||
ports::ImportProfileRepository,
|
||||
value_objects::{ImportProfileId, UserId},
|
||||
};
|
||||
|
||||
pub async fn execute(ctx: &AppContext, cmd: DeleteImportProfileCommand) -> Result<(), DomainError> {
|
||||
pub async fn execute(
|
||||
import_profile: Arc<dyn ImportProfileRepository>,
|
||||
cmd: DeleteImportProfileCommand,
|
||||
) -> Result<(), DomainError> {
|
||||
let user_id = UserId::from_uuid(cmd.user_id);
|
||||
let profile_id = ImportProfileId::from_uuid(cmd.profile_id);
|
||||
|
||||
ctx.repos
|
||||
.import_profile
|
||||
import_profile
|
||||
.get(&profile_id, &user_id)
|
||||
.await?
|
||||
.ok_or_else(|| DomainError::NotFound("import profile".into()))?;
|
||||
ctx.repos.import_profile.delete(&profile_id).await
|
||||
import_profile.delete(&profile_id).await
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -1,15 +1,18 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use chrono::NaiveDateTime;
|
||||
use domain::{
|
||||
errors::DomainError,
|
||||
models::{ImportRow, import::RowResult},
|
||||
ports::ImportSessionRepository,
|
||||
value_objects::{ImportSessionId, UserId},
|
||||
};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{
|
||||
context::AppContext,
|
||||
diary::commands::{LogReviewCommand, MovieInput},
|
||||
import::commands::ExecuteImportCommand,
|
||||
ports::ReviewLogger,
|
||||
};
|
||||
|
||||
pub struct ImportSummary {
|
||||
@@ -19,15 +22,14 @@ pub struct ImportSummary {
|
||||
}
|
||||
|
||||
pub async fn execute(
|
||||
ctx: &AppContext,
|
||||
import_session: Arc<dyn ImportSessionRepository>,
|
||||
review_logger: Arc<dyn ReviewLogger>,
|
||||
cmd: ExecuteImportCommand,
|
||||
) -> Result<ImportSummary, DomainError> {
|
||||
let user_id = UserId::from_uuid(cmd.user_id);
|
||||
let session_id = ImportSessionId::from_uuid(cmd.session_id);
|
||||
let confirmed_indices = cmd.confirmed_indices;
|
||||
let session = ctx
|
||||
.repos
|
||||
.import_session
|
||||
let session = import_session
|
||||
.get(&session_id, &user_id)
|
||||
.await?
|
||||
.ok_or_else(|| DomainError::NotFound("import session".into()))?;
|
||||
@@ -46,7 +48,7 @@ pub async fn execute(
|
||||
}
|
||||
match annotated.result {
|
||||
RowResult::Valid(row) => match row_to_command(&row, user_id.value()) {
|
||||
Ok(cmd) => match ctx.services.review_logger.log_review(cmd).await {
|
||||
Ok(cmd) => match review_logger.log_review(cmd).await {
|
||||
Ok(_) => imported += 1,
|
||||
Err(e) => failed.push((idx, e.to_string())),
|
||||
},
|
||||
@@ -58,7 +60,7 @@ pub async fn execute(
|
||||
}
|
||||
}
|
||||
|
||||
ctx.repos.import_session.delete(&session_id).await?;
|
||||
import_session.delete(&session_id).await?;
|
||||
|
||||
Ok(ImportSummary {
|
||||
imported,
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
use crate::context::AppContext;
|
||||
use domain::{errors::DomainError, models::ImportProfile, value_objects::UserId};
|
||||
use std::sync::Arc;
|
||||
|
||||
use domain::{errors::DomainError, models::ImportProfile, ports::ImportProfileRepository, value_objects::UserId};
|
||||
|
||||
pub async fn execute(
|
||||
ctx: &AppContext,
|
||||
import_profile: Arc<dyn ImportProfileRepository>,
|
||||
user_id: &UserId,
|
||||
) -> Result<Vec<ImportProfile>, DomainError> {
|
||||
ctx.repos.import_profile.list_for_user(user_id).await
|
||||
import_profile.list_for_user(user_id).await
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -1,21 +1,23 @@
|
||||
use crate::{context::AppContext, import::commands::SaveImportProfileCommand};
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::import::commands::SaveImportProfileCommand;
|
||||
use chrono::Utc;
|
||||
use domain::{
|
||||
errors::DomainError,
|
||||
models::ImportProfile,
|
||||
ports::{ImportProfileRepository, ImportSessionRepository},
|
||||
value_objects::{ImportProfileId, ImportSessionId, UserId},
|
||||
};
|
||||
|
||||
pub async fn execute(
|
||||
ctx: &AppContext,
|
||||
import_session: Arc<dyn ImportSessionRepository>,
|
||||
import_profile: Arc<dyn ImportProfileRepository>,
|
||||
cmd: SaveImportProfileCommand,
|
||||
) -> Result<ImportProfileId, DomainError> {
|
||||
let user_id = UserId::from_uuid(cmd.user_id);
|
||||
let session_id = ImportSessionId::from_uuid(cmd.session_id);
|
||||
|
||||
let session = ctx
|
||||
.repos
|
||||
.import_session
|
||||
let session = import_session
|
||||
.get(&session_id, &user_id)
|
||||
.await?
|
||||
.ok_or_else(|| DomainError::NotFound("import session".into()))?;
|
||||
@@ -30,7 +32,7 @@ pub async fn execute(
|
||||
Utc::now().naive_utc(),
|
||||
);
|
||||
let id = profile.id.clone();
|
||||
ctx.repos.import_profile.save(&profile).await?;
|
||||
import_profile.save(&profile).await?;
|
||||
Ok(id)
|
||||
}
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@ use domain::{
|
||||
import::{ImportRow, ParsedFile, RowResult},
|
||||
},
|
||||
ports::{DocumentParser, MovieRepository},
|
||||
testing::InMemoryMovieRepository,
|
||||
testing::{InMemoryImportSessionRepository, InMemoryMovieRepository},
|
||||
value_objects::{ExternalMetadataId, MovieTitle, ReleaseYear},
|
||||
};
|
||||
|
||||
@@ -21,11 +21,13 @@ use crate::test_helpers::TestContextBuilder;
|
||||
|
||||
#[tokio::test]
|
||||
async fn applies_mapping_to_session() {
|
||||
let sessions = InMemoryImportSessionRepository::new();
|
||||
let ctx = TestContextBuilder::new().build();
|
||||
let user_id = Uuid::new_v4();
|
||||
|
||||
let session = create_session::execute(
|
||||
&ctx,
|
||||
Arc::clone(&sessions) as _,
|
||||
ctx.services.document_parser.clone(),
|
||||
CreateImportSessionCommand {
|
||||
user_id,
|
||||
bytes: b"title\nTest".to_vec(),
|
||||
@@ -36,7 +38,9 @@ async fn applies_mapping_to_session() {
|
||||
.unwrap();
|
||||
|
||||
let rows = apply_mapping::execute(
|
||||
&ctx,
|
||||
Arc::clone(&sessions) as _,
|
||||
ctx.services.document_parser.clone(),
|
||||
ctx.repos.movie.clone(),
|
||||
ApplyImportMappingCommand {
|
||||
user_id,
|
||||
session_id: session.session_id.value(),
|
||||
@@ -51,10 +55,13 @@ async fn applies_mapping_to_session() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn fails_when_session_not_found() {
|
||||
let sessions = InMemoryImportSessionRepository::new();
|
||||
let ctx = TestContextBuilder::new().build();
|
||||
|
||||
let result = apply_mapping::execute(
|
||||
&ctx,
|
||||
Arc::clone(&sessions) as _,
|
||||
ctx.services.document_parser.clone(),
|
||||
ctx.repos.movie.clone(),
|
||||
ApplyImportMappingCommand {
|
||||
user_id: Uuid::new_v4(),
|
||||
session_id: Uuid::new_v4(),
|
||||
@@ -102,6 +109,7 @@ impl DocumentParser for DuplicateTestParser {
|
||||
#[tokio::test]
|
||||
async fn marks_duplicate_by_external_id() {
|
||||
let movies = InMemoryMovieRepository::new();
|
||||
let sessions = InMemoryImportSessionRepository::new();
|
||||
|
||||
let ext_id = ExternalMetadataId::new("tt1234567".into()).unwrap();
|
||||
let movie = Movie::new(
|
||||
@@ -113,23 +121,20 @@ async fn marks_duplicate_by_external_id() {
|
||||
);
|
||||
movies.upsert_movie(&movie).await.unwrap();
|
||||
|
||||
let parser = DuplicateTestParser {
|
||||
let parser = Arc::new(DuplicateTestParser {
|
||||
rows: vec![ImportRow {
|
||||
title: Some("Known Movie".into()),
|
||||
release_year: Some("2020".into()),
|
||||
external_metadata_id: Some("tt1234567".into()),
|
||||
..ImportRow::default()
|
||||
}],
|
||||
};
|
||||
});
|
||||
|
||||
let ctx = TestContextBuilder::new()
|
||||
.with_movies(Arc::clone(&movies) as _)
|
||||
.with_document_parser(Arc::new(parser) as _)
|
||||
.build();
|
||||
let user_id = Uuid::new_v4();
|
||||
|
||||
let session = create_session::execute(
|
||||
&ctx,
|
||||
Arc::clone(&sessions) as _,
|
||||
Arc::clone(&parser) as _,
|
||||
CreateImportSessionCommand {
|
||||
user_id,
|
||||
bytes: b"title\nKnown Movie".to_vec(),
|
||||
@@ -140,7 +145,9 @@ async fn marks_duplicate_by_external_id() {
|
||||
.unwrap();
|
||||
|
||||
let rows = apply_mapping::execute(
|
||||
&ctx,
|
||||
Arc::clone(&sessions) as _,
|
||||
Arc::clone(&parser) as _,
|
||||
Arc::clone(&movies) as _,
|
||||
ApplyImportMappingCommand {
|
||||
user_id,
|
||||
session_id: session.session_id.value(),
|
||||
@@ -157,6 +164,7 @@ async fn marks_duplicate_by_external_id() {
|
||||
#[tokio::test]
|
||||
async fn marks_duplicate_by_title_and_year() {
|
||||
let movies = InMemoryMovieRepository::new();
|
||||
let sessions = InMemoryImportSessionRepository::new();
|
||||
|
||||
let movie = Movie::new(
|
||||
None,
|
||||
@@ -167,22 +175,19 @@ async fn marks_duplicate_by_title_and_year() {
|
||||
);
|
||||
movies.upsert_movie(&movie).await.unwrap();
|
||||
|
||||
let parser = DuplicateTestParser {
|
||||
let parser = Arc::new(DuplicateTestParser {
|
||||
rows: vec![ImportRow {
|
||||
title: Some("Duplicate Film".into()),
|
||||
release_year: Some("2022".into()),
|
||||
..ImportRow::default()
|
||||
}],
|
||||
};
|
||||
});
|
||||
|
||||
let ctx = TestContextBuilder::new()
|
||||
.with_movies(Arc::clone(&movies) as _)
|
||||
.with_document_parser(Arc::new(parser) as _)
|
||||
.build();
|
||||
let user_id = Uuid::new_v4();
|
||||
|
||||
let session = create_session::execute(
|
||||
&ctx,
|
||||
Arc::clone(&sessions) as _,
|
||||
Arc::clone(&parser) as _,
|
||||
CreateImportSessionCommand {
|
||||
user_id,
|
||||
bytes: b"title\nDuplicate Film".to_vec(),
|
||||
@@ -193,7 +198,9 @@ async fn marks_duplicate_by_title_and_year() {
|
||||
.unwrap();
|
||||
|
||||
let rows = apply_mapping::execute(
|
||||
&ctx,
|
||||
Arc::clone(&sessions) as _,
|
||||
Arc::clone(&parser) as _,
|
||||
Arc::clone(&movies) as _,
|
||||
ApplyImportMappingCommand {
|
||||
user_id,
|
||||
session_id: session.session_id.value(),
|
||||
|
||||
@@ -3,19 +3,20 @@ use std::sync::Arc;
|
||||
use chrono::Utc;
|
||||
use domain::models::ImportProfile;
|
||||
use domain::ports::{ImportProfileRepository, ImportSessionRepository};
|
||||
use domain::testing::InMemoryImportProfileRepository;
|
||||
use domain::testing::{InMemoryImportProfileRepository, InMemoryImportSessionRepository};
|
||||
use domain::value_objects::{ImportProfileId, UserId};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::import::{apply_profile, commands::ApplyImportProfileCommand};
|
||||
use crate::test_helpers::TestContextBuilder;
|
||||
|
||||
#[tokio::test]
|
||||
async fn fails_when_profile_not_found() {
|
||||
let ctx = TestContextBuilder::new().build();
|
||||
let profiles = InMemoryImportProfileRepository::new();
|
||||
let sessions = InMemoryImportSessionRepository::new();
|
||||
|
||||
let result = apply_profile::execute(
|
||||
&ctx,
|
||||
Arc::clone(&profiles) as _,
|
||||
Arc::clone(&sessions) as _,
|
||||
ApplyImportProfileCommand {
|
||||
user_id: Uuid::new_v4(),
|
||||
session_id: Uuid::new_v4(),
|
||||
@@ -30,6 +31,7 @@ async fn fails_when_profile_not_found() {
|
||||
#[tokio::test]
|
||||
async fn fails_when_session_not_found() {
|
||||
let profiles = InMemoryImportProfileRepository::new();
|
||||
let sessions = InMemoryImportSessionRepository::new();
|
||||
let user_id = Uuid::new_v4();
|
||||
|
||||
let profile = ImportProfile::new(
|
||||
@@ -42,12 +44,9 @@ async fn fails_when_session_not_found() {
|
||||
let profile_id = profile.id.clone();
|
||||
profiles.save(&profile).await.unwrap();
|
||||
|
||||
let ctx = TestContextBuilder::new()
|
||||
.with_import_profiles(Arc::clone(&profiles) as _)
|
||||
.build();
|
||||
|
||||
let result = apply_profile::execute(
|
||||
&ctx,
|
||||
Arc::clone(&profiles) as _,
|
||||
Arc::clone(&sessions) as _,
|
||||
ApplyImportProfileCommand {
|
||||
user_id,
|
||||
session_id: Uuid::new_v4(),
|
||||
@@ -87,13 +86,9 @@ async fn applies_profile_mappings_to_session() {
|
||||
let session_id = session.id.clone();
|
||||
sessions.create(&session).await.unwrap();
|
||||
|
||||
let ctx = TestContextBuilder::new()
|
||||
.with_import_profiles(Arc::clone(&profiles) as _)
|
||||
.with_import_sessions(Arc::clone(&sessions) as _)
|
||||
.build();
|
||||
|
||||
apply_profile::execute(
|
||||
&ctx,
|
||||
Arc::clone(&profiles) as _,
|
||||
Arc::clone(&sessions) as _,
|
||||
ApplyImportProfileCommand {
|
||||
user_id,
|
||||
session_id: session_id.value(),
|
||||
|
||||
@@ -3,16 +3,12 @@ use std::sync::Arc;
|
||||
use domain::testing::InMemoryImportSessionRepository;
|
||||
|
||||
use crate::import::cleanup;
|
||||
use crate::test_helpers::TestContextBuilder;
|
||||
|
||||
#[tokio::test]
|
||||
async fn returns_zero_when_nothing_expired() {
|
||||
let sessions = InMemoryImportSessionRepository::new();
|
||||
let ctx = TestContextBuilder::new()
|
||||
.with_import_sessions(Arc::clone(&sessions) as _)
|
||||
.build();
|
||||
|
||||
let result = cleanup::execute(&ctx).await.unwrap();
|
||||
let result = cleanup::execute(Arc::clone(&sessions) as _).await.unwrap();
|
||||
|
||||
assert_eq!(result, 0);
|
||||
}
|
||||
|
||||
@@ -1,14 +1,20 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use uuid::Uuid;
|
||||
|
||||
use domain::testing::InMemoryImportSessionRepository;
|
||||
|
||||
use crate::import::{commands::CreateImportSessionCommand, create_session};
|
||||
use crate::test_helpers::TestContextBuilder;
|
||||
|
||||
#[tokio::test]
|
||||
async fn creates_session_with_parsed_file() {
|
||||
let sessions = InMemoryImportSessionRepository::new();
|
||||
let ctx = TestContextBuilder::new().build();
|
||||
|
||||
let result = create_session::execute(
|
||||
&ctx,
|
||||
Arc::clone(&sessions) as _,
|
||||
ctx.services.document_parser.clone(),
|
||||
CreateImportSessionCommand {
|
||||
user_id: Uuid::new_v4(),
|
||||
bytes: b"col1\nval1".to_vec(),
|
||||
|
||||
@@ -4,17 +4,13 @@ use domain::testing::InMemoryImportProfileRepository;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::import::{commands::DeleteImportProfileCommand, delete_profile};
|
||||
use crate::test_helpers::TestContextBuilder;
|
||||
|
||||
#[tokio::test]
|
||||
async fn fails_when_profile_not_found() {
|
||||
let profiles = InMemoryImportProfileRepository::new();
|
||||
let ctx = TestContextBuilder::new()
|
||||
.with_import_profiles(Arc::clone(&profiles) as _)
|
||||
.build();
|
||||
|
||||
let result = delete_profile::execute(
|
||||
&ctx,
|
||||
Arc::clone(&profiles) as _,
|
||||
DeleteImportProfileCommand {
|
||||
user_id: Uuid::new_v4(),
|
||||
profile_id: Uuid::new_v4(),
|
||||
|
||||
@@ -9,7 +9,7 @@ use uuid::Uuid;
|
||||
|
||||
use crate::import::commands::ExecuteImportCommand;
|
||||
use crate::import::execute;
|
||||
use crate::test_helpers::TestContextBuilder;
|
||||
use crate::test_helpers::NoopReviewLogger;
|
||||
|
||||
fn make_session_with_rows(user_id: UserId, session_id: ImportSessionId) -> ImportSession {
|
||||
let now = Utc::now().naive_utc();
|
||||
@@ -52,12 +52,9 @@ async fn imports_confirmed_rows() {
|
||||
let session = make_session_with_rows(UserId::from_uuid(uid), sid.clone());
|
||||
sessions.create(&session).await.unwrap();
|
||||
|
||||
let ctx = TestContextBuilder::new()
|
||||
.with_import_sessions(Arc::clone(&sessions) as _)
|
||||
.build();
|
||||
|
||||
let result = execute::execute(
|
||||
&ctx,
|
||||
Arc::clone(&sessions) as _,
|
||||
Arc::new(NoopReviewLogger),
|
||||
ExecuteImportCommand {
|
||||
user_id: uid,
|
||||
session_id: sid.value(),
|
||||
@@ -81,12 +78,9 @@ async fn skips_unconfirmed_rows() {
|
||||
let session = make_session_with_rows(UserId::from_uuid(uid), sid.clone());
|
||||
sessions.create(&session).await.unwrap();
|
||||
|
||||
let ctx = TestContextBuilder::new()
|
||||
.with_import_sessions(Arc::clone(&sessions) as _)
|
||||
.build();
|
||||
|
||||
let result = execute::execute(
|
||||
&ctx,
|
||||
Arc::clone(&sessions) as _,
|
||||
Arc::new(NoopReviewLogger),
|
||||
ExecuteImportCommand {
|
||||
user_id: uid,
|
||||
session_id: sid.value(),
|
||||
@@ -102,9 +96,11 @@ async fn skips_unconfirmed_rows() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn fails_when_session_not_found() {
|
||||
let ctx = TestContextBuilder::new().build();
|
||||
let sessions = InMemoryImportSessionRepository::new();
|
||||
|
||||
let result = execute::execute(
|
||||
&ctx,
|
||||
Arc::clone(&sessions) as _,
|
||||
Arc::new(NoopReviewLogger),
|
||||
ExecuteImportCommand {
|
||||
user_id: Uuid::new_v4(),
|
||||
session_id: Uuid::new_v4(),
|
||||
@@ -138,12 +134,9 @@ async fn handles_datetime_format() {
|
||||
}]);
|
||||
sessions.create(&session).await.unwrap();
|
||||
|
||||
let ctx = TestContextBuilder::new()
|
||||
.with_import_sessions(Arc::clone(&sessions) as _)
|
||||
.build();
|
||||
|
||||
let result = execute::execute(
|
||||
&ctx,
|
||||
Arc::clone(&sessions) as _,
|
||||
Arc::new(NoopReviewLogger),
|
||||
ExecuteImportCommand {
|
||||
user_id: uid,
|
||||
session_id: sid.value(),
|
||||
@@ -179,12 +172,9 @@ async fn fails_on_invalid_rating() {
|
||||
}]);
|
||||
sessions.create(&session).await.unwrap();
|
||||
|
||||
let ctx = TestContextBuilder::new()
|
||||
.with_import_sessions(Arc::clone(&sessions) as _)
|
||||
.build();
|
||||
|
||||
let result = execute::execute(
|
||||
&ctx,
|
||||
Arc::clone(&sessions) as _,
|
||||
Arc::new(NoopReviewLogger),
|
||||
ExecuteImportCommand {
|
||||
user_id: uid,
|
||||
session_id: sid.value(),
|
||||
@@ -220,12 +210,9 @@ async fn fails_on_missing_watched_at() {
|
||||
}]);
|
||||
sessions.create(&session).await.unwrap();
|
||||
|
||||
let ctx = TestContextBuilder::new()
|
||||
.with_import_sessions(Arc::clone(&sessions) as _)
|
||||
.build();
|
||||
|
||||
let result = execute::execute(
|
||||
&ctx,
|
||||
Arc::clone(&sessions) as _,
|
||||
Arc::new(NoopReviewLogger),
|
||||
ExecuteImportCommand {
|
||||
user_id: uid,
|
||||
session_id: sid.value(),
|
||||
@@ -261,12 +248,9 @@ async fn imports_row_with_external_metadata_id() {
|
||||
}]);
|
||||
sessions.create(&session).await.unwrap();
|
||||
|
||||
let ctx = TestContextBuilder::new()
|
||||
.with_import_sessions(Arc::clone(&sessions) as _)
|
||||
.build();
|
||||
|
||||
let result = execute::execute(
|
||||
&ctx,
|
||||
Arc::clone(&sessions) as _,
|
||||
Arc::new(NoopReviewLogger),
|
||||
ExecuteImportCommand {
|
||||
user_id: uid,
|
||||
session_id: sid.value(),
|
||||
@@ -302,12 +286,9 @@ async fn imports_row_with_director_and_comment() {
|
||||
}]);
|
||||
sessions.create(&session).await.unwrap();
|
||||
|
||||
let ctx = TestContextBuilder::new()
|
||||
.with_import_sessions(Arc::clone(&sessions) as _)
|
||||
.build();
|
||||
|
||||
let result = execute::execute(
|
||||
&ctx,
|
||||
Arc::clone(&sessions) as _,
|
||||
Arc::new(NoopReviewLogger),
|
||||
ExecuteImportCommand {
|
||||
user_id: uid,
|
||||
session_id: sid.value(),
|
||||
@@ -343,12 +324,9 @@ async fn handles_space_separated_datetime_format() {
|
||||
}]);
|
||||
sessions.create(&session).await.unwrap();
|
||||
|
||||
let ctx = TestContextBuilder::new()
|
||||
.with_import_sessions(Arc::clone(&sessions) as _)
|
||||
.build();
|
||||
|
||||
let result = execute::execute(
|
||||
&ctx,
|
||||
Arc::clone(&sessions) as _,
|
||||
Arc::new(NoopReviewLogger),
|
||||
ExecuteImportCommand {
|
||||
user_id: uid,
|
||||
session_id: sid.value(),
|
||||
@@ -379,12 +357,9 @@ async fn reports_invalid_row_result_errors() {
|
||||
}]);
|
||||
sessions.create(&session).await.unwrap();
|
||||
|
||||
let ctx = TestContextBuilder::new()
|
||||
.with_import_sessions(Arc::clone(&sessions) as _)
|
||||
.build();
|
||||
|
||||
let result = execute::execute(
|
||||
&ctx,
|
||||
Arc::clone(&sessions) as _,
|
||||
Arc::new(NoopReviewLogger),
|
||||
ExecuteImportCommand {
|
||||
user_id: uid,
|
||||
session_id: sid.value(),
|
||||
@@ -422,12 +397,9 @@ async fn fails_on_missing_rating() {
|
||||
}]);
|
||||
sessions.create(&session).await.unwrap();
|
||||
|
||||
let ctx = TestContextBuilder::new()
|
||||
.with_import_sessions(Arc::clone(&sessions) as _)
|
||||
.build();
|
||||
|
||||
let result = execute::execute(
|
||||
&ctx,
|
||||
Arc::clone(&sessions) as _,
|
||||
Arc::new(NoopReviewLogger),
|
||||
ExecuteImportCommand {
|
||||
user_id: uid,
|
||||
session_id: sid.value(),
|
||||
@@ -464,12 +436,9 @@ async fn fails_on_unparseable_date() {
|
||||
}]);
|
||||
sessions.create(&session).await.unwrap();
|
||||
|
||||
let ctx = TestContextBuilder::new()
|
||||
.with_import_sessions(Arc::clone(&sessions) as _)
|
||||
.build();
|
||||
|
||||
let result = execute::execute(
|
||||
&ctx,
|
||||
Arc::clone(&sessions) as _,
|
||||
Arc::new(NoopReviewLogger),
|
||||
ExecuteImportCommand {
|
||||
user_id: uid,
|
||||
session_id: sid.value(),
|
||||
@@ -506,12 +475,9 @@ async fn imports_row_without_release_year() {
|
||||
}]);
|
||||
sessions.create(&session).await.unwrap();
|
||||
|
||||
let ctx = TestContextBuilder::new()
|
||||
.with_import_sessions(Arc::clone(&sessions) as _)
|
||||
.build();
|
||||
|
||||
let result = execute::execute(
|
||||
&ctx,
|
||||
Arc::clone(&sessions) as _,
|
||||
Arc::new(NoopReviewLogger),
|
||||
ExecuteImportCommand {
|
||||
user_id: uid,
|
||||
session_id: sid.value(),
|
||||
@@ -535,12 +501,9 @@ async fn deletes_session_after_import() {
|
||||
sessions.create(&session).await.unwrap();
|
||||
assert_eq!(sessions.count(), 1);
|
||||
|
||||
let ctx = TestContextBuilder::new()
|
||||
.with_import_sessions(Arc::clone(&sessions) as _)
|
||||
.build();
|
||||
|
||||
execute::execute(
|
||||
&ctx,
|
||||
Arc::clone(&sessions) as _,
|
||||
Arc::new(NoopReviewLogger),
|
||||
ExecuteImportCommand {
|
||||
user_id: uid,
|
||||
session_id: sid.value(),
|
||||
|
||||
@@ -5,17 +5,13 @@ use domain::value_objects::UserId;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::import::list_profiles;
|
||||
use crate::test_helpers::TestContextBuilder;
|
||||
|
||||
#[tokio::test]
|
||||
async fn returns_empty_when_no_profiles() {
|
||||
let profiles = InMemoryImportProfileRepository::new();
|
||||
let ctx = TestContextBuilder::new()
|
||||
.with_import_profiles(Arc::clone(&profiles) as _)
|
||||
.build();
|
||||
|
||||
let user_id = UserId::from_uuid(Uuid::new_v4());
|
||||
let result = list_profiles::execute(&ctx, &user_id).await.unwrap();
|
||||
let result = list_profiles::execute(Arc::clone(&profiles) as _, &user_id).await.unwrap();
|
||||
|
||||
assert!(result.is_empty());
|
||||
}
|
||||
|
||||
@@ -3,22 +3,20 @@ use std::sync::Arc;
|
||||
use chrono::Utc;
|
||||
use domain::models::ImportSession;
|
||||
use domain::ports::ImportSessionRepository;
|
||||
use domain::testing::InMemoryImportSessionRepository;
|
||||
use domain::testing::{InMemoryImportProfileRepository, InMemoryImportSessionRepository};
|
||||
use domain::value_objects::{ImportSessionId, UserId};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::import::{commands::SaveImportProfileCommand, save_profile};
|
||||
use crate::test_helpers::TestContextBuilder;
|
||||
|
||||
#[tokio::test]
|
||||
async fn fails_when_session_not_found() {
|
||||
let sessions = InMemoryImportSessionRepository::new();
|
||||
let ctx = TestContextBuilder::new()
|
||||
.with_import_sessions(Arc::clone(&sessions) as _)
|
||||
.build();
|
||||
let profiles = InMemoryImportProfileRepository::new();
|
||||
|
||||
let result = save_profile::execute(
|
||||
&ctx,
|
||||
Arc::clone(&sessions) as _,
|
||||
Arc::clone(&profiles) as _,
|
||||
SaveImportProfileCommand {
|
||||
user_id: Uuid::new_v4(),
|
||||
session_id: Uuid::new_v4(),
|
||||
@@ -33,6 +31,7 @@ async fn fails_when_session_not_found() {
|
||||
#[tokio::test]
|
||||
async fn saves_profile_from_session() {
|
||||
let sessions = InMemoryImportSessionRepository::new();
|
||||
let profiles = InMemoryImportProfileRepository::new();
|
||||
let user_id = Uuid::new_v4();
|
||||
let sid = ImportSessionId::generate();
|
||||
|
||||
@@ -44,12 +43,9 @@ async fn saves_profile_from_session() {
|
||||
session.field_mappings = Some(vec![]);
|
||||
sessions.create(&session).await.unwrap();
|
||||
|
||||
let ctx = TestContextBuilder::new()
|
||||
.with_import_sessions(Arc::clone(&sessions) as _)
|
||||
.build();
|
||||
|
||||
let result = save_profile::execute(
|
||||
&ctx,
|
||||
Arc::clone(&sessions) as _,
|
||||
Arc::clone(&profiles) as _,
|
||||
SaveImportProfileCommand {
|
||||
user_id,
|
||||
session_id: sid.value(),
|
||||
|
||||
@@ -1,17 +1,16 @@
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use domain::{errors::DomainError, ports::PeriodicJob};
|
||||
|
||||
use crate::context::AppContext;
|
||||
use domain::{errors::DomainError, ports::{ImportSessionRepository, PeriodicJob}};
|
||||
|
||||
pub struct ImportSessionCleanupJob {
|
||||
ctx: AppContext,
|
||||
import_session: Arc<dyn ImportSessionRepository>,
|
||||
}
|
||||
|
||||
impl ImportSessionCleanupJob {
|
||||
pub fn new(ctx: AppContext) -> Self {
|
||||
Self { ctx }
|
||||
pub fn new(import_session: Arc<dyn ImportSessionRepository>) -> Self {
|
||||
Self { import_session }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,7 +21,7 @@ impl PeriodicJob for ImportSessionCleanupJob {
|
||||
}
|
||||
|
||||
async fn run(&self) -> Result<(), DomainError> {
|
||||
let n = crate::import::cleanup::execute(&self.ctx).await?;
|
||||
let n = crate::import::cleanup::execute(self.import_session.clone()).await?;
|
||||
tracing::info!("import session cleanup: removed {} expired sessions", n);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -109,9 +109,12 @@ pub async fn get_import_page(
|
||||
Extension(csrf): Extension<CsrfToken>,
|
||||
) -> impl IntoResponse {
|
||||
let ctx = super::helpers::build_page_context(&state, Some(user_id.clone()), csrf.0).await;
|
||||
let profiles = list_import_profiles::execute(&state.app_ctx, &user_id)
|
||||
.await
|
||||
.unwrap_or_default()
|
||||
let profiles = list_import_profiles::execute(
|
||||
state.app_ctx.repos.import_profile.clone(),
|
||||
&user_id,
|
||||
)
|
||||
.await
|
||||
.unwrap_or_default()
|
||||
.into_iter()
|
||||
.map(|p| ImportProfileView {
|
||||
id: p.id.value().to_string(),
|
||||
@@ -161,7 +164,8 @@ pub async fn post_upload(
|
||||
};
|
||||
|
||||
match create_import_session::execute(
|
||||
&state.app_ctx,
|
||||
state.app_ctx.repos.import_session.clone(),
|
||||
state.app_ctx.services.document_parser.clone(),
|
||||
CreateImportSessionCommand {
|
||||
user_id: user_id.value(),
|
||||
bytes,
|
||||
@@ -250,7 +254,9 @@ pub async fn post_mapping(
|
||||
.into_response();
|
||||
}
|
||||
match apply_import_mapping::execute(
|
||||
&state.app_ctx,
|
||||
state.app_ctx.repos.import_session.clone(),
|
||||
state.app_ctx.services.document_parser.clone(),
|
||||
state.app_ctx.repos.movie.clone(),
|
||||
ApplyImportMappingCommand {
|
||||
user_id: user_id.value(),
|
||||
session_id: session_id.value(),
|
||||
@@ -344,7 +350,8 @@ pub async fn post_confirm(
|
||||
.filter(|n| !n.trim().is_empty());
|
||||
if let Some(name) = profile_name {
|
||||
let _ = save_import_profile::execute(
|
||||
&state.app_ctx,
|
||||
state.app_ctx.repos.import_session.clone(),
|
||||
state.app_ctx.repos.import_profile.clone(),
|
||||
SaveImportProfileCommand {
|
||||
user_id: user_id.value(),
|
||||
session_id: session_id.value(),
|
||||
@@ -362,7 +369,8 @@ pub async fn post_confirm(
|
||||
.collect();
|
||||
|
||||
match execute_import::execute(
|
||||
&state.app_ctx,
|
||||
state.app_ctx.repos.import_session.clone(),
|
||||
state.app_ctx.services.review_logger.clone(),
|
||||
ExecuteImportCommand {
|
||||
user_id: user_id.value(),
|
||||
session_id: session_id.value(),
|
||||
@@ -397,7 +405,7 @@ pub async fn post_delete_profile(
|
||||
}
|
||||
if let Ok(profile_id) = profile_id_str.parse::<uuid::Uuid>() {
|
||||
let _ = delete_import_profile::execute(
|
||||
&state.app_ctx,
|
||||
state.app_ctx.repos.import_profile.clone(),
|
||||
DeleteImportProfileCommand {
|
||||
user_id: user_id.value(),
|
||||
profile_id,
|
||||
@@ -489,7 +497,8 @@ pub async fn api_post_session(
|
||||
_ => FileFormat::Csv,
|
||||
};
|
||||
match create_import_session::execute(
|
||||
&state.app_ctx,
|
||||
state.app_ctx.repos.import_session.clone(),
|
||||
state.app_ctx.services.document_parser.clone(),
|
||||
CreateImportSessionCommand {
|
||||
user_id: user_id.value(),
|
||||
bytes,
|
||||
@@ -618,7 +627,9 @@ pub async fn api_put_mapping(
|
||||
.collect();
|
||||
|
||||
match apply_import_mapping::execute(
|
||||
&state.app_ctx,
|
||||
state.app_ctx.repos.import_session.clone(),
|
||||
state.app_ctx.services.document_parser.clone(),
|
||||
state.app_ctx.repos.movie.clone(),
|
||||
ApplyImportMappingCommand {
|
||||
user_id: user_id.value(),
|
||||
session_id: session_id.value(),
|
||||
@@ -725,7 +736,7 @@ pub async fn api_post_confirm(
|
||||
)
|
||||
.into_response();
|
||||
};
|
||||
match execute_import::execute(&state.app_ctx, ExecuteImportCommand { user_id: user_id.value(), session_id: session_id.value(), confirmed_indices: body.confirmed_indices }).await {
|
||||
match execute_import::execute(state.app_ctx.repos.import_session.clone(), state.app_ctx.services.review_logger.clone(), ExecuteImportCommand { user_id: user_id.value(), session_id: session_id.value(), confirmed_indices: body.confirmed_indices }).await {
|
||||
Ok(s) => axum::Json(serde_json::json!({
|
||||
"imported": s.imported,
|
||||
"skipped_duplicates": s.skipped_duplicates,
|
||||
@@ -754,7 +765,7 @@ pub async fn api_get_profiles(
|
||||
State(state): State<AppState>,
|
||||
AuthenticatedUser(user_id): AuthenticatedUser,
|
||||
) -> impl IntoResponse {
|
||||
match list_import_profiles::execute(&state.app_ctx, &user_id).await {
|
||||
match list_import_profiles::execute(state.app_ctx.repos.import_profile.clone(), &user_id).await {
|
||||
Ok(profiles) => axum::Json(
|
||||
profiles
|
||||
.into_iter()
|
||||
@@ -803,7 +814,8 @@ pub async fn api_post_profile(
|
||||
.into_response();
|
||||
};
|
||||
match save_import_profile::execute(
|
||||
&state.app_ctx,
|
||||
state.app_ctx.repos.import_session.clone(),
|
||||
state.app_ctx.repos.import_profile.clone(),
|
||||
SaveImportProfileCommand {
|
||||
user_id: user_id.value(),
|
||||
session_id: session_id.value(),
|
||||
@@ -840,7 +852,7 @@ pub async fn api_delete_profile(
|
||||
return StatusCode::BAD_REQUEST.into_response();
|
||||
};
|
||||
match delete_import_profile::execute(
|
||||
&state.app_ctx,
|
||||
state.app_ctx.repos.import_profile.clone(),
|
||||
DeleteImportProfileCommand {
|
||||
user_id: user_id.value(),
|
||||
profile_id,
|
||||
@@ -896,7 +908,8 @@ pub async fn api_apply_profile(
|
||||
};
|
||||
|
||||
if let Err(e) = apply_import_profile::execute(
|
||||
&state.app_ctx,
|
||||
state.app_ctx.repos.import_profile.clone(),
|
||||
state.app_ctx.repos.import_session.clone(),
|
||||
ApplyImportProfileCommand {
|
||||
user_id: user_id.value(),
|
||||
session_id,
|
||||
@@ -936,7 +949,9 @@ pub async fn api_apply_profile(
|
||||
|
||||
let mappings = session.field_mappings.unwrap_or_default();
|
||||
match apply_import_mapping::execute(
|
||||
&state.app_ctx,
|
||||
state.app_ctx.repos.import_session.clone(),
|
||||
state.app_ctx.services.document_parser.clone(),
|
||||
state.app_ctx.repos.movie.clone(),
|
||||
ApplyImportMappingCommand {
|
||||
user_id: user_id.value(),
|
||||
session_id,
|
||||
|
||||
@@ -174,7 +174,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
// ── Periodic jobs ─────────────────────────────────────────────────────────
|
||||
|
||||
let mut periodic_jobs: Vec<Arc<dyn PeriodicJob>> = vec![
|
||||
Arc::new(application::jobs::ImportSessionCleanupJob::new(ctx.clone())),
|
||||
Arc::new(application::jobs::ImportSessionCleanupJob::new(ctx.repos.import_session.clone())),
|
||||
Arc::new(application::jobs::WatchEventCleanupJob::new(ctx.clone())),
|
||||
Arc::new(application::jobs::WrapUpAutoGenerateJob::new(ctx.clone())),
|
||||
Arc::new(application::jobs::WrapUpCleanupJob::new(ctx.clone())),
|
||||
|
||||
Reference in New Issue
Block a user