From e082387f6e02029ab29d00d907538ae48eace4c6 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Sun, 31 May 2026 11:11:46 +0200 Subject: [PATCH] refactor: group postgres adapters by bounded context --- .../postgres/src/asset_metadata_repository.rs | 164 ----- .../adapters/postgres/src/asset_repository.rs | 165 ----- crates/adapters/postgres/src/catalog/mod.rs | 478 ++++++++++++++ .../postgres/src/duplicate_repository.rs | 153 ----- .../{user_repository.rs => identity/mod.rs} | 0 .../postgres/src/ingest_session_repository.rs | 126 ---- .../postgres/src/job_batch_repository.rs | 99 --- .../adapters/postgres/src/job_repository.rs | 209 ------ crates/adapters/postgres/src/lib.rs | 48 +- .../postgres/src/library_path_repository.rs | 136 ---- .../mod.rs} | 177 ++++- .../postgres/src/pipeline_repository.rs | 126 ---- .../postgres/src/plugin_repository.rs | 127 ---- .../adapters/postgres/src/processing/mod.rs | 532 +++++++++++++++ .../adapters/postgres/src/quota_repository.rs | 261 -------- .../{share_repository.rs => sharing/mod.rs} | 96 ++- .../{sidecar_repository.rs => sidecar/mod.rs} | 0 crates/adapters/postgres/src/storage/mod.rs | 607 ++++++++++++++++++ .../postgres/src/storage_volume_repository.rs | 96 --- .../adapters/postgres/src/tag_repository.rs | 174 ----- .../src/visibility_filter_repository.rs | 83 --- 21 files changed, 1891 insertions(+), 1966 deletions(-) delete mode 100644 crates/adapters/postgres/src/asset_metadata_repository.rs delete mode 100644 crates/adapters/postgres/src/asset_repository.rs create mode 100644 crates/adapters/postgres/src/catalog/mod.rs delete mode 100644 crates/adapters/postgres/src/duplicate_repository.rs rename crates/adapters/postgres/src/{user_repository.rs => identity/mod.rs} (100%) delete mode 100644 crates/adapters/postgres/src/ingest_session_repository.rs delete mode 100644 crates/adapters/postgres/src/job_batch_repository.rs delete mode 100644 crates/adapters/postgres/src/job_repository.rs delete mode 100644 crates/adapters/postgres/src/library_path_repository.rs rename crates/adapters/postgres/src/{album_repository.rs => organization/mod.rs} (51%) delete mode 100644 crates/adapters/postgres/src/pipeline_repository.rs delete mode 100644 crates/adapters/postgres/src/plugin_repository.rs create mode 100644 crates/adapters/postgres/src/processing/mod.rs delete mode 100644 crates/adapters/postgres/src/quota_repository.rs rename crates/adapters/postgres/src/{share_repository.rs => sharing/mod.rs} (81%) rename crates/adapters/postgres/src/{sidecar_repository.rs => sidecar/mod.rs} (100%) create mode 100644 crates/adapters/postgres/src/storage/mod.rs delete mode 100644 crates/adapters/postgres/src/storage_volume_repository.rs delete mode 100644 crates/adapters/postgres/src/tag_repository.rs delete mode 100644 crates/adapters/postgres/src/visibility_filter_repository.rs diff --git a/crates/adapters/postgres/src/asset_metadata_repository.rs b/crates/adapters/postgres/src/asset_metadata_repository.rs deleted file mode 100644 index 5fdb347..0000000 --- a/crates/adapters/postgres/src/asset_metadata_repository.rs +++ /dev/null @@ -1,164 +0,0 @@ -use crate::db::PgPool; -use async_trait::async_trait; -use chrono::{DateTime, Utc}; -use domain::{ - entities::{AssetMetadata, MetadataSource}, - errors::DomainError, - ports::AssetMetadataRepository, - value_objects::{DateTimeStamp, MetadataValue, StructuredData, SystemId}, -}; -use uuid::Uuid; - -#[derive(sqlx::FromRow)] -struct AssetMetadataRow { - asset_id: Uuid, - metadata_source: String, - data: serde_json::Value, - updated_at: DateTime, -} - -fn source_from_str(s: &str) -> MetadataSource { - match s { - "exif_extracted" => MetadataSource::ExifExtracted, - "ai_generated" => MetadataSource::AiGenerated, - "user_edited" => MetadataSource::UserEdited, - _ => MetadataSource::ExifExtracted, - } -} - -fn source_to_str(s: &MetadataSource) -> &'static str { - match s { - MetadataSource::ExifExtracted => "exif_extracted", - MetadataSource::AiGenerated => "ai_generated", - MetadataSource::UserEdited => "user_edited", - } -} - -fn json_to_structured_data(v: serde_json::Value) -> StructuredData { - let mut sd = StructuredData::new(); - if let serde_json::Value::Object(map) = v { - for (key, val) in map { - let mv = match val { - serde_json::Value::String(s) => MetadataValue::String(s), - serde_json::Value::Number(n) => { - if let Some(i) = n.as_i64() { - MetadataValue::Integer(i) - } else if let Some(f) = n.as_f64() { - MetadataValue::Float(f) - } else { - MetadataValue::Null - } - } - serde_json::Value::Bool(b) => MetadataValue::Boolean(b), - serde_json::Value::Null => MetadataValue::Null, - _ => MetadataValue::String(val.to_string()), - }; - sd.insert(key, mv); - } - } - sd -} - -fn structured_data_to_json(sd: &StructuredData) -> serde_json::Value { - let mut map = serde_json::Map::new(); - for (key, val) in sd.inner() { - let jv = match val { - MetadataValue::String(s) => serde_json::Value::String(s.clone()), - MetadataValue::Integer(i) => serde_json::Value::Number((*i).into()), - MetadataValue::Float(f) => serde_json::Number::from_f64(*f) - .map(serde_json::Value::Number) - .unwrap_or(serde_json::Value::Null), - MetadataValue::Boolean(b) => serde_json::Value::Bool(*b), - MetadataValue::Null => serde_json::Value::Null, - }; - map.insert(key.clone(), jv); - } - serde_json::Value::Object(map) -} - -impl From for AssetMetadata { - fn from(r: AssetMetadataRow) -> Self { - Self { - asset_id: SystemId::from_uuid(r.asset_id), - metadata_source: source_from_str(&r.metadata_source), - data: json_to_structured_data(r.data), - updated_at: DateTimeStamp::from_datetime(r.updated_at), - } - } -} - -pub struct PostgresAssetMetadataRepository { - pool: PgPool, -} - -impl PostgresAssetMetadataRepository { - pub fn new(pool: PgPool) -> Self { - Self { pool } - } -} - -#[async_trait] -impl AssetMetadataRepository for PostgresAssetMetadataRepository { - async fn find_by_asset(&self, asset_id: &SystemId) -> Result, DomainError> { - let rows = sqlx::query_as::<_, AssetMetadataRow>( - "SELECT asset_id, metadata_source, data, updated_at - FROM asset_metadata WHERE asset_id = $1", - ) - .bind(*asset_id.as_uuid()) - .fetch_all(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - - Ok(rows.into_iter().map(Into::into).collect()) - } - - async fn find_by_asset_and_source( - &self, - asset_id: &SystemId, - source: MetadataSource, - ) -> Result, DomainError> { - let row = sqlx::query_as::<_, AssetMetadataRow>( - "SELECT asset_id, metadata_source, data, updated_at - FROM asset_metadata WHERE asset_id = $1 AND metadata_source = $2", - ) - .bind(*asset_id.as_uuid()) - .bind(source_to_str(&source)) - .fetch_optional(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - - Ok(row.map(Into::into)) - } - - async fn save(&self, metadata: &AssetMetadata) -> Result<(), DomainError> { - sqlx::query( - "INSERT INTO asset_metadata (asset_id, metadata_source, data, updated_at) - VALUES ($1, $2, $3, $4) - ON CONFLICT (asset_id, metadata_source) DO UPDATE SET - data = EXCLUDED.data, - updated_at = EXCLUDED.updated_at", - ) - .bind(*metadata.asset_id.as_uuid()) - .bind(source_to_str(&metadata.metadata_source)) - .bind(structured_data_to_json(&metadata.data)) - .bind(metadata.updated_at.as_datetime()) - .execute(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - Ok(()) - } - - async fn delete_by_asset_and_source( - &self, - asset_id: &SystemId, - source: MetadataSource, - ) -> Result<(), DomainError> { - sqlx::query("DELETE FROM asset_metadata WHERE asset_id = $1 AND metadata_source = $2") - .bind(*asset_id.as_uuid()) - .bind(source_to_str(&source)) - .execute(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - Ok(()) - } -} diff --git a/crates/adapters/postgres/src/asset_repository.rs b/crates/adapters/postgres/src/asset_repository.rs deleted file mode 100644 index 851b13e..0000000 --- a/crates/adapters/postgres/src/asset_repository.rs +++ /dev/null @@ -1,165 +0,0 @@ -use crate::db::PgPool; -use async_trait::async_trait; -use chrono::{DateTime, Utc}; -use domain::{ - entities::{Asset, AssetType, SourceReference}, - errors::DomainError, - ports::AssetRepository, - value_objects::{Checksum, DateTimeStamp, SystemId}, -}; -use uuid::Uuid; - -#[derive(sqlx::FromRow)] -struct AssetRow { - asset_id: Uuid, - volume_id: Uuid, - relative_path: String, - checksum: String, - asset_type: String, - mime_type: String, - file_size: i64, - is_processed: bool, - owner_user_id: Uuid, - created_at: DateTime, -} - -fn asset_type_from_str(s: &str) -> AssetType { - match s { - "image" => AssetType::Image, - "video" => AssetType::Video, - "live_photo" => AssetType::LivePhoto, - _ => AssetType::Image, - } -} - -fn asset_type_to_str(t: &AssetType) -> &'static str { - match t { - AssetType::Image => "image", - AssetType::Video => "video", - AssetType::LivePhoto => "live_photo", - } -} - -impl TryFrom for Asset { - type Error = DomainError; - fn try_from(r: AssetRow) -> Result { - Ok(Self { - asset_id: SystemId::from_uuid(r.asset_id), - source_reference: SourceReference { - volume_id: SystemId::from_uuid(r.volume_id), - relative_path: r.relative_path, - checksum: Checksum::new(r.checksum)?, - }, - asset_type: asset_type_from_str(&r.asset_type), - mime_type: r.mime_type, - file_size: r.file_size as u64, - is_processed: r.is_processed, - owner_user_id: SystemId::from_uuid(r.owner_user_id), - created_at: DateTimeStamp::from_datetime(r.created_at), - }) - } -} - -pub struct PostgresAssetRepository { - pool: PgPool, -} - -impl PostgresAssetRepository { - pub fn new(pool: PgPool) -> Self { - Self { pool } - } -} - -#[async_trait] -impl AssetRepository for PostgresAssetRepository { - async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { - let row = sqlx::query_as::<_, AssetRow>( - "SELECT asset_id, volume_id, relative_path, checksum, asset_type, mime_type, - file_size, is_processed, owner_user_id, created_at - FROM assets WHERE asset_id = $1", - ) - .bind(*id.as_uuid()) - .fetch_optional(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - - row.map(TryInto::try_into).transpose() - } - - async fn find_by_checksum(&self, checksum: &Checksum) -> Result, DomainError> { - let rows = sqlx::query_as::<_, AssetRow>( - "SELECT asset_id, volume_id, relative_path, checksum, asset_type, mime_type, - file_size, is_processed, owner_user_id, created_at - FROM assets WHERE checksum = $1", - ) - .bind(checksum.as_str()) - .fetch_all(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - - rows.into_iter().map(TryInto::try_into).collect() - } - - async fn find_by_owner( - &self, - owner_id: &SystemId, - limit: u32, - offset: u32, - ) -> Result, DomainError> { - let rows = sqlx::query_as::<_, AssetRow>( - "SELECT asset_id, volume_id, relative_path, checksum, asset_type, mime_type, - file_size, is_processed, owner_user_id, created_at - FROM assets WHERE owner_user_id = $1 - ORDER BY created_at DESC - LIMIT $2 OFFSET $3", - ) - .bind(*owner_id.as_uuid()) - .bind(limit as i64) - .bind(offset as i64) - .fetch_all(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - - rows.into_iter().map(TryInto::try_into).collect() - } - - async fn save(&self, asset: &Asset) -> Result<(), DomainError> { - sqlx::query( - "INSERT INTO assets (asset_id, volume_id, relative_path, checksum, asset_type, - mime_type, file_size, is_processed, owner_user_id, created_at) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) - ON CONFLICT (asset_id) DO UPDATE SET - volume_id = EXCLUDED.volume_id, - relative_path = EXCLUDED.relative_path, - checksum = EXCLUDED.checksum, - asset_type = EXCLUDED.asset_type, - mime_type = EXCLUDED.mime_type, - file_size = EXCLUDED.file_size, - is_processed = EXCLUDED.is_processed, - owner_user_id = EXCLUDED.owner_user_id", - ) - .bind(*asset.asset_id.as_uuid()) - .bind(*asset.source_reference.volume_id.as_uuid()) - .bind(&asset.source_reference.relative_path) - .bind(asset.source_reference.checksum.as_str()) - .bind(asset_type_to_str(&asset.asset_type)) - .bind(&asset.mime_type) - .bind(asset.file_size as i64) - .bind(asset.is_processed) - .bind(*asset.owner_user_id.as_uuid()) - .bind(asset.created_at.as_datetime()) - .execute(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - Ok(()) - } - - async fn delete(&self, id: &SystemId) -> Result<(), DomainError> { - sqlx::query("DELETE FROM assets WHERE asset_id = $1") - .bind(*id.as_uuid()) - .execute(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - Ok(()) - } -} diff --git a/crates/adapters/postgres/src/catalog/mod.rs b/crates/adapters/postgres/src/catalog/mod.rs new file mode 100644 index 0000000..d33634f --- /dev/null +++ b/crates/adapters/postgres/src/catalog/mod.rs @@ -0,0 +1,478 @@ +use crate::db::PgPool; +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use domain::{ + entities::{ + Asset, AssetMetadata, AssetType, DetectionMethod, DuplicateCandidate, DuplicateGroup, + DuplicateStatus, MetadataSource, SourceReference, + }, + errors::DomainError, + ports::{AssetMetadataRepository, AssetRepository, DuplicateRepository}, + value_objects::{Checksum, DateTimeStamp, MetadataValue, StructuredData, SystemId}, +}; +use uuid::Uuid; + +// ────────────────────────────────────────────── +// Asset +// ────────────────────────────────────────────── + +#[derive(sqlx::FromRow)] +struct AssetRow { + asset_id: Uuid, + volume_id: Uuid, + relative_path: String, + checksum: String, + asset_type: String, + mime_type: String, + file_size: i64, + is_processed: bool, + owner_user_id: Uuid, + created_at: DateTime, +} + +fn asset_type_from_str(s: &str) -> AssetType { + match s { + "image" => AssetType::Image, + "video" => AssetType::Video, + "live_photo" => AssetType::LivePhoto, + _ => AssetType::Image, + } +} + +fn asset_type_to_str(t: &AssetType) -> &'static str { + match t { + AssetType::Image => "image", + AssetType::Video => "video", + AssetType::LivePhoto => "live_photo", + } +} + +impl TryFrom for Asset { + type Error = DomainError; + fn try_from(r: AssetRow) -> Result { + Ok(Self { + asset_id: SystemId::from_uuid(r.asset_id), + source_reference: SourceReference { + volume_id: SystemId::from_uuid(r.volume_id), + relative_path: r.relative_path, + checksum: Checksum::new(r.checksum)?, + }, + asset_type: asset_type_from_str(&r.asset_type), + mime_type: r.mime_type, + file_size: r.file_size as u64, + is_processed: r.is_processed, + owner_user_id: SystemId::from_uuid(r.owner_user_id), + created_at: DateTimeStamp::from_datetime(r.created_at), + }) + } +} + +pub struct PostgresAssetRepository { + pool: PgPool, +} + +impl PostgresAssetRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl AssetRepository for PostgresAssetRepository { + async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { + let row = sqlx::query_as::<_, AssetRow>( + "SELECT asset_id, volume_id, relative_path, checksum, asset_type, mime_type, + file_size, is_processed, owner_user_id, created_at + FROM assets WHERE asset_id = $1", + ) + .bind(*id.as_uuid()) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + row.map(TryInto::try_into).transpose() + } + + async fn find_by_checksum(&self, checksum: &Checksum) -> Result, DomainError> { + let rows = sqlx::query_as::<_, AssetRow>( + "SELECT asset_id, volume_id, relative_path, checksum, asset_type, mime_type, + file_size, is_processed, owner_user_id, created_at + FROM assets WHERE checksum = $1", + ) + .bind(checksum.as_str()) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + rows.into_iter().map(TryInto::try_into).collect() + } + + async fn find_by_owner( + &self, + owner_id: &SystemId, + limit: u32, + offset: u32, + ) -> Result, DomainError> { + let rows = sqlx::query_as::<_, AssetRow>( + "SELECT asset_id, volume_id, relative_path, checksum, asset_type, mime_type, + file_size, is_processed, owner_user_id, created_at + FROM assets WHERE owner_user_id = $1 + ORDER BY created_at DESC + LIMIT $2 OFFSET $3", + ) + .bind(*owner_id.as_uuid()) + .bind(limit as i64) + .bind(offset as i64) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + rows.into_iter().map(TryInto::try_into).collect() + } + + async fn save(&self, asset: &Asset) -> Result<(), DomainError> { + sqlx::query( + "INSERT INTO assets (asset_id, volume_id, relative_path, checksum, asset_type, + mime_type, file_size, is_processed, owner_user_id, created_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + ON CONFLICT (asset_id) DO UPDATE SET + volume_id = EXCLUDED.volume_id, + relative_path = EXCLUDED.relative_path, + checksum = EXCLUDED.checksum, + asset_type = EXCLUDED.asset_type, + mime_type = EXCLUDED.mime_type, + file_size = EXCLUDED.file_size, + is_processed = EXCLUDED.is_processed, + owner_user_id = EXCLUDED.owner_user_id", + ) + .bind(*asset.asset_id.as_uuid()) + .bind(*asset.source_reference.volume_id.as_uuid()) + .bind(&asset.source_reference.relative_path) + .bind(asset.source_reference.checksum.as_str()) + .bind(asset_type_to_str(&asset.asset_type)) + .bind(&asset.mime_type) + .bind(asset.file_size as i64) + .bind(asset.is_processed) + .bind(*asset.owner_user_id.as_uuid()) + .bind(asset.created_at.as_datetime()) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } + + async fn delete(&self, id: &SystemId) -> Result<(), DomainError> { + sqlx::query("DELETE FROM assets WHERE asset_id = $1") + .bind(*id.as_uuid()) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } +} + +// ────────────────────────────────────────────── +// AssetMetadata +// ────────────────────────────────────────────── + +#[derive(sqlx::FromRow)] +struct AssetMetadataRow { + asset_id: Uuid, + metadata_source: String, + data: serde_json::Value, + updated_at: DateTime, +} + +fn source_from_str(s: &str) -> MetadataSource { + match s { + "exif_extracted" => MetadataSource::ExifExtracted, + "ai_generated" => MetadataSource::AiGenerated, + "user_edited" => MetadataSource::UserEdited, + _ => MetadataSource::ExifExtracted, + } +} + +fn source_to_str(s: &MetadataSource) -> &'static str { + match s { + MetadataSource::ExifExtracted => "exif_extracted", + MetadataSource::AiGenerated => "ai_generated", + MetadataSource::UserEdited => "user_edited", + } +} + +fn json_to_structured_data(v: serde_json::Value) -> StructuredData { + let mut sd = StructuredData::new(); + if let serde_json::Value::Object(map) = v { + for (key, val) in map { + let mv = match val { + serde_json::Value::String(s) => MetadataValue::String(s), + serde_json::Value::Number(n) => { + if let Some(i) = n.as_i64() { + MetadataValue::Integer(i) + } else if let Some(f) = n.as_f64() { + MetadataValue::Float(f) + } else { + MetadataValue::Null + } + } + serde_json::Value::Bool(b) => MetadataValue::Boolean(b), + serde_json::Value::Null => MetadataValue::Null, + _ => MetadataValue::String(val.to_string()), + }; + sd.insert(key, mv); + } + } + sd +} + +fn structured_data_to_json(sd: &StructuredData) -> serde_json::Value { + let mut map = serde_json::Map::new(); + for (key, val) in sd.inner() { + let jv = match val { + MetadataValue::String(s) => serde_json::Value::String(s.clone()), + MetadataValue::Integer(i) => serde_json::Value::Number((*i).into()), + MetadataValue::Float(f) => serde_json::Number::from_f64(*f) + .map(serde_json::Value::Number) + .unwrap_or(serde_json::Value::Null), + MetadataValue::Boolean(b) => serde_json::Value::Bool(*b), + MetadataValue::Null => serde_json::Value::Null, + }; + map.insert(key.clone(), jv); + } + serde_json::Value::Object(map) +} + +impl From for AssetMetadata { + fn from(r: AssetMetadataRow) -> Self { + Self { + asset_id: SystemId::from_uuid(r.asset_id), + metadata_source: source_from_str(&r.metadata_source), + data: json_to_structured_data(r.data), + updated_at: DateTimeStamp::from_datetime(r.updated_at), + } + } +} + +pub struct PostgresAssetMetadataRepository { + pool: PgPool, +} + +impl PostgresAssetMetadataRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl AssetMetadataRepository for PostgresAssetMetadataRepository { + async fn find_by_asset(&self, asset_id: &SystemId) -> Result, DomainError> { + let rows = sqlx::query_as::<_, AssetMetadataRow>( + "SELECT asset_id, metadata_source, data, updated_at + FROM asset_metadata WHERE asset_id = $1", + ) + .bind(*asset_id.as_uuid()) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(rows.into_iter().map(Into::into).collect()) + } + + async fn find_by_asset_and_source( + &self, + asset_id: &SystemId, + source: MetadataSource, + ) -> Result, DomainError> { + let row = sqlx::query_as::<_, AssetMetadataRow>( + "SELECT asset_id, metadata_source, data, updated_at + FROM asset_metadata WHERE asset_id = $1 AND metadata_source = $2", + ) + .bind(*asset_id.as_uuid()) + .bind(source_to_str(&source)) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(row.map(Into::into)) + } + + async fn save(&self, metadata: &AssetMetadata) -> Result<(), DomainError> { + sqlx::query( + "INSERT INTO asset_metadata (asset_id, metadata_source, data, updated_at) + VALUES ($1, $2, $3, $4) + ON CONFLICT (asset_id, metadata_source) DO UPDATE SET + data = EXCLUDED.data, + updated_at = EXCLUDED.updated_at", + ) + .bind(*metadata.asset_id.as_uuid()) + .bind(source_to_str(&metadata.metadata_source)) + .bind(structured_data_to_json(&metadata.data)) + .bind(metadata.updated_at.as_datetime()) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } + + async fn delete_by_asset_and_source( + &self, + asset_id: &SystemId, + source: MetadataSource, + ) -> Result<(), DomainError> { + sqlx::query("DELETE FROM asset_metadata WHERE asset_id = $1 AND metadata_source = $2") + .bind(*asset_id.as_uuid()) + .bind(source_to_str(&source)) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } +} + +// ────────────────────────────────────────────── +// Duplicate +// ────────────────────────────────────────────── + +#[derive(sqlx::FromRow)] +struct GroupRow { + group_id: Uuid, + detection_method: String, + status: String, + candidates: serde_json::Value, +} + +fn detection_from_str(s: &str) -> DetectionMethod { + match s { + "perceptual_hash" => DetectionMethod::PerceptualHash, + _ => DetectionMethod::ExactHash, + } +} + +fn detection_to_str(d: &DetectionMethod) -> &'static str { + match d { + DetectionMethod::ExactHash => "exact_hash", + DetectionMethod::PerceptualHash => "perceptual_hash", + } +} + +fn dup_status_from_str(s: &str) -> DuplicateStatus { + match s { + "resolved" => DuplicateStatus::Resolved, + _ => DuplicateStatus::Unresolved, + } +} + +fn dup_status_to_str(s: &DuplicateStatus) -> &'static str { + match s { + DuplicateStatus::Unresolved => "unresolved", + DuplicateStatus::Resolved => "resolved", + } +} + +#[derive(serde::Serialize, serde::Deserialize)] +struct CandidateJson { + asset_id: Uuid, + similarity_score: f64, +} + +fn candidates_from_json(v: serde_json::Value) -> Vec { + let arr: Vec = serde_json::from_value(v).unwrap_or_default(); + arr.into_iter() + .map(|c| DuplicateCandidate { + asset_id: SystemId::from_uuid(c.asset_id), + similarity_score: c.similarity_score, + }) + .collect() +} + +fn candidates_to_json(candidates: &[DuplicateCandidate]) -> serde_json::Value { + let arr: Vec = candidates + .iter() + .map(|c| CandidateJson { + asset_id: *c.asset_id.as_uuid(), + similarity_score: c.similarity_score, + }) + .collect(); + serde_json::to_value(arr).unwrap_or(serde_json::Value::Array(vec![])) +} + +impl From for DuplicateGroup { + fn from(r: GroupRow) -> Self { + Self { + group_id: SystemId::from_uuid(r.group_id), + detection_method: detection_from_str(&r.detection_method), + status: dup_status_from_str(&r.status), + candidates: candidates_from_json(r.candidates), + } + } +} + +pub struct PostgresDuplicateRepository { + pool: PgPool, +} + +impl PostgresDuplicateRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl DuplicateRepository for PostgresDuplicateRepository { + async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { + let row = sqlx::query_as::<_, GroupRow>( + "SELECT group_id, detection_method, status, candidates + FROM duplicate_groups WHERE group_id = $1", + ) + .bind(*id.as_uuid()) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(row.map(Into::into)) + } + + async fn find_unresolved(&self) -> Result, DomainError> { + let rows = sqlx::query_as::<_, GroupRow>( + "SELECT group_id, detection_method, status, candidates + FROM duplicate_groups WHERE status = 'unresolved'", + ) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(rows.into_iter().map(Into::into).collect()) + } + + async fn find_by_asset(&self, asset_id: &SystemId) -> Result, DomainError> { + let rows = sqlx::query_as::<_, GroupRow>( + "SELECT group_id, detection_method, status, candidates + FROM duplicate_groups WHERE candidates @> $1::jsonb", + ) + .bind(serde_json::json!([{"asset_id": asset_id.as_uuid()}])) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(rows.into_iter().map(Into::into).collect()) + } + + async fn save(&self, group: &DuplicateGroup) -> Result<(), DomainError> { + sqlx::query( + "INSERT INTO duplicate_groups (group_id, detection_method, status, candidates) + VALUES ($1, $2, $3, $4) + ON CONFLICT (group_id) DO UPDATE SET + detection_method = EXCLUDED.detection_method, + status = EXCLUDED.status, + candidates = EXCLUDED.candidates", + ) + .bind(*group.group_id.as_uuid()) + .bind(detection_to_str(&group.detection_method)) + .bind(dup_status_to_str(&group.status)) + .bind(candidates_to_json(&group.candidates)) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } +} diff --git a/crates/adapters/postgres/src/duplicate_repository.rs b/crates/adapters/postgres/src/duplicate_repository.rs deleted file mode 100644 index 2eabb0c..0000000 --- a/crates/adapters/postgres/src/duplicate_repository.rs +++ /dev/null @@ -1,153 +0,0 @@ -use crate::db::PgPool; -use async_trait::async_trait; -use domain::{ - entities::{DetectionMethod, DuplicateCandidate, DuplicateGroup, DuplicateStatus}, - errors::DomainError, - ports::DuplicateRepository, - value_objects::SystemId, -}; -use uuid::Uuid; - -#[derive(sqlx::FromRow)] -struct GroupRow { - group_id: Uuid, - detection_method: String, - status: String, - candidates: serde_json::Value, -} - -fn detection_from_str(s: &str) -> DetectionMethod { - match s { - "perceptual_hash" => DetectionMethod::PerceptualHash, - _ => DetectionMethod::ExactHash, - } -} - -fn detection_to_str(d: &DetectionMethod) -> &'static str { - match d { - DetectionMethod::ExactHash => "exact_hash", - DetectionMethod::PerceptualHash => "perceptual_hash", - } -} - -fn dup_status_from_str(s: &str) -> DuplicateStatus { - match s { - "resolved" => DuplicateStatus::Resolved, - _ => DuplicateStatus::Unresolved, - } -} - -fn dup_status_to_str(s: &DuplicateStatus) -> &'static str { - match s { - DuplicateStatus::Unresolved => "unresolved", - DuplicateStatus::Resolved => "resolved", - } -} - -#[derive(serde::Serialize, serde::Deserialize)] -struct CandidateJson { - asset_id: Uuid, - similarity_score: f64, -} - -fn candidates_from_json(v: serde_json::Value) -> Vec { - let arr: Vec = serde_json::from_value(v).unwrap_or_default(); - arr.into_iter() - .map(|c| DuplicateCandidate { - asset_id: SystemId::from_uuid(c.asset_id), - similarity_score: c.similarity_score, - }) - .collect() -} - -fn candidates_to_json(candidates: &[DuplicateCandidate]) -> serde_json::Value { - let arr: Vec = candidates - .iter() - .map(|c| CandidateJson { - asset_id: *c.asset_id.as_uuid(), - similarity_score: c.similarity_score, - }) - .collect(); - serde_json::to_value(arr).unwrap_or(serde_json::Value::Array(vec![])) -} - -impl From for DuplicateGroup { - fn from(r: GroupRow) -> Self { - Self { - group_id: SystemId::from_uuid(r.group_id), - detection_method: detection_from_str(&r.detection_method), - status: dup_status_from_str(&r.status), - candidates: candidates_from_json(r.candidates), - } - } -} - -pub struct PostgresDuplicateRepository { - pool: PgPool, -} - -impl PostgresDuplicateRepository { - pub fn new(pool: PgPool) -> Self { - Self { pool } - } -} - -#[async_trait] -impl DuplicateRepository for PostgresDuplicateRepository { - async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { - let row = sqlx::query_as::<_, GroupRow>( - "SELECT group_id, detection_method, status, candidates - FROM duplicate_groups WHERE group_id = $1", - ) - .bind(*id.as_uuid()) - .fetch_optional(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - - Ok(row.map(Into::into)) - } - - async fn find_unresolved(&self) -> Result, DomainError> { - let rows = sqlx::query_as::<_, GroupRow>( - "SELECT group_id, detection_method, status, candidates - FROM duplicate_groups WHERE status = 'unresolved'", - ) - .fetch_all(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - - Ok(rows.into_iter().map(Into::into).collect()) - } - - async fn find_by_asset(&self, asset_id: &SystemId) -> Result, DomainError> { - let rows = sqlx::query_as::<_, GroupRow>( - "SELECT group_id, detection_method, status, candidates - FROM duplicate_groups WHERE candidates @> $1::jsonb", - ) - .bind(serde_json::json!([{"asset_id": asset_id.as_uuid()}])) - .fetch_all(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - - Ok(rows.into_iter().map(Into::into).collect()) - } - - async fn save(&self, group: &DuplicateGroup) -> Result<(), DomainError> { - sqlx::query( - "INSERT INTO duplicate_groups (group_id, detection_method, status, candidates) - VALUES ($1, $2, $3, $4) - ON CONFLICT (group_id) DO UPDATE SET - detection_method = EXCLUDED.detection_method, - status = EXCLUDED.status, - candidates = EXCLUDED.candidates", - ) - .bind(*group.group_id.as_uuid()) - .bind(detection_to_str(&group.detection_method)) - .bind(dup_status_to_str(&group.status)) - .bind(candidates_to_json(&group.candidates)) - .execute(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - Ok(()) - } -} diff --git a/crates/adapters/postgres/src/user_repository.rs b/crates/adapters/postgres/src/identity/mod.rs similarity index 100% rename from crates/adapters/postgres/src/user_repository.rs rename to crates/adapters/postgres/src/identity/mod.rs diff --git a/crates/adapters/postgres/src/ingest_session_repository.rs b/crates/adapters/postgres/src/ingest_session_repository.rs deleted file mode 100644 index 4659bed..0000000 --- a/crates/adapters/postgres/src/ingest_session_repository.rs +++ /dev/null @@ -1,126 +0,0 @@ -use crate::db::PgPool; -use async_trait::async_trait; -use chrono::{DateTime, Utc}; -use domain::{ - entities::{IngestSession, IngestStatus}, - errors::DomainError, - ports::IngestSessionRepository, - value_objects::{Checksum, DateTimeStamp, SystemId}, -}; -use uuid::Uuid; - -#[derive(sqlx::FromRow)] -struct IngestSessionRow { - session_id: Uuid, - uploader_user_id: Uuid, - client_device_id: String, - original_filename: String, - client_checksum: String, - target_library_path_id: Uuid, - status: String, - created_at: DateTime, - error_message: Option, -} - -fn status_from_str(s: &str) -> IngestStatus { - match s { - "uploading" => IngestStatus::Uploading, - "awaiting_processing" => IngestStatus::AwaitingProcessing, - "processing" => IngestStatus::Processing, - "completed" => IngestStatus::Completed, - "failed" => IngestStatus::Failed, - _ => IngestStatus::Uploading, - } -} - -fn status_to_str(s: &IngestStatus) -> &'static str { - match s { - IngestStatus::Uploading => "uploading", - IngestStatus::AwaitingProcessing => "awaiting_processing", - IngestStatus::Processing => "processing", - IngestStatus::Completed => "completed", - IngestStatus::Failed => "failed", - } -} - -impl TryFrom for IngestSession { - type Error = DomainError; - fn try_from(r: IngestSessionRow) -> Result { - Ok(Self { - session_id: SystemId::from_uuid(r.session_id), - uploader_user_id: SystemId::from_uuid(r.uploader_user_id), - client_device_id: r.client_device_id, - original_filename: r.original_filename, - client_checksum: Checksum::new(r.client_checksum)?, - target_library_path_id: SystemId::from_uuid(r.target_library_path_id), - status: status_from_str(&r.status), - created_at: DateTimeStamp::from_datetime(r.created_at), - error_message: r.error_message, - }) - } -} - -pub struct PostgresIngestSessionRepository { - pool: PgPool, -} - -impl PostgresIngestSessionRepository { - pub fn new(pool: PgPool) -> Self { - Self { pool } - } -} - -#[async_trait] -impl IngestSessionRepository for PostgresIngestSessionRepository { - async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { - let row = sqlx::query_as::<_, IngestSessionRow>( - "SELECT session_id, uploader_user_id, client_device_id, original_filename, - client_checksum, target_library_path_id, status, created_at, error_message - FROM ingest_sessions WHERE session_id = $1", - ) - .bind(*id.as_uuid()) - .fetch_optional(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - - row.map(TryInto::try_into).transpose() - } - - async fn find_by_user(&self, user_id: &SystemId) -> Result, DomainError> { - let rows = sqlx::query_as::<_, IngestSessionRow>( - "SELECT session_id, uploader_user_id, client_device_id, original_filename, - client_checksum, target_library_path_id, status, created_at, error_message - FROM ingest_sessions WHERE uploader_user_id = $1", - ) - .bind(*user_id.as_uuid()) - .fetch_all(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - - rows.into_iter().map(TryInto::try_into).collect() - } - - async fn save(&self, session: &IngestSession) -> Result<(), DomainError> { - sqlx::query( - "INSERT INTO ingest_sessions (session_id, uploader_user_id, client_device_id, original_filename, - client_checksum, target_library_path_id, status, created_at, error_message) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) - ON CONFLICT (session_id) DO UPDATE SET - status = EXCLUDED.status, - error_message = EXCLUDED.error_message", - ) - .bind(*session.session_id.as_uuid()) - .bind(*session.uploader_user_id.as_uuid()) - .bind(&session.client_device_id) - .bind(&session.original_filename) - .bind(session.client_checksum.as_str()) - .bind(*session.target_library_path_id.as_uuid()) - .bind(status_to_str(&session.status)) - .bind(session.created_at.as_datetime()) - .bind(session.error_message.as_deref()) - .execute(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - Ok(()) - } -} diff --git a/crates/adapters/postgres/src/job_batch_repository.rs b/crates/adapters/postgres/src/job_batch_repository.rs deleted file mode 100644 index a82f592..0000000 --- a/crates/adapters/postgres/src/job_batch_repository.rs +++ /dev/null @@ -1,99 +0,0 @@ -use crate::db::PgPool; -use async_trait::async_trait; -use domain::{ - entities::{BatchStatus, JobBatch}, - errors::DomainError, - ports::JobBatchRepository, - value_objects::SystemId, -}; -use uuid::Uuid; - -#[derive(sqlx::FromRow)] -struct BatchRow { - batch_id: Uuid, - batch_type: String, - total_jobs: i32, - completed_count: i32, - failed_count: i32, - status: String, -} - -fn batch_status_from_str(s: &str) -> BatchStatus { - match s { - "in_progress" => BatchStatus::InProgress, - "completed_with_errors" => BatchStatus::CompletedWithErrors, - "completed" => BatchStatus::Completed, - "cancelled" => BatchStatus::Cancelled, - _ => BatchStatus::InProgress, - } -} - -fn batch_status_to_str(s: &BatchStatus) -> &'static str { - match s { - BatchStatus::InProgress => "in_progress", - BatchStatus::CompletedWithErrors => "completed_with_errors", - BatchStatus::Completed => "completed", - BatchStatus::Cancelled => "cancelled", - } -} - -impl From for JobBatch { - fn from(r: BatchRow) -> Self { - Self { - batch_id: SystemId::from_uuid(r.batch_id), - batch_type: r.batch_type, - total_jobs: r.total_jobs as u32, - completed_count: r.completed_count as u32, - failed_count: r.failed_count as u32, - status: batch_status_from_str(&r.status), - } - } -} - -pub struct PostgresJobBatchRepository { - pool: PgPool, -} - -impl PostgresJobBatchRepository { - pub fn new(pool: PgPool) -> Self { - Self { pool } - } -} - -#[async_trait] -impl JobBatchRepository for PostgresJobBatchRepository { - async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { - let row = sqlx::query_as::<_, BatchRow>( - "SELECT batch_id, batch_type, total_jobs, completed_count, failed_count, status - FROM job_batches WHERE batch_id = $1", - ) - .bind(*id.as_uuid()) - .fetch_optional(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - - Ok(row.map(Into::into)) - } - - async fn save(&self, batch: &JobBatch) -> Result<(), DomainError> { - sqlx::query( - "INSERT INTO job_batches (batch_id, batch_type, total_jobs, completed_count, failed_count, status) - VALUES ($1, $2, $3, $4, $5, $6) - ON CONFLICT (batch_id) DO UPDATE SET - total_jobs = EXCLUDED.total_jobs, - completed_count = EXCLUDED.completed_count, - failed_count = EXCLUDED.failed_count, - status = EXCLUDED.status", - ) - .bind(*batch.batch_id.as_uuid()) - .bind(&batch.batch_type) - .bind(batch.total_jobs as i32) - .bind(batch.completed_count as i32) - .bind(batch.failed_count as i32) - .bind(batch_status_to_str(&batch.status)) - .execute(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - Ok(()) - } -} diff --git a/crates/adapters/postgres/src/job_repository.rs b/crates/adapters/postgres/src/job_repository.rs deleted file mode 100644 index b7d410a..0000000 --- a/crates/adapters/postgres/src/job_repository.rs +++ /dev/null @@ -1,209 +0,0 @@ -use crate::db::PgPool; -use async_trait::async_trait; -use chrono::{DateTime, Utc}; -use domain::{ - entities::{Job, JobStatus, JobType}, - errors::DomainError, - ports::JobRepository, - value_objects::{DateTimeStamp, StructuredData, SystemId}, -}; -use uuid::Uuid; - -#[derive(sqlx::FromRow)] -struct JobRow { - job_id: Uuid, - job_type: String, - target_asset_id: Option, - batch_id: Option, - status: String, - priority: i32, - payload: serde_json::Value, - result_data: Option, - retry_count: i32, - max_retries: i32, - created_at: DateTime, - started_at: Option>, - completed_at: Option>, - error_message: Option, -} - -fn job_type_from_str(s: &str) -> JobType { - match s { - "scan_directory" => JobType::ScanDirectory, - "extract_metadata" => JobType::ExtractMetadata, - "generate_derivative" => JobType::GenerateDerivative, - "sync_sidecar" => JobType::SyncSidecar, - "detect_duplicates" => JobType::DetectDuplicates, - other => JobType::Custom(other.to_string()), - } -} - -fn job_type_to_str(t: &JobType) -> String { - match t { - JobType::ScanDirectory => "scan_directory".to_string(), - JobType::ExtractMetadata => "extract_metadata".to_string(), - JobType::GenerateDerivative => "generate_derivative".to_string(), - JobType::SyncSidecar => "sync_sidecar".to_string(), - JobType::DetectDuplicates => "detect_duplicates".to_string(), - JobType::Custom(s) => s.clone(), - } -} - -fn job_status_from_str(s: &str) -> JobStatus { - match s { - "queued" => JobStatus::Queued, - "processing" => JobStatus::Processing, - "completed" => JobStatus::Completed, - "failed" => JobStatus::Failed, - "cancelled" => JobStatus::Cancelled, - _ => JobStatus::Queued, - } -} - -fn job_status_to_str(s: &JobStatus) -> &'static str { - match s { - JobStatus::Queued => "queued", - JobStatus::Processing => "processing", - JobStatus::Completed => "completed", - JobStatus::Failed => "failed", - JobStatus::Cancelled => "cancelled", - } -} - -fn structured_from_json(v: serde_json::Value) -> StructuredData { - if let serde_json::Value::Object(map) = v { - let mut sd = StructuredData::new(); - for (k, val) in map { - sd.insert(k, domain::value_objects::MetadataValue::from(val)); - } - sd - } else { - StructuredData::new() - } -} - -fn structured_to_json(sd: &StructuredData) -> serde_json::Value { - let map: serde_json::Map = sd - .inner() - .iter() - .map(|(k, v)| (k.clone(), serde_json::Value::from(v))) - .collect(); - serde_json::Value::Object(map) -} - -impl From for Job { - fn from(r: JobRow) -> Self { - Self { - job_id: SystemId::from_uuid(r.job_id), - job_type: job_type_from_str(&r.job_type), - target_asset_id: r.target_asset_id.map(SystemId::from_uuid), - batch_id: r.batch_id.map(SystemId::from_uuid), - status: job_status_from_str(&r.status), - priority: r.priority as u32, - payload: structured_from_json(r.payload), - result_data: r.result_data.map(structured_from_json), - retry_count: r.retry_count as u32, - max_retries: r.max_retries as u32, - created_at: DateTimeStamp::from_datetime(r.created_at), - started_at: r.started_at.map(DateTimeStamp::from_datetime), - completed_at: r.completed_at.map(DateTimeStamp::from_datetime), - error_message: r.error_message, - } - } -} - -pub struct PostgresJobRepository { - pool: PgPool, -} - -impl PostgresJobRepository { - pub fn new(pool: PgPool) -> Self { - Self { pool } - } -} - -#[async_trait] -impl JobRepository for PostgresJobRepository { - async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { - let row = sqlx::query_as::<_, JobRow>( - "SELECT job_id, job_type, target_asset_id, batch_id, status, priority, - payload, result_data, retry_count, max_retries, created_at, - started_at, completed_at, error_message - FROM jobs WHERE job_id = $1", - ) - .bind(*id.as_uuid()) - .fetch_optional(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - - Ok(row.map(Into::into)) - } - - async fn find_next_queued(&self) -> Result, DomainError> { - let row = sqlx::query_as::<_, JobRow>( - "SELECT job_id, job_type, target_asset_id, batch_id, status, priority, - payload, result_data, retry_count, max_retries, created_at, - started_at, completed_at, error_message - FROM jobs WHERE status = 'queued' - ORDER BY priority DESC, created_at ASC - LIMIT 1", - ) - .fetch_optional(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - - Ok(row.map(Into::into)) - } - - async fn find_by_batch(&self, batch_id: &SystemId) -> Result, DomainError> { - let rows = sqlx::query_as::<_, JobRow>( - "SELECT job_id, job_type, target_asset_id, batch_id, status, priority, - payload, result_data, retry_count, max_retries, created_at, - started_at, completed_at, error_message - FROM jobs WHERE batch_id = $1 - ORDER BY created_at ASC", - ) - .bind(*batch_id.as_uuid()) - .fetch_all(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - - Ok(rows.into_iter().map(Into::into).collect()) - } - - async fn save(&self, job: &Job) -> Result<(), DomainError> { - sqlx::query( - "INSERT INTO jobs (job_id, job_type, target_asset_id, batch_id, status, priority, - payload, result_data, retry_count, max_retries, created_at, - started_at, completed_at, error_message) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14) - ON CONFLICT (job_id) DO UPDATE SET - status = EXCLUDED.status, - priority = EXCLUDED.priority, - payload = EXCLUDED.payload, - result_data = EXCLUDED.result_data, - retry_count = EXCLUDED.retry_count, - started_at = EXCLUDED.started_at, - completed_at = EXCLUDED.completed_at, - error_message = EXCLUDED.error_message", - ) - .bind(*job.job_id.as_uuid()) - .bind(job_type_to_str(&job.job_type)) - .bind(job.target_asset_id.as_ref().map(|id| *id.as_uuid())) - .bind(job.batch_id.as_ref().map(|id| *id.as_uuid())) - .bind(job_status_to_str(&job.status)) - .bind(job.priority as i32) - .bind(structured_to_json(&job.payload)) - .bind(job.result_data.as_ref().map(structured_to_json)) - .bind(job.retry_count as i32) - .bind(job.max_retries as i32) - .bind(job.created_at.as_datetime()) - .bind(job.started_at.as_ref().map(|d| d.as_datetime())) - .bind(job.completed_at.as_ref().map(|d| d.as_datetime())) - .bind(&job.error_message) - .execute(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - Ok(()) - } -} diff --git a/crates/adapters/postgres/src/lib.rs b/crates/adapters/postgres/src/lib.rs index 9391c62..758dae4 100644 --- a/crates/adapters/postgres/src/lib.rs +++ b/crates/adapters/postgres/src/lib.rs @@ -1,39 +1,19 @@ pub mod db; -pub mod album_repository; -pub mod asset_metadata_repository; -pub mod asset_repository; -pub mod duplicate_repository; -pub mod ingest_session_repository; -pub mod job_batch_repository; -pub mod job_repository; -pub mod library_path_repository; -pub mod pipeline_repository; -pub mod plugin_repository; -pub mod quota_repository; -pub mod share_repository; -pub mod sidecar_repository; -pub mod storage_volume_repository; -pub mod tag_repository; -pub mod user_repository; -pub mod visibility_filter_repository; +pub mod identity; +pub mod storage; +pub mod catalog; +pub mod organization; +pub mod sharing; +pub mod sidecar; +pub mod processing; pub use db::{PgPool, connect, run_migrations}; -pub use album_repository::PostgresAlbumRepository; -pub use asset_metadata_repository::PostgresAssetMetadataRepository; -pub use asset_repository::PostgresAssetRepository; -pub use duplicate_repository::PostgresDuplicateRepository; -pub use ingest_session_repository::PostgresIngestSessionRepository; -pub use job_batch_repository::PostgresJobBatchRepository; -pub use job_repository::PostgresJobRepository; -pub use library_path_repository::PostgresLibraryPathRepository; -pub use pipeline_repository::PostgresPipelineRepository; -pub use plugin_repository::PostgresPluginRepository; -pub use quota_repository::{PostgresQuotaRepository, PostgresUsageLedgerRepository}; -pub use share_repository::PostgresShareRepository; -pub use sidecar_repository::PostgresSidecarRepository; -pub use storage_volume_repository::PostgresStorageVolumeRepository; -pub use tag_repository::PostgresTagRepository; -pub use user_repository::PostgresUserRepository; -pub use visibility_filter_repository::PostgresVisibilityFilterRepository; +pub use identity::*; +pub use storage::*; +pub use catalog::*; +pub use organization::*; +pub use sharing::*; +pub use sidecar::*; +pub use processing::*; diff --git a/crates/adapters/postgres/src/library_path_repository.rs b/crates/adapters/postgres/src/library_path_repository.rs deleted file mode 100644 index 72449e9..0000000 --- a/crates/adapters/postgres/src/library_path_repository.rs +++ /dev/null @@ -1,136 +0,0 @@ -use crate::db::PgPool; -use async_trait::async_trait; -use domain::{ - entities::{LibraryPath, OwnershipPolicy}, - errors::DomainError, - ports::LibraryPathRepository, - value_objects::SystemId, -}; -use uuid::Uuid; - -#[derive(sqlx::FromRow)] -struct LibraryPathRow { - path_id: Uuid, - volume_id: Uuid, - relative_path: String, - is_ingest_destination: bool, - ownership_policy: String, - designated_owner_id: Option, -} - -fn policy_from_str(s: &str) -> OwnershipPolicy { - match s { - "user_owned" => OwnershipPolicy::UserOwned, - "group_owned" => OwnershipPolicy::GroupOwned, - _ => OwnershipPolicy::Unassigned, - } -} - -fn policy_to_str(p: &OwnershipPolicy) -> &'static str { - match p { - OwnershipPolicy::UserOwned => "user_owned", - OwnershipPolicy::GroupOwned => "group_owned", - OwnershipPolicy::Unassigned => "unassigned", - } -} - -impl From for LibraryPath { - fn from(r: LibraryPathRow) -> Self { - Self { - path_id: SystemId::from_uuid(r.path_id), - volume_id: SystemId::from_uuid(r.volume_id), - relative_path: r.relative_path, - is_ingest_destination: r.is_ingest_destination, - ownership_policy: policy_from_str(&r.ownership_policy), - designated_owner_id: r.designated_owner_id.map(SystemId::from_uuid), - } - } -} - -pub struct PostgresLibraryPathRepository { - pool: PgPool, -} - -impl PostgresLibraryPathRepository { - pub fn new(pool: PgPool) -> Self { - Self { pool } - } -} - -#[async_trait] -impl LibraryPathRepository for PostgresLibraryPathRepository { - async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { - let row = sqlx::query_as::<_, LibraryPathRow>( - "SELECT path_id, volume_id, relative_path, is_ingest_destination, ownership_policy, designated_owner_id - FROM library_paths WHERE path_id = $1", - ) - .bind(*id.as_uuid()) - .fetch_optional(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - - Ok(row.map(Into::into)) - } - - async fn find_by_volume(&self, volume_id: &SystemId) -> Result, DomainError> { - let rows = sqlx::query_as::<_, LibraryPathRow>( - "SELECT path_id, volume_id, relative_path, is_ingest_destination, ownership_policy, designated_owner_id - FROM library_paths WHERE volume_id = $1", - ) - .bind(*volume_id.as_uuid()) - .fetch_all(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - - Ok(rows.into_iter().map(Into::into).collect()) - } - - async fn find_ingest_destinations( - &self, - owner_id: &SystemId, - ) -> Result, DomainError> { - let rows = sqlx::query_as::<_, LibraryPathRow>( - "SELECT path_id, volume_id, relative_path, is_ingest_destination, ownership_policy, designated_owner_id - FROM library_paths - WHERE is_ingest_destination = true AND designated_owner_id = $1", - ) - .bind(*owner_id.as_uuid()) - .fetch_all(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - - Ok(rows.into_iter().map(Into::into).collect()) - } - - async fn save(&self, path: &LibraryPath) -> Result<(), DomainError> { - sqlx::query( - "INSERT INTO library_paths (path_id, volume_id, relative_path, is_ingest_destination, ownership_policy, designated_owner_id) - VALUES ($1, $2, $3, $4, $5, $6) - ON CONFLICT (path_id) DO UPDATE SET - volume_id = EXCLUDED.volume_id, - relative_path = EXCLUDED.relative_path, - is_ingest_destination = EXCLUDED.is_ingest_destination, - ownership_policy = EXCLUDED.ownership_policy, - designated_owner_id = EXCLUDED.designated_owner_id", - ) - .bind(*path.path_id.as_uuid()) - .bind(*path.volume_id.as_uuid()) - .bind(&path.relative_path) - .bind(path.is_ingest_destination) - .bind(policy_to_str(&path.ownership_policy)) - .bind(path.designated_owner_id.as_ref().map(|id| *id.as_uuid())) - .execute(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - Ok(()) - } - - async fn delete(&self, id: &SystemId) -> Result<(), DomainError> { - sqlx::query("DELETE FROM library_paths WHERE path_id = $1") - .bind(*id.as_uuid()) - .execute(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - Ok(()) - } -} diff --git a/crates/adapters/postgres/src/album_repository.rs b/crates/adapters/postgres/src/organization/mod.rs similarity index 51% rename from crates/adapters/postgres/src/album_repository.rs rename to crates/adapters/postgres/src/organization/mod.rs index ce55f44..fa340f5 100644 --- a/crates/adapters/postgres/src/album_repository.rs +++ b/crates/adapters/postgres/src/organization/mod.rs @@ -2,13 +2,17 @@ use crate::db::PgPool; use async_trait::async_trait; use chrono::{DateTime, Utc}; use domain::{ - entities::{Album, AlbumEntry}, + entities::{Album, AlbumEntry, AssetTag, Tag, TagSource}, errors::DomainError, - ports::AlbumRepository, + ports::{AlbumRepository, TagRepository}, value_objects::{DateTimeStamp, SystemId}, }; use uuid::Uuid; +// ────────────────────────────────────────────── +// Album +// ────────────────────────────────────────────── + #[derive(sqlx::FromRow)] struct AlbumRow { album_id: Uuid, @@ -179,3 +183,172 @@ impl AlbumRepository for PostgresAlbumRepository { Ok(()) } } + +// ────────────────────────────────────────────── +// Tag +// ────────────────────────────────────────────── + +#[derive(sqlx::FromRow)] +struct TagRow { + tag_id: Uuid, + name: String, + tag_source: String, +} + +#[derive(sqlx::FromRow)] +struct AssetTagRow { + asset_id: Uuid, + tag_id: Uuid, + tagged_by_user_id: Option, + confidence: f64, +} + +fn tag_source_from_str(s: &str) -> TagSource { + match s { + "ai_generated" => TagSource::AiGenerated, + "exif_extracted" => TagSource::ExifExtracted, + _ => TagSource::UserManual, + } +} + +fn tag_source_to_str(s: &TagSource) -> &'static str { + match s { + TagSource::UserManual => "user_manual", + TagSource::AiGenerated => "ai_generated", + TagSource::ExifExtracted => "exif_extracted", + } +} + +impl From for Tag { + fn from(r: TagRow) -> Self { + Self { + tag_id: SystemId::from_uuid(r.tag_id), + name: r.name, + tag_source: tag_source_from_str(&r.tag_source), + } + } +} + +impl From for AssetTag { + fn from(r: AssetTagRow) -> Self { + Self { + asset_id: SystemId::from_uuid(r.asset_id), + tag_id: SystemId::from_uuid(r.tag_id), + tagged_by_user_id: r.tagged_by_user_id.map(SystemId::from_uuid), + confidence: r.confidence, + } + } +} + +pub struct PostgresTagRepository { + pool: PgPool, +} + +impl PostgresTagRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl TagRepository for PostgresTagRepository { + async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { + let row = sqlx::query_as::<_, TagRow>( + "SELECT tag_id, name, tag_source FROM tags WHERE tag_id = $1", + ) + .bind(*id.as_uuid()) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(row.map(Into::into)) + } + + async fn find_by_name(&self, name: &str) -> Result, DomainError> { + let row = sqlx::query_as::<_, TagRow>( + "SELECT tag_id, name, tag_source FROM tags WHERE name = $1", + ) + .bind(name) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(row.map(Into::into)) + } + + async fn find_tags_for_asset( + &self, + asset_id: &SystemId, + ) -> Result, DomainError> { + let rows = sqlx::query_as::<_, TagRow>( + "SELECT t.tag_id, t.name, t.tag_source + FROM tags t JOIN asset_tags at ON t.tag_id = at.tag_id + WHERE at.asset_id = $1", + ) + .bind(*asset_id.as_uuid()) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + let at_rows = sqlx::query_as::<_, AssetTagRow>( + "SELECT asset_id, tag_id, tagged_by_user_id, confidence + FROM asset_tags WHERE asset_id = $1", + ) + .bind(*asset_id.as_uuid()) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + let tags: Vec = rows.into_iter().map(Into::into).collect(); + let asset_tags: Vec = at_rows.into_iter().map(Into::into).collect(); + + Ok(tags.into_iter().zip(asset_tags).collect()) + } + + async fn save_tag(&self, tag: &Tag) -> Result<(), DomainError> { + sqlx::query( + "INSERT INTO tags (tag_id, name, tag_source) + VALUES ($1, $2, $3) + ON CONFLICT (tag_id) DO UPDATE SET name = EXCLUDED.name, tag_source = EXCLUDED.tag_source", + ) + .bind(*tag.tag_id.as_uuid()) + .bind(&tag.name) + .bind(tag_source_to_str(&tag.tag_source)) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } + + async fn save_asset_tag(&self, asset_tag: &AssetTag) -> Result<(), DomainError> { + sqlx::query( + "INSERT INTO asset_tags (asset_id, tag_id, tagged_by_user_id, confidence) + VALUES ($1, $2, $3, $4) + ON CONFLICT (asset_id, tag_id) DO UPDATE SET + tagged_by_user_id = EXCLUDED.tagged_by_user_id, + confidence = EXCLUDED.confidence", + ) + .bind(*asset_tag.asset_id.as_uuid()) + .bind(*asset_tag.tag_id.as_uuid()) + .bind(asset_tag.tagged_by_user_id.as_ref().map(|id| *id.as_uuid())) + .bind(asset_tag.confidence) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } + + async fn remove_asset_tag( + &self, + asset_id: &SystemId, + tag_id: &SystemId, + ) -> Result<(), DomainError> { + sqlx::query("DELETE FROM asset_tags WHERE asset_id = $1 AND tag_id = $2") + .bind(*asset_id.as_uuid()) + .bind(*tag_id.as_uuid()) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } +} diff --git a/crates/adapters/postgres/src/pipeline_repository.rs b/crates/adapters/postgres/src/pipeline_repository.rs deleted file mode 100644 index 7143a59..0000000 --- a/crates/adapters/postgres/src/pipeline_repository.rs +++ /dev/null @@ -1,126 +0,0 @@ -use crate::db::PgPool; -use async_trait::async_trait; -use domain::{ - entities::{PipelineStep, ProcessingPipeline}, - errors::DomainError, - ports::PipelineRepository, - value_objects::{MetadataValue, StructuredData, SystemId}, -}; -use uuid::Uuid; - -#[derive(sqlx::FromRow)] -struct PipelineRow { - pipeline_id: Uuid, - trigger_event: String, - steps: serde_json::Value, -} - -#[derive(serde::Serialize, serde::Deserialize)] -struct StepJson { - plugin_id: Uuid, - step_order: u32, - configuration: serde_json::Map, -} - -fn steps_from_json(v: serde_json::Value) -> Vec { - let arr: Vec = serde_json::from_value(v).unwrap_or_default(); - arr.into_iter() - .map(|s| { - let mut config = StructuredData::new(); - for (k, val) in s.configuration { - config.insert(k, MetadataValue::from(val)); - } - PipelineStep { - plugin_id: SystemId::from_uuid(s.plugin_id), - step_order: s.step_order, - configuration: config, - } - }) - .collect() -} - -fn steps_to_json(steps: &[PipelineStep]) -> serde_json::Value { - let arr: Vec = steps - .iter() - .map(|s| { - let config: serde_json::Map = s - .configuration - .inner() - .iter() - .map(|(k, v)| (k.clone(), serde_json::Value::from(v))) - .collect(); - StepJson { - plugin_id: *s.plugin_id.as_uuid(), - step_order: s.step_order, - configuration: config, - } - }) - .collect(); - serde_json::to_value(arr).unwrap_or(serde_json::Value::Array(vec![])) -} - -impl From for ProcessingPipeline { - fn from(r: PipelineRow) -> Self { - Self { - pipeline_id: SystemId::from_uuid(r.pipeline_id), - trigger_event: r.trigger_event, - steps: steps_from_json(r.steps), - } - } -} - -pub struct PostgresPipelineRepository { - pool: PgPool, -} - -impl PostgresPipelineRepository { - pub fn new(pool: PgPool) -> Self { - Self { pool } - } -} - -#[async_trait] -impl PipelineRepository for PostgresPipelineRepository { - async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { - let row = sqlx::query_as::<_, PipelineRow>( - "SELECT pipeline_id, trigger_event, steps - FROM processing_pipelines WHERE pipeline_id = $1", - ) - .bind(*id.as_uuid()) - .fetch_optional(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - - Ok(row.map(Into::into)) - } - - async fn find_by_trigger(&self, event: &str) -> Result, DomainError> { - let rows = sqlx::query_as::<_, PipelineRow>( - "SELECT pipeline_id, trigger_event, steps - FROM processing_pipelines WHERE trigger_event = $1", - ) - .bind(event) - .fetch_all(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - - Ok(rows.into_iter().map(Into::into).collect()) - } - - async fn save(&self, pipeline: &ProcessingPipeline) -> Result<(), DomainError> { - sqlx::query( - "INSERT INTO processing_pipelines (pipeline_id, trigger_event, steps) - VALUES ($1, $2, $3) - ON CONFLICT (pipeline_id) DO UPDATE SET - trigger_event = EXCLUDED.trigger_event, - steps = EXCLUDED.steps", - ) - .bind(*pipeline.pipeline_id.as_uuid()) - .bind(&pipeline.trigger_event) - .bind(steps_to_json(&pipeline.steps)) - .execute(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - Ok(()) - } -} diff --git a/crates/adapters/postgres/src/plugin_repository.rs b/crates/adapters/postgres/src/plugin_repository.rs deleted file mode 100644 index 2fffd28..0000000 --- a/crates/adapters/postgres/src/plugin_repository.rs +++ /dev/null @@ -1,127 +0,0 @@ -use crate::db::PgPool; -use async_trait::async_trait; -use domain::{ - entities::{Plugin, PluginType}, - errors::DomainError, - ports::PluginRepository, - value_objects::{MetadataValue, StructuredData, SystemId}, -}; -use uuid::Uuid; - -#[derive(sqlx::FromRow)] -struct PluginRow { - plugin_id: Uuid, - name: String, - plugin_type: String, - is_enabled: bool, - configuration: serde_json::Value, -} - -fn plugin_type_from_str(s: &str) -> PluginType { - match s { - "media_processor" => PluginType::MediaProcessor, - "scheduled_task" => PluginType::ScheduledTask, - "sidecar_writer" => PluginType::SidecarWriter, - _ => PluginType::MediaProcessor, - } -} - -fn plugin_type_to_str(t: &PluginType) -> &'static str { - match t { - PluginType::MediaProcessor => "media_processor", - PluginType::ScheduledTask => "scheduled_task", - PluginType::SidecarWriter => "sidecar_writer", - } -} - -fn structured_from_json(v: serde_json::Value) -> StructuredData { - if let serde_json::Value::Object(map) = v { - let mut sd = StructuredData::new(); - for (k, val) in map { - sd.insert(k, MetadataValue::from(val)); - } - sd - } else { - StructuredData::new() - } -} - -fn structured_to_json(sd: &StructuredData) -> serde_json::Value { - let map: serde_json::Map = sd - .inner() - .iter() - .map(|(k, v)| (k.clone(), serde_json::Value::from(v))) - .collect(); - serde_json::Value::Object(map) -} - -impl From for Plugin { - fn from(r: PluginRow) -> Self { - Self { - plugin_id: SystemId::from_uuid(r.plugin_id), - name: r.name, - plugin_type: plugin_type_from_str(&r.plugin_type), - is_enabled: r.is_enabled, - configuration: structured_from_json(r.configuration), - } - } -} - -pub struct PostgresPluginRepository { - pool: PgPool, -} - -impl PostgresPluginRepository { - pub fn new(pool: PgPool) -> Self { - Self { pool } - } -} - -#[async_trait] -impl PluginRepository for PostgresPluginRepository { - async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { - let row = sqlx::query_as::<_, PluginRow>( - "SELECT plugin_id, name, plugin_type, is_enabled, configuration - FROM plugins WHERE plugin_id = $1", - ) - .bind(*id.as_uuid()) - .fetch_optional(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - - Ok(row.map(Into::into)) - } - - async fn find_enabled(&self) -> Result, DomainError> { - let rows = sqlx::query_as::<_, PluginRow>( - "SELECT plugin_id, name, plugin_type, is_enabled, configuration - FROM plugins WHERE is_enabled = true", - ) - .fetch_all(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - - Ok(rows.into_iter().map(Into::into).collect()) - } - - async fn save(&self, plugin: &Plugin) -> Result<(), DomainError> { - sqlx::query( - "INSERT INTO plugins (plugin_id, name, plugin_type, is_enabled, configuration) - VALUES ($1, $2, $3, $4, $5) - ON CONFLICT (plugin_id) DO UPDATE SET - name = EXCLUDED.name, - plugin_type = EXCLUDED.plugin_type, - is_enabled = EXCLUDED.is_enabled, - configuration = EXCLUDED.configuration", - ) - .bind(*plugin.plugin_id.as_uuid()) - .bind(&plugin.name) - .bind(plugin_type_to_str(&plugin.plugin_type)) - .bind(plugin.is_enabled) - .bind(structured_to_json(&plugin.configuration)) - .execute(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - Ok(()) - } -} diff --git a/crates/adapters/postgres/src/processing/mod.rs b/crates/adapters/postgres/src/processing/mod.rs new file mode 100644 index 0000000..ccdf4bb --- /dev/null +++ b/crates/adapters/postgres/src/processing/mod.rs @@ -0,0 +1,532 @@ +use crate::db::PgPool; +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use domain::{ + entities::{ + BatchStatus, Job, JobBatch, JobStatus, JobType, PipelineStep, Plugin, PluginType, + ProcessingPipeline, + }, + errors::DomainError, + ports::{JobBatchRepository, JobRepository, PipelineRepository, PluginRepository}, + value_objects::{DateTimeStamp, MetadataValue, StructuredData, SystemId}, +}; +use uuid::Uuid; + +// ────────────────────────────────────────────── +// Job +// ────────────────────────────────────────────── + +#[derive(sqlx::FromRow)] +struct JobRow { + job_id: Uuid, + job_type: String, + target_asset_id: Option, + batch_id: Option, + status: String, + priority: i32, + payload: serde_json::Value, + result_data: Option, + retry_count: i32, + max_retries: i32, + created_at: DateTime, + started_at: Option>, + completed_at: Option>, + error_message: Option, +} + +fn job_type_from_str(s: &str) -> JobType { + match s { + "scan_directory" => JobType::ScanDirectory, + "extract_metadata" => JobType::ExtractMetadata, + "generate_derivative" => JobType::GenerateDerivative, + "sync_sidecar" => JobType::SyncSidecar, + "detect_duplicates" => JobType::DetectDuplicates, + other => JobType::Custom(other.to_string()), + } +} + +fn job_type_to_str(t: &JobType) -> String { + match t { + JobType::ScanDirectory => "scan_directory".to_string(), + JobType::ExtractMetadata => "extract_metadata".to_string(), + JobType::GenerateDerivative => "generate_derivative".to_string(), + JobType::SyncSidecar => "sync_sidecar".to_string(), + JobType::DetectDuplicates => "detect_duplicates".to_string(), + JobType::Custom(s) => s.clone(), + } +} + +fn job_status_from_str(s: &str) -> JobStatus { + match s { + "queued" => JobStatus::Queued, + "processing" => JobStatus::Processing, + "completed" => JobStatus::Completed, + "failed" => JobStatus::Failed, + "cancelled" => JobStatus::Cancelled, + _ => JobStatus::Queued, + } +} + +fn job_status_to_str(s: &JobStatus) -> &'static str { + match s { + JobStatus::Queued => "queued", + JobStatus::Processing => "processing", + JobStatus::Completed => "completed", + JobStatus::Failed => "failed", + JobStatus::Cancelled => "cancelled", + } +} + +fn structured_from_json(v: serde_json::Value) -> StructuredData { + if let serde_json::Value::Object(map) = v { + let mut sd = StructuredData::new(); + for (k, val) in map { + sd.insert(k, MetadataValue::from(val)); + } + sd + } else { + StructuredData::new() + } +} + +fn structured_to_json(sd: &StructuredData) -> serde_json::Value { + let map: serde_json::Map = sd + .inner() + .iter() + .map(|(k, v)| (k.clone(), serde_json::Value::from(v))) + .collect(); + serde_json::Value::Object(map) +} + +impl From for Job { + fn from(r: JobRow) -> Self { + Self { + job_id: SystemId::from_uuid(r.job_id), + job_type: job_type_from_str(&r.job_type), + target_asset_id: r.target_asset_id.map(SystemId::from_uuid), + batch_id: r.batch_id.map(SystemId::from_uuid), + status: job_status_from_str(&r.status), + priority: r.priority as u32, + payload: structured_from_json(r.payload), + result_data: r.result_data.map(structured_from_json), + retry_count: r.retry_count as u32, + max_retries: r.max_retries as u32, + created_at: DateTimeStamp::from_datetime(r.created_at), + started_at: r.started_at.map(DateTimeStamp::from_datetime), + completed_at: r.completed_at.map(DateTimeStamp::from_datetime), + error_message: r.error_message, + } + } +} + +pub struct PostgresJobRepository { + pool: PgPool, +} + +impl PostgresJobRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl JobRepository for PostgresJobRepository { + async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { + let row = sqlx::query_as::<_, JobRow>( + "SELECT job_id, job_type, target_asset_id, batch_id, status, priority, + payload, result_data, retry_count, max_retries, created_at, + started_at, completed_at, error_message + FROM jobs WHERE job_id = $1", + ) + .bind(*id.as_uuid()) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(row.map(Into::into)) + } + + async fn find_next_queued(&self) -> Result, DomainError> { + let row = sqlx::query_as::<_, JobRow>( + "SELECT job_id, job_type, target_asset_id, batch_id, status, priority, + payload, result_data, retry_count, max_retries, created_at, + started_at, completed_at, error_message + FROM jobs WHERE status = 'queued' + ORDER BY priority DESC, created_at ASC + LIMIT 1", + ) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(row.map(Into::into)) + } + + async fn find_by_batch(&self, batch_id: &SystemId) -> Result, DomainError> { + let rows = sqlx::query_as::<_, JobRow>( + "SELECT job_id, job_type, target_asset_id, batch_id, status, priority, + payload, result_data, retry_count, max_retries, created_at, + started_at, completed_at, error_message + FROM jobs WHERE batch_id = $1 + ORDER BY created_at ASC", + ) + .bind(*batch_id.as_uuid()) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(rows.into_iter().map(Into::into).collect()) + } + + async fn save(&self, job: &Job) -> Result<(), DomainError> { + sqlx::query( + "INSERT INTO jobs (job_id, job_type, target_asset_id, batch_id, status, priority, + payload, result_data, retry_count, max_retries, created_at, + started_at, completed_at, error_message) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14) + ON CONFLICT (job_id) DO UPDATE SET + status = EXCLUDED.status, + priority = EXCLUDED.priority, + payload = EXCLUDED.payload, + result_data = EXCLUDED.result_data, + retry_count = EXCLUDED.retry_count, + started_at = EXCLUDED.started_at, + completed_at = EXCLUDED.completed_at, + error_message = EXCLUDED.error_message", + ) + .bind(*job.job_id.as_uuid()) + .bind(job_type_to_str(&job.job_type)) + .bind(job.target_asset_id.as_ref().map(|id| *id.as_uuid())) + .bind(job.batch_id.as_ref().map(|id| *id.as_uuid())) + .bind(job_status_to_str(&job.status)) + .bind(job.priority as i32) + .bind(structured_to_json(&job.payload)) + .bind(job.result_data.as_ref().map(structured_to_json)) + .bind(job.retry_count as i32) + .bind(job.max_retries as i32) + .bind(job.created_at.as_datetime()) + .bind(job.started_at.as_ref().map(|d| d.as_datetime())) + .bind(job.completed_at.as_ref().map(|d| d.as_datetime())) + .bind(&job.error_message) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } +} + +// ────────────────────────────────────────────── +// JobBatch +// ────────────────────────────────────────────── + +#[derive(sqlx::FromRow)] +struct BatchRow { + batch_id: Uuid, + batch_type: String, + total_jobs: i32, + completed_count: i32, + failed_count: i32, + status: String, +} + +fn batch_status_from_str(s: &str) -> BatchStatus { + match s { + "in_progress" => BatchStatus::InProgress, + "completed_with_errors" => BatchStatus::CompletedWithErrors, + "completed" => BatchStatus::Completed, + "cancelled" => BatchStatus::Cancelled, + _ => BatchStatus::InProgress, + } +} + +fn batch_status_to_str(s: &BatchStatus) -> &'static str { + match s { + BatchStatus::InProgress => "in_progress", + BatchStatus::CompletedWithErrors => "completed_with_errors", + BatchStatus::Completed => "completed", + BatchStatus::Cancelled => "cancelled", + } +} + +impl From for JobBatch { + fn from(r: BatchRow) -> Self { + Self { + batch_id: SystemId::from_uuid(r.batch_id), + batch_type: r.batch_type, + total_jobs: r.total_jobs as u32, + completed_count: r.completed_count as u32, + failed_count: r.failed_count as u32, + status: batch_status_from_str(&r.status), + } + } +} + +pub struct PostgresJobBatchRepository { + pool: PgPool, +} + +impl PostgresJobBatchRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl JobBatchRepository for PostgresJobBatchRepository { + async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { + let row = sqlx::query_as::<_, BatchRow>( + "SELECT batch_id, batch_type, total_jobs, completed_count, failed_count, status + FROM job_batches WHERE batch_id = $1", + ) + .bind(*id.as_uuid()) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(row.map(Into::into)) + } + + async fn save(&self, batch: &JobBatch) -> Result<(), DomainError> { + sqlx::query( + "INSERT INTO job_batches (batch_id, batch_type, total_jobs, completed_count, failed_count, status) + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT (batch_id) DO UPDATE SET + total_jobs = EXCLUDED.total_jobs, + completed_count = EXCLUDED.completed_count, + failed_count = EXCLUDED.failed_count, + status = EXCLUDED.status", + ) + .bind(*batch.batch_id.as_uuid()) + .bind(&batch.batch_type) + .bind(batch.total_jobs as i32) + .bind(batch.completed_count as i32) + .bind(batch.failed_count as i32) + .bind(batch_status_to_str(&batch.status)) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } +} + +// ────────────────────────────────────────────── +// Plugin +// ────────────────────────────────────────────── + +#[derive(sqlx::FromRow)] +struct PluginRow { + plugin_id: Uuid, + name: String, + plugin_type: String, + is_enabled: bool, + configuration: serde_json::Value, +} + +fn plugin_type_from_str(s: &str) -> PluginType { + match s { + "media_processor" => PluginType::MediaProcessor, + "scheduled_task" => PluginType::ScheduledTask, + "sidecar_writer" => PluginType::SidecarWriter, + _ => PluginType::MediaProcessor, + } +} + +fn plugin_type_to_str(t: &PluginType) -> &'static str { + match t { + PluginType::MediaProcessor => "media_processor", + PluginType::ScheduledTask => "scheduled_task", + PluginType::SidecarWriter => "sidecar_writer", + } +} + +impl From for Plugin { + fn from(r: PluginRow) -> Self { + Self { + plugin_id: SystemId::from_uuid(r.plugin_id), + name: r.name, + plugin_type: plugin_type_from_str(&r.plugin_type), + is_enabled: r.is_enabled, + configuration: structured_from_json(r.configuration), + } + } +} + +pub struct PostgresPluginRepository { + pool: PgPool, +} + +impl PostgresPluginRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl PluginRepository for PostgresPluginRepository { + async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { + let row = sqlx::query_as::<_, PluginRow>( + "SELECT plugin_id, name, plugin_type, is_enabled, configuration + FROM plugins WHERE plugin_id = $1", + ) + .bind(*id.as_uuid()) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(row.map(Into::into)) + } + + async fn find_enabled(&self) -> Result, DomainError> { + let rows = sqlx::query_as::<_, PluginRow>( + "SELECT plugin_id, name, plugin_type, is_enabled, configuration + FROM plugins WHERE is_enabled = true", + ) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(rows.into_iter().map(Into::into).collect()) + } + + async fn save(&self, plugin: &Plugin) -> Result<(), DomainError> { + sqlx::query( + "INSERT INTO plugins (plugin_id, name, plugin_type, is_enabled, configuration) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (plugin_id) DO UPDATE SET + name = EXCLUDED.name, + plugin_type = EXCLUDED.plugin_type, + is_enabled = EXCLUDED.is_enabled, + configuration = EXCLUDED.configuration", + ) + .bind(*plugin.plugin_id.as_uuid()) + .bind(&plugin.name) + .bind(plugin_type_to_str(&plugin.plugin_type)) + .bind(plugin.is_enabled) + .bind(structured_to_json(&plugin.configuration)) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } +} + +// ────────────────────────────────────────────── +// Pipeline +// ────────────────────────────────────────────── + +#[derive(sqlx::FromRow)] +struct PipelineRow { + pipeline_id: Uuid, + trigger_event: String, + steps: serde_json::Value, +} + +#[derive(serde::Serialize, serde::Deserialize)] +struct StepJson { + plugin_id: Uuid, + step_order: u32, + configuration: serde_json::Map, +} + +fn steps_from_json(v: serde_json::Value) -> Vec { + let arr: Vec = serde_json::from_value(v).unwrap_or_default(); + arr.into_iter() + .map(|s| { + let mut config = StructuredData::new(); + for (k, val) in s.configuration { + config.insert(k, MetadataValue::from(val)); + } + PipelineStep { + plugin_id: SystemId::from_uuid(s.plugin_id), + step_order: s.step_order, + configuration: config, + } + }) + .collect() +} + +fn steps_to_json(steps: &[PipelineStep]) -> serde_json::Value { + let arr: Vec = steps + .iter() + .map(|s| { + let config: serde_json::Map = s + .configuration + .inner() + .iter() + .map(|(k, v)| (k.clone(), serde_json::Value::from(v))) + .collect(); + StepJson { + plugin_id: *s.plugin_id.as_uuid(), + step_order: s.step_order, + configuration: config, + } + }) + .collect(); + serde_json::to_value(arr).unwrap_or(serde_json::Value::Array(vec![])) +} + +impl From for ProcessingPipeline { + fn from(r: PipelineRow) -> Self { + Self { + pipeline_id: SystemId::from_uuid(r.pipeline_id), + trigger_event: r.trigger_event, + steps: steps_from_json(r.steps), + } + } +} + +pub struct PostgresPipelineRepository { + pool: PgPool, +} + +impl PostgresPipelineRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl PipelineRepository for PostgresPipelineRepository { + async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { + let row = sqlx::query_as::<_, PipelineRow>( + "SELECT pipeline_id, trigger_event, steps + FROM processing_pipelines WHERE pipeline_id = $1", + ) + .bind(*id.as_uuid()) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(row.map(Into::into)) + } + + async fn find_by_trigger(&self, event: &str) -> Result, DomainError> { + let rows = sqlx::query_as::<_, PipelineRow>( + "SELECT pipeline_id, trigger_event, steps + FROM processing_pipelines WHERE trigger_event = $1", + ) + .bind(event) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(rows.into_iter().map(Into::into).collect()) + } + + async fn save(&self, pipeline: &ProcessingPipeline) -> Result<(), DomainError> { + sqlx::query( + "INSERT INTO processing_pipelines (pipeline_id, trigger_event, steps) + VALUES ($1, $2, $3) + ON CONFLICT (pipeline_id) DO UPDATE SET + trigger_event = EXCLUDED.trigger_event, + steps = EXCLUDED.steps", + ) + .bind(*pipeline.pipeline_id.as_uuid()) + .bind(&pipeline.trigger_event) + .bind(steps_to_json(&pipeline.steps)) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } +} diff --git a/crates/adapters/postgres/src/quota_repository.rs b/crates/adapters/postgres/src/quota_repository.rs deleted file mode 100644 index 8779330..0000000 --- a/crates/adapters/postgres/src/quota_repository.rs +++ /dev/null @@ -1,261 +0,0 @@ -use crate::db::PgPool; -use async_trait::async_trait; -use chrono::{DateTime, Utc}; -use domain::{ - entities::{QuotaDefinition, QuotaRule, TimePeriod, UsageLedgerEntry, UsageType}, - errors::DomainError, - ports::{QuotaRepository, UsageLedgerRepository}, - value_objects::{DateTimeStamp, SystemId}, -}; -use uuid::Uuid; - -// --- Enum mappings --- - -fn usage_type_from_str(s: &str) -> UsageType { - match s { - "storage_bytes" => UsageType::StorageBytes, - "process_jobs" => UsageType::ProcessJobs, - "api_calls" => UsageType::ApiCalls, - "indexing_size" => UsageType::IndexingSize, - _ => UsageType::StorageBytes, - } -} - -fn usage_type_to_str(t: &UsageType) -> &'static str { - match t { - UsageType::StorageBytes => "storage_bytes", - UsageType::ProcessJobs => "process_jobs", - UsageType::ApiCalls => "api_calls", - UsageType::IndexingSize => "indexing_size", - } -} - -fn time_period_from_str(s: &str) -> TimePeriod { - match s { - "daily" => TimePeriod::Daily, - "monthly" => TimePeriod::Monthly, - "lifetime" => TimePeriod::Lifetime, - _ => TimePeriod::Lifetime, - } -} - -fn time_period_to_str(p: &TimePeriod) -> &'static str { - match p { - TimePeriod::Daily => "daily", - TimePeriod::Monthly => "monthly", - TimePeriod::Lifetime => "lifetime", - } -} - -// --- Row structs --- - -#[derive(sqlx::FromRow)] -struct QuotaDefRow { - quota_id: Uuid, - owner_scope: Uuid, - is_enforced: bool, -} - -#[derive(sqlx::FromRow)] -#[allow(dead_code)] -struct QuotaRuleRow { - rule_id: Uuid, - quota_id: Uuid, - dimension: String, - limit_value: i64, - time_period: String, - is_unlimited: bool, -} - -#[derive(sqlx::FromRow)] -struct UsageLedgerRow { - entry_id: Uuid, - user_id: Uuid, - usage_type: String, - consumed_amount: i64, - timestamp: DateTime, - context: String, -} - -#[derive(sqlx::FromRow)] -struct SumRow { - total: i64, -} - -impl From for QuotaRule { - fn from(r: QuotaRuleRow) -> Self { - Self { - rule_id: SystemId::from_uuid(r.rule_id), - dimension: usage_type_from_str(&r.dimension), - limit_value: r.limit_value as u64, - time_period: time_period_from_str(&r.time_period), - is_unlimited: r.is_unlimited, - } - } -} - -impl From for UsageLedgerEntry { - fn from(r: UsageLedgerRow) -> Self { - Self { - entry_id: SystemId::from_uuid(r.entry_id), - user_id: SystemId::from_uuid(r.user_id), - usage_type: usage_type_from_str(&r.usage_type), - consumed_amount: r.consumed_amount as u64, - timestamp: DateTimeStamp::from_datetime(r.timestamp), - context: r.context, - } - } -} - -// --- PostgresQuotaRepository --- - -pub struct PostgresQuotaRepository { - pool: PgPool, -} - -impl PostgresQuotaRepository { - pub fn new(pool: PgPool) -> Self { - Self { pool } - } -} - -#[async_trait] -impl QuotaRepository for PostgresQuotaRepository { - async fn find_by_owner( - &self, - owner_id: &SystemId, - ) -> Result, DomainError> { - let def_row = sqlx::query_as::<_, QuotaDefRow>( - "SELECT quota_id, owner_scope, is_enforced FROM quota_definitions WHERE owner_scope = $1", - ) - .bind(*owner_id.as_uuid()) - .fetch_optional(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - - let Some(def) = def_row else { - return Ok(None); - }; - - let rule_rows = sqlx::query_as::<_, QuotaRuleRow>( - "SELECT rule_id, quota_id, dimension, limit_value, time_period, is_unlimited - FROM quota_rules WHERE quota_id = $1", - ) - .bind(def.quota_id) - .fetch_all(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - - Ok(Some(QuotaDefinition { - quota_id: SystemId::from_uuid(def.quota_id), - owner_scope: SystemId::from_uuid(def.owner_scope), - is_enforced: def.is_enforced, - rules: rule_rows.into_iter().map(Into::into).collect(), - })) - } - - async fn save(&self, quota: &QuotaDefinition) -> Result<(), DomainError> { - sqlx::query( - "INSERT INTO quota_definitions (quota_id, owner_scope, is_enforced) - VALUES ($1, $2, $3) - ON CONFLICT (quota_id) DO UPDATE SET - owner_scope = EXCLUDED.owner_scope, - is_enforced = EXCLUDED.is_enforced", - ) - .bind(*quota.quota_id.as_uuid()) - .bind(*quota.owner_scope.as_uuid()) - .bind(quota.is_enforced) - .execute(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - - // Delete old rules then re-insert - sqlx::query("DELETE FROM quota_rules WHERE quota_id = $1") - .bind(*quota.quota_id.as_uuid()) - .execute(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - - for rule in "a.rules { - sqlx::query( - "INSERT INTO quota_rules (rule_id, quota_id, dimension, limit_value, time_period, is_unlimited) - VALUES ($1, $2, $3, $4, $5, $6)", - ) - .bind(*rule.rule_id.as_uuid()) - .bind(*quota.quota_id.as_uuid()) - .bind(usage_type_to_str(&rule.dimension)) - .bind(rule.limit_value as i64) - .bind(time_period_to_str(&rule.time_period)) - .bind(rule.is_unlimited) - .execute(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - } - - Ok(()) - } - - async fn delete(&self, id: &SystemId) -> Result<(), DomainError> { - // Rules cascade-delete - sqlx::query("DELETE FROM quota_definitions WHERE quota_id = $1") - .bind(*id.as_uuid()) - .execute(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - Ok(()) - } -} - -// --- PostgresUsageLedgerRepository --- - -pub struct PostgresUsageLedgerRepository { - pool: PgPool, -} - -impl PostgresUsageLedgerRepository { - pub fn new(pool: PgPool) -> Self { - Self { pool } - } -} - -#[async_trait] -impl UsageLedgerRepository for PostgresUsageLedgerRepository { - async fn record(&self, entry: &UsageLedgerEntry) -> Result<(), DomainError> { - sqlx::query( - "INSERT INTO usage_ledger (entry_id, user_id, usage_type, consumed_amount, timestamp, context) - VALUES ($1, $2, $3, $4, $5, $6)", - ) - .bind(*entry.entry_id.as_uuid()) - .bind(*entry.user_id.as_uuid()) - .bind(usage_type_to_str(&entry.usage_type)) - .bind(entry.consumed_amount as i64) - .bind(entry.timestamp.as_datetime()) - .bind(&entry.context) - .execute(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - Ok(()) - } - - async fn sum_usage( - &self, - user_id: &SystemId, - usage_type: UsageType, - since: Option, - ) -> Result { - let since_dt: Option> = since.map(|s| *s.as_datetime()); - let row = sqlx::query_as::<_, SumRow>( - "SELECT COALESCE(SUM(consumed_amount), 0) as total - FROM usage_ledger - WHERE user_id = $1 AND usage_type = $2 AND ($3::timestamptz IS NULL OR timestamp >= $3)", - ) - .bind(*user_id.as_uuid()) - .bind(usage_type_to_str(&usage_type)) - .bind(since_dt) - .fetch_one(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - - Ok(row.total as u64) - } -} diff --git a/crates/adapters/postgres/src/share_repository.rs b/crates/adapters/postgres/src/sharing/mod.rs similarity index 81% rename from crates/adapters/postgres/src/share_repository.rs rename to crates/adapters/postgres/src/sharing/mod.rs index cae6cfc..eee9f75 100644 --- a/crates/adapters/postgres/src/share_repository.rs +++ b/crates/adapters/postgres/src/sharing/mod.rs @@ -4,15 +4,17 @@ use chrono::{DateTime, Utc}; use domain::{ entities::{ InviteCode, LinkAccessLevel, ScopeType, ShareLink, ShareScope, ShareTarget, ShareableType, - TargetType, + TargetType, VisibilityFilter, }, errors::DomainError, - ports::ShareRepository, + ports::{ShareRepository, VisibilityFilterRepository}, value_objects::{DateTimeStamp, SystemId}, }; use uuid::Uuid; -// --- String constants for DB enum mapping --- +// ────────────────────────────────────────────── +// Share +// ────────────────────────────────────────────── const SCOPE_PRIVATE: &str = "private"; const SCOPE_USER: &str = "user"; @@ -31,8 +33,6 @@ const TARGET_GROUP: &str = "group"; const ACCESS_VIEW_ONLY: &str = "view_only"; const ACCESS_LIMITED_SEARCH: &str = "limited_search"; -// --- Row structs --- - #[derive(sqlx::FromRow)] struct ShareScopeRow { scope_id: Uuid, @@ -74,8 +74,6 @@ struct InviteCodeRow { assigned_role_id: Uuid, } -// --- Enum conversions --- - fn scope_type_to_str(t: ScopeType) -> &'static str { match t { ScopeType::Private => SCOPE_PRIVATE, @@ -148,8 +146,6 @@ fn access_level_from_str(s: &str) -> Result { } } -// --- Row → Domain conversions --- - impl TryFrom for ShareScope { type Error = DomainError; @@ -211,8 +207,6 @@ impl TryFrom for InviteCode { } } -// --- Repository --- - pub struct PostgresShareRepository { pool: PgPool, } @@ -415,3 +409,83 @@ impl ShareRepository for PostgresShareRepository { row.map(TryInto::try_into).transpose() } } + +// ────────────────────────────────────────────── +// VisibilityFilter +// ────────────────────────────────────────────── + +#[derive(sqlx::FromRow)] +struct VisibilityFilterRow { + filter_id: Uuid, + scope_id: Uuid, + role_id: Uuid, + hidden_fields: Vec, +} + +impl From for VisibilityFilter { + fn from(r: VisibilityFilterRow) -> Self { + Self { + filter_id: SystemId::from_uuid(r.filter_id), + scope_id: SystemId::from_uuid(r.scope_id), + role_id: SystemId::from_uuid(r.role_id), + hidden_fields: r.hidden_fields, + } + } +} + +pub struct PostgresVisibilityFilterRepository { + pool: PgPool, +} + +impl PostgresVisibilityFilterRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl VisibilityFilterRepository for PostgresVisibilityFilterRepository { + async fn find_by_scope_and_role( + &self, + scope_id: &SystemId, + role_id: &SystemId, + ) -> Result, DomainError> { + let row = sqlx::query_as::<_, VisibilityFilterRow>( + "SELECT filter_id, scope_id, role_id, hidden_fields + FROM visibility_filters WHERE scope_id = $1 AND role_id = $2", + ) + .bind(*scope_id.as_uuid()) + .bind(*role_id.as_uuid()) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(row.map(Into::into)) + } + + async fn save(&self, filter: &VisibilityFilter) -> Result<(), DomainError> { + sqlx::query( + "INSERT INTO visibility_filters (filter_id, scope_id, role_id, hidden_fields) + VALUES ($1, $2, $3, $4) + ON CONFLICT (filter_id) DO UPDATE SET + hidden_fields = EXCLUDED.hidden_fields", + ) + .bind(*filter.filter_id.as_uuid()) + .bind(*filter.scope_id.as_uuid()) + .bind(*filter.role_id.as_uuid()) + .bind(&filter.hidden_fields) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } + + async fn delete(&self, id: &SystemId) -> Result<(), DomainError> { + sqlx::query("DELETE FROM visibility_filters WHERE filter_id = $1") + .bind(*id.as_uuid()) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } +} diff --git a/crates/adapters/postgres/src/sidecar_repository.rs b/crates/adapters/postgres/src/sidecar/mod.rs similarity index 100% rename from crates/adapters/postgres/src/sidecar_repository.rs rename to crates/adapters/postgres/src/sidecar/mod.rs diff --git a/crates/adapters/postgres/src/storage/mod.rs b/crates/adapters/postgres/src/storage/mod.rs new file mode 100644 index 0000000..0dd2cb5 --- /dev/null +++ b/crates/adapters/postgres/src/storage/mod.rs @@ -0,0 +1,607 @@ +use crate::db::PgPool; +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use domain::{ + entities::{ + IngestSession, IngestStatus, LibraryPath, OwnershipPolicy, QuotaDefinition, QuotaRule, + StorageVolume, TimePeriod, UsageLedgerEntry, UsageType, + }, + errors::DomainError, + ports::{ + IngestSessionRepository, LibraryPathRepository, QuotaRepository, StorageVolumeRepository, + UsageLedgerRepository, + }, + value_objects::{Checksum, DateTimeStamp, SystemId}, +}; +use uuid::Uuid; + +// ────────────────────────────────────────────── +// StorageVolume +// ────────────────────────────────────────────── + +#[derive(sqlx::FromRow)] +struct StorageVolumeRow { + volume_id: Uuid, + volume_name: String, + uri_prefix: String, + is_writable: bool, + available_bytes: i64, +} + +impl From for StorageVolume { + fn from(r: StorageVolumeRow) -> Self { + Self { + volume_id: SystemId::from_uuid(r.volume_id), + volume_name: r.volume_name, + uri_prefix: r.uri_prefix, + is_writable: r.is_writable, + available_bytes: r.available_bytes as u64, + } + } +} + +pub struct PostgresStorageVolumeRepository { + pool: PgPool, +} + +impl PostgresStorageVolumeRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl StorageVolumeRepository for PostgresStorageVolumeRepository { + async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { + let row = sqlx::query_as::<_, StorageVolumeRow>( + "SELECT volume_id, volume_name, uri_prefix, is_writable, available_bytes + FROM storage_volumes WHERE volume_id = $1", + ) + .bind(*id.as_uuid()) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(row.map(Into::into)) + } + + async fn find_all(&self) -> Result, DomainError> { + let rows = sqlx::query_as::<_, StorageVolumeRow>( + "SELECT volume_id, volume_name, uri_prefix, is_writable, available_bytes + FROM storage_volumes", + ) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(rows.into_iter().map(Into::into).collect()) + } + + async fn save(&self, volume: &StorageVolume) -> Result<(), DomainError> { + sqlx::query( + "INSERT INTO storage_volumes (volume_id, volume_name, uri_prefix, is_writable, available_bytes) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (volume_id) DO UPDATE SET + volume_name = EXCLUDED.volume_name, + uri_prefix = EXCLUDED.uri_prefix, + is_writable = EXCLUDED.is_writable, + available_bytes = EXCLUDED.available_bytes", + ) + .bind(*volume.volume_id.as_uuid()) + .bind(&volume.volume_name) + .bind(&volume.uri_prefix) + .bind(volume.is_writable) + .bind(volume.available_bytes as i64) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } + + async fn delete(&self, id: &SystemId) -> Result<(), DomainError> { + sqlx::query("DELETE FROM storage_volumes WHERE volume_id = $1") + .bind(*id.as_uuid()) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } +} + +// ────────────────────────────────────────────── +// LibraryPath +// ────────────────────────────────────────────── + +#[derive(sqlx::FromRow)] +struct LibraryPathRow { + path_id: Uuid, + volume_id: Uuid, + relative_path: String, + is_ingest_destination: bool, + ownership_policy: String, + designated_owner_id: Option, +} + +fn policy_from_str(s: &str) -> OwnershipPolicy { + match s { + "user_owned" => OwnershipPolicy::UserOwned, + "group_owned" => OwnershipPolicy::GroupOwned, + _ => OwnershipPolicy::Unassigned, + } +} + +fn policy_to_str(p: &OwnershipPolicy) -> &'static str { + match p { + OwnershipPolicy::UserOwned => "user_owned", + OwnershipPolicy::GroupOwned => "group_owned", + OwnershipPolicy::Unassigned => "unassigned", + } +} + +impl From for LibraryPath { + fn from(r: LibraryPathRow) -> Self { + Self { + path_id: SystemId::from_uuid(r.path_id), + volume_id: SystemId::from_uuid(r.volume_id), + relative_path: r.relative_path, + is_ingest_destination: r.is_ingest_destination, + ownership_policy: policy_from_str(&r.ownership_policy), + designated_owner_id: r.designated_owner_id.map(SystemId::from_uuid), + } + } +} + +pub struct PostgresLibraryPathRepository { + pool: PgPool, +} + +impl PostgresLibraryPathRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl LibraryPathRepository for PostgresLibraryPathRepository { + async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { + let row = sqlx::query_as::<_, LibraryPathRow>( + "SELECT path_id, volume_id, relative_path, is_ingest_destination, ownership_policy, designated_owner_id + FROM library_paths WHERE path_id = $1", + ) + .bind(*id.as_uuid()) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(row.map(Into::into)) + } + + async fn find_by_volume(&self, volume_id: &SystemId) -> Result, DomainError> { + let rows = sqlx::query_as::<_, LibraryPathRow>( + "SELECT path_id, volume_id, relative_path, is_ingest_destination, ownership_policy, designated_owner_id + FROM library_paths WHERE volume_id = $1", + ) + .bind(*volume_id.as_uuid()) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(rows.into_iter().map(Into::into).collect()) + } + + async fn find_ingest_destinations( + &self, + owner_id: &SystemId, + ) -> Result, DomainError> { + let rows = sqlx::query_as::<_, LibraryPathRow>( + "SELECT path_id, volume_id, relative_path, is_ingest_destination, ownership_policy, designated_owner_id + FROM library_paths + WHERE is_ingest_destination = true AND designated_owner_id = $1", + ) + .bind(*owner_id.as_uuid()) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(rows.into_iter().map(Into::into).collect()) + } + + async fn save(&self, path: &LibraryPath) -> Result<(), DomainError> { + sqlx::query( + "INSERT INTO library_paths (path_id, volume_id, relative_path, is_ingest_destination, ownership_policy, designated_owner_id) + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT (path_id) DO UPDATE SET + volume_id = EXCLUDED.volume_id, + relative_path = EXCLUDED.relative_path, + is_ingest_destination = EXCLUDED.is_ingest_destination, + ownership_policy = EXCLUDED.ownership_policy, + designated_owner_id = EXCLUDED.designated_owner_id", + ) + .bind(*path.path_id.as_uuid()) + .bind(*path.volume_id.as_uuid()) + .bind(&path.relative_path) + .bind(path.is_ingest_destination) + .bind(policy_to_str(&path.ownership_policy)) + .bind(path.designated_owner_id.as_ref().map(|id| *id.as_uuid())) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } + + async fn delete(&self, id: &SystemId) -> Result<(), DomainError> { + sqlx::query("DELETE FROM library_paths WHERE path_id = $1") + .bind(*id.as_uuid()) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } +} + +// ────────────────────────────────────────────── +// IngestSession +// ────────────────────────────────────────────── + +#[derive(sqlx::FromRow)] +struct IngestSessionRow { + session_id: Uuid, + uploader_user_id: Uuid, + client_device_id: String, + original_filename: String, + client_checksum: String, + target_library_path_id: Uuid, + status: String, + created_at: DateTime, + error_message: Option, +} + +fn ingest_status_from_str(s: &str) -> IngestStatus { + match s { + "uploading" => IngestStatus::Uploading, + "awaiting_processing" => IngestStatus::AwaitingProcessing, + "processing" => IngestStatus::Processing, + "completed" => IngestStatus::Completed, + "failed" => IngestStatus::Failed, + _ => IngestStatus::Uploading, + } +} + +fn ingest_status_to_str(s: &IngestStatus) -> &'static str { + match s { + IngestStatus::Uploading => "uploading", + IngestStatus::AwaitingProcessing => "awaiting_processing", + IngestStatus::Processing => "processing", + IngestStatus::Completed => "completed", + IngestStatus::Failed => "failed", + } +} + +impl TryFrom for IngestSession { + type Error = DomainError; + fn try_from(r: IngestSessionRow) -> Result { + Ok(Self { + session_id: SystemId::from_uuid(r.session_id), + uploader_user_id: SystemId::from_uuid(r.uploader_user_id), + client_device_id: r.client_device_id, + original_filename: r.original_filename, + client_checksum: Checksum::new(r.client_checksum)?, + target_library_path_id: SystemId::from_uuid(r.target_library_path_id), + status: ingest_status_from_str(&r.status), + created_at: DateTimeStamp::from_datetime(r.created_at), + error_message: r.error_message, + }) + } +} + +pub struct PostgresIngestSessionRepository { + pool: PgPool, +} + +impl PostgresIngestSessionRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl IngestSessionRepository for PostgresIngestSessionRepository { + async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { + let row = sqlx::query_as::<_, IngestSessionRow>( + "SELECT session_id, uploader_user_id, client_device_id, original_filename, + client_checksum, target_library_path_id, status, created_at, error_message + FROM ingest_sessions WHERE session_id = $1", + ) + .bind(*id.as_uuid()) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + row.map(TryInto::try_into).transpose() + } + + async fn find_by_user(&self, user_id: &SystemId) -> Result, DomainError> { + let rows = sqlx::query_as::<_, IngestSessionRow>( + "SELECT session_id, uploader_user_id, client_device_id, original_filename, + client_checksum, target_library_path_id, status, created_at, error_message + FROM ingest_sessions WHERE uploader_user_id = $1", + ) + .bind(*user_id.as_uuid()) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + rows.into_iter().map(TryInto::try_into).collect() + } + + async fn save(&self, session: &IngestSession) -> Result<(), DomainError> { + sqlx::query( + "INSERT INTO ingest_sessions (session_id, uploader_user_id, client_device_id, original_filename, + client_checksum, target_library_path_id, status, created_at, error_message) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + ON CONFLICT (session_id) DO UPDATE SET + status = EXCLUDED.status, + error_message = EXCLUDED.error_message", + ) + .bind(*session.session_id.as_uuid()) + .bind(*session.uploader_user_id.as_uuid()) + .bind(&session.client_device_id) + .bind(&session.original_filename) + .bind(session.client_checksum.as_str()) + .bind(*session.target_library_path_id.as_uuid()) + .bind(ingest_status_to_str(&session.status)) + .bind(session.created_at.as_datetime()) + .bind(session.error_message.as_deref()) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } +} + +// ────────────────────────────────────────────── +// Quota + UsageLedger +// ────────────────────────────────────────────── + +fn usage_type_from_str(s: &str) -> UsageType { + match s { + "storage_bytes" => UsageType::StorageBytes, + "process_jobs" => UsageType::ProcessJobs, + "api_calls" => UsageType::ApiCalls, + "indexing_size" => UsageType::IndexingSize, + _ => UsageType::StorageBytes, + } +} + +fn usage_type_to_str(t: &UsageType) -> &'static str { + match t { + UsageType::StorageBytes => "storage_bytes", + UsageType::ProcessJobs => "process_jobs", + UsageType::ApiCalls => "api_calls", + UsageType::IndexingSize => "indexing_size", + } +} + +fn time_period_from_str(s: &str) -> TimePeriod { + match s { + "daily" => TimePeriod::Daily, + "monthly" => TimePeriod::Monthly, + "lifetime" => TimePeriod::Lifetime, + _ => TimePeriod::Lifetime, + } +} + +fn time_period_to_str(p: &TimePeriod) -> &'static str { + match p { + TimePeriod::Daily => "daily", + TimePeriod::Monthly => "monthly", + TimePeriod::Lifetime => "lifetime", + } +} + +#[derive(sqlx::FromRow)] +struct QuotaDefRow { + quota_id: Uuid, + owner_scope: Uuid, + is_enforced: bool, +} + +#[derive(sqlx::FromRow)] +#[allow(dead_code)] +struct QuotaRuleRow { + rule_id: Uuid, + quota_id: Uuid, + dimension: String, + limit_value: i64, + time_period: String, + is_unlimited: bool, +} + +#[derive(sqlx::FromRow)] +struct UsageLedgerRow { + entry_id: Uuid, + user_id: Uuid, + usage_type: String, + consumed_amount: i64, + timestamp: DateTime, + context: String, +} + +#[derive(sqlx::FromRow)] +struct SumRow { + total: i64, +} + +impl From for QuotaRule { + fn from(r: QuotaRuleRow) -> Self { + Self { + rule_id: SystemId::from_uuid(r.rule_id), + dimension: usage_type_from_str(&r.dimension), + limit_value: r.limit_value as u64, + time_period: time_period_from_str(&r.time_period), + is_unlimited: r.is_unlimited, + } + } +} + +impl From for UsageLedgerEntry { + fn from(r: UsageLedgerRow) -> Self { + Self { + entry_id: SystemId::from_uuid(r.entry_id), + user_id: SystemId::from_uuid(r.user_id), + usage_type: usage_type_from_str(&r.usage_type), + consumed_amount: r.consumed_amount as u64, + timestamp: DateTimeStamp::from_datetime(r.timestamp), + context: r.context, + } + } +} + +pub struct PostgresQuotaRepository { + pool: PgPool, +} + +impl PostgresQuotaRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl QuotaRepository for PostgresQuotaRepository { + async fn find_by_owner( + &self, + owner_id: &SystemId, + ) -> Result, DomainError> { + let def_row = sqlx::query_as::<_, QuotaDefRow>( + "SELECT quota_id, owner_scope, is_enforced FROM quota_definitions WHERE owner_scope = $1", + ) + .bind(*owner_id.as_uuid()) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + let Some(def) = def_row else { + return Ok(None); + }; + + let rule_rows = sqlx::query_as::<_, QuotaRuleRow>( + "SELECT rule_id, quota_id, dimension, limit_value, time_period, is_unlimited + FROM quota_rules WHERE quota_id = $1", + ) + .bind(def.quota_id) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(Some(QuotaDefinition { + quota_id: SystemId::from_uuid(def.quota_id), + owner_scope: SystemId::from_uuid(def.owner_scope), + is_enforced: def.is_enforced, + rules: rule_rows.into_iter().map(Into::into).collect(), + })) + } + + async fn save(&self, quota: &QuotaDefinition) -> Result<(), DomainError> { + sqlx::query( + "INSERT INTO quota_definitions (quota_id, owner_scope, is_enforced) + VALUES ($1, $2, $3) + ON CONFLICT (quota_id) DO UPDATE SET + owner_scope = EXCLUDED.owner_scope, + is_enforced = EXCLUDED.is_enforced", + ) + .bind(*quota.quota_id.as_uuid()) + .bind(*quota.owner_scope.as_uuid()) + .bind(quota.is_enforced) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + // Delete old rules then re-insert + sqlx::query("DELETE FROM quota_rules WHERE quota_id = $1") + .bind(*quota.quota_id.as_uuid()) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + for rule in "a.rules { + sqlx::query( + "INSERT INTO quota_rules (rule_id, quota_id, dimension, limit_value, time_period, is_unlimited) + VALUES ($1, $2, $3, $4, $5, $6)", + ) + .bind(*rule.rule_id.as_uuid()) + .bind(*quota.quota_id.as_uuid()) + .bind(usage_type_to_str(&rule.dimension)) + .bind(rule.limit_value as i64) + .bind(time_period_to_str(&rule.time_period)) + .bind(rule.is_unlimited) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + } + + Ok(()) + } + + async fn delete(&self, id: &SystemId) -> Result<(), DomainError> { + // Rules cascade-delete + sqlx::query("DELETE FROM quota_definitions WHERE quota_id = $1") + .bind(*id.as_uuid()) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } +} + +pub struct PostgresUsageLedgerRepository { + pool: PgPool, +} + +impl PostgresUsageLedgerRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl UsageLedgerRepository for PostgresUsageLedgerRepository { + async fn record(&self, entry: &UsageLedgerEntry) -> Result<(), DomainError> { + sqlx::query( + "INSERT INTO usage_ledger (entry_id, user_id, usage_type, consumed_amount, timestamp, context) + VALUES ($1, $2, $3, $4, $5, $6)", + ) + .bind(*entry.entry_id.as_uuid()) + .bind(*entry.user_id.as_uuid()) + .bind(usage_type_to_str(&entry.usage_type)) + .bind(entry.consumed_amount as i64) + .bind(entry.timestamp.as_datetime()) + .bind(&entry.context) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } + + async fn sum_usage( + &self, + user_id: &SystemId, + usage_type: UsageType, + since: Option, + ) -> Result { + let since_dt: Option> = since.map(|s| *s.as_datetime()); + let row = sqlx::query_as::<_, SumRow>( + "SELECT COALESCE(SUM(consumed_amount), 0) as total + FROM usage_ledger + WHERE user_id = $1 AND usage_type = $2 AND ($3::timestamptz IS NULL OR timestamp >= $3)", + ) + .bind(*user_id.as_uuid()) + .bind(usage_type_to_str(&usage_type)) + .bind(since_dt) + .fetch_one(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(row.total as u64) + } +} diff --git a/crates/adapters/postgres/src/storage_volume_repository.rs b/crates/adapters/postgres/src/storage_volume_repository.rs deleted file mode 100644 index a42df8a..0000000 --- a/crates/adapters/postgres/src/storage_volume_repository.rs +++ /dev/null @@ -1,96 +0,0 @@ -use crate::db::PgPool; -use async_trait::async_trait; -use domain::{ - entities::StorageVolume, errors::DomainError, ports::StorageVolumeRepository, - value_objects::SystemId, -}; -use uuid::Uuid; - -#[derive(sqlx::FromRow)] -struct StorageVolumeRow { - volume_id: Uuid, - volume_name: String, - uri_prefix: String, - is_writable: bool, - available_bytes: i64, -} - -impl From for StorageVolume { - fn from(r: StorageVolumeRow) -> Self { - Self { - volume_id: SystemId::from_uuid(r.volume_id), - volume_name: r.volume_name, - uri_prefix: r.uri_prefix, - is_writable: r.is_writable, - available_bytes: r.available_bytes as u64, - } - } -} - -pub struct PostgresStorageVolumeRepository { - pool: PgPool, -} - -impl PostgresStorageVolumeRepository { - pub fn new(pool: PgPool) -> Self { - Self { pool } - } -} - -#[async_trait] -impl StorageVolumeRepository for PostgresStorageVolumeRepository { - async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { - let row = sqlx::query_as::<_, StorageVolumeRow>( - "SELECT volume_id, volume_name, uri_prefix, is_writable, available_bytes - FROM storage_volumes WHERE volume_id = $1", - ) - .bind(*id.as_uuid()) - .fetch_optional(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - - Ok(row.map(Into::into)) - } - - async fn find_all(&self) -> Result, DomainError> { - let rows = sqlx::query_as::<_, StorageVolumeRow>( - "SELECT volume_id, volume_name, uri_prefix, is_writable, available_bytes - FROM storage_volumes", - ) - .fetch_all(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - - Ok(rows.into_iter().map(Into::into).collect()) - } - - async fn save(&self, volume: &StorageVolume) -> Result<(), DomainError> { - sqlx::query( - "INSERT INTO storage_volumes (volume_id, volume_name, uri_prefix, is_writable, available_bytes) - VALUES ($1, $2, $3, $4, $5) - ON CONFLICT (volume_id) DO UPDATE SET - volume_name = EXCLUDED.volume_name, - uri_prefix = EXCLUDED.uri_prefix, - is_writable = EXCLUDED.is_writable, - available_bytes = EXCLUDED.available_bytes", - ) - .bind(*volume.volume_id.as_uuid()) - .bind(&volume.volume_name) - .bind(&volume.uri_prefix) - .bind(volume.is_writable) - .bind(volume.available_bytes as i64) - .execute(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - Ok(()) - } - - async fn delete(&self, id: &SystemId) -> Result<(), DomainError> { - sqlx::query("DELETE FROM storage_volumes WHERE volume_id = $1") - .bind(*id.as_uuid()) - .execute(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - Ok(()) - } -} diff --git a/crates/adapters/postgres/src/tag_repository.rs b/crates/adapters/postgres/src/tag_repository.rs deleted file mode 100644 index e345073..0000000 --- a/crates/adapters/postgres/src/tag_repository.rs +++ /dev/null @@ -1,174 +0,0 @@ -use crate::db::PgPool; -use async_trait::async_trait; -use domain::{ - entities::{AssetTag, Tag, TagSource}, - errors::DomainError, - ports::TagRepository, - value_objects::SystemId, -}; -use uuid::Uuid; - -#[derive(sqlx::FromRow)] -struct TagRow { - tag_id: Uuid, - name: String, - tag_source: String, -} - -#[derive(sqlx::FromRow)] -struct AssetTagRow { - asset_id: Uuid, - tag_id: Uuid, - tagged_by_user_id: Option, - confidence: f64, -} - -fn tag_source_from_str(s: &str) -> TagSource { - match s { - "ai_generated" => TagSource::AiGenerated, - "exif_extracted" => TagSource::ExifExtracted, - _ => TagSource::UserManual, - } -} - -fn tag_source_to_str(s: &TagSource) -> &'static str { - match s { - TagSource::UserManual => "user_manual", - TagSource::AiGenerated => "ai_generated", - TagSource::ExifExtracted => "exif_extracted", - } -} - -impl From for Tag { - fn from(r: TagRow) -> Self { - Self { - tag_id: SystemId::from_uuid(r.tag_id), - name: r.name, - tag_source: tag_source_from_str(&r.tag_source), - } - } -} - -impl From for AssetTag { - fn from(r: AssetTagRow) -> Self { - Self { - asset_id: SystemId::from_uuid(r.asset_id), - tag_id: SystemId::from_uuid(r.tag_id), - tagged_by_user_id: r.tagged_by_user_id.map(SystemId::from_uuid), - confidence: r.confidence, - } - } -} - -pub struct PostgresTagRepository { - pool: PgPool, -} - -impl PostgresTagRepository { - pub fn new(pool: PgPool) -> Self { - Self { pool } - } -} - -#[async_trait] -impl TagRepository for PostgresTagRepository { - async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { - let row = sqlx::query_as::<_, TagRow>( - "SELECT tag_id, name, tag_source FROM tags WHERE tag_id = $1", - ) - .bind(*id.as_uuid()) - .fetch_optional(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - - Ok(row.map(Into::into)) - } - - async fn find_by_name(&self, name: &str) -> Result, DomainError> { - let row = sqlx::query_as::<_, TagRow>( - "SELECT tag_id, name, tag_source FROM tags WHERE name = $1", - ) - .bind(name) - .fetch_optional(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - - Ok(row.map(Into::into)) - } - - async fn find_tags_for_asset( - &self, - asset_id: &SystemId, - ) -> Result, DomainError> { - let rows = sqlx::query_as::<_, TagRow>( - "SELECT t.tag_id, t.name, t.tag_source - FROM tags t JOIN asset_tags at ON t.tag_id = at.tag_id - WHERE at.asset_id = $1", - ) - .bind(*asset_id.as_uuid()) - .fetch_all(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - - let at_rows = sqlx::query_as::<_, AssetTagRow>( - "SELECT asset_id, tag_id, tagged_by_user_id, confidence - FROM asset_tags WHERE asset_id = $1", - ) - .bind(*asset_id.as_uuid()) - .fetch_all(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - - let tags: Vec = rows.into_iter().map(Into::into).collect(); - let asset_tags: Vec = at_rows.into_iter().map(Into::into).collect(); - - Ok(tags.into_iter().zip(asset_tags).collect()) - } - - async fn save_tag(&self, tag: &Tag) -> Result<(), DomainError> { - sqlx::query( - "INSERT INTO tags (tag_id, name, tag_source) - VALUES ($1, $2, $3) - ON CONFLICT (tag_id) DO UPDATE SET name = EXCLUDED.name, tag_source = EXCLUDED.tag_source", - ) - .bind(*tag.tag_id.as_uuid()) - .bind(&tag.name) - .bind(tag_source_to_str(&tag.tag_source)) - .execute(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - Ok(()) - } - - async fn save_asset_tag(&self, asset_tag: &AssetTag) -> Result<(), DomainError> { - sqlx::query( - "INSERT INTO asset_tags (asset_id, tag_id, tagged_by_user_id, confidence) - VALUES ($1, $2, $3, $4) - ON CONFLICT (asset_id, tag_id) DO UPDATE SET - tagged_by_user_id = EXCLUDED.tagged_by_user_id, - confidence = EXCLUDED.confidence", - ) - .bind(*asset_tag.asset_id.as_uuid()) - .bind(*asset_tag.tag_id.as_uuid()) - .bind(asset_tag.tagged_by_user_id.as_ref().map(|id| *id.as_uuid())) - .bind(asset_tag.confidence) - .execute(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - Ok(()) - } - - async fn remove_asset_tag( - &self, - asset_id: &SystemId, - tag_id: &SystemId, - ) -> Result<(), DomainError> { - sqlx::query("DELETE FROM asset_tags WHERE asset_id = $1 AND tag_id = $2") - .bind(*asset_id.as_uuid()) - .bind(*tag_id.as_uuid()) - .execute(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - Ok(()) - } -} diff --git a/crates/adapters/postgres/src/visibility_filter_repository.rs b/crates/adapters/postgres/src/visibility_filter_repository.rs deleted file mode 100644 index eceb1eb..0000000 --- a/crates/adapters/postgres/src/visibility_filter_repository.rs +++ /dev/null @@ -1,83 +0,0 @@ -use crate::db::PgPool; -use async_trait::async_trait; -use domain::{ - entities::VisibilityFilter, errors::DomainError, ports::VisibilityFilterRepository, - value_objects::SystemId, -}; -use uuid::Uuid; - -#[derive(sqlx::FromRow)] -struct VisibilityFilterRow { - filter_id: Uuid, - scope_id: Uuid, - role_id: Uuid, - hidden_fields: Vec, -} - -impl From for VisibilityFilter { - fn from(r: VisibilityFilterRow) -> Self { - Self { - filter_id: SystemId::from_uuid(r.filter_id), - scope_id: SystemId::from_uuid(r.scope_id), - role_id: SystemId::from_uuid(r.role_id), - hidden_fields: r.hidden_fields, - } - } -} - -pub struct PostgresVisibilityFilterRepository { - pool: PgPool, -} - -impl PostgresVisibilityFilterRepository { - pub fn new(pool: PgPool) -> Self { - Self { pool } - } -} - -#[async_trait] -impl VisibilityFilterRepository for PostgresVisibilityFilterRepository { - async fn find_by_scope_and_role( - &self, - scope_id: &SystemId, - role_id: &SystemId, - ) -> Result, DomainError> { - let row = sqlx::query_as::<_, VisibilityFilterRow>( - "SELECT filter_id, scope_id, role_id, hidden_fields - FROM visibility_filters WHERE scope_id = $1 AND role_id = $2", - ) - .bind(*scope_id.as_uuid()) - .bind(*role_id.as_uuid()) - .fetch_optional(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - - Ok(row.map(Into::into)) - } - - async fn save(&self, filter: &VisibilityFilter) -> Result<(), DomainError> { - sqlx::query( - "INSERT INTO visibility_filters (filter_id, scope_id, role_id, hidden_fields) - VALUES ($1, $2, $3, $4) - ON CONFLICT (filter_id) DO UPDATE SET - hidden_fields = EXCLUDED.hidden_fields", - ) - .bind(*filter.filter_id.as_uuid()) - .bind(*filter.scope_id.as_uuid()) - .bind(*filter.role_id.as_uuid()) - .bind(&filter.hidden_fields) - .execute(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - Ok(()) - } - - async fn delete(&self, id: &SystemId) -> Result<(), DomainError> { - sqlx::query("DELETE FROM visibility_filters WHERE filter_id = $1") - .bind(*id.as_uuid()) - .execute(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - Ok(()) - } -}