movie detail page + importer architecture fix

This commit is contained in:
2026-05-10 23:59:26 +02:00
parent f2f1317660
commit b2a2aa4262
49 changed files with 1670 additions and 264 deletions

View File

@@ -18,3 +18,5 @@ chrono = { workspace = true }
tracing = { workspace = true }
async-trait = { workspace = true }
tokio = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }

View File

@@ -2,12 +2,84 @@ use async_trait::async_trait;
use chrono::NaiveDateTime;
use domain::{
errors::DomainError,
models::ImportProfile,
models::{
FieldMapping, ImportProfile,
import::{DomainField, Transform},
},
ports::ImportProfileRepository,
value_objects::{ImportProfileId, UserId},
};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
#[derive(Serialize, Deserialize)]
enum DomainFieldJson {
Title, ReleaseYear, Director, Rating, WatchedAt, Comment, ExternalMetadataId,
}
#[derive(Serialize, Deserialize)]
enum TransformJson {
RatingScale(f64), DateFormat(String), Identity,
}
#[derive(Serialize, Deserialize)]
struct FieldMappingJson {
source_column: String,
domain_field: DomainFieldJson,
transform: TransformJson,
}
fn mapping_to_json(m: &FieldMapping) -> FieldMappingJson {
FieldMappingJson {
source_column: m.source_column.clone(),
domain_field: match &m.domain_field {
DomainField::Title => DomainFieldJson::Title,
DomainField::ReleaseYear => DomainFieldJson::ReleaseYear,
DomainField::Director => DomainFieldJson::Director,
DomainField::Rating => DomainFieldJson::Rating,
DomainField::WatchedAt => DomainFieldJson::WatchedAt,
DomainField::Comment => DomainFieldJson::Comment,
DomainField::ExternalMetadataId => DomainFieldJson::ExternalMetadataId,
},
transform: match &m.transform {
Transform::RatingScale(f) => TransformJson::RatingScale(*f),
Transform::DateFormat(s) => TransformJson::DateFormat(s.clone()),
Transform::Identity => TransformJson::Identity,
},
}
}
fn mapping_from_json(j: FieldMappingJson) -> FieldMapping {
FieldMapping {
source_column: j.source_column,
domain_field: match j.domain_field {
DomainFieldJson::Title => DomainField::Title,
DomainFieldJson::ReleaseYear => DomainField::ReleaseYear,
DomainFieldJson::Director => DomainField::Director,
DomainFieldJson::Rating => DomainField::Rating,
DomainFieldJson::WatchedAt => DomainField::WatchedAt,
DomainFieldJson::Comment => DomainField::Comment,
DomainFieldJson::ExternalMetadataId => DomainField::ExternalMetadataId,
},
transform: match j.transform {
TransformJson::RatingScale(f) => Transform::RatingScale(f),
TransformJson::DateFormat(s) => Transform::DateFormat(s),
TransformJson::Identity => Transform::Identity,
},
}
}
fn serialize_mappings(ms: &[FieldMapping]) -> Result<String, DomainError> {
serde_json::to_string(&ms.iter().map(mapping_to_json).collect::<Vec<_>>())
.map_err(|e| DomainError::InfrastructureError(e.to_string()))
}
fn deserialize_mappings(s: &str) -> Result<Vec<FieldMapping>, DomainError> {
let js: Vec<FieldMappingJson> = serde_json::from_str(s)
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
Ok(js.into_iter().map(mapping_from_json).collect())
}
pub struct PostgresImportProfileRepository {
pool: PgPool,
}
@@ -26,15 +98,13 @@ impl ImportProfileRepository for PostgresImportProfileRepository {
async fn save(&self, p: &ImportProfile) -> Result<(), DomainError> {
let id = p.id.value().to_string();
let user_id = p.user_id.value().to_string();
let field_mappings = serialize_mappings(&p.field_mappings)?;
sqlx::query(
"INSERT INTO import_profiles (id, user_id, name, field_mappings, created_at)
VALUES ($1, $2, $3, $4, $5) ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name, field_mappings = EXCLUDED.field_mappings",
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name, field_mappings = EXCLUDED.field_mappings",
)
.bind(&id)
.bind(&user_id)
.bind(&p.name)
.bind(&p.field_mappings)
.bind(p.created_at)
.bind(&id).bind(&user_id).bind(&p.name).bind(&field_mappings).bind(p.created_at)
.execute(&self.pool)
.await
.map(|_| ())
@@ -45,13 +115,7 @@ impl ImportProfileRepository for PostgresImportProfileRepository {
let uid = user_id.value().to_string();
#[derive(sqlx::FromRow)]
struct Row {
id: String,
user_id: String,
name: String,
field_mappings: String,
created_at: NaiveDateTime,
}
struct Row { id: String, user_id: String, name: String, field_mappings: String, created_at: NaiveDateTime }
let rows = sqlx::query_as::<_, Row>(
"SELECT id, user_id, name, field_mappings, created_at FROM import_profiles WHERE user_id = $1 ORDER BY created_at DESC",
@@ -61,19 +125,17 @@ impl ImportProfileRepository for PostgresImportProfileRepository {
.await
.map_err(Self::map_err)?;
rows.into_iter().map(|r| -> Result<ImportProfile, DomainError> {
Ok(ImportProfile {
id: ImportProfileId::from_uuid(
r.id.parse::<uuid::Uuid>().map_err(|e| DomainError::InfrastructureError(e.to_string()))?
),
user_id: UserId::from_uuid(
r.user_id.parse::<uuid::Uuid>().map_err(|e| DomainError::InfrastructureError(e.to_string()))?
),
name: r.name,
field_mappings: r.field_mappings,
created_at: r.created_at,
})
}).collect()
rows.into_iter().map(|r| Ok(ImportProfile {
id: ImportProfileId::from_uuid(
r.id.parse::<uuid::Uuid>().map_err(|e| DomainError::InfrastructureError(e.to_string()))?
),
user_id: UserId::from_uuid(
r.user_id.parse::<uuid::Uuid>().map_err(|e| DomainError::InfrastructureError(e.to_string()))?
),
name: r.name,
field_mappings: deserialize_mappings(&r.field_mappings)?,
created_at: r.created_at,
})).collect()
}
async fn get(&self, id: &ImportProfileId, user_id: &UserId) -> Result<Option<ImportProfile>, DomainError> {
@@ -81,36 +143,27 @@ impl ImportProfileRepository for PostgresImportProfileRepository {
let uid_str = user_id.value().to_string();
#[derive(sqlx::FromRow)]
struct Row {
id: String,
user_id: String,
name: String,
field_mappings: String,
created_at: NaiveDateTime,
}
struct Row { id: String, user_id: String, name: String, field_mappings: String, created_at: NaiveDateTime }
let row = sqlx::query_as::<_, Row>(
"SELECT id, user_id, name, field_mappings, created_at FROM import_profiles WHERE id = $1 AND user_id = $2",
)
.bind(&id_str)
.bind(&uid_str)
.bind(&id_str).bind(&uid_str)
.fetch_optional(&self.pool)
.await
.map_err(Self::map_err)?;
Ok(row.map(|r| -> Result<ImportProfile, DomainError> {
Ok(ImportProfile {
id: ImportProfileId::from_uuid(
r.id.parse::<uuid::Uuid>().map_err(|e| DomainError::InfrastructureError(e.to_string()))?
),
user_id: UserId::from_uuid(
r.user_id.parse::<uuid::Uuid>().map_err(|e| DomainError::InfrastructureError(e.to_string()))?
),
name: r.name,
field_mappings: r.field_mappings,
created_at: r.created_at,
})
}).transpose()?)
row.map(|r| Ok(ImportProfile {
id: ImportProfileId::from_uuid(
r.id.parse::<uuid::Uuid>().map_err(|e| DomainError::InfrastructureError(e.to_string()))?
),
user_id: UserId::from_uuid(
r.user_id.parse::<uuid::Uuid>().map_err(|e| DomainError::InfrastructureError(e.to_string()))?
),
name: r.name,
field_mappings: deserialize_mappings(&r.field_mappings)?,
created_at: r.created_at,
})).transpose()
}
async fn delete(&self, id: &ImportProfileId) -> Result<(), DomainError> {

View File

@@ -2,12 +2,181 @@ use async_trait::async_trait;
use chrono::NaiveDateTime;
use domain::{
errors::DomainError,
models::ImportSession,
models::{
AnnotatedRow, FieldMapping, ImportSession, ParsedFile,
import::{DomainField, ImportRow, RowResult, Transform},
},
ports::ImportSessionRepository,
value_objects::{ImportSessionId, UserId},
};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
// ── serde mirror structs ──
#[derive(Serialize, Deserialize, Default)]
struct ParsedFileJson {
columns: Vec<String>,
rows: Vec<Vec<String>>,
}
#[derive(Serialize, Deserialize)]
enum DomainFieldJson {
Title, ReleaseYear, Director, Rating, WatchedAt, Comment, ExternalMetadataId,
}
#[derive(Serialize, Deserialize)]
enum TransformJson {
RatingScale(f64),
DateFormat(String),
Identity,
}
#[derive(Serialize, Deserialize)]
struct FieldMappingJson {
source_column: String,
domain_field: DomainFieldJson,
transform: TransformJson,
}
#[derive(Serialize, Deserialize, Default)]
struct ImportRowJson {
#[serde(skip_serializing_if = "Option::is_none")] title: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")] release_year: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")] director: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")] rating: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")] watched_at: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")] comment: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")] external_metadata_id: Option<String>,
}
#[derive(Serialize, Deserialize)]
enum RowResultJson {
Valid(ImportRowJson),
Invalid { errors: Vec<String>, raw: Vec<(String, String)> },
}
#[derive(Serialize, Deserialize)]
struct AnnotatedRowJson {
result: RowResultJson,
is_duplicate: bool,
}
// ── conversion helpers ──
fn domain_field_to_json(f: &DomainField) -> DomainFieldJson {
match f {
DomainField::Title => DomainFieldJson::Title,
DomainField::ReleaseYear => DomainFieldJson::ReleaseYear,
DomainField::Director => DomainFieldJson::Director,
DomainField::Rating => DomainFieldJson::Rating,
DomainField::WatchedAt => DomainFieldJson::WatchedAt,
DomainField::Comment => DomainFieldJson::Comment,
DomainField::ExternalMetadataId => DomainFieldJson::ExternalMetadataId,
}
}
fn domain_field_from_json(j: DomainFieldJson) -> DomainField {
match j {
DomainFieldJson::Title => DomainField::Title,
DomainFieldJson::ReleaseYear => DomainField::ReleaseYear,
DomainFieldJson::Director => DomainField::Director,
DomainFieldJson::Rating => DomainField::Rating,
DomainFieldJson::WatchedAt => DomainField::WatchedAt,
DomainFieldJson::Comment => DomainField::Comment,
DomainFieldJson::ExternalMetadataId => DomainField::ExternalMetadataId,
}
}
fn transform_to_json(t: &Transform) -> TransformJson {
match t {
Transform::RatingScale(f) => TransformJson::RatingScale(*f),
Transform::DateFormat(s) => TransformJson::DateFormat(s.clone()),
Transform::Identity => TransformJson::Identity,
}
}
fn transform_from_json(j: TransformJson) -> Transform {
match j {
TransformJson::RatingScale(f) => Transform::RatingScale(f),
TransformJson::DateFormat(s) => Transform::DateFormat(s),
TransformJson::Identity => Transform::Identity,
}
}
fn mapping_to_json(m: &FieldMapping) -> FieldMappingJson {
FieldMappingJson {
source_column: m.source_column.clone(),
domain_field: domain_field_to_json(&m.domain_field),
transform: transform_to_json(&m.transform),
}
}
fn mapping_from_json(j: FieldMappingJson) -> FieldMapping {
FieldMapping {
source_column: j.source_column,
domain_field: domain_field_from_json(j.domain_field),
transform: transform_from_json(j.transform),
}
}
fn import_row_to_json(r: &ImportRow) -> ImportRowJson {
ImportRowJson {
title: r.title.clone(),
release_year: r.release_year.clone(),
director: r.director.clone(),
rating: r.rating.clone(),
watched_at: r.watched_at.clone(),
comment: r.comment.clone(),
external_metadata_id: r.external_metadata_id.clone(),
}
}
fn import_row_from_json(j: ImportRowJson) -> ImportRow {
ImportRow {
title: j.title,
release_year: j.release_year,
director: j.director,
rating: j.rating,
watched_at: j.watched_at,
comment: j.comment,
external_metadata_id: j.external_metadata_id,
}
}
fn annotated_to_json(a: &AnnotatedRow) -> AnnotatedRowJson {
AnnotatedRowJson {
result: match &a.result {
RowResult::Valid(row) => RowResultJson::Valid(import_row_to_json(row)),
RowResult::Invalid { errors, raw } => RowResultJson::Invalid {
errors: errors.clone(),
raw: raw.clone(),
},
},
is_duplicate: a.is_duplicate,
}
}
fn annotated_from_json(j: AnnotatedRowJson) -> AnnotatedRow {
AnnotatedRow {
result: match j.result {
RowResultJson::Valid(row) => RowResult::Valid(import_row_from_json(row)),
RowResultJson::Invalid { errors, raw } => RowResult::Invalid { errors, raw },
},
is_duplicate: j.is_duplicate,
}
}
fn ser<T: Serialize>(v: &T) -> Result<String, DomainError> {
serde_json::to_string(v).map_err(|e| DomainError::InfrastructureError(e.to_string()))
}
fn de<T: for<'de> Deserialize<'de>>(s: &str) -> Result<T, DomainError> {
serde_json::from_str(s).map_err(|e| DomainError::InfrastructureError(e.to_string()))
}
// ── repository ──
pub struct PostgresImportSessionRepository {
pool: PgPool,
}
@@ -19,6 +188,62 @@ impl PostgresImportSessionRepository {
tracing::error!("DB error: {:?}", e);
DomainError::InfrastructureError("Database operation failed".into())
}
fn serialize_session(s: &ImportSession) -> Result<(String, Option<String>, Option<String>), DomainError> {
let parsed = s.parsed_file.as_ref()
.map(|f| ser(&ParsedFileJson { columns: f.columns.clone(), rows: f.rows.clone() }))
.transpose()?
.unwrap_or_default();
let mappings = s.field_mappings.as_ref()
.map(|ms| ser(&ms.iter().map(mapping_to_json).collect::<Vec<_>>()))
.transpose()?;
let results = s.row_results.as_ref()
.map(|rs| ser(&rs.iter().map(annotated_to_json).collect::<Vec<_>>()))
.transpose()?;
Ok((parsed, mappings, results))
}
fn deserialize_session(
id: String,
user_id: String,
parsed_data: String,
field_mappings: Option<String>,
row_results: Option<String>,
created_at: NaiveDateTime,
expires_at: NaiveDateTime,
) -> Result<ImportSession, DomainError> {
let parsed_file = if parsed_data.is_empty() {
None
} else {
let j: ParsedFileJson = de(&parsed_data)?;
Some(ParsedFile { columns: j.columns, rows: j.rows })
};
let field_mappings = field_mappings.as_deref()
.map(|s| -> Result<Vec<FieldMapping>, DomainError> {
let js: Vec<FieldMappingJson> = de(s)?;
Ok(js.into_iter().map(mapping_from_json).collect())
})
.transpose()?;
let row_results = row_results.as_deref()
.map(|s| -> Result<Vec<AnnotatedRow>, DomainError> {
let js: Vec<AnnotatedRowJson> = de(s)?;
Ok(js.into_iter().map(annotated_from_json).collect())
})
.transpose()?;
Ok(ImportSession {
id: ImportSessionId::from_uuid(
id.parse::<uuid::Uuid>().map_err(|e| DomainError::InfrastructureError(e.to_string()))?
),
user_id: UserId::from_uuid(
user_id.parse::<uuid::Uuid>().map_err(|e| DomainError::InfrastructureError(e.to_string()))?
),
parsed_file,
field_mappings,
row_results,
created_at,
expires_at,
})
}
}
#[async_trait]
@@ -26,17 +251,14 @@ impl ImportSessionRepository for PostgresImportSessionRepository {
async fn create(&self, s: &ImportSession) -> Result<(), DomainError> {
let id = s.id.value().to_string();
let user_id = s.user_id.value().to_string();
let (parsed_data, field_mappings, row_results) = Self::serialize_session(s)?;
sqlx::query(
"INSERT INTO import_sessions (id, user_id, parsed_data, field_mappings, row_results, created_at, expires_at)
VALUES ($1, $2, $3, $4, $5, $6, $7)",
)
.bind(&id)
.bind(&user_id)
.bind(&s.parsed_data)
.bind(&s.field_mappings)
.bind(&s.row_results)
.bind(s.created_at)
.bind(s.expires_at)
.bind(&id).bind(&user_id).bind(&parsed_data)
.bind(&field_mappings).bind(&row_results)
.bind(s.created_at).bind(s.expires_at)
.execute(&self.pool)
.await
.map(|_| ())
@@ -62,41 +284,26 @@ impl ImportSessionRepository for PostgresImportSessionRepository {
"SELECT id, user_id, parsed_data, field_mappings, row_results, created_at, expires_at
FROM import_sessions WHERE id = $1 AND user_id = $2",
)
.bind(&id_str)
.bind(&uid_str)
.bind(&id_str).bind(&uid_str)
.fetch_optional(&self.pool)
.await
.map_err(Self::map_err)?;
Ok(row.map(|r| -> Result<ImportSession, DomainError> {
Ok(ImportSession {
id: ImportSessionId::from_uuid(
r.id.parse::<uuid::Uuid>().map_err(|e| DomainError::InfrastructureError(e.to_string()))?
),
user_id: UserId::from_uuid(
r.user_id.parse::<uuid::Uuid>().map_err(|e| DomainError::InfrastructureError(e.to_string()))?
),
parsed_data: r.parsed_data,
field_mappings: r.field_mappings,
row_results: r.row_results,
created_at: r.created_at,
expires_at: r.expires_at,
})
}).transpose()?)
row.map(|r| Self::deserialize_session(
r.id, r.user_id, r.parsed_data, r.field_mappings, r.row_results,
r.created_at, r.expires_at,
)).transpose()
}
async fn update(&self, s: &ImportSession) -> Result<(), DomainError> {
let id = s.id.value().to_string();
sqlx::query(
"UPDATE import_sessions SET field_mappings = $1, row_results = $2 WHERE id = $3",
)
.bind(&s.field_mappings)
.bind(&s.row_results)
.bind(&id)
.execute(&self.pool)
.await
.map(|_| ())
.map_err(Self::map_err)
let (_, field_mappings, row_results) = Self::serialize_session(s)?;
sqlx::query("UPDATE import_sessions SET field_mappings = $1, row_results = $2 WHERE id = $3")
.bind(&field_mappings).bind(&row_results).bind(&id)
.execute(&self.pool)
.await
.map(|_| ())
.map_err(Self::map_err)
}
async fn delete(&self, id: &ImportSessionId) -> Result<(), DomainError> {

View File

@@ -3,7 +3,7 @@ use domain::{
errors::DomainError,
events::DomainEvent,
models::{
DiaryEntry, DiaryFilter, DirectorStat, FeedEntry, MonthlyRating, Movie, Review,
DiaryEntry, DiaryFilter, DirectorStat, FeedEntry, MonthlyRating, Movie, MovieStats, Review,
ReviewHistory, ReviewSource, SortDirection, UserStats, UserTrends,
collections::{PageParams, Paginated},
},
@@ -18,8 +18,8 @@ mod models;
mod users;
use models::{
DiaryRow, DirectorCountRow, FeedRow, MonthlyRatingRow, MovieRow, ReviewRow, UserTotalsRow,
datetime_to_str,
DiaryRow, DirectorCountRow, FeedRow, MonthlyRatingRow, MovieRow, MovieStatsRow, ReviewRow,
UserTotalsRow, datetime_to_str,
};
pub use import_profile::PostgresImportProfileRepository;
@@ -692,6 +692,80 @@ impl DiaryRepository for PostgresRepository {
rows.into_iter().map(DiaryRow::to_domain).collect()
}
async fn get_movie_stats(&self, movie_id: &MovieId) -> Result<MovieStats, DomainError> {
let id_str = movie_id.value().to_string();
sqlx::query_as::<_, MovieStatsRow>(
"SELECT
COUNT(*) AS total_count,
AVG(CAST(rating AS FLOAT)) AS avg_rating,
COUNT(CASE WHEN remote_actor_url IS NOT NULL THEN 1 END) AS federated_count,
COUNT(CASE WHEN rating = 1 THEN 1 END) AS rating_1,
COUNT(CASE WHEN rating = 2 THEN 1 END) AS rating_2,
COUNT(CASE WHEN rating = 3 THEN 1 END) AS rating_3,
COUNT(CASE WHEN rating = 4 THEN 1 END) AS rating_4,
COUNT(CASE WHEN rating = 5 THEN 1 END) AS rating_5
FROM reviews WHERE movie_id = $1",
)
.bind(id_str)
.fetch_one(&self.pool)
.await
.map_err(Self::map_err)
.map(MovieStatsRow::to_domain)
}
async fn get_movie_social_feed(
&self,
movie_id: &MovieId,
page: &PageParams,
) -> Result<Paginated<FeedEntry>, DomainError> {
let id_str = movie_id.value().to_string();
let limit = page.limit as i64;
let offset = page.offset as i64;
let total: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM reviews WHERE movie_id = $1",
)
.bind(&id_str)
.fetch_one(&self.pool)
.await
.map_err(Self::map_err)?;
let rows = sqlx::query_as::<_, FeedRow>(
"SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path,
r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment,
to_char(r.watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at,
to_char(r.created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at,
r.remote_actor_url,
CASE WHEN r.remote_actor_url IS NOT NULL THEN r.remote_actor_url
WHEN u.email IS NOT NULL THEN u.email
ELSE r.user_id END AS user_email
FROM reviews r
INNER JOIN movies m ON m.id = r.movie_id
LEFT JOIN users u ON u.id = r.user_id
WHERE r.movie_id = $1
ORDER BY r.watched_at DESC
LIMIT $2 OFFSET $3",
)
.bind(&id_str)
.bind(limit)
.bind(offset)
.fetch_all(&self.pool)
.await
.map_err(Self::map_err)?;
let items = rows
.into_iter()
.map(FeedRow::to_domain)
.collect::<Result<Vec<_>, _>>()?;
Ok(Paginated {
items,
total_count: total as u64,
limit: page.limit,
offset: page.offset,
})
}
}
#[async_trait]

View File

@@ -157,6 +157,35 @@ impl FeedRow {
}
}
#[derive(sqlx::FromRow)]
pub(crate) struct MovieStatsRow {
pub total_count: i64,
pub avg_rating: Option<f64>,
pub federated_count: i64,
pub rating_1: i64,
pub rating_2: i64,
pub rating_3: i64,
pub rating_4: i64,
pub rating_5: i64,
}
impl MovieStatsRow {
pub fn to_domain(self) -> domain::models::MovieStats {
domain::models::MovieStats {
total_count: self.total_count as u64,
avg_rating: self.avg_rating,
federated_count: self.federated_count as u64,
rating_histogram: [
self.rating_1 as u64,
self.rating_2 as u64,
self.rating_3 as u64,
self.rating_4 as u64,
self.rating_5 as u64,
],
}
}
}
#[derive(sqlx::FromRow)]
pub(crate) struct UserSummaryRow {
pub id: String,