importer feature

This commit is contained in:
2026-05-10 21:23:56 +02:00
parent a47e3ae4e6
commit f2f1317660
77 changed files with 4884 additions and 1810 deletions

View File

@@ -4,8 +4,9 @@ version = "0.1.0"
edition = "2024"
[dependencies]
domain = { workspace = true }
serde = { workspace = true }
chrono = { workspace = true }
uuid = { workspace = true }
anyhow = { workspace = true }
domain = { workspace = true }
serde = { workspace = true }
chrono = { workspace = true }
uuid = { workspace = true }
anyhow = { workspace = true }
serde_json = { workspace = true }

View File

@@ -0,0 +1,14 @@
[package]
name = "importer"
version = "0.1.0"
edition = "2024"
[features]
xlsx = ["dep:calamine"]
[dependencies]
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
thiserror = { workspace = true }
csv = { workspace = true }
calamine = { version = "0.26", optional = true }

View File

@@ -0,0 +1,13 @@
#[derive(Debug, thiserror::Error)]
pub enum ImportError {
#[error("CSV parse error: {0}")]
Csv(String),
#[error("JSON parse error: {0}")]
Json(String),
#[error("XLSX parse error: {0}")]
Xlsx(String),
#[error("Empty file")]
Empty,
#[error("Missing header row")]
NoHeader,
}

View File

@@ -0,0 +1,12 @@
pub mod error;
pub mod mapper;
pub mod parsers;
pub mod types;
pub use error::ImportError;
pub use mapper::apply_mapping;
pub use parsers::{parse_csv, parse_json};
pub use types::{AnnotatedRow, DomainField, FieldMapping, ImportRow, ParsedFile, RowResult, Transform};
#[cfg(feature = "xlsx")]
pub use parsers::parse_xlsx;

View File

@@ -0,0 +1,192 @@
use crate::types::{AnnotatedRow, DomainField, FieldMapping, ImportRow, ParsedFile, RowResult, Transform};
pub fn apply_mapping(file: &ParsedFile, mappings: &[FieldMapping]) -> Vec<AnnotatedRow> {
file.rows.iter().map(|row| {
let result = map_row(row, &file.columns, mappings);
AnnotatedRow { result, is_duplicate: false }
}).collect()
}
fn map_row(row: &[String], columns: &[String], mappings: &[FieldMapping]) -> RowResult {
let mut import_row = ImportRow::default();
let mut errors = Vec::new();
for mapping in mappings {
let Some(col_idx) = columns.iter().position(|c| c == &mapping.source_column) else {
continue;
};
let raw_value = row.get(col_idx).map(|s| s.as_str()).unwrap_or("").trim();
if raw_value.is_empty() {
continue;
}
if let Some(value) = apply_transform(raw_value, &mapping.transform, &mut errors) {
set_field(&mut import_row, &mapping.domain_field, value);
}
}
if import_row.title.is_none() && import_row.external_metadata_id.is_none() {
errors.push("missing required field: title or external_metadata_id".into());
}
if import_row.rating.is_none() {
errors.push("missing required field: rating".into());
}
if import_row.watched_at.is_none() {
errors.push("missing required field: watched_at".into());
}
if errors.is_empty() {
RowResult::Valid(import_row)
} else {
let raw = columns.iter()
.zip(row.iter())
.map(|(c, v)| (c.clone(), v.clone()))
.collect();
RowResult::Invalid { errors, raw }
}
}
fn apply_transform(value: &str, transform: &Transform, errors: &mut Vec<String>) -> Option<String> {
match transform {
Transform::Identity => Some(value.to_string()),
Transform::DateFormat(_) => Some(value.to_string()),
Transform::RatingScale(factor) => {
match value.parse::<f64>() {
Ok(n) => Some((n * factor).round().to_string()),
Err(_) => {
errors.push(format!("rating '{}' is not a number", value));
None
}
}
}
}
}
fn set_field(row: &mut ImportRow, field: &DomainField, value: String) {
match field {
DomainField::Title => row.title = Some(value),
DomainField::ReleaseYear => row.release_year = Some(value),
DomainField::Director => row.director = Some(value),
DomainField::Rating => row.rating = Some(value),
DomainField::WatchedAt => row.watched_at = Some(value),
DomainField::Comment => row.comment = Some(value),
DomainField::ExternalMetadataId => row.external_metadata_id = Some(value),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::{DomainField, FieldMapping, ParsedFile, RowResult, Transform};
fn sample_file() -> ParsedFile {
ParsedFile {
columns: vec!["Name".into(), "Stars".into(), "Date".into()],
rows: vec![
vec!["Inception".into(), "10".into(), "2024-01-15".into()],
vec!["Dune".into(), "8".into(), "2024-02-20".into()],
vec!["".into(), "3".into(), "2024-03-01".into()], // missing title → invalid
],
}
}
fn full_mappings() -> Vec<FieldMapping> {
vec![
FieldMapping { source_column: "Name".into(), domain_field: DomainField::Title, transform: Transform::Identity },
FieldMapping { source_column: "Stars".into(), domain_field: DomainField::Rating, transform: Transform::RatingScale(0.5) },
FieldMapping { source_column: "Date".into(), domain_field: DomainField::WatchedAt, transform: Transform::Identity },
]
}
#[test]
fn maps_valid_rows() {
let results = apply_mapping(&sample_file(), &full_mappings());
assert_eq!(results.len(), 3);
// First two rows are valid
assert!(matches!(results[0].result, RowResult::Valid(_)));
assert!(matches!(results[1].result, RowResult::Valid(_)));
// is_duplicate defaults to false
assert!(!results[0].is_duplicate);
}
#[test]
fn applies_rating_scale_transform() {
let results = apply_mapping(&sample_file(), &full_mappings());
if let RowResult::Valid(row) = &results[0].result {
// 10 * 0.5 = 5
assert_eq!(row.rating.as_deref(), Some("5"));
} else {
panic!("expected Valid");
}
}
#[test]
fn marks_missing_required_fields_invalid() {
let results = apply_mapping(&sample_file(), &full_mappings());
// Row 2 has empty title
assert!(matches!(results[2].result, RowResult::Invalid { .. }));
}
#[test]
fn ignores_unmapped_columns() {
let mappings = vec![
FieldMapping { source_column: "Name".into(), domain_field: DomainField::Title, transform: Transform::Identity },
];
let file = ParsedFile {
columns: vec!["Name".into(), "Extra".into()],
rows: vec![vec!["Inception".into(), "ignored".into()]],
};
let results = apply_mapping(&file, &mappings);
assert_eq!(results.len(), 1);
// Missing rating and watched_at → invalid
assert!(matches!(results[0].result, RowResult::Invalid { .. }));
}
#[test]
fn nonexistent_source_column_skipped() {
let mappings = vec![
FieldMapping { source_column: "DoesNotExist".into(), domain_field: DomainField::Title, transform: Transform::Identity },
];
let file = ParsedFile {
columns: vec!["Name".into()],
rows: vec![vec!["Inception".into()]],
};
let results = apply_mapping(&file, &mappings);
// Column not found → field not set → invalid (missing title, rating, watched_at)
assert!(matches!(results[0].result, RowResult::Invalid { .. }));
}
#[test]
fn collects_all_errors_not_just_first() {
let mappings = vec![
FieldMapping { source_column: "Name".into(), domain_field: DomainField::Title, transform: Transform::Identity },
FieldMapping { source_column: "Stars".into(), domain_field: DomainField::Rating, transform: Transform::RatingScale(0.5) },
// no watched_at mapping
];
let file = ParsedFile {
columns: vec!["Name".into(), "Stars".into()],
rows: vec![vec!["Inception".into(), "notanumber".into()]],
};
let results = apply_mapping(&file, &mappings);
if let RowResult::Invalid { errors, .. } = &results[0].result {
assert!(errors.iter().any(|e| e.contains("not a number")), "expected rating error, got: {:?}", errors);
assert!(errors.iter().any(|e| e.contains("watched_at")), "expected watched_at error, got: {:?}", errors);
} else {
panic!("expected Invalid");
}
}
#[test]
fn non_numeric_rating_produces_error_in_row() {
let mappings = vec![
FieldMapping { source_column: "Name".into(), domain_field: DomainField::Title, transform: Transform::Identity },
FieldMapping { source_column: "Stars".into(), domain_field: DomainField::Rating, transform: Transform::RatingScale(0.5) },
FieldMapping { source_column: "Date".into(), domain_field: DomainField::WatchedAt, transform: Transform::Identity },
];
let file = ParsedFile {
columns: vec!["Name".into(), "Stars".into(), "Date".into()],
rows: vec![vec!["Inception".into(), "five".into(), "2024-01-15".into()]],
};
let results = apply_mapping(&file, &mappings);
assert!(matches!(results[0].result, RowResult::Invalid { .. }));
}
}

View File

@@ -0,0 +1,49 @@
use crate::{ImportError, types::ParsedFile};
pub fn parse_csv(bytes: &[u8]) -> Result<ParsedFile, ImportError> {
if bytes.is_empty() {
return Err(ImportError::Empty);
}
let delimiter = detect_delimiter(bytes);
let mut rdr = csv::ReaderBuilder::new()
.delimiter(delimiter)
.from_reader(bytes);
let columns: Vec<String> = rdr
.headers()
.map_err(|e| ImportError::Csv(e.to_string()))?
.iter()
.map(|s| s.trim().to_string())
.collect();
if columns.is_empty() {
return Err(ImportError::NoHeader);
}
let rows: Vec<Vec<String>> = rdr
.records()
.map(|r| {
r.map_err(|e| ImportError::Csv(e.to_string()))
.map(|rec| {
let mut cells: Vec<String> = rec.iter().map(|f| f.trim().to_string()).collect();
cells.resize(columns.len(), String::new());
cells.truncate(columns.len());
cells
})
})
.collect::<Result<_, _>>()?;
if rows.is_empty() {
return Err(ImportError::Empty);
}
Ok(ParsedFile { columns, rows })
}
fn detect_delimiter(bytes: &[u8]) -> u8 {
let first_line = bytes.split(|&b| b == b'\n').next().unwrap_or(bytes);
let tabs = first_line.iter().filter(|&&b| b == b'\t').count();
let commas = first_line.iter().filter(|&&b| b == b',').count();
if tabs > commas { b'\t' } else { b',' }
}

View File

@@ -0,0 +1,43 @@
use serde_json::Value;
use crate::{ImportError, types::ParsedFile};
pub fn parse_json(bytes: &[u8]) -> Result<ParsedFile, ImportError> {
let value: Value = serde_json::from_slice(bytes)
.map_err(|e| ImportError::Json(e.to_string()))?;
let arr = value.as_array()
.ok_or_else(|| ImportError::Json("expected a JSON array".into()))?;
if arr.is_empty() {
return Err(ImportError::Empty);
}
let first = arr[0].as_object()
.ok_or_else(|| ImportError::Json("array elements must be objects".into()))?;
let columns: Vec<String> = first.keys().cloned().collect();
if columns.is_empty() {
return Err(ImportError::NoHeader);
}
let rows: Vec<Vec<String>> = arr.iter()
.enumerate()
.map(|(idx, item)| {
let obj = item.as_object()
.ok_or_else(|| ImportError::Json(format!("element at index {} is not an object", idx)))?;
Ok(columns.iter()
.map(|col| obj.get(col).map(value_to_string).unwrap_or_default())
.collect())
})
.collect::<Result<_, ImportError>>()?;
Ok(ParsedFile { columns, rows })
}
fn value_to_string(v: &Value) -> String {
match v {
Value::String(s) => s.clone(),
Value::Null => String::new(),
other => other.to_string(),
}
}

View File

@@ -0,0 +1,50 @@
mod csv;
mod json;
#[cfg(feature = "xlsx")]
mod xlsx;
pub use csv::parse_csv;
pub use json::parse_json;
#[cfg(feature = "xlsx")]
pub use xlsx::parse_xlsx;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn csv_parses_headers_and_rows() {
let data = b"title,rating,watched_at\nInception,5,2024-01-01\nDune,4,2024-02-15\n";
let file = parse_csv(data).unwrap();
assert_eq!(file.columns, vec!["title", "rating", "watched_at"]);
assert_eq!(file.rows.len(), 2);
assert_eq!(file.rows[0], vec!["Inception", "5", "2024-01-01"]);
}
#[test]
fn csv_rejects_empty() {
assert!(parse_csv(b"").is_err());
}
#[test]
fn tsv_parses_correctly() {
let data = b"title\trating\nInception\t5\n";
let file = parse_csv(data).unwrap();
assert_eq!(file.columns, vec!["title", "rating"]);
assert_eq!(file.rows[0], vec!["Inception", "5"]);
}
#[test]
fn json_array_of_objects() {
let data = br#"[{"title":"Inception","rating":"5"},{"title":"Dune","rating":"4"}]"#;
let file = parse_json(data).unwrap();
assert_eq!(file.columns.len(), 2);
assert!(file.columns.contains(&"title".to_string()));
assert_eq!(file.rows.len(), 2);
}
#[test]
fn json_empty_array_errors() {
assert!(parse_json(b"[]").is_err());
}
}

View File

@@ -0,0 +1,64 @@
use calamine::{Reader, open_workbook_from_rs, Xlsx, Data};
use std::io::Cursor;
use crate::{ImportError, types::ParsedFile};
pub fn parse_xlsx(bytes: &[u8]) -> Result<ParsedFile, ImportError> {
let cursor = Cursor::new(bytes);
let mut workbook: Xlsx<_> = open_workbook_from_rs(cursor)
.map_err(|e: calamine::XlsxError| ImportError::Xlsx(e.to_string()))?;
let sheet_name = workbook.sheet_names()
.first()
.cloned()
.ok_or(ImportError::Empty)?;
let range = workbook.worksheet_range(&sheet_name)
.map_err(|e| ImportError::Xlsx(e.to_string()))?;
let mut iter = range.rows();
let header = iter.next().ok_or(ImportError::NoHeader)?;
let columns: Vec<String> = header.iter()
.map(|c| cell_to_string(c).trim().to_string())
.collect();
if columns.is_empty() {
return Err(ImportError::NoHeader);
}
let rows: Vec<Vec<String>> = iter
.map(|row| {
let mut cells: Vec<String> = row.iter().map(cell_to_string).collect();
cells.resize(columns.len(), String::new());
cells.truncate(columns.len());
cells
})
.collect();
if rows.is_empty() {
return Err(ImportError::Empty);
}
Ok(ParsedFile { columns, rows })
}
fn cell_to_string(cell: &Data) -> String {
match cell {
Data::String(s) => s.clone(),
Data::Float(f) => {
if f.fract() == 0.0 { format!("{}", *f as i64) } else { format!("{}", f) }
}
Data::Int(i) => i.to_string(),
Data::Bool(b) => b.to_string(),
Data::DateTime(dt) => {
// ExcelDateTime::to_ymd_hms_milli() works without the chrono feature.
let (year, month, day, _, _, _, _) = dt.to_ymd_hms_milli();
format!("{:04}-{:02}-{:02}", year, month, day)
}
Data::DateTimeIso(s) => s.clone(),
Data::DurationIso(s) => s.clone(),
Data::Empty | Data::Error(_) => String::new(),
// Fallback for unexpected calamine Data variants; renders as debug string
other => format!("{other:?}"),
}
}

View File

@@ -0,0 +1,57 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ParsedFile {
pub columns: Vec<String>,
pub rows: Vec<Vec<String>>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum DomainField {
Title,
ReleaseYear,
Director,
Rating,
WatchedAt,
Comment,
ExternalMetadataId,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Transform {
RatingScale(f64),
DateFormat(String),
Identity,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FieldMapping {
pub source_column: String,
pub domain_field: DomainField,
pub transform: Transform,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ImportRow {
pub title: Option<String>,
pub release_year: Option<String>,
pub director: Option<String>,
pub rating: Option<String>,
pub watched_at: Option<String>,
pub comment: Option<String>,
pub external_metadata_id: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RowResult {
Valid(ImportRow),
Invalid { errors: Vec<String>, raw: Vec<(String, String)> },
}
/// Wraps a RowResult with a duplicate flag so this information persists when
/// serialised as JSON into the import_sessions.row_results DB column.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AnnotatedRow {
pub result: RowResult,
pub is_duplicate: bool,
}

View File

@@ -0,0 +1,10 @@
[package]
name = "poster-sync"
version = "0.1.0"
edition = "2024"
[dependencies]
domain = { workspace = true }
async-trait = { workspace = true }
tracing = { workspace = true }
tokio = { workspace = true }

View File

@@ -0,0 +1,93 @@
use std::{sync::Arc, time::Duration};
use async_trait::async_trait;
use domain::{
errors::DomainError,
events::DomainEvent,
ports::{EventHandler, MetadataClient, MovieRepository, PosterFetcherClient, PosterStorage},
value_objects::{ExternalMetadataId, MovieId},
};
pub struct PosterSyncHandler {
movie_repository: Arc<dyn MovieRepository>,
metadata_client: Arc<dyn MetadataClient>,
poster_fetcher: Arc<dyn PosterFetcherClient>,
poster_storage: Arc<dyn PosterStorage>,
max_retries: u32,
}
impl PosterSyncHandler {
pub fn new(
movie_repository: Arc<dyn MovieRepository>,
metadata_client: Arc<dyn MetadataClient>,
poster_fetcher: Arc<dyn PosterFetcherClient>,
poster_storage: Arc<dyn PosterStorage>,
max_retries: u32,
) -> Self {
Self { movie_repository, metadata_client, poster_fetcher, poster_storage, max_retries }
}
async fn sync(&self, movie_id: MovieId, external_metadata_id: ExternalMetadataId) -> Result<(), DomainError> {
let mut movie = match self.movie_repository.get_movie_by_id(&movie_id).await? {
Some(m) => m,
None => {
tracing::warn!("Sync cancelled: Movie {} not found", movie_id.value());
return Err(DomainError::NotFound("Movie not found".into()));
}
};
let poster_url = match self.metadata_client.get_poster_url(&external_metadata_id).await {
Ok(Some(url)) => url,
Ok(None) => return Ok(()),
Err(e) => {
tracing::warn!("Failed to find poster URL: {:?}", e);
return Err(e);
}
};
let image_bytes = self.poster_fetcher.fetch_poster_bytes(&poster_url).await?;
let stored_path = self.poster_storage.store_poster(&movie_id, &image_bytes).await?;
movie.update_poster(stored_path);
self.movie_repository.upsert_movie(&movie).await
}
}
#[async_trait]
impl EventHandler for PosterSyncHandler {
async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> {
let (movie_id, external_metadata_id) = match event {
DomainEvent::MovieDiscovered { movie_id, external_metadata_id } => {
(movie_id.value(), external_metadata_id.value().to_owned())
}
_ => return Ok(()),
};
let movie_id = MovieId::from_uuid(movie_id);
let external_metadata_id = ExternalMetadataId::new(external_metadata_id)?;
let mut last_err: Option<DomainError> = None;
for attempt in 0..=self.max_retries {
match self.sync(movie_id.clone(), external_metadata_id.clone()).await {
Ok(()) => return Ok(()),
Err(e) => {
if attempt < self.max_retries {
let delay = Duration::from_secs(2u64.pow(attempt));
tracing::warn!(
attempt = attempt + 1,
max_attempts = self.max_retries + 1,
delay_secs = delay.as_secs(),
"poster sync failed, retrying: {e}"
);
tokio::time::sleep(delay).await;
}
last_err = Some(e);
}
}
}
let err = last_err.expect("loop runs at least once");
tracing::error!(attempts = self.max_retries + 1, "poster sync failed after all attempts: {err}");
Err(err)
}
}

View File

@@ -0,0 +1,21 @@
CREATE TABLE IF NOT EXISTS import_sessions (
id TEXT PRIMARY KEY NOT NULL,
user_id TEXT NOT NULL,
parsed_data TEXT NOT NULL,
field_mappings TEXT,
row_results TEXT,
created_at TIMESTAMPTZ NOT NULL,
expires_at TIMESTAMPTZ NOT NULL
);
CREATE TABLE IF NOT EXISTS import_profiles (
id TEXT PRIMARY KEY NOT NULL,
user_id TEXT NOT NULL,
name TEXT NOT NULL,
field_mappings TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_import_sessions_user_id ON import_sessions (user_id);
CREATE INDEX IF NOT EXISTS idx_import_sessions_expires_at ON import_sessions (expires_at);
CREATE INDEX IF NOT EXISTS idx_import_profiles_user_id ON import_profiles (user_id);

View File

@@ -0,0 +1,125 @@
use async_trait::async_trait;
use chrono::NaiveDateTime;
use domain::{
errors::DomainError,
models::ImportProfile,
ports::ImportProfileRepository,
value_objects::{ImportProfileId, UserId},
};
use sqlx::PgPool;
pub struct PostgresImportProfileRepository {
pool: PgPool,
}
impl PostgresImportProfileRepository {
pub fn new(pool: PgPool) -> Self { Self { pool } }
fn map_err(e: sqlx::Error) -> DomainError {
tracing::error!("DB error: {:?}", e);
DomainError::InfrastructureError("Database operation failed".into())
}
}
#[async_trait]
impl ImportProfileRepository for PostgresImportProfileRepository {
async fn save(&self, p: &ImportProfile) -> Result<(), DomainError> {
let id = p.id.value().to_string();
let user_id = p.user_id.value().to_string();
sqlx::query(
"INSERT INTO import_profiles (id, user_id, name, field_mappings, created_at)
VALUES ($1, $2, $3, $4, $5) ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name, field_mappings = EXCLUDED.field_mappings",
)
.bind(&id)
.bind(&user_id)
.bind(&p.name)
.bind(&p.field_mappings)
.bind(p.created_at)
.execute(&self.pool)
.await
.map(|_| ())
.map_err(Self::map_err)
}
async fn list_for_user(&self, user_id: &UserId) -> Result<Vec<ImportProfile>, DomainError> {
let uid = user_id.value().to_string();
#[derive(sqlx::FromRow)]
struct Row {
id: String,
user_id: String,
name: String,
field_mappings: String,
created_at: NaiveDateTime,
}
let rows = sqlx::query_as::<_, Row>(
"SELECT id, user_id, name, field_mappings, created_at FROM import_profiles WHERE user_id = $1 ORDER BY created_at DESC",
)
.bind(&uid)
.fetch_all(&self.pool)
.await
.map_err(Self::map_err)?;
rows.into_iter().map(|r| -> Result<ImportProfile, DomainError> {
Ok(ImportProfile {
id: ImportProfileId::from_uuid(
r.id.parse::<uuid::Uuid>().map_err(|e| DomainError::InfrastructureError(e.to_string()))?
),
user_id: UserId::from_uuid(
r.user_id.parse::<uuid::Uuid>().map_err(|e| DomainError::InfrastructureError(e.to_string()))?
),
name: r.name,
field_mappings: r.field_mappings,
created_at: r.created_at,
})
}).collect()
}
async fn get(&self, id: &ImportProfileId, user_id: &UserId) -> Result<Option<ImportProfile>, DomainError> {
let id_str = id.value().to_string();
let uid_str = user_id.value().to_string();
#[derive(sqlx::FromRow)]
struct Row {
id: String,
user_id: String,
name: String,
field_mappings: String,
created_at: NaiveDateTime,
}
let row = sqlx::query_as::<_, Row>(
"SELECT id, user_id, name, field_mappings, created_at FROM import_profiles WHERE id = $1 AND user_id = $2",
)
.bind(&id_str)
.bind(&uid_str)
.fetch_optional(&self.pool)
.await
.map_err(Self::map_err)?;
Ok(row.map(|r| -> Result<ImportProfile, DomainError> {
Ok(ImportProfile {
id: ImportProfileId::from_uuid(
r.id.parse::<uuid::Uuid>().map_err(|e| DomainError::InfrastructureError(e.to_string()))?
),
user_id: UserId::from_uuid(
r.user_id.parse::<uuid::Uuid>().map_err(|e| DomainError::InfrastructureError(e.to_string()))?
),
name: r.name,
field_mappings: r.field_mappings,
created_at: r.created_at,
})
}).transpose()?)
}
async fn delete(&self, id: &ImportProfileId) -> Result<(), DomainError> {
let id_str = id.value().to_string();
sqlx::query("DELETE FROM import_profiles WHERE id = $1")
.bind(&id_str)
.execute(&self.pool)
.await
.map(|_| ())
.map_err(Self::map_err)
}
}

View File

@@ -0,0 +1,129 @@
use async_trait::async_trait;
use chrono::NaiveDateTime;
use domain::{
errors::DomainError,
models::ImportSession,
ports::ImportSessionRepository,
value_objects::{ImportSessionId, UserId},
};
use sqlx::PgPool;
pub struct PostgresImportSessionRepository {
pool: PgPool,
}
impl PostgresImportSessionRepository {
pub fn new(pool: PgPool) -> Self { Self { pool } }
fn map_err(e: sqlx::Error) -> DomainError {
tracing::error!("DB error: {:?}", e);
DomainError::InfrastructureError("Database operation failed".into())
}
}
#[async_trait]
impl ImportSessionRepository for PostgresImportSessionRepository {
async fn create(&self, s: &ImportSession) -> Result<(), DomainError> {
let id = s.id.value().to_string();
let user_id = s.user_id.value().to_string();
sqlx::query(
"INSERT INTO import_sessions (id, user_id, parsed_data, field_mappings, row_results, created_at, expires_at)
VALUES ($1, $2, $3, $4, $5, $6, $7)",
)
.bind(&id)
.bind(&user_id)
.bind(&s.parsed_data)
.bind(&s.field_mappings)
.bind(&s.row_results)
.bind(s.created_at)
.bind(s.expires_at)
.execute(&self.pool)
.await
.map(|_| ())
.map_err(Self::map_err)
}
async fn get(&self, id: &ImportSessionId, user_id: &UserId) -> Result<Option<ImportSession>, DomainError> {
let id_str = id.value().to_string();
let uid_str = user_id.value().to_string();
#[derive(sqlx::FromRow)]
struct Row {
id: String,
user_id: String,
parsed_data: String,
field_mappings: Option<String>,
row_results: Option<String>,
created_at: NaiveDateTime,
expires_at: NaiveDateTime,
}
let row = sqlx::query_as::<_, Row>(
"SELECT id, user_id, parsed_data, field_mappings, row_results, created_at, expires_at
FROM import_sessions WHERE id = $1 AND user_id = $2",
)
.bind(&id_str)
.bind(&uid_str)
.fetch_optional(&self.pool)
.await
.map_err(Self::map_err)?;
Ok(row.map(|r| -> Result<ImportSession, DomainError> {
Ok(ImportSession {
id: ImportSessionId::from_uuid(
r.id.parse::<uuid::Uuid>().map_err(|e| DomainError::InfrastructureError(e.to_string()))?
),
user_id: UserId::from_uuid(
r.user_id.parse::<uuid::Uuid>().map_err(|e| DomainError::InfrastructureError(e.to_string()))?
),
parsed_data: r.parsed_data,
field_mappings: r.field_mappings,
row_results: r.row_results,
created_at: r.created_at,
expires_at: r.expires_at,
})
}).transpose()?)
}
async fn update(&self, s: &ImportSession) -> Result<(), DomainError> {
let id = s.id.value().to_string();
sqlx::query(
"UPDATE import_sessions SET field_mappings = $1, row_results = $2 WHERE id = $3",
)
.bind(&s.field_mappings)
.bind(&s.row_results)
.bind(&id)
.execute(&self.pool)
.await
.map(|_| ())
.map_err(Self::map_err)
}
async fn delete(&self, id: &ImportSessionId) -> Result<(), DomainError> {
let id_str = id.value().to_string();
sqlx::query("DELETE FROM import_sessions WHERE id = $1")
.bind(&id_str)
.execute(&self.pool)
.await
.map(|_| ())
.map_err(Self::map_err)
}
async fn delete_expired(&self) -> Result<u64, DomainError> {
let result = sqlx::query("DELETE FROM import_sessions WHERE expires_at < NOW()")
.execute(&self.pool)
.await
.map_err(Self::map_err)?;
Ok(result.rows_affected())
}
async fn delete_expired_for_user(&self, user_id: &UserId) -> Result<(), DomainError> {
let uid = user_id.value().to_string();
sqlx::query("DELETE FROM import_sessions WHERE user_id = $1 AND expires_at < NOW()")
.bind(&uid)
.execute(&self.pool)
.await
.map(|_| ())
.map_err(Self::map_err)
}
}

View File

@@ -12,6 +12,8 @@ use domain::{
};
use sqlx::PgPool;
mod import_profile;
mod import_session;
mod models;
mod users;
@@ -20,6 +22,8 @@ use models::{
datetime_to_str,
};
pub use import_profile::PostgresImportProfileRepository;
pub use import_session::PostgresImportSessionRepository;
pub use users::PostgresUserRepository;
fn format_year_month(ym: &str) -> String {
@@ -775,6 +779,8 @@ pub async fn wire(database_url: &str) -> anyhow::Result<(
std::sync::Arc<dyn domain::ports::DiaryRepository>,
std::sync::Arc<dyn domain::ports::StatsRepository>,
std::sync::Arc<dyn domain::ports::UserRepository>,
std::sync::Arc<dyn domain::ports::ImportSessionRepository>,
std::sync::Arc<dyn domain::ports::ImportProfileRepository>,
)> {
use anyhow::Context;
@@ -788,6 +794,9 @@ pub async fn wire(database_url: &str) -> anyhow::Result<(
.map_err(|e| anyhow::anyhow!("{e}"))
.context("Database migration failed")?;
let import_session_repo = std::sync::Arc::new(PostgresImportSessionRepository::new(pool.clone()));
let import_profile_repo = std::sync::Arc::new(PostgresImportProfileRepository::new(pool.clone()));
Ok((
pool.clone(),
std::sync::Arc::clone(&repo) as _,
@@ -795,5 +804,7 @@ pub async fn wire(database_url: &str) -> anyhow::Result<(
std::sync::Arc::clone(&repo) as _,
std::sync::Arc::clone(&repo) as _,
std::sync::Arc::new(PostgresUserRepository::new(pool)) as _,
import_session_repo as _,
import_profile_repo as _,
))
}

View File

@@ -0,0 +1,21 @@
CREATE TABLE IF NOT EXISTS import_sessions (
id TEXT PRIMARY KEY NOT NULL,
user_id TEXT NOT NULL,
parsed_data TEXT NOT NULL,
field_mappings TEXT,
row_results TEXT,
created_at TEXT NOT NULL,
expires_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS import_profiles (
id TEXT PRIMARY KEY NOT NULL,
user_id TEXT NOT NULL,
name TEXT NOT NULL,
field_mappings TEXT NOT NULL,
created_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_import_sessions_user_id ON import_sessions (user_id);
CREATE INDEX IF NOT EXISTS idx_import_sessions_expires_at ON import_sessions (expires_at);
CREATE INDEX IF NOT EXISTS idx_import_profiles_user_id ON import_profiles (user_id);

View File

@@ -0,0 +1,106 @@
use async_trait::async_trait;
use chrono::NaiveDateTime;
use domain::{
errors::DomainError,
models::ImportProfile,
ports::ImportProfileRepository,
value_objects::{ImportProfileId, UserId},
};
use sqlx::SqlitePool;
pub struct SqliteImportProfileRepository {
pool: SqlitePool,
}
impl SqliteImportProfileRepository {
pub fn new(pool: SqlitePool) -> Self { Self { pool } }
fn map_err(e: sqlx::Error) -> DomainError {
tracing::error!("DB error: {:?}", e);
DomainError::InfrastructureError("Database operation failed".into())
}
fn parse_dt(s: &str) -> Result<NaiveDateTime, DomainError> {
NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S")
.or_else(|_| NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S"))
.map_err(|e| DomainError::InfrastructureError(format!("invalid datetime '{}': {}", s, e)))
}
}
#[async_trait]
impl ImportProfileRepository for SqliteImportProfileRepository {
async fn save(&self, p: &ImportProfile) -> Result<(), DomainError> {
let id = p.id.value().to_string();
let user_id = p.user_id.value().to_string();
let created_at = p.created_at.format("%Y-%m-%d %H:%M:%S").to_string();
sqlx::query!(
"INSERT OR REPLACE INTO import_profiles (id, user_id, name, field_mappings, created_at)
VALUES (?, ?, ?, ?, ?)",
id, user_id, p.name, p.field_mappings, created_at
)
.execute(&self.pool)
.await
.map(|_| ())
.map_err(Self::map_err)
}
async fn list_for_user(&self, user_id: &UserId) -> Result<Vec<ImportProfile>, DomainError> {
let uid = user_id.value().to_string();
let rows = sqlx::query!(
"SELECT id, user_id, name, field_mappings, created_at FROM import_profiles WHERE user_id = ? ORDER BY created_at DESC",
uid
)
.fetch_all(&self.pool)
.await
.map_err(Self::map_err)?;
rows.into_iter().map(|r| -> Result<ImportProfile, DomainError> {
Ok(ImportProfile {
id: ImportProfileId::from_uuid(
r.id.parse::<uuid::Uuid>().map_err(|e| DomainError::InfrastructureError(e.to_string()))?
),
user_id: UserId::from_uuid(
r.user_id.parse::<uuid::Uuid>().map_err(|e| DomainError::InfrastructureError(e.to_string()))?
),
name: r.name,
field_mappings: r.field_mappings,
created_at: Self::parse_dt(&r.created_at)?,
})
}).collect()
}
async fn get(&self, id: &ImportProfileId, user_id: &UserId) -> Result<Option<ImportProfile>, DomainError> {
let id_str = id.value().to_string();
let uid_str = user_id.value().to_string();
let row = sqlx::query!(
"SELECT id, user_id, name, field_mappings, created_at FROM import_profiles WHERE id = ? AND user_id = ?",
id_str, uid_str
)
.fetch_optional(&self.pool)
.await
.map_err(Self::map_err)?;
Ok(row.map(|r| -> Result<ImportProfile, DomainError> {
Ok(ImportProfile {
id: ImportProfileId::from_uuid(
r.id.parse::<uuid::Uuid>().map_err(|e| DomainError::InfrastructureError(e.to_string()))?
),
user_id: UserId::from_uuid(
r.user_id.parse::<uuid::Uuid>().map_err(|e| DomainError::InfrastructureError(e.to_string()))?
),
name: r.name,
field_mappings: r.field_mappings,
created_at: Self::parse_dt(&r.created_at)?,
})
}).transpose()?)
}
async fn delete(&self, id: &ImportProfileId) -> Result<(), DomainError> {
let id_str = id.value().to_string();
sqlx::query!("DELETE FROM import_profiles WHERE id = ?", id_str)
.execute(&self.pool)
.await
.map(|_| ())
.map_err(Self::map_err)
}
}

View File

@@ -0,0 +1,114 @@
use async_trait::async_trait;
use chrono::NaiveDateTime;
use domain::{
errors::DomainError,
models::ImportSession,
ports::ImportSessionRepository,
value_objects::{ImportSessionId, UserId},
};
use sqlx::SqlitePool;
pub struct SqliteImportSessionRepository {
pool: SqlitePool,
}
impl SqliteImportSessionRepository {
pub fn new(pool: SqlitePool) -> Self { Self { pool } }
fn map_err(e: sqlx::Error) -> DomainError {
tracing::error!("DB error: {:?}", e);
DomainError::InfrastructureError("Database operation failed".into())
}
fn parse_dt(s: &str) -> Result<NaiveDateTime, DomainError> {
NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S")
.or_else(|_| NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S"))
.map_err(|e| DomainError::InfrastructureError(format!("invalid datetime '{}': {}", s, e)))
}
}
#[async_trait]
impl ImportSessionRepository for SqliteImportSessionRepository {
async fn create(&self, s: &ImportSession) -> Result<(), DomainError> {
let id = s.id.value().to_string();
let user_id = s.user_id.value().to_string();
let created_at = s.created_at.format("%Y-%m-%d %H:%M:%S").to_string();
let expires_at = s.expires_at.format("%Y-%m-%d %H:%M:%S").to_string();
sqlx::query!(
"INSERT INTO import_sessions (id, user_id, parsed_data, field_mappings, row_results, created_at, expires_at)
VALUES (?, ?, ?, ?, ?, ?, ?)",
id, user_id, s.parsed_data, s.field_mappings, s.row_results, created_at, expires_at
)
.execute(&self.pool)
.await
.map(|_| ())
.map_err(Self::map_err)
}
async fn get(&self, id: &ImportSessionId, user_id: &UserId) -> Result<Option<ImportSession>, DomainError> {
let id_str = id.value().to_string();
let uid_str = user_id.value().to_string();
let row = sqlx::query!(
"SELECT id, user_id, parsed_data, field_mappings, row_results, created_at, expires_at
FROM import_sessions WHERE id = ? AND user_id = ?",
id_str, uid_str
)
.fetch_optional(&self.pool)
.await
.map_err(Self::map_err)?;
Ok(row.map(|r| -> Result<ImportSession, DomainError> {
Ok(ImportSession {
id: ImportSessionId::from_uuid(
r.id.parse::<uuid::Uuid>().map_err(|e| DomainError::InfrastructureError(e.to_string()))?
),
user_id: UserId::from_uuid(
r.user_id.parse::<uuid::Uuid>().map_err(|e| DomainError::InfrastructureError(e.to_string()))?
),
parsed_data: r.parsed_data,
field_mappings: r.field_mappings,
row_results: r.row_results,
created_at: Self::parse_dt(&r.created_at)?,
expires_at: Self::parse_dt(&r.expires_at)?,
})
}).transpose()?)
}
async fn update(&self, s: &ImportSession) -> Result<(), DomainError> {
let id = s.id.value().to_string();
sqlx::query!(
"UPDATE import_sessions SET field_mappings = ?, row_results = ? WHERE id = ?",
s.field_mappings, s.row_results, id
)
.execute(&self.pool)
.await
.map(|_| ())
.map_err(Self::map_err)
}
async fn delete(&self, id: &ImportSessionId) -> Result<(), DomainError> {
let id_str = id.value().to_string();
sqlx::query!("DELETE FROM import_sessions WHERE id = ?", id_str)
.execute(&self.pool)
.await
.map(|_| ())
.map_err(Self::map_err)
}
async fn delete_expired(&self) -> Result<u64, DomainError> {
let result = sqlx::query!("DELETE FROM import_sessions WHERE expires_at < datetime('now')")
.execute(&self.pool)
.await
.map_err(Self::map_err)?;
Ok(result.rows_affected())
}
async fn delete_expired_for_user(&self, user_id: &UserId) -> Result<(), DomainError> {
let uid = user_id.value().to_string();
sqlx::query!("DELETE FROM import_sessions WHERE user_id = ? AND expires_at < datetime('now')", uid)
.execute(&self.pool)
.await
.map(|_| ())
.map_err(Self::map_err)
}
}

View File

@@ -12,6 +12,8 @@ use domain::{
};
use sqlx::SqlitePool;
mod import_profile;
mod import_session;
mod migrations;
mod models;
mod users;
@@ -21,6 +23,8 @@ use models::{
datetime_to_str,
};
pub use import_profile::SqliteImportProfileRepository;
pub use import_session::SqliteImportSessionRepository;
pub use users::SqliteUserRepository;
fn format_year_month(ym: &str) -> String {
@@ -766,6 +770,8 @@ pub async fn wire(database_url: &str) -> anyhow::Result<(
std::sync::Arc<dyn domain::ports::DiaryRepository>,
std::sync::Arc<dyn domain::ports::StatsRepository>,
std::sync::Arc<dyn domain::ports::UserRepository>,
std::sync::Arc<dyn domain::ports::ImportSessionRepository>,
std::sync::Arc<dyn domain::ports::ImportProfileRepository>,
)> {
use std::str::FromStr;
use anyhow::Context;
@@ -786,6 +792,9 @@ pub async fn wire(database_url: &str) -> anyhow::Result<(
.map_err(|e| anyhow::anyhow!("{e}"))
.context("Database migration failed")?;
let import_session_repo = std::sync::Arc::new(SqliteImportSessionRepository::new(pool.clone()));
let import_profile_repo = std::sync::Arc::new(SqliteImportProfileRepository::new(pool.clone()));
Ok((
pool.clone(),
std::sync::Arc::clone(&repo) as _,
@@ -793,6 +802,8 @@ pub async fn wire(database_url: &str) -> anyhow::Result<(
std::sync::Arc::clone(&repo) as _,
std::sync::Arc::clone(&repo) as _,
std::sync::Arc::new(SqliteUserRepository::new(pool)) as _,
import_session_repo as _,
import_profile_repo as _,
))
}

View File

@@ -1,6 +1,8 @@
use application::ports::{
ActivityFeedPageData, FollowersPageData, FollowingPageData, HtmlPageContext, HtmlRenderer,
LoginPageData, NewReviewPageData, ProfilePageData, RegisterPageData, UsersPageData,
ImportMappingPageData, ImportPreviewPageData, ImportPreviewRow, ImportProfileView,
ImportRowStatus, ImportUploadPageData, LoginPageData, NewReviewPageData, ProfilePageData,
RegisterPageData, UsersPageData,
};
use askama::Template;
use chrono::Datelike;
@@ -290,6 +292,34 @@ fn bar_height_px(avg_rating: f64) -> i64 {
(avg_rating / 5.0 * 60.0) as i64
}
#[derive(Template)]
#[template(path = "import_upload.html")]
struct ImportUploadTemplate<'a> {
ctx: &'a HtmlPageContext,
profiles: &'a [ImportProfileView],
error: Option<&'a str>,
}
#[derive(Template)]
#[template(path = "import_mapping.html")]
struct ImportMappingTemplate<'a> {
ctx: &'a HtmlPageContext,
session_id: &'a str,
columns: &'a [String],
sample_rows: &'a [Vec<String>],
domain_fields: &'a [(&'static str, &'static str)],
error: Option<&'a str>,
}
#[derive(Template)]
#[template(path = "import_preview.html")]
struct ImportPreviewTemplate<'a> {
ctx: &'a HtmlPageContext,
session_id: &'a str,
columns: &'a [String],
rows: &'a [ImportPreviewRow],
}
pub struct AskamaHtmlRenderer;
impl AskamaHtmlRenderer {
@@ -557,4 +587,38 @@ impl HtmlRenderer for AskamaHtmlRenderer {
.render()
.map_err(|e| e.to_string())
}
fn render_import_upload_page(&self, data: ImportUploadPageData) -> Result<String, String> {
ImportUploadTemplate {
ctx: &data.ctx,
profiles: &data.profiles,
error: data.error.as_deref(),
}
.render()
.map_err(|e| e.to_string())
}
fn render_import_mapping_page(&self, data: ImportMappingPageData) -> Result<String, String> {
ImportMappingTemplate {
ctx: &data.ctx,
session_id: &data.session_id,
columns: &data.columns,
sample_rows: &data.sample_rows,
domain_fields: &data.domain_fields,
error: data.error.as_deref(),
}
.render()
.map_err(|e| e.to_string())
}
fn render_import_preview_page(&self, data: ImportPreviewPageData) -> Result<String, String> {
ImportPreviewTemplate {
ctx: &data.ctx,
session_id: &data.session_id,
columns: &data.columns,
rows: &data.rows,
}
.render()
.map_err(|e| e.to_string())
}
}

View File

@@ -33,6 +33,7 @@
{% if let Some(uid) = ctx.user_id %}
<a href="/users/{{ uid }}">Profile</a>
<a href="/reviews/new">Add Review</a>
<a href="/import">Import</a>
<a href="/logout">Logout</a>
{% else %}
<a href="/login">Login</a>

View File

@@ -0,0 +1,52 @@
{% extends "base.html" %}
{% block content %}
<h1>Map Columns</h1>
{% if let Some(err) = error %}
<p class="error">{{ err }}</p>
{% endif %}
<p>Showing up to 5 sample rows. Map each column to a diary field.</p>
<form method="POST" action="/import/{{ session_id }}/mapping">
<table>
<thead>
<tr>
{% for col in columns %}<th>{{ col }}</th>{% endfor %}
</tr>
</thead>
<tbody>
{% for row in sample_rows %}
<tr>{% for cell in row %}<td>{{ cell }}</td>{% endfor %}</tr>
{% endfor %}
</tbody>
</table>
{% for col in columns %}
<fieldset>
<legend>{{ col }}</legend>
<label>Maps to
<select name="mapping_{{ loop.index0 }}_field">
<option value="">— skip —</option>
{% for (val, label) in domain_fields %}
<option value="{{ val }}">{{ label }}</option>
{% endfor %}
</select>
</label>
<label>Rating scale
<select name="mapping_{{ loop.index0 }}_scale">
<option value="1.0">Same (05)</option>
<option value="0.5">10-point (/2)</option>
<option value="0.05">Percentage (/20)</option>
</select>
</label>
<label>Date format
<input type="text" name="mapping_{{ loop.index0 }}_datefmt" placeholder="%Y-%m-%d">
</label>
<input type="hidden" name="mapping_{{ loop.index0 }}_col" value="{{ col }}">
</fieldset>
{% endfor %}
<input type="hidden" name="_csrf" value="{{ ctx.csrf_token }}">
<button type="submit">Preview Import</button>
</form>
{% endblock %}

View File

@@ -0,0 +1,45 @@
{% extends "base.html" %}
{% block content %}
<h1>Preview Import</h1>
<form method="POST" action="/import/{{ session_id }}/confirm">
<table>
<thead>
<tr>
<th>Include?</th>
{% for col in columns %}<th>{{ col }}</th>{% endfor %}
<th>Status</th>
</tr>
</thead>
<tbody>
{% for row in rows %}
<tr>
<td>
{% match row.status %}
{% when ImportRowStatus::Invalid with (_e) %}
<input type="checkbox" disabled>
{% when _ %}
<input type="checkbox" name="confirmed" value="{{ row.index }}" checked>
{% endmatch %}
</td>
{% for cell in row.cells %}<td>{{ cell }}</td>{% endfor %}
<td>
{% match row.status %}
{% when ImportRowStatus::Valid %}&#10003;
{% when ImportRowStatus::Duplicate %}&#9888; duplicate
{% when ImportRowStatus::Invalid with (e) %}&#10007; {{ e }}
{% endmatch %}
</td>
</tr>
{% endfor %}
</tbody>
</table>
<label>Save this mapping as a profile?
<input type="text" name="profile_name" placeholder="e.g. Letterboxd">
</label>
<input type="hidden" name="_csrf" value="{{ ctx.csrf_token }}">
<button type="submit">Import Selected</button>
</form>
{% endblock %}

View File

@@ -0,0 +1,42 @@
{% extends "base.html" %}
{% block content %}
<h1>Import Reviews</h1>
{% if let Some(err) = error %}
<p class="error">{{ err }}</p>
{% endif %}
{% if !profiles.is_empty() %}
<section>
<h2>Saved Profiles</h2>
<ul>
{% for p in profiles %}
<li>
{{ p.name }}
<form method="POST" action="/import/profiles/{{ p.id }}/delete" style="display:inline">
<input type="hidden" name="_csrf" value="{{ ctx.csrf_token }}">
<button type="submit">Delete</button>
</form>
</li>
{% endfor %}
</ul>
</section>
{% endif %}
<h2>Upload File</h2>
<form method="POST" action="/import/upload" enctype="multipart/form-data">
<label>
File (CSV, TSV, JSON, XLSX)<br>
<input type="file" name="file" accept=".csv,.tsv,.json,.xlsx" required>
</label>
<label>
Format<br>
<select name="format">
<option value="csv">CSV / TSV</option>
<option value="json">JSON</option>
<option value="xlsx">XLSX</option>
</select>
</label>
<input type="hidden" name="_csrf" value="{{ ctx.csrf_token }}">
<button type="submit">Upload</button>
</form>
{% endblock %}