use crate::db::PgPool; use async_trait::async_trait; use chrono::{DateTime, Utc}; use domain::{ entities::{ IngestSession, IngestStatus, LibraryPath, OwnershipPolicy, QuotaDefinition, QuotaRule, StorageVolume, TimePeriod, UsageLedgerEntry, UsageType, }, errors::DomainError, ports::{ IngestSessionRepository, LibraryPathRepository, QuotaRepository, StorageVolumeRepository, UsageLedgerRepository, }, value_objects::{Checksum, DateTimeStamp, SystemId}, }; use uuid::Uuid; // ────────────────────────────────────────────── // StorageVolume // ────────────────────────────────────────────── #[derive(sqlx::FromRow)] struct StorageVolumeRow { volume_id: Uuid, volume_name: String, uri_prefix: String, is_writable: bool, available_bytes: i64, } impl From for StorageVolume { fn from(r: StorageVolumeRow) -> Self { Self { volume_id: SystemId::from_uuid(r.volume_id), volume_name: r.volume_name, uri_prefix: r.uri_prefix, is_writable: r.is_writable, available_bytes: r.available_bytes as u64, } } } pub struct PostgresStorageVolumeRepository { pool: PgPool, } impl PostgresStorageVolumeRepository { pub fn new(pool: PgPool) -> Self { Self { pool } } } #[async_trait] impl StorageVolumeRepository for PostgresStorageVolumeRepository { async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { let row = sqlx::query_as::<_, StorageVolumeRow>( "SELECT volume_id, volume_name, uri_prefix, is_writable, available_bytes FROM storage_volumes WHERE volume_id = $1", ) .bind(*id.as_uuid()) .fetch_optional(&self.pool) .await .map_err(|e| DomainError::Internal(e.to_string()))?; Ok(row.map(Into::into)) } async fn find_all(&self) -> Result, DomainError> { let rows = sqlx::query_as::<_, StorageVolumeRow>( "SELECT volume_id, volume_name, uri_prefix, is_writable, available_bytes FROM storage_volumes", ) .fetch_all(&self.pool) .await .map_err(|e| DomainError::Internal(e.to_string()))?; Ok(rows.into_iter().map(Into::into).collect()) } async fn save(&self, volume: &StorageVolume) -> Result<(), DomainError> { sqlx::query( "INSERT INTO storage_volumes (volume_id, volume_name, uri_prefix, is_writable, available_bytes) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (volume_id) DO UPDATE SET volume_name = EXCLUDED.volume_name, uri_prefix = EXCLUDED.uri_prefix, is_writable = EXCLUDED.is_writable, available_bytes = EXCLUDED.available_bytes", ) .bind(*volume.volume_id.as_uuid()) .bind(&volume.volume_name) .bind(&volume.uri_prefix) .bind(volume.is_writable) .bind(volume.available_bytes as i64) .execute(&self.pool) .await .map_err(|e| DomainError::Internal(e.to_string()))?; Ok(()) } async fn delete(&self, id: &SystemId) -> Result<(), DomainError> { sqlx::query("DELETE FROM storage_volumes WHERE volume_id = $1") .bind(*id.as_uuid()) .execute(&self.pool) .await .map_err(|e| DomainError::Internal(e.to_string()))?; Ok(()) } } // ────────────────────────────────────────────── // LibraryPath // ────────────────────────────────────────────── #[derive(sqlx::FromRow)] struct LibraryPathRow { path_id: Uuid, volume_id: Uuid, relative_path: String, is_ingest_destination: bool, ownership_policy: String, designated_owner_id: Option, } fn policy_from_str(s: &str) -> OwnershipPolicy { match s { "user_owned" => OwnershipPolicy::UserOwned, "group_owned" => OwnershipPolicy::GroupOwned, _ => OwnershipPolicy::Unassigned, } } fn policy_to_str(p: &OwnershipPolicy) -> &'static str { match p { OwnershipPolicy::UserOwned => "user_owned", OwnershipPolicy::GroupOwned => "group_owned", OwnershipPolicy::Unassigned => "unassigned", } } impl From for LibraryPath { fn from(r: LibraryPathRow) -> Self { Self { path_id: SystemId::from_uuid(r.path_id), volume_id: SystemId::from_uuid(r.volume_id), relative_path: r.relative_path, is_ingest_destination: r.is_ingest_destination, ownership_policy: policy_from_str(&r.ownership_policy), designated_owner_id: r.designated_owner_id.map(SystemId::from_uuid), } } } pub struct PostgresLibraryPathRepository { pool: PgPool, } impl PostgresLibraryPathRepository { pub fn new(pool: PgPool) -> Self { Self { pool } } } #[async_trait] impl LibraryPathRepository for PostgresLibraryPathRepository { async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { let row = sqlx::query_as::<_, LibraryPathRow>( "SELECT path_id, volume_id, relative_path, is_ingest_destination, ownership_policy, designated_owner_id FROM library_paths WHERE path_id = $1", ) .bind(*id.as_uuid()) .fetch_optional(&self.pool) .await .map_err(|e| DomainError::Internal(e.to_string()))?; Ok(row.map(Into::into)) } async fn find_by_volume(&self, volume_id: &SystemId) -> Result, DomainError> { let rows = sqlx::query_as::<_, LibraryPathRow>( "SELECT path_id, volume_id, relative_path, is_ingest_destination, ownership_policy, designated_owner_id FROM library_paths WHERE volume_id = $1", ) .bind(*volume_id.as_uuid()) .fetch_all(&self.pool) .await .map_err(|e| DomainError::Internal(e.to_string()))?; Ok(rows.into_iter().map(Into::into).collect()) } async fn find_ingest_destinations( &self, owner_id: &SystemId, ) -> Result, DomainError> { let rows = sqlx::query_as::<_, LibraryPathRow>( "SELECT path_id, volume_id, relative_path, is_ingest_destination, ownership_policy, designated_owner_id FROM library_paths WHERE is_ingest_destination = true AND designated_owner_id = $1", ) .bind(*owner_id.as_uuid()) .fetch_all(&self.pool) .await .map_err(|e| DomainError::Internal(e.to_string()))?; Ok(rows.into_iter().map(Into::into).collect()) } async fn save(&self, path: &LibraryPath) -> Result<(), DomainError> { sqlx::query( "INSERT INTO library_paths (path_id, volume_id, relative_path, is_ingest_destination, ownership_policy, designated_owner_id) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (path_id) DO UPDATE SET volume_id = EXCLUDED.volume_id, relative_path = EXCLUDED.relative_path, is_ingest_destination = EXCLUDED.is_ingest_destination, ownership_policy = EXCLUDED.ownership_policy, designated_owner_id = EXCLUDED.designated_owner_id", ) .bind(*path.path_id.as_uuid()) .bind(*path.volume_id.as_uuid()) .bind(&path.relative_path) .bind(path.is_ingest_destination) .bind(policy_to_str(&path.ownership_policy)) .bind(path.designated_owner_id.as_ref().map(|id| *id.as_uuid())) .execute(&self.pool) .await .map_err(|e| DomainError::Internal(e.to_string()))?; Ok(()) } async fn delete(&self, id: &SystemId) -> Result<(), DomainError> { sqlx::query("DELETE FROM library_paths WHERE path_id = $1") .bind(*id.as_uuid()) .execute(&self.pool) .await .map_err(|e| DomainError::Internal(e.to_string()))?; Ok(()) } } // ────────────────────────────────────────────── // IngestSession // ────────────────────────────────────────────── #[derive(sqlx::FromRow)] struct IngestSessionRow { session_id: Uuid, uploader_user_id: Uuid, client_device_id: String, original_filename: String, client_checksum: String, target_library_path_id: Uuid, status: String, created_at: DateTime, error_message: Option, } fn ingest_status_from_str(s: &str) -> IngestStatus { match s { "uploading" => IngestStatus::Uploading, "awaiting_processing" => IngestStatus::AwaitingProcessing, "processing" => IngestStatus::Processing, "completed" => IngestStatus::Completed, "failed" => IngestStatus::Failed, _ => IngestStatus::Uploading, } } fn ingest_status_to_str(s: &IngestStatus) -> &'static str { match s { IngestStatus::Uploading => "uploading", IngestStatus::AwaitingProcessing => "awaiting_processing", IngestStatus::Processing => "processing", IngestStatus::Completed => "completed", IngestStatus::Failed => "failed", } } impl TryFrom for IngestSession { type Error = DomainError; fn try_from(r: IngestSessionRow) -> Result { Ok(Self { session_id: SystemId::from_uuid(r.session_id), uploader_user_id: SystemId::from_uuid(r.uploader_user_id), client_device_id: r.client_device_id, original_filename: r.original_filename, client_checksum: Checksum::new(r.client_checksum)?, target_library_path_id: SystemId::from_uuid(r.target_library_path_id), status: ingest_status_from_str(&r.status), created_at: DateTimeStamp::from_datetime(r.created_at), error_message: r.error_message, }) } } pub struct PostgresIngestSessionRepository { pool: PgPool, } impl PostgresIngestSessionRepository { pub fn new(pool: PgPool) -> Self { Self { pool } } } #[async_trait] impl IngestSessionRepository for PostgresIngestSessionRepository { async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { let row = sqlx::query_as::<_, IngestSessionRow>( "SELECT session_id, uploader_user_id, client_device_id, original_filename, client_checksum, target_library_path_id, status, created_at, error_message FROM ingest_sessions WHERE session_id = $1", ) .bind(*id.as_uuid()) .fetch_optional(&self.pool) .await .map_err(|e| DomainError::Internal(e.to_string()))?; row.map(TryInto::try_into).transpose() } async fn find_by_user(&self, user_id: &SystemId) -> Result, DomainError> { let rows = sqlx::query_as::<_, IngestSessionRow>( "SELECT session_id, uploader_user_id, client_device_id, original_filename, client_checksum, target_library_path_id, status, created_at, error_message FROM ingest_sessions WHERE uploader_user_id = $1", ) .bind(*user_id.as_uuid()) .fetch_all(&self.pool) .await .map_err(|e| DomainError::Internal(e.to_string()))?; rows.into_iter().map(TryInto::try_into).collect() } async fn save(&self, session: &IngestSession) -> Result<(), DomainError> { sqlx::query( "INSERT INTO ingest_sessions (session_id, uploader_user_id, client_device_id, original_filename, client_checksum, target_library_path_id, status, created_at, error_message) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (session_id) DO UPDATE SET status = EXCLUDED.status, error_message = EXCLUDED.error_message", ) .bind(*session.session_id.as_uuid()) .bind(*session.uploader_user_id.as_uuid()) .bind(&session.client_device_id) .bind(&session.original_filename) .bind(session.client_checksum.as_str()) .bind(*session.target_library_path_id.as_uuid()) .bind(ingest_status_to_str(&session.status)) .bind(session.created_at.as_datetime()) .bind(session.error_message.as_deref()) .execute(&self.pool) .await .map_err(|e| DomainError::Internal(e.to_string()))?; Ok(()) } } // ────────────────────────────────────────────── // Quota + UsageLedger // ────────────────────────────────────────────── fn usage_type_from_str(s: &str) -> UsageType { match s { "storage_bytes" => UsageType::StorageBytes, "process_jobs" => UsageType::ProcessJobs, "api_calls" => UsageType::ApiCalls, "indexing_size" => UsageType::IndexingSize, _ => UsageType::StorageBytes, } } fn usage_type_to_str(t: &UsageType) -> &'static str { match t { UsageType::StorageBytes => "storage_bytes", UsageType::ProcessJobs => "process_jobs", UsageType::ApiCalls => "api_calls", UsageType::IndexingSize => "indexing_size", } } fn time_period_from_str(s: &str) -> TimePeriod { match s { "daily" => TimePeriod::Daily, "monthly" => TimePeriod::Monthly, "lifetime" => TimePeriod::Lifetime, _ => TimePeriod::Lifetime, } } fn time_period_to_str(p: &TimePeriod) -> &'static str { match p { TimePeriod::Daily => "daily", TimePeriod::Monthly => "monthly", TimePeriod::Lifetime => "lifetime", } } #[derive(sqlx::FromRow)] struct QuotaDefRow { quota_id: Uuid, owner_scope: Uuid, is_enforced: bool, } #[derive(sqlx::FromRow)] #[allow(dead_code)] struct QuotaRuleRow { rule_id: Uuid, quota_id: Uuid, dimension: String, limit_value: i64, time_period: String, is_unlimited: bool, } #[derive(sqlx::FromRow)] struct UsageLedgerRow { entry_id: Uuid, user_id: Uuid, usage_type: String, consumed_amount: i64, timestamp: DateTime, context: String, } #[derive(sqlx::FromRow)] struct SumRow { total: i64, } impl From for QuotaRule { fn from(r: QuotaRuleRow) -> Self { Self { rule_id: SystemId::from_uuid(r.rule_id), dimension: usage_type_from_str(&r.dimension), limit_value: r.limit_value as u64, time_period: time_period_from_str(&r.time_period), is_unlimited: r.is_unlimited, } } } impl From for UsageLedgerEntry { fn from(r: UsageLedgerRow) -> Self { Self { entry_id: SystemId::from_uuid(r.entry_id), user_id: SystemId::from_uuid(r.user_id), usage_type: usage_type_from_str(&r.usage_type), consumed_amount: r.consumed_amount as u64, timestamp: DateTimeStamp::from_datetime(r.timestamp), context: r.context, } } } pub struct PostgresQuotaRepository { pool: PgPool, } impl PostgresQuotaRepository { pub fn new(pool: PgPool) -> Self { Self { pool } } } #[async_trait] impl QuotaRepository for PostgresQuotaRepository { async fn find_by_owner( &self, owner_id: &SystemId, ) -> Result, DomainError> { let def_row = sqlx::query_as::<_, QuotaDefRow>( "SELECT quota_id, owner_scope, is_enforced FROM quota_definitions WHERE owner_scope = $1", ) .bind(*owner_id.as_uuid()) .fetch_optional(&self.pool) .await .map_err(|e| DomainError::Internal(e.to_string()))?; let Some(def) = def_row else { return Ok(None); }; let rule_rows = sqlx::query_as::<_, QuotaRuleRow>( "SELECT rule_id, quota_id, dimension, limit_value, time_period, is_unlimited FROM quota_rules WHERE quota_id = $1", ) .bind(def.quota_id) .fetch_all(&self.pool) .await .map_err(|e| DomainError::Internal(e.to_string()))?; Ok(Some(QuotaDefinition { quota_id: SystemId::from_uuid(def.quota_id), owner_scope: SystemId::from_uuid(def.owner_scope), is_enforced: def.is_enforced, rules: rule_rows.into_iter().map(Into::into).collect(), })) } async fn save(&self, quota: &QuotaDefinition) -> Result<(), DomainError> { sqlx::query( "INSERT INTO quota_definitions (quota_id, owner_scope, is_enforced) VALUES ($1, $2, $3) ON CONFLICT (quota_id) DO UPDATE SET owner_scope = EXCLUDED.owner_scope, is_enforced = EXCLUDED.is_enforced", ) .bind(*quota.quota_id.as_uuid()) .bind(*quota.owner_scope.as_uuid()) .bind(quota.is_enforced) .execute(&self.pool) .await .map_err(|e| DomainError::Internal(e.to_string()))?; // Delete old rules then re-insert sqlx::query("DELETE FROM quota_rules WHERE quota_id = $1") .bind(*quota.quota_id.as_uuid()) .execute(&self.pool) .await .map_err(|e| DomainError::Internal(e.to_string()))?; for rule in "a.rules { sqlx::query( "INSERT INTO quota_rules (rule_id, quota_id, dimension, limit_value, time_period, is_unlimited) VALUES ($1, $2, $3, $4, $5, $6)", ) .bind(*rule.rule_id.as_uuid()) .bind(*quota.quota_id.as_uuid()) .bind(usage_type_to_str(&rule.dimension)) .bind(rule.limit_value as i64) .bind(time_period_to_str(&rule.time_period)) .bind(rule.is_unlimited) .execute(&self.pool) .await .map_err(|e| DomainError::Internal(e.to_string()))?; } Ok(()) } async fn delete(&self, id: &SystemId) -> Result<(), DomainError> { // Rules cascade-delete sqlx::query("DELETE FROM quota_definitions WHERE quota_id = $1") .bind(*id.as_uuid()) .execute(&self.pool) .await .map_err(|e| DomainError::Internal(e.to_string()))?; Ok(()) } } pub struct PostgresUsageLedgerRepository { pool: PgPool, } impl PostgresUsageLedgerRepository { pub fn new(pool: PgPool) -> Self { Self { pool } } } #[async_trait] impl UsageLedgerRepository for PostgresUsageLedgerRepository { async fn record(&self, entry: &UsageLedgerEntry) -> Result<(), DomainError> { sqlx::query( "INSERT INTO usage_ledger (entry_id, user_id, usage_type, consumed_amount, timestamp, context) VALUES ($1, $2, $3, $4, $5, $6)", ) .bind(*entry.entry_id.as_uuid()) .bind(*entry.user_id.as_uuid()) .bind(usage_type_to_str(&entry.usage_type)) .bind(entry.consumed_amount as i64) .bind(entry.timestamp.as_datetime()) .bind(&entry.context) .execute(&self.pool) .await .map_err(|e| DomainError::Internal(e.to_string()))?; Ok(()) } async fn sum_usage( &self, user_id: &SystemId, usage_type: UsageType, since: Option, ) -> Result { let since_dt: Option> = since.map(|s| *s.as_datetime()); let row = sqlx::query_as::<_, SumRow>( "SELECT COALESCE(SUM(consumed_amount), 0) as total FROM usage_ledger WHERE user_id = $1 AND usage_type = $2 AND ($3::timestamptz IS NULL OR timestamp >= $3)", ) .bind(*user_id.as_uuid()) .bind(usage_type_to_str(&usage_type)) .bind(since_dt) .fetch_one(&self.pool) .await .map_err(|e| DomainError::Internal(e.to_string()))?; Ok(row.total as u64) } }