use crate::helpers::{MapDomainError, pg_repo}; use async_trait::async_trait; use chrono::{DateTime, Utc}; use domain::{ entities::{ Asset, AssetFilters, AssetMetadata, AssetStack, AssetStackMember, AssetType, DerivativeAsset, DerivativeProfile, DetectionMethod, DuplicateCandidate, DuplicateGroup, DuplicateStatus, GenerationStatus, MetadataSource, SourceReference, StackMemberRole, StackType, }, errors::DomainError, ports::{ AssetMetadataRepository, AssetRepository, AssetStackRepository, DerivativeRepository, DuplicateRepository, }, value_objects::{Checksum, DateTimeStamp, MetadataValue, StructuredData, SystemId}, }; use uuid::Uuid; // ────────────────────────────────────────────── // Asset // ────────────────────────────────────────────── #[derive(sqlx::FromRow)] struct AssetRow { asset_id: Uuid, volume_id: Uuid, relative_path: String, checksum: String, asset_type: String, mime_type: String, file_size: i64, is_processed: bool, owner_user_id: Uuid, created_at: DateTime, deleted_at: Option>, deleted_by: Option, } fn asset_type_from_str(s: &str) -> AssetType { match s { "image" => AssetType::Image, "video" => AssetType::Video, "live_photo" => AssetType::LivePhoto, _ => AssetType::Image, } } fn asset_type_to_str(t: &AssetType) -> &'static str { match t { AssetType::Image => "image", AssetType::Video => "video", AssetType::LivePhoto => "live_photo", } } impl TryFrom for Asset { type Error = DomainError; fn try_from(r: AssetRow) -> Result { Ok(Self { asset_id: SystemId::from_uuid(r.asset_id), source_reference: SourceReference { volume_id: SystemId::from_uuid(r.volume_id), relative_path: r.relative_path, checksum: Checksum::new(r.checksum)?, }, asset_type: asset_type_from_str(&r.asset_type), mime_type: r.mime_type, file_size: r.file_size as u64, is_processed: r.is_processed, owner_user_id: SystemId::from_uuid(r.owner_user_id), created_at: DateTimeStamp::from_datetime(r.created_at), deleted_at: r.deleted_at.map(DateTimeStamp::from_datetime), deleted_by: r.deleted_by.map(SystemId::from_uuid), }) } } pg_repo!(PostgresAssetRepository); fn build_search_where(filters: &AssetFilters) -> (String, bool) { let mut clause = String::new(); let mut idx = 2u32; if filters.asset_type.is_some() { clause.push_str(&format!(" AND a.asset_type = ${idx}")); idx += 1; } if filters.mime_type.is_some() { clause.push_str(&format!(" AND a.mime_type = ${idx}")); idx += 1; } if filters.date_from.is_some() { clause.push_str(&format!(" AND a.created_at >= ${idx}")); idx += 1; } if filters.date_to.is_some() { clause.push_str(&format!(" AND a.created_at <= ${idx}")); idx += 1; } if filters.is_processed.is_some() { clause.push_str(&format!(" AND a.is_processed = ${idx}")); idx += 1; } let has_tag = filters.tag_name.is_some(); if has_tag { clause.push_str(&format!(" AND t.name = ${idx}")); } (clause, has_tag) } fn count_filter_params(filters: &AssetFilters) -> u32 { let mut n = 0u32; if filters.asset_type.is_some() { n += 1; } if filters.mime_type.is_some() { n += 1; } if filters.date_from.is_some() { n += 1; } if filters.date_to.is_some() { n += 1; } if filters.is_processed.is_some() { n += 1; } if filters.tag_name.is_some() { n += 1; } n } fn bind_filters<'q, O>( mut query: sqlx::query::QueryAs<'q, sqlx::Postgres, O, sqlx::postgres::PgArguments>, filters: &'q AssetFilters, ) -> sqlx::query::QueryAs<'q, sqlx::Postgres, O, sqlx::postgres::PgArguments> { if let Some(ref t) = filters.asset_type { query = query.bind(asset_type_to_str(t)); } if let Some(ref m) = filters.mime_type { query = query.bind(m.as_str()); } if let Some(ref d) = filters.date_from { query = query.bind(d.as_datetime()); } if let Some(ref d) = filters.date_to { query = query.bind(d.as_datetime()); } if let Some(p) = filters.is_processed { query = query.bind(p); } if let Some(ref tag) = filters.tag_name { query = query.bind(tag.as_str()); } query } #[async_trait] impl AssetRepository for PostgresAssetRepository { async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { let row = sqlx::query_as::<_, AssetRow>( "SELECT asset_id, volume_id, relative_path, checksum, asset_type, mime_type, file_size, is_processed, owner_user_id, created_at, deleted_at, deleted_by FROM assets WHERE asset_id = $1", ) .bind(*id.as_uuid()) .fetch_optional(&self.pool) .await .map_pg()?; row.map(TryInto::try_into).transpose() } async fn find_by_checksum(&self, checksum: &Checksum) -> Result, DomainError> { let rows = sqlx::query_as::<_, AssetRow>( "SELECT asset_id, volume_id, relative_path, checksum, asset_type, mime_type, file_size, is_processed, owner_user_id, created_at, deleted_at, deleted_by FROM assets WHERE checksum = $1", ) .bind(checksum.as_str()) .fetch_all(&self.pool) .await .map_pg()?; rows.into_iter().map(TryInto::try_into).collect() } async fn find_by_owner( &self, owner_id: &SystemId, limit: u32, offset: u32, ) -> Result, DomainError> { let rows = sqlx::query_as::<_, AssetRow>( "SELECT a.asset_id, a.volume_id, a.relative_path, a.checksum, a.asset_type, a.mime_type, a.file_size, a.is_processed, a.owner_user_id, a.created_at, a.deleted_at, a.deleted_by FROM assets a LEFT JOIN asset_metadata am ON am.asset_id = a.asset_id AND am.metadata_source = 'exif_extracted' WHERE a.owner_user_id = $1 AND a.deleted_at IS NULL ORDER BY COALESCE( (am.data->>'DateTimeOriginal')::timestamptz, a.created_at ) DESC LIMIT $2 OFFSET $3", ) .bind(*owner_id.as_uuid()) .bind(limit as i64) .bind(offset as i64) .fetch_all(&self.pool) .await .map_pg()?; rows.into_iter().map(TryInto::try_into).collect() } async fn search( &self, owner_id: &SystemId, filters: &AssetFilters, limit: u32, offset: u32, ) -> Result, DomainError> { let (where_clause, has_tag) = build_search_where(filters); let mut sql = format!( "SELECT a.asset_id, a.volume_id, a.relative_path, a.checksum, a.asset_type, a.mime_type, a.file_size, a.is_processed, a.owner_user_id, a.created_at, a.deleted_at, a.deleted_by FROM assets a{} WHERE a.owner_user_id = $1 AND a.deleted_at IS NULL{}", if has_tag { " JOIN asset_tags at ON at.asset_id = a.asset_id JOIN tags t ON t.tag_id = at.tag_id" } else { "" }, where_clause ); let param_count = count_filter_params(filters); sql.push_str(&format!( " ORDER BY a.created_at DESC LIMIT ${} OFFSET ${}", param_count + 2, param_count + 3 )); let mut query = sqlx::query_as::<_, AssetRow>(&sql).bind(*owner_id.as_uuid()); query = bind_filters(query, filters); let rows = query .bind(limit as i64) .bind(offset as i64) .fetch_all(&self.pool) .await .map_pg()?; rows.into_iter().map(TryInto::try_into).collect() } async fn count_by_owner(&self, owner_id: &SystemId) -> Result { let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM assets WHERE owner_user_id = $1 AND deleted_at IS NULL") .bind(*owner_id.as_uuid()) .fetch_one(&self.pool) .await .map_pg()?; Ok(count as u64) } async fn count_search( &self, owner_id: &SystemId, filters: &AssetFilters, ) -> Result { let (where_clause, has_tag) = build_search_where(filters); let sql = format!( "SELECT COUNT(*) FROM assets a{} WHERE a.owner_user_id = $1 AND a.deleted_at IS NULL{}", if has_tag { " JOIN asset_tags at ON at.asset_id = a.asset_id JOIN tags t ON t.tag_id = at.tag_id" } else { "" }, where_clause ); let mut query = sqlx::query_as::<_, (i64,)>(&sql).bind(*owner_id.as_uuid()); if let Some(ref t) = filters.asset_type { query = query.bind(asset_type_to_str(t)); } if let Some(ref m) = filters.mime_type { query = query.bind(m.as_str()); } if let Some(ref d) = filters.date_from { query = query.bind(d.as_datetime()); } if let Some(ref d) = filters.date_to { query = query.bind(d.as_datetime()); } if let Some(p) = filters.is_processed { query = query.bind(p); } if let Some(ref tag) = filters.tag_name { query = query.bind(tag.as_str()); } let (count,) = query.fetch_one(&self.pool).await.map_pg()?; Ok(count as u64) } async fn date_summary( &self, owner_id: &SystemId, ) -> Result, DomainError> { let rows: Vec<(chrono::NaiveDate, i64)> = sqlx::query_as( "SELECT COALESCE( (am.data->>'DateTimeOriginal')::timestamptz, a.created_at )::date AS day, COUNT(*) AS cnt FROM assets a LEFT JOIN asset_metadata am ON am.asset_id = a.asset_id AND am.metadata_source = 'exif_extracted' WHERE a.owner_user_id = $1 AND a.deleted_at IS NULL GROUP BY day ORDER BY day DESC", ) .bind(*owner_id.as_uuid()) .fetch_all(&self.pool) .await .map_pg()?; Ok(rows.into_iter().map(|(d, c)| (d, c as u64)).collect()) } async fn save(&self, asset: &Asset) -> Result<(), DomainError> { sqlx::query( "INSERT INTO assets (asset_id, volume_id, relative_path, checksum, asset_type, mime_type, file_size, is_processed, owner_user_id, created_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ON CONFLICT (asset_id) DO UPDATE SET volume_id = EXCLUDED.volume_id, relative_path = EXCLUDED.relative_path, checksum = EXCLUDED.checksum, asset_type = EXCLUDED.asset_type, mime_type = EXCLUDED.mime_type, file_size = EXCLUDED.file_size, is_processed = EXCLUDED.is_processed, owner_user_id = EXCLUDED.owner_user_id", ) .bind(*asset.asset_id.as_uuid()) .bind(*asset.source_reference.volume_id.as_uuid()) .bind(&asset.source_reference.relative_path) .bind(asset.source_reference.checksum.as_str()) .bind(asset_type_to_str(&asset.asset_type)) .bind(&asset.mime_type) .bind(asset.file_size as i64) .bind(asset.is_processed) .bind(*asset.owner_user_id.as_uuid()) .bind(asset.created_at.as_datetime()) .execute(&self.pool) .await .map_pg()?; Ok(()) } async fn delete(&self, id: &SystemId) -> Result<(), DomainError> { sqlx::query("DELETE FROM assets WHERE asset_id = $1") .bind(*id.as_uuid()) .execute(&self.pool) .await .map_pg()?; Ok(()) } async fn soft_delete( &self, id: &SystemId, deleted_by: &SystemId, ) -> Result<(), DomainError> { sqlx::query( "UPDATE assets SET deleted_at = NOW(), deleted_by = $2 WHERE asset_id = $1", ) .bind(*id.as_uuid()) .bind(*deleted_by.as_uuid()) .execute(&self.pool) .await .map_pg()?; Ok(()) } async fn restore(&self, id: &SystemId) -> Result<(), DomainError> { sqlx::query( "UPDATE assets SET deleted_at = NULL, deleted_by = NULL WHERE asset_id = $1", ) .bind(*id.as_uuid()) .execute(&self.pool) .await .map_pg()?; Ok(()) } async fn find_trashed_before( &self, cutoff: chrono::DateTime, ) -> Result, DomainError> { let rows = sqlx::query_as::<_, AssetRow>( "SELECT asset_id, volume_id, relative_path, checksum, asset_type, mime_type, file_size, is_processed, owner_user_id, created_at, deleted_at, deleted_by FROM assets WHERE deleted_at IS NOT NULL AND deleted_at < $1", ) .bind(cutoff) .fetch_all(&self.pool) .await .map_pg()?; rows.into_iter().map(TryInto::try_into).collect() } async fn count_trashed(&self, owner_id: &SystemId) -> Result { let (count,): (i64,) = sqlx::query_as( "SELECT COUNT(*) FROM assets WHERE owner_user_id = $1 AND deleted_at IS NOT NULL", ) .bind(*owner_id.as_uuid()) .fetch_one(&self.pool) .await .map_pg()?; Ok(count as u64) } async fn find_trashed_by_owner( &self, owner_id: &SystemId, limit: u32, offset: u32, ) -> Result, DomainError> { let rows = sqlx::query_as::<_, AssetRow>( "SELECT asset_id, volume_id, relative_path, checksum, asset_type, mime_type, file_size, is_processed, owner_user_id, created_at, deleted_at, deleted_by FROM assets WHERE owner_user_id = $1 AND deleted_at IS NOT NULL ORDER BY deleted_at DESC LIMIT $2 OFFSET $3", ) .bind(*owner_id.as_uuid()) .bind(limit as i64) .bind(offset as i64) .fetch_all(&self.pool) .await .map_pg()?; rows.into_iter().map(TryInto::try_into).collect() } } // ────────────────────────────────────────────── // AssetMetadata // ────────────────────────────────────────────── #[derive(sqlx::FromRow)] struct AssetMetadataRow { asset_id: Uuid, metadata_source: String, data: serde_json::Value, updated_at: DateTime, } fn source_from_str(s: &str) -> MetadataSource { match s { "exif_extracted" => MetadataSource::ExifExtracted, "ai_generated" => MetadataSource::AiGenerated, "user_edited" => MetadataSource::UserEdited, _ => MetadataSource::ExifExtracted, } } fn source_to_str(s: &MetadataSource) -> &'static str { match s { MetadataSource::ExifExtracted => "exif_extracted", MetadataSource::AiGenerated => "ai_generated", MetadataSource::UserEdited => "user_edited", } } fn json_to_structured_data(v: serde_json::Value) -> StructuredData { let mut sd = StructuredData::new(); if let serde_json::Value::Object(map) = v { for (key, val) in map { let mv = match val { serde_json::Value::String(s) => MetadataValue::String(s), serde_json::Value::Number(n) => { if let Some(i) = n.as_i64() { MetadataValue::Integer(i) } else if let Some(f) = n.as_f64() { MetadataValue::Float(f) } else { MetadataValue::Null } } serde_json::Value::Bool(b) => MetadataValue::Boolean(b), serde_json::Value::Null => MetadataValue::Null, _ => MetadataValue::String(val.to_string()), }; sd.insert(key, mv); } } sd } fn structured_data_to_json(sd: &StructuredData) -> serde_json::Value { let mut map = serde_json::Map::new(); for (key, val) in sd.inner() { let jv = match val { MetadataValue::String(s) => serde_json::Value::String(s.clone()), MetadataValue::Integer(i) => serde_json::Value::Number((*i).into()), MetadataValue::Float(f) => serde_json::Number::from_f64(*f) .map(serde_json::Value::Number) .unwrap_or(serde_json::Value::Null), MetadataValue::Boolean(b) => serde_json::Value::Bool(*b), MetadataValue::Null => serde_json::Value::Null, }; map.insert(key.clone(), jv); } serde_json::Value::Object(map) } impl From for AssetMetadata { fn from(r: AssetMetadataRow) -> Self { Self { asset_id: SystemId::from_uuid(r.asset_id), metadata_source: source_from_str(&r.metadata_source), data: json_to_structured_data(r.data), updated_at: DateTimeStamp::from_datetime(r.updated_at), } } } pg_repo!(PostgresAssetMetadataRepository); #[async_trait] impl AssetMetadataRepository for PostgresAssetMetadataRepository { async fn find_by_asset(&self, asset_id: &SystemId) -> Result, DomainError> { let rows = sqlx::query_as::<_, AssetMetadataRow>( "SELECT asset_id, metadata_source, data, updated_at FROM asset_metadata WHERE asset_id = $1", ) .bind(*asset_id.as_uuid()) .fetch_all(&self.pool) .await .map_pg()?; Ok(rows.into_iter().map(Into::into).collect()) } async fn find_by_assets( &self, asset_ids: &[SystemId], ) -> Result, DomainError> { let uuids: Vec = asset_ids.iter().map(|id| *id.as_uuid()).collect(); let rows = sqlx::query_as::<_, AssetMetadataRow>( "SELECT asset_id, metadata_source, data, updated_at FROM asset_metadata WHERE asset_id = ANY($1)", ) .bind(&uuids) .fetch_all(&self.pool) .await .map_pg()?; Ok(rows.into_iter().map(Into::into).collect()) } async fn find_by_asset_and_source( &self, asset_id: &SystemId, source: MetadataSource, ) -> Result, DomainError> { let row = sqlx::query_as::<_, AssetMetadataRow>( "SELECT asset_id, metadata_source, data, updated_at FROM asset_metadata WHERE asset_id = $1 AND metadata_source = $2", ) .bind(*asset_id.as_uuid()) .bind(source_to_str(&source)) .fetch_optional(&self.pool) .await .map_pg()?; Ok(row.map(Into::into)) } async fn save(&self, metadata: &AssetMetadata) -> Result<(), DomainError> { sqlx::query( "INSERT INTO asset_metadata (asset_id, metadata_source, data, updated_at) VALUES ($1, $2, $3, $4) ON CONFLICT (asset_id, metadata_source) DO UPDATE SET data = EXCLUDED.data, updated_at = EXCLUDED.updated_at", ) .bind(*metadata.asset_id.as_uuid()) .bind(source_to_str(&metadata.metadata_source)) .bind(structured_data_to_json(&metadata.data)) .bind(metadata.updated_at.as_datetime()) .execute(&self.pool) .await .map_pg()?; Ok(()) } async fn delete_by_asset_and_source( &self, asset_id: &SystemId, source: MetadataSource, ) -> Result<(), DomainError> { sqlx::query("DELETE FROM asset_metadata WHERE asset_id = $1 AND metadata_source = $2") .bind(*asset_id.as_uuid()) .bind(source_to_str(&source)) .execute(&self.pool) .await .map_pg()?; Ok(()) } } // ────────────────────────────────────────────── // Duplicate // ────────────────────────────────────────────── #[derive(sqlx::FromRow)] struct GroupRow { group_id: Uuid, detection_method: String, status: String, candidates: serde_json::Value, } fn detection_from_str(s: &str) -> DetectionMethod { match s { "perceptual_hash" => DetectionMethod::PerceptualHash, _ => DetectionMethod::ExactHash, } } fn detection_to_str(d: &DetectionMethod) -> &'static str { match d { DetectionMethod::ExactHash => "exact_hash", DetectionMethod::PerceptualHash => "perceptual_hash", } } fn dup_status_from_str(s: &str) -> DuplicateStatus { match s { "resolved" => DuplicateStatus::Resolved, _ => DuplicateStatus::Unresolved, } } fn dup_status_to_str(s: &DuplicateStatus) -> &'static str { match s { DuplicateStatus::Unresolved => "unresolved", DuplicateStatus::Resolved => "resolved", } } #[derive(serde::Serialize, serde::Deserialize)] struct CandidateJson { asset_id: Uuid, similarity_score: f64, } fn candidates_from_json(v: serde_json::Value) -> Vec { let arr: Vec = serde_json::from_value(v).unwrap_or_default(); arr.into_iter() .map(|c| DuplicateCandidate { asset_id: SystemId::from_uuid(c.asset_id), similarity_score: c.similarity_score, }) .collect() } fn candidates_to_json(candidates: &[DuplicateCandidate]) -> serde_json::Value { let arr: Vec = candidates .iter() .map(|c| CandidateJson { asset_id: *c.asset_id.as_uuid(), similarity_score: c.similarity_score, }) .collect(); serde_json::to_value(arr).unwrap_or(serde_json::Value::Array(vec![])) } impl From for DuplicateGroup { fn from(r: GroupRow) -> Self { Self { group_id: SystemId::from_uuid(r.group_id), detection_method: detection_from_str(&r.detection_method), status: dup_status_from_str(&r.status), candidates: candidates_from_json(r.candidates), } } } pg_repo!(PostgresDuplicateRepository); #[async_trait] impl DuplicateRepository for PostgresDuplicateRepository { async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { let row = sqlx::query_as::<_, GroupRow>( "SELECT group_id, detection_method, status, candidates FROM duplicate_groups WHERE group_id = $1", ) .bind(*id.as_uuid()) .fetch_optional(&self.pool) .await .map_pg()?; Ok(row.map(Into::into)) } async fn find_unresolved( &self, limit: u32, offset: u32, ) -> Result, DomainError> { let rows = sqlx::query_as::<_, GroupRow>( "SELECT group_id, detection_method, status, candidates FROM duplicate_groups WHERE status = 'unresolved' ORDER BY group_id LIMIT $1 OFFSET $2", ) .bind(limit as i64) .bind(offset as i64) .fetch_all(&self.pool) .await .map_pg()?; Ok(rows.into_iter().map(Into::into).collect()) } async fn find_by_asset(&self, asset_id: &SystemId) -> Result, DomainError> { let rows = sqlx::query_as::<_, GroupRow>( "SELECT group_id, detection_method, status, candidates FROM duplicate_groups WHERE candidates @> $1::jsonb", ) .bind(serde_json::json!([{"asset_id": asset_id.as_uuid()}])) .fetch_all(&self.pool) .await .map_pg()?; Ok(rows.into_iter().map(Into::into).collect()) } async fn save(&self, group: &DuplicateGroup) -> Result<(), DomainError> { sqlx::query( "INSERT INTO duplicate_groups (group_id, detection_method, status, candidates) VALUES ($1, $2, $3, $4) ON CONFLICT (group_id) DO UPDATE SET detection_method = EXCLUDED.detection_method, status = EXCLUDED.status, candidates = EXCLUDED.candidates", ) .bind(*group.group_id.as_uuid()) .bind(detection_to_str(&group.detection_method)) .bind(dup_status_to_str(&group.status)) .bind(candidates_to_json(&group.candidates)) .execute(&self.pool) .await .map_pg()?; Ok(()) } } // ── DerivativeRepository ────────────────────────────────────────────── #[derive(sqlx::FromRow)] struct DerivativeRow { derivative_id: Uuid, parent_asset_id: Uuid, profile_type: String, storage_path: String, mime_type: String, file_size: i64, width: i32, height: i32, generation_status: String, } fn profile_from_str(s: &str) -> DerivativeProfile { match s { "thumbnail_square" => DerivativeProfile::ThumbnailSquare, "thumbnail_large" => DerivativeProfile::ThumbnailLarge, "web_optimized" => DerivativeProfile::WebOptimized, "video_sd" => DerivativeProfile::VideoSd, _ => DerivativeProfile::ThumbnailSquare, } } fn profile_to_str(p: &DerivativeProfile) -> &'static str { match p { DerivativeProfile::ThumbnailSquare => "thumbnail_square", DerivativeProfile::ThumbnailLarge => "thumbnail_large", DerivativeProfile::WebOptimized => "web_optimized", DerivativeProfile::VideoSd => "video_sd", } } fn gen_status_from_str(s: &str) -> GenerationStatus { match s { "pending" => GenerationStatus::Pending, "ready" => GenerationStatus::Ready, "failed" => GenerationStatus::Failed, _ => GenerationStatus::Pending, } } fn gen_status_to_str(s: &GenerationStatus) -> &'static str { match s { GenerationStatus::Pending => "pending", GenerationStatus::Ready => "ready", GenerationStatus::Failed => "failed", } } impl From for DerivativeAsset { fn from(r: DerivativeRow) -> Self { Self { derivative_id: SystemId::from_uuid(r.derivative_id), parent_asset_id: SystemId::from_uuid(r.parent_asset_id), profile_type: profile_from_str(&r.profile_type), storage_path: r.storage_path, mime_type: r.mime_type, file_size: r.file_size as u64, dimensions: (r.width as u32, r.height as u32), generation_status: gen_status_from_str(&r.generation_status), } } } pg_repo!(PostgresDerivativeRepository); #[async_trait] impl DerivativeRepository for PostgresDerivativeRepository { async fn find_by_asset( &self, asset_id: &SystemId, ) -> Result, DomainError> { let rows = sqlx::query_as::<_, DerivativeRow>( "SELECT derivative_id, parent_asset_id, profile_type, storage_path, mime_type, file_size, width, height, generation_status FROM derivatives WHERE parent_asset_id = $1", ) .bind(*asset_id.as_uuid()) .fetch_all(&self.pool) .await .map_pg()?; Ok(rows.into_iter().map(Into::into).collect()) } async fn find_by_asset_and_profile( &self, asset_id: &SystemId, profile: DerivativeProfile, ) -> Result, DomainError> { let row = sqlx::query_as::<_, DerivativeRow>( "SELECT derivative_id, parent_asset_id, profile_type, storage_path, mime_type, file_size, width, height, generation_status FROM derivatives WHERE parent_asset_id = $1 AND profile_type = $2", ) .bind(*asset_id.as_uuid()) .bind(profile_to_str(&profile)) .fetch_optional(&self.pool) .await .map_pg()?; Ok(row.map(Into::into)) } async fn save(&self, d: &DerivativeAsset) -> Result<(), DomainError> { sqlx::query( "INSERT INTO derivatives (derivative_id, parent_asset_id, profile_type, storage_path, mime_type, file_size, width, height, generation_status) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (derivative_id) DO UPDATE SET storage_path = EXCLUDED.storage_path, mime_type = EXCLUDED.mime_type, file_size = EXCLUDED.file_size, width = EXCLUDED.width, height = EXCLUDED.height, generation_status = EXCLUDED.generation_status", ) .bind(*d.derivative_id.as_uuid()) .bind(*d.parent_asset_id.as_uuid()) .bind(profile_to_str(&d.profile_type)) .bind(&d.storage_path) .bind(&d.mime_type) .bind(d.file_size as i64) .bind(d.dimensions.0 as i32) .bind(d.dimensions.1 as i32) .bind(gen_status_to_str(&d.generation_status)) .execute(&self.pool) .await .map_pg()?; Ok(()) } async fn delete(&self, id: &SystemId) -> Result<(), DomainError> { sqlx::query("DELETE FROM derivatives WHERE derivative_id = $1") .bind(*id.as_uuid()) .execute(&self.pool) .await .map_pg()?; Ok(()) } } // ── AssetStack ────────────────────────────────────────────────────── #[derive(sqlx::FromRow)] struct StackRow { stack_id: Uuid, stack_type: String, primary_asset_id: Uuid, owner_user_id: Uuid, members: serde_json::Value, } fn stack_type_from_str(s: &str) -> StackType { match s { "live_photo" => StackType::LivePhoto, "format_pair" => StackType::FormatPair, "burst_sequence" => StackType::BurstSequence, "exposure_bracket" => StackType::ExposureBracket, "manual_group" => StackType::ManualGroup, _ => StackType::ManualGroup, } } fn stack_type_to_str(t: &StackType) -> &'static str { match t { StackType::LivePhoto => "live_photo", StackType::FormatPair => "format_pair", StackType::BurstSequence => "burst_sequence", StackType::ExposureBracket => "exposure_bracket", StackType::ManualGroup => "manual_group", } } fn member_role_from_str(s: &str) -> StackMemberRole { match s { "primary_display" => StackMemberRole::PrimaryDisplay, "high_res_source" => StackMemberRole::HighResSource, "motion_clip" => StackMemberRole::MotionClip, "alternate_frame" => StackMemberRole::AlternateFrame, _ => StackMemberRole::AlternateFrame, } } fn member_role_to_str(r: &StackMemberRole) -> &'static str { match r { StackMemberRole::PrimaryDisplay => "primary_display", StackMemberRole::HighResSource => "high_res_source", StackMemberRole::MotionClip => "motion_clip", StackMemberRole::AlternateFrame => "alternate_frame", } } #[derive(serde::Serialize, serde::Deserialize)] struct MemberJson { asset_id: Uuid, role: String, sort_order: u32, } fn members_from_json(v: serde_json::Value) -> Vec { let arr: Vec = serde_json::from_value(v).unwrap_or_default(); arr.into_iter() .map(|m| AssetStackMember { asset_id: SystemId::from_uuid(m.asset_id), role: member_role_from_str(&m.role), sort_order: m.sort_order, }) .collect() } fn members_to_json(members: &[AssetStackMember]) -> serde_json::Value { let arr: Vec = members .iter() .map(|m| MemberJson { asset_id: *m.asset_id.as_uuid(), role: member_role_to_str(&m.role).to_string(), sort_order: m.sort_order, }) .collect(); serde_json::to_value(arr).unwrap_or(serde_json::Value::Array(vec![])) } impl From for AssetStack { fn from(r: StackRow) -> Self { Self { stack_id: SystemId::from_uuid(r.stack_id), stack_type: stack_type_from_str(&r.stack_type), primary_asset_id: SystemId::from_uuid(r.primary_asset_id), owner_user_id: SystemId::from_uuid(r.owner_user_id), members: members_from_json(r.members), } } } pg_repo!(PostgresAssetStackRepository); #[async_trait] impl AssetStackRepository for PostgresAssetStackRepository { async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { let row = sqlx::query_as::<_, StackRow>( "SELECT stack_id, stack_type, primary_asset_id, owner_user_id, members FROM asset_stacks WHERE stack_id = $1", ) .bind(*id.as_uuid()) .fetch_optional(&self.pool) .await .map_pg()?; Ok(row.map(Into::into)) } async fn find_by_owner(&self, owner_id: &SystemId) -> Result, DomainError> { let rows = sqlx::query_as::<_, StackRow>( "SELECT stack_id, stack_type, primary_asset_id, owner_user_id, members FROM asset_stacks WHERE owner_user_id = $1", ) .bind(*owner_id.as_uuid()) .fetch_all(&self.pool) .await .map_pg()?; Ok(rows.into_iter().map(Into::into).collect()) } async fn find_by_asset(&self, asset_id: &SystemId) -> Result, DomainError> { let rows = sqlx::query_as::<_, StackRow>( "SELECT stack_id, stack_type, primary_asset_id, owner_user_id, members FROM asset_stacks WHERE members @> $1::jsonb", ) .bind(serde_json::json!([{"asset_id": asset_id.as_uuid()}])) .fetch_all(&self.pool) .await .map_pg()?; Ok(rows.into_iter().map(Into::into).collect()) } async fn save(&self, stack: &AssetStack) -> Result<(), DomainError> { sqlx::query( "INSERT INTO asset_stacks (stack_id, stack_type, primary_asset_id, owner_user_id, members) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (stack_id) DO UPDATE SET stack_type = EXCLUDED.stack_type, primary_asset_id = EXCLUDED.primary_asset_id, members = EXCLUDED.members", ) .bind(*stack.stack_id.as_uuid()) .bind(stack_type_to_str(&stack.stack_type)) .bind(*stack.primary_asset_id.as_uuid()) .bind(*stack.owner_user_id.as_uuid()) .bind(members_to_json(&stack.members)) .execute(&self.pool) .await .map_pg()?; Ok(()) } async fn delete(&self, id: &SystemId) -> Result<(), DomainError> { sqlx::query("DELETE FROM asset_stacks WHERE stack_id = $1") .bind(*id.as_uuid()) .execute(&self.pool) .await .map_pg()?; Ok(()) } }