Compare commits

...

3 Commits

Author SHA1 Message Date
33aa5bdab3 fmt
All checks were successful
CI / Check / Test (push) Successful in 38m21s
2026-06-12 01:46:16 +02:00
b844339795 fix(domain): ImportSession::new() generates own ID, add from_persistence() 2026-06-12 01:41:03 +02:00
cedb13d7a8 fix(domain): typed VOs in MovieEnrichmentRequested and PersonEnrichmentRequested 2026-06-12 01:34:47 +02:00
16 changed files with 92 additions and 81 deletions

View File

@@ -2,7 +2,7 @@ use chrono::NaiveDateTime;
use domain::{
errors::DomainError,
events::DomainEvent,
models::PersonId,
models::{ExternalPersonId, PersonId},
value_objects::{
ExternalMetadataId, GoalId, MovieId, PosterPath, Rating, ReviewId, UserId, WrapUpId,
},
@@ -210,7 +210,7 @@ impl From<&DomainEvent> for EventPayload {
external_metadata_id,
} => EventPayload::MovieEnrichmentRequested {
movie_id: movie_id.value().to_string(),
external_metadata_id: external_metadata_id.clone(),
external_metadata_id: external_metadata_id.value().to_string(),
},
DomainEvent::ImageStored { key } => EventPayload::ImageStored { key: key.clone() },
DomainEvent::WatchlistEntryAdded {
@@ -322,7 +322,7 @@ impl From<&DomainEvent> for EventPayload {
external_person_id,
} => EventPayload::PersonEnrichmentRequested {
person_id: person_id.value().to_string(),
external_person_id: external_person_id.clone(),
external_person_id: external_person_id.value().to_string(),
},
}
}
@@ -391,7 +391,8 @@ impl TryFrom<EventPayload> for DomainEvent {
external_metadata_id,
} => Ok(DomainEvent::MovieEnrichmentRequested {
movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?),
external_metadata_id,
external_metadata_id: ExternalMetadataId::new(external_metadata_id)
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?,
}),
EventPayload::ImageStored { key } => Ok(DomainEvent::ImageStored { key }),
EventPayload::WatchlistEntryAdded {
@@ -514,7 +515,7 @@ impl TryFrom<EventPayload> for DomainEvent {
external_person_id,
} => Ok(DomainEvent::PersonEnrichmentRequested {
person_id: PersonId::from_uuid(parse_uuid(&person_id, "person_id")?),
external_person_id,
external_person_id: ExternalPersonId::new(external_person_id),
}),
}
}

View File

@@ -123,7 +123,7 @@ impl EventHandler for PosterSyncHandler {
if already_has_poster {
return Ok(());
}
(movie_id.value(), external_metadata_id.clone())
(movie_id.value(), external_metadata_id.value().to_owned())
}
_ => return Ok(()),
};

View File

@@ -5,6 +5,7 @@ use domain::{
models::{
AnnotatedRow, FieldMapping, ImportSession, ParsedFile,
import::{DomainField, ImportRow, RowResult, Transform},
import_session::PersistedImportSession,
},
ports::ImportSessionRepository,
value_objects::{ImportSessionId, UserId},
@@ -266,7 +267,7 @@ impl PostgresImportSessionRepository {
Ok(js.into_iter().map(annotated_from_json).collect())
})
.transpose()?;
Ok(ImportSession {
Ok(ImportSession::from_persistence(PersistedImportSession {
id: ImportSessionId::from_uuid(
id.parse::<uuid::Uuid>()
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?,
@@ -281,7 +282,7 @@ impl PostgresImportSessionRepository {
row_results,
created_at,
expires_at,
})
}))
}
}

View File

@@ -5,6 +5,7 @@ use domain::{
models::{
AnnotatedRow, FieldMapping, ImportSession, ParsedFile,
import::{DomainField, ImportRow, RowResult, Transform},
import_session::PersistedImportSession,
},
ports::ImportSessionRepository,
value_objects::{ImportSessionId, UserId},
@@ -275,7 +276,7 @@ impl SqliteImportSessionRepository {
})
.transpose()?;
Ok(ImportSession {
Ok(ImportSession::from_persistence(PersistedImportSession {
id: ImportSessionId::from_uuid(
id.parse::<uuid::Uuid>()
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?,
@@ -290,7 +291,7 @@ impl SqliteImportSessionRepository {
row_results,
created_at: Self::parse_dt(created_at)?,
expires_at: Self::parse_dt(expires_at)?,
})
}))
}
}

View File

@@ -83,7 +83,7 @@ impl EventHandler for MovieEnrichmentHandler {
self.enrichment_client.as_ref(),
&self.profile_repo,
movie_id.clone(),
&external_metadata_id,
external_metadata_id.value(),
)
.await?
else {

View File

@@ -40,6 +40,7 @@ impl EventHandler for PersonEnrichmentHandler {
_ => return Ok(()),
};
application::person::enrich::execute(&self.deps, person_id, &external_person_id).await
application::person::enrich::execute(&self.deps, person_id, external_person_id.value())
.await
}
}

View File

@@ -112,7 +112,7 @@ async fn publish_events(
publisher
.publish(&DomainEvent::MovieEnrichmentRequested {
movie_id: movie.id().clone(),
external_metadata_id: ext_id.value().to_string(),
external_metadata_id: ext_id.clone(),
})
.await?;
}

View File

@@ -1,6 +1,5 @@
use std::sync::Arc;
use chrono::Utc;
use domain::{
errors::DomainError,
models::ImportSession,
@@ -31,8 +30,7 @@ pub async fn execute(
let sample_rows = parsed.rows.iter().take(5).cloned().collect();
let columns = parsed.columns.clone();
let now = Utc::now().naive_utc();
let mut session = ImportSession::new(ImportSessionId::generate(), user_id, now);
let mut session = ImportSession::new(user_id);
let session_id = session.id.clone();
session.parsed_file = Some(parsed);

View File

@@ -78,11 +78,7 @@ async fn applies_profile_mappings_to_session() {
let profile_id = profile.id.clone();
profiles.save(&profile).await.unwrap();
let session = domain::models::ImportSession::new(
domain::value_objects::ImportSessionId::generate(),
UserId::from_uuid(user_id),
Utc::now().naive_utc(),
);
let session = domain::models::ImportSession::new(UserId::from_uuid(user_id));
let session_id = session.id.clone();
sessions.create(&session).await.unwrap();

View File

@@ -1,19 +1,17 @@
use std::sync::Arc;
use chrono::Utc;
use domain::models::{AnnotatedRow, ImportSession, import::RowResult};
use domain::ports::ImportSessionRepository;
use domain::testing::InMemoryImportSessionRepository;
use domain::value_objects::{ImportSessionId, UserId};
use domain::value_objects::UserId;
use uuid::Uuid;
use crate::import::commands::ExecuteImportCommand;
use crate::import::execute;
use crate::test_helpers::NoopReviewLogger;
fn make_session_with_rows(user_id: UserId, session_id: ImportSessionId) -> ImportSession {
let now = Utc::now().naive_utc();
let mut session = ImportSession::new(session_id, user_id, now);
fn make_session_with_rows(user_id: UserId) -> ImportSession {
let mut session = ImportSession::new(user_id);
session.row_results = Some(vec![
AnnotatedRow {
result: RowResult::Valid(domain::models::ImportRow {
@@ -47,9 +45,9 @@ fn make_session_with_rows(user_id: UserId, session_id: ImportSessionId) -> Impor
async fn imports_confirmed_rows() {
let sessions = InMemoryImportSessionRepository::new();
let uid = Uuid::new_v4();
let sid = ImportSessionId::generate();
let session = make_session_with_rows(UserId::from_uuid(uid), sid.clone());
let session = make_session_with_rows(UserId::from_uuid(uid));
let sid = session.id.clone();
sessions.create(&session).await.unwrap();
let result = execute::execute(
@@ -73,9 +71,9 @@ async fn imports_confirmed_rows() {
async fn skips_unconfirmed_rows() {
let sessions = InMemoryImportSessionRepository::new();
let uid = Uuid::new_v4();
let sid = ImportSessionId::generate();
let session = make_session_with_rows(UserId::from_uuid(uid), sid.clone());
let session = make_session_with_rows(UserId::from_uuid(uid));
let sid = session.id.clone();
sessions.create(&session).await.unwrap();
let result = execute::execute(
@@ -116,10 +114,9 @@ async fn fails_when_session_not_found() {
async fn handles_datetime_format() {
let sessions = InMemoryImportSessionRepository::new();
let uid = Uuid::new_v4();
let sid = ImportSessionId::generate();
let now = Utc::now().naive_utc();
let mut session = ImportSession::new(sid.clone(), UserId::from_uuid(uid), now);
let mut session = ImportSession::new(UserId::from_uuid(uid));
let sid = session.id.clone();
session.row_results = Some(vec![AnnotatedRow {
result: RowResult::Valid(domain::models::ImportRow {
title: Some("DateTime Movie".into()),
@@ -154,10 +151,9 @@ async fn handles_datetime_format() {
async fn fails_on_invalid_rating() {
let sessions = InMemoryImportSessionRepository::new();
let uid = Uuid::new_v4();
let sid = ImportSessionId::generate();
let now = Utc::now().naive_utc();
let mut session = ImportSession::new(sid.clone(), UserId::from_uuid(uid), now);
let mut session = ImportSession::new(UserId::from_uuid(uid));
let sid = session.id.clone();
session.row_results = Some(vec![AnnotatedRow {
result: RowResult::Valid(domain::models::ImportRow {
title: Some("Bad Rating Movie".into()),
@@ -192,10 +188,9 @@ async fn fails_on_invalid_rating() {
async fn fails_on_missing_watched_at() {
let sessions = InMemoryImportSessionRepository::new();
let uid = Uuid::new_v4();
let sid = ImportSessionId::generate();
let now = Utc::now().naive_utc();
let mut session = ImportSession::new(sid.clone(), UserId::from_uuid(uid), now);
let mut session = ImportSession::new(UserId::from_uuid(uid));
let sid = session.id.clone();
session.row_results = Some(vec![AnnotatedRow {
result: RowResult::Valid(domain::models::ImportRow {
title: Some("No Date Movie".into()),
@@ -230,10 +225,9 @@ async fn fails_on_missing_watched_at() {
async fn imports_row_with_external_metadata_id() {
let sessions = InMemoryImportSessionRepository::new();
let uid = Uuid::new_v4();
let sid = ImportSessionId::generate();
let now = Utc::now().naive_utc();
let mut session = ImportSession::new(sid.clone(), UserId::from_uuid(uid), now);
let mut session = ImportSession::new(UserId::from_uuid(uid));
let sid = session.id.clone();
session.row_results = Some(vec![AnnotatedRow {
result: RowResult::Valid(domain::models::ImportRow {
title: Some("TMDB Movie".into()),
@@ -268,10 +262,9 @@ async fn imports_row_with_external_metadata_id() {
async fn imports_row_with_director_and_comment() {
let sessions = InMemoryImportSessionRepository::new();
let uid = Uuid::new_v4();
let sid = ImportSessionId::generate();
let now = Utc::now().naive_utc();
let mut session = ImportSession::new(sid.clone(), UserId::from_uuid(uid), now);
let mut session = ImportSession::new(UserId::from_uuid(uid));
let sid = session.id.clone();
session.row_results = Some(vec![AnnotatedRow {
result: RowResult::Valid(domain::models::ImportRow {
title: Some("Directed Movie".into()),
@@ -306,10 +299,9 @@ async fn imports_row_with_director_and_comment() {
async fn handles_space_separated_datetime_format() {
let sessions = InMemoryImportSessionRepository::new();
let uid = Uuid::new_v4();
let sid = ImportSessionId::generate();
let now = Utc::now().naive_utc();
let mut session = ImportSession::new(sid.clone(), UserId::from_uuid(uid), now);
let mut session = ImportSession::new(UserId::from_uuid(uid));
let sid = session.id.clone();
session.row_results = Some(vec![AnnotatedRow {
result: RowResult::Valid(domain::models::ImportRow {
title: Some("Space DateTime".into()),
@@ -344,10 +336,9 @@ async fn handles_space_separated_datetime_format() {
async fn reports_invalid_row_result_errors() {
let sessions = InMemoryImportSessionRepository::new();
let uid = Uuid::new_v4();
let sid = ImportSessionId::generate();
let now = Utc::now().naive_utc();
let mut session = ImportSession::new(sid.clone(), UserId::from_uuid(uid), now);
let mut session = ImportSession::new(UserId::from_uuid(uid));
let sid = session.id.clone();
session.row_results = Some(vec![AnnotatedRow {
result: RowResult::Invalid {
errors: vec!["missing title".into(), "bad year".into()],
@@ -379,10 +370,9 @@ async fn reports_invalid_row_result_errors() {
async fn fails_on_missing_rating() {
let sessions = InMemoryImportSessionRepository::new();
let uid = Uuid::new_v4();
let sid = ImportSessionId::generate();
let now = Utc::now().naive_utc();
let mut session = ImportSession::new(sid.clone(), UserId::from_uuid(uid), now);
let mut session = ImportSession::new(UserId::from_uuid(uid));
let sid = session.id.clone();
session.row_results = Some(vec![AnnotatedRow {
result: RowResult::Valid(domain::models::ImportRow {
title: Some("No Rating Movie".into()),
@@ -418,10 +408,9 @@ async fn fails_on_missing_rating() {
async fn fails_on_unparseable_date() {
let sessions = InMemoryImportSessionRepository::new();
let uid = Uuid::new_v4();
let sid = ImportSessionId::generate();
let now = Utc::now().naive_utc();
let mut session = ImportSession::new(sid.clone(), UserId::from_uuid(uid), now);
let mut session = ImportSession::new(UserId::from_uuid(uid));
let sid = session.id.clone();
session.row_results = Some(vec![AnnotatedRow {
result: RowResult::Valid(domain::models::ImportRow {
title: Some("Bad Date Movie".into()),
@@ -457,10 +446,9 @@ async fn fails_on_unparseable_date() {
async fn imports_row_without_release_year() {
let sessions = InMemoryImportSessionRepository::new();
let uid = Uuid::new_v4();
let sid = ImportSessionId::generate();
let now = Utc::now().naive_utc();
let mut session = ImportSession::new(sid.clone(), UserId::from_uuid(uid), now);
let mut session = ImportSession::new(UserId::from_uuid(uid));
let sid = session.id.clone();
session.row_results = Some(vec![AnnotatedRow {
result: RowResult::Valid(domain::models::ImportRow {
title: Some("No Year Movie".into()),
@@ -495,9 +483,9 @@ async fn imports_row_without_release_year() {
async fn deletes_session_after_import() {
let sessions = InMemoryImportSessionRepository::new();
let uid = Uuid::new_v4();
let sid = ImportSessionId::generate();
let session = make_session_with_rows(UserId::from_uuid(uid), sid.clone());
let session = make_session_with_rows(UserId::from_uuid(uid));
let sid = session.id.clone();
sessions.create(&session).await.unwrap();
assert_eq!(sessions.count(), 1);
@@ -524,9 +512,7 @@ async fn deletes_session_after_import() {
async fn imports_more_rows_than_concurrency_limit() {
let sessions = InMemoryImportSessionRepository::new();
let uid = Uuid::new_v4();
let sid = ImportSessionId::generate();
let now = Utc::now().naive_utc();
let rows: Vec<_> = (0..15)
.map(|i| AnnotatedRow {
result: RowResult::Valid(domain::models::ImportRow {
@@ -542,7 +528,8 @@ async fn imports_more_rows_than_concurrency_limit() {
})
.collect();
let mut session = ImportSession::new(sid.clone(), UserId::from_uuid(uid), now);
let mut session = ImportSession::new(UserId::from_uuid(uid));
let sid = session.id.clone();
session.row_results = Some(rows);
sessions.create(&session).await.unwrap();

View File

@@ -1,10 +1,9 @@
use std::sync::Arc;
use chrono::Utc;
use domain::models::ImportSession;
use domain::ports::ImportSessionRepository;
use domain::testing::{InMemoryImportProfileRepository, InMemoryImportSessionRepository};
use domain::value_objects::{ImportSessionId, UserId};
use domain::value_objects::UserId;
use uuid::Uuid;
use crate::import::{commands::SaveImportProfileCommand, save_profile};
@@ -33,13 +32,9 @@ async fn saves_profile_from_session() {
let sessions = InMemoryImportSessionRepository::new();
let profiles = InMemoryImportProfileRepository::new();
let user_id = Uuid::new_v4();
let sid = ImportSessionId::generate();
let mut session = ImportSession::new(
sid.clone(),
UserId::from_uuid(user_id),
Utc::now().naive_utc(),
);
let mut session = ImportSession::new(UserId::from_uuid(user_id));
let sid = session.id.clone();
session.field_mappings = Some(vec![]);
sessions.create(&session).await.unwrap();

View File

@@ -6,6 +6,7 @@ use domain::{
errors::DomainError,
events::DomainEvent,
ports::{EventPublisher, MovieProfileRepository, PeriodicJob},
value_objects::ExternalMetadataId,
};
pub struct EnrichmentStalenessJob {
@@ -38,9 +39,16 @@ impl PeriodicJob for EnrichmentStalenessJob {
}
tracing::info!("enrichment scan: {} stale movies", stale.len());
for (movie_id, external_metadata_id) in stale {
let ext_id = match ExternalMetadataId::new(external_metadata_id) {
Ok(id) => id,
Err(e) => {
tracing::warn!("skipping stale movie with malformed external_metadata_id: {e}");
continue;
}
};
let event = DomainEvent::MovieEnrichmentRequested {
movie_id,
external_metadata_id,
external_metadata_id: ext_id,
};
self.event_publisher.publish(&event).await?;
}

View File

@@ -18,7 +18,7 @@ pub async fn execute(deps: &GetPersonDeps, id: PersonId) -> Result<Option<Person
.event_publisher
.publish(&DomainEvent::PersonEnrichmentRequested {
person_id: id,
external_person_id: p.external_id().value().to_string(),
external_person_id: p.external_id().clone(),
})
.await;
}

View File

@@ -16,7 +16,7 @@ pub async fn execute(deps: &GetPersonDeps, id: PersonId) -> Result<PersonCredits
.event_publisher
.publish(&DomainEvent::PersonEnrichmentRequested {
person_id: id,
external_person_id: credits.person.external_id().value().to_string(),
external_person_id: credits.person.external_id().clone(),
})
.await;
}

View File

@@ -3,7 +3,7 @@ use chrono::NaiveDateTime;
use crate::{
errors::DomainError,
models::PersonId,
models::{ExternalPersonId, PersonId},
value_objects::{
ExternalMetadataId, GoalId, MovieId, PosterPath, Rating, ReviewId, UserId, WrapUpId,
},
@@ -42,11 +42,11 @@ pub enum DomainEvent {
},
MovieEnrichmentRequested {
movie_id: MovieId,
external_metadata_id: String,
external_metadata_id: ExternalMetadataId,
},
PersonEnrichmentRequested {
person_id: PersonId,
external_person_id: String,
external_person_id: ExternalPersonId,
},
ImageStored {
key: String,

View File

@@ -15,11 +15,22 @@ pub struct ImportSession {
pub expires_at: NaiveDateTime,
}
pub struct PersistedImportSession {
pub id: ImportSessionId,
pub user_id: UserId,
pub parsed_file: Option<ParsedFile>,
pub field_mappings: Option<Vec<FieldMapping>>,
pub row_results: Option<Vec<AnnotatedRow>>,
pub created_at: NaiveDateTime,
pub expires_at: NaiveDateTime,
}
impl ImportSession {
pub fn new(id: ImportSessionId, user_id: UserId, created_at: NaiveDateTime) -> Self {
pub fn new(user_id: UserId) -> Self {
let created_at = chrono::Utc::now().naive_utc();
let expires_at = created_at + chrono::Duration::hours(24);
Self {
id,
id: ImportSessionId::generate(),
user_id,
parsed_file: None,
field_mappings: None,
@@ -28,4 +39,16 @@ impl ImportSession {
expires_at,
}
}
pub fn from_persistence(p: PersistedImportSession) -> Self {
Self {
id: p.id,
user_id: p.user_id,
parsed_file: p.parsed_file,
field_mappings: p.field_mappings,
row_results: p.row_results,
created_at: p.created_at,
expires_at: p.expires_at,
}
}
}