diff --git a/Cargo.lock b/Cargo.lock index 9ed2fb4..cf5675e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -25,6 +25,7 @@ dependencies = [ "async-trait", "chrono", "domain", + "serde", "serde_json", "sqlx", "uuid", @@ -78,6 +79,7 @@ checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" name = "api-types" version = "0.1.0" dependencies = [ + "application", "chrono", "domain", "serde", diff --git a/crates/adapters/postgres/Cargo.toml b/crates/adapters/postgres/Cargo.toml index 95179f1..ee72413 100644 --- a/crates/adapters/postgres/Cargo.toml +++ b/crates/adapters/postgres/Cargo.toml @@ -10,4 +10,5 @@ uuid = { workspace = true } chrono = { workspace = true } anyhow = { workspace = true } async-trait = { workspace = true } +serde = { workspace = true } serde_json = { workspace = true } diff --git a/crates/adapters/postgres/migrations/007_sidecar.sql b/crates/adapters/postgres/migrations/007_sidecar.sql new file mode 100644 index 0000000..efb2746 --- /dev/null +++ b/crates/adapters/postgres/migrations/007_sidecar.sql @@ -0,0 +1,8 @@ +CREATE TABLE sidecar_records ( + asset_id UUID PRIMARY KEY REFERENCES assets(asset_id), + sync_status TEXT NOT NULL DEFAULT 'pending_write', + sidecar_storage_path TEXT NOT NULL, + last_synced_at TIMESTAMPTZ, + last_known_file_hash TEXT, + error_message TEXT +); diff --git a/crates/adapters/postgres/migrations/008_processing.sql b/crates/adapters/postgres/migrations/008_processing.sql new file mode 100644 index 0000000..2a04934 --- /dev/null +++ b/crates/adapters/postgres/migrations/008_processing.sql @@ -0,0 +1,42 @@ +CREATE TABLE jobs ( + job_id UUID PRIMARY KEY, + job_type TEXT NOT NULL, + target_asset_id UUID, + batch_id UUID, + status TEXT NOT NULL DEFAULT 'queued', + priority INTEGER NOT NULL DEFAULT 0, + payload JSONB NOT NULL DEFAULT '{}', + result_data JSONB, + retry_count INTEGER NOT NULL DEFAULT 0, + max_retries INTEGER NOT NULL DEFAULT 3, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + started_at TIMESTAMPTZ, + completed_at TIMESTAMPTZ, + error_message TEXT +); + +CREATE INDEX idx_jobs_status_priority ON jobs(status, priority DESC); +CREATE INDEX idx_jobs_batch ON jobs(batch_id); + +CREATE TABLE job_batches ( + batch_id UUID PRIMARY KEY, + batch_type TEXT NOT NULL, + total_jobs INTEGER NOT NULL DEFAULT 0, + completed_count INTEGER NOT NULL DEFAULT 0, + failed_count INTEGER NOT NULL DEFAULT 0, + status TEXT NOT NULL DEFAULT 'in_progress' +); + +CREATE TABLE plugins ( + plugin_id UUID PRIMARY KEY, + name TEXT NOT NULL, + plugin_type TEXT NOT NULL, + is_enabled BOOLEAN NOT NULL DEFAULT true, + configuration JSONB NOT NULL DEFAULT '{}' +); + +CREATE TABLE processing_pipelines ( + pipeline_id UUID PRIMARY KEY, + trigger_event TEXT NOT NULL, + steps JSONB NOT NULL DEFAULT '[]' +); diff --git a/crates/adapters/postgres/migrations/009_duplicate_groups.sql b/crates/adapters/postgres/migrations/009_duplicate_groups.sql new file mode 100644 index 0000000..8e94d8c --- /dev/null +++ b/crates/adapters/postgres/migrations/009_duplicate_groups.sql @@ -0,0 +1,6 @@ +CREATE TABLE duplicate_groups ( + group_id UUID PRIMARY KEY, + detection_method TEXT NOT NULL DEFAULT 'exact_hash', + status TEXT NOT NULL DEFAULT 'unresolved', + candidates JSONB NOT NULL DEFAULT '[]' +); diff --git a/crates/adapters/postgres/src/duplicate_repository.rs b/crates/adapters/postgres/src/duplicate_repository.rs new file mode 100644 index 0000000..2eabb0c --- /dev/null +++ b/crates/adapters/postgres/src/duplicate_repository.rs @@ -0,0 +1,153 @@ +use crate::db::PgPool; +use async_trait::async_trait; +use domain::{ + entities::{DetectionMethod, DuplicateCandidate, DuplicateGroup, DuplicateStatus}, + errors::DomainError, + ports::DuplicateRepository, + value_objects::SystemId, +}; +use uuid::Uuid; + +#[derive(sqlx::FromRow)] +struct GroupRow { + group_id: Uuid, + detection_method: String, + status: String, + candidates: serde_json::Value, +} + +fn detection_from_str(s: &str) -> DetectionMethod { + match s { + "perceptual_hash" => DetectionMethod::PerceptualHash, + _ => DetectionMethod::ExactHash, + } +} + +fn detection_to_str(d: &DetectionMethod) -> &'static str { + match d { + DetectionMethod::ExactHash => "exact_hash", + DetectionMethod::PerceptualHash => "perceptual_hash", + } +} + +fn dup_status_from_str(s: &str) -> DuplicateStatus { + match s { + "resolved" => DuplicateStatus::Resolved, + _ => DuplicateStatus::Unresolved, + } +} + +fn dup_status_to_str(s: &DuplicateStatus) -> &'static str { + match s { + DuplicateStatus::Unresolved => "unresolved", + DuplicateStatus::Resolved => "resolved", + } +} + +#[derive(serde::Serialize, serde::Deserialize)] +struct CandidateJson { + asset_id: Uuid, + similarity_score: f64, +} + +fn candidates_from_json(v: serde_json::Value) -> Vec { + let arr: Vec = serde_json::from_value(v).unwrap_or_default(); + arr.into_iter() + .map(|c| DuplicateCandidate { + asset_id: SystemId::from_uuid(c.asset_id), + similarity_score: c.similarity_score, + }) + .collect() +} + +fn candidates_to_json(candidates: &[DuplicateCandidate]) -> serde_json::Value { + let arr: Vec = candidates + .iter() + .map(|c| CandidateJson { + asset_id: *c.asset_id.as_uuid(), + similarity_score: c.similarity_score, + }) + .collect(); + serde_json::to_value(arr).unwrap_or(serde_json::Value::Array(vec![])) +} + +impl From for DuplicateGroup { + fn from(r: GroupRow) -> Self { + Self { + group_id: SystemId::from_uuid(r.group_id), + detection_method: detection_from_str(&r.detection_method), + status: dup_status_from_str(&r.status), + candidates: candidates_from_json(r.candidates), + } + } +} + +pub struct PostgresDuplicateRepository { + pool: PgPool, +} + +impl PostgresDuplicateRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl DuplicateRepository for PostgresDuplicateRepository { + async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { + let row = sqlx::query_as::<_, GroupRow>( + "SELECT group_id, detection_method, status, candidates + FROM duplicate_groups WHERE group_id = $1", + ) + .bind(*id.as_uuid()) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(row.map(Into::into)) + } + + async fn find_unresolved(&self) -> Result, DomainError> { + let rows = sqlx::query_as::<_, GroupRow>( + "SELECT group_id, detection_method, status, candidates + FROM duplicate_groups WHERE status = 'unresolved'", + ) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(rows.into_iter().map(Into::into).collect()) + } + + async fn find_by_asset(&self, asset_id: &SystemId) -> Result, DomainError> { + let rows = sqlx::query_as::<_, GroupRow>( + "SELECT group_id, detection_method, status, candidates + FROM duplicate_groups WHERE candidates @> $1::jsonb", + ) + .bind(serde_json::json!([{"asset_id": asset_id.as_uuid()}])) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(rows.into_iter().map(Into::into).collect()) + } + + async fn save(&self, group: &DuplicateGroup) -> Result<(), DomainError> { + sqlx::query( + "INSERT INTO duplicate_groups (group_id, detection_method, status, candidates) + VALUES ($1, $2, $3, $4) + ON CONFLICT (group_id) DO UPDATE SET + detection_method = EXCLUDED.detection_method, + status = EXCLUDED.status, + candidates = EXCLUDED.candidates", + ) + .bind(*group.group_id.as_uuid()) + .bind(detection_to_str(&group.detection_method)) + .bind(dup_status_to_str(&group.status)) + .bind(candidates_to_json(&group.candidates)) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } +} diff --git a/crates/adapters/postgres/src/job_batch_repository.rs b/crates/adapters/postgres/src/job_batch_repository.rs new file mode 100644 index 0000000..a82f592 --- /dev/null +++ b/crates/adapters/postgres/src/job_batch_repository.rs @@ -0,0 +1,99 @@ +use crate::db::PgPool; +use async_trait::async_trait; +use domain::{ + entities::{BatchStatus, JobBatch}, + errors::DomainError, + ports::JobBatchRepository, + value_objects::SystemId, +}; +use uuid::Uuid; + +#[derive(sqlx::FromRow)] +struct BatchRow { + batch_id: Uuid, + batch_type: String, + total_jobs: i32, + completed_count: i32, + failed_count: i32, + status: String, +} + +fn batch_status_from_str(s: &str) -> BatchStatus { + match s { + "in_progress" => BatchStatus::InProgress, + "completed_with_errors" => BatchStatus::CompletedWithErrors, + "completed" => BatchStatus::Completed, + "cancelled" => BatchStatus::Cancelled, + _ => BatchStatus::InProgress, + } +} + +fn batch_status_to_str(s: &BatchStatus) -> &'static str { + match s { + BatchStatus::InProgress => "in_progress", + BatchStatus::CompletedWithErrors => "completed_with_errors", + BatchStatus::Completed => "completed", + BatchStatus::Cancelled => "cancelled", + } +} + +impl From for JobBatch { + fn from(r: BatchRow) -> Self { + Self { + batch_id: SystemId::from_uuid(r.batch_id), + batch_type: r.batch_type, + total_jobs: r.total_jobs as u32, + completed_count: r.completed_count as u32, + failed_count: r.failed_count as u32, + status: batch_status_from_str(&r.status), + } + } +} + +pub struct PostgresJobBatchRepository { + pool: PgPool, +} + +impl PostgresJobBatchRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl JobBatchRepository for PostgresJobBatchRepository { + async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { + let row = sqlx::query_as::<_, BatchRow>( + "SELECT batch_id, batch_type, total_jobs, completed_count, failed_count, status + FROM job_batches WHERE batch_id = $1", + ) + .bind(*id.as_uuid()) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(row.map(Into::into)) + } + + async fn save(&self, batch: &JobBatch) -> Result<(), DomainError> { + sqlx::query( + "INSERT INTO job_batches (batch_id, batch_type, total_jobs, completed_count, failed_count, status) + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT (batch_id) DO UPDATE SET + total_jobs = EXCLUDED.total_jobs, + completed_count = EXCLUDED.completed_count, + failed_count = EXCLUDED.failed_count, + status = EXCLUDED.status", + ) + .bind(*batch.batch_id.as_uuid()) + .bind(&batch.batch_type) + .bind(batch.total_jobs as i32) + .bind(batch.completed_count as i32) + .bind(batch.failed_count as i32) + .bind(batch_status_to_str(&batch.status)) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } +} diff --git a/crates/adapters/postgres/src/job_repository.rs b/crates/adapters/postgres/src/job_repository.rs new file mode 100644 index 0000000..b7d410a --- /dev/null +++ b/crates/adapters/postgres/src/job_repository.rs @@ -0,0 +1,209 @@ +use crate::db::PgPool; +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use domain::{ + entities::{Job, JobStatus, JobType}, + errors::DomainError, + ports::JobRepository, + value_objects::{DateTimeStamp, StructuredData, SystemId}, +}; +use uuid::Uuid; + +#[derive(sqlx::FromRow)] +struct JobRow { + job_id: Uuid, + job_type: String, + target_asset_id: Option, + batch_id: Option, + status: String, + priority: i32, + payload: serde_json::Value, + result_data: Option, + retry_count: i32, + max_retries: i32, + created_at: DateTime, + started_at: Option>, + completed_at: Option>, + error_message: Option, +} + +fn job_type_from_str(s: &str) -> JobType { + match s { + "scan_directory" => JobType::ScanDirectory, + "extract_metadata" => JobType::ExtractMetadata, + "generate_derivative" => JobType::GenerateDerivative, + "sync_sidecar" => JobType::SyncSidecar, + "detect_duplicates" => JobType::DetectDuplicates, + other => JobType::Custom(other.to_string()), + } +} + +fn job_type_to_str(t: &JobType) -> String { + match t { + JobType::ScanDirectory => "scan_directory".to_string(), + JobType::ExtractMetadata => "extract_metadata".to_string(), + JobType::GenerateDerivative => "generate_derivative".to_string(), + JobType::SyncSidecar => "sync_sidecar".to_string(), + JobType::DetectDuplicates => "detect_duplicates".to_string(), + JobType::Custom(s) => s.clone(), + } +} + +fn job_status_from_str(s: &str) -> JobStatus { + match s { + "queued" => JobStatus::Queued, + "processing" => JobStatus::Processing, + "completed" => JobStatus::Completed, + "failed" => JobStatus::Failed, + "cancelled" => JobStatus::Cancelled, + _ => JobStatus::Queued, + } +} + +fn job_status_to_str(s: &JobStatus) -> &'static str { + match s { + JobStatus::Queued => "queued", + JobStatus::Processing => "processing", + JobStatus::Completed => "completed", + JobStatus::Failed => "failed", + JobStatus::Cancelled => "cancelled", + } +} + +fn structured_from_json(v: serde_json::Value) -> StructuredData { + if let serde_json::Value::Object(map) = v { + let mut sd = StructuredData::new(); + for (k, val) in map { + sd.insert(k, domain::value_objects::MetadataValue::from(val)); + } + sd + } else { + StructuredData::new() + } +} + +fn structured_to_json(sd: &StructuredData) -> serde_json::Value { + let map: serde_json::Map = sd + .inner() + .iter() + .map(|(k, v)| (k.clone(), serde_json::Value::from(v))) + .collect(); + serde_json::Value::Object(map) +} + +impl From for Job { + fn from(r: JobRow) -> Self { + Self { + job_id: SystemId::from_uuid(r.job_id), + job_type: job_type_from_str(&r.job_type), + target_asset_id: r.target_asset_id.map(SystemId::from_uuid), + batch_id: r.batch_id.map(SystemId::from_uuid), + status: job_status_from_str(&r.status), + priority: r.priority as u32, + payload: structured_from_json(r.payload), + result_data: r.result_data.map(structured_from_json), + retry_count: r.retry_count as u32, + max_retries: r.max_retries as u32, + created_at: DateTimeStamp::from_datetime(r.created_at), + started_at: r.started_at.map(DateTimeStamp::from_datetime), + completed_at: r.completed_at.map(DateTimeStamp::from_datetime), + error_message: r.error_message, + } + } +} + +pub struct PostgresJobRepository { + pool: PgPool, +} + +impl PostgresJobRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl JobRepository for PostgresJobRepository { + async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { + let row = sqlx::query_as::<_, JobRow>( + "SELECT job_id, job_type, target_asset_id, batch_id, status, priority, + payload, result_data, retry_count, max_retries, created_at, + started_at, completed_at, error_message + FROM jobs WHERE job_id = $1", + ) + .bind(*id.as_uuid()) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(row.map(Into::into)) + } + + async fn find_next_queued(&self) -> Result, DomainError> { + let row = sqlx::query_as::<_, JobRow>( + "SELECT job_id, job_type, target_asset_id, batch_id, status, priority, + payload, result_data, retry_count, max_retries, created_at, + started_at, completed_at, error_message + FROM jobs WHERE status = 'queued' + ORDER BY priority DESC, created_at ASC + LIMIT 1", + ) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(row.map(Into::into)) + } + + async fn find_by_batch(&self, batch_id: &SystemId) -> Result, DomainError> { + let rows = sqlx::query_as::<_, JobRow>( + "SELECT job_id, job_type, target_asset_id, batch_id, status, priority, + payload, result_data, retry_count, max_retries, created_at, + started_at, completed_at, error_message + FROM jobs WHERE batch_id = $1 + ORDER BY created_at ASC", + ) + .bind(*batch_id.as_uuid()) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(rows.into_iter().map(Into::into).collect()) + } + + async fn save(&self, job: &Job) -> Result<(), DomainError> { + sqlx::query( + "INSERT INTO jobs (job_id, job_type, target_asset_id, batch_id, status, priority, + payload, result_data, retry_count, max_retries, created_at, + started_at, completed_at, error_message) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14) + ON CONFLICT (job_id) DO UPDATE SET + status = EXCLUDED.status, + priority = EXCLUDED.priority, + payload = EXCLUDED.payload, + result_data = EXCLUDED.result_data, + retry_count = EXCLUDED.retry_count, + started_at = EXCLUDED.started_at, + completed_at = EXCLUDED.completed_at, + error_message = EXCLUDED.error_message", + ) + .bind(*job.job_id.as_uuid()) + .bind(job_type_to_str(&job.job_type)) + .bind(job.target_asset_id.as_ref().map(|id| *id.as_uuid())) + .bind(job.batch_id.as_ref().map(|id| *id.as_uuid())) + .bind(job_status_to_str(&job.status)) + .bind(job.priority as i32) + .bind(structured_to_json(&job.payload)) + .bind(job.result_data.as_ref().map(structured_to_json)) + .bind(job.retry_count as i32) + .bind(job.max_retries as i32) + .bind(job.created_at.as_datetime()) + .bind(job.started_at.as_ref().map(|d| d.as_datetime())) + .bind(job.completed_at.as_ref().map(|d| d.as_datetime())) + .bind(&job.error_message) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } +} diff --git a/crates/adapters/postgres/src/lib.rs b/crates/adapters/postgres/src/lib.rs index 610b55b..9391c62 100644 --- a/crates/adapters/postgres/src/lib.rs +++ b/crates/adapters/postgres/src/lib.rs @@ -3,11 +3,18 @@ pub mod db; pub mod album_repository; pub mod asset_metadata_repository; pub mod asset_repository; +pub mod duplicate_repository; pub mod ingest_session_repository; +pub mod job_batch_repository; +pub mod job_repository; pub mod library_path_repository; +pub mod pipeline_repository; +pub mod plugin_repository; pub mod quota_repository; pub mod share_repository; +pub mod sidecar_repository; pub mod storage_volume_repository; +pub mod tag_repository; pub mod user_repository; pub mod visibility_filter_repository; @@ -16,10 +23,17 @@ pub use db::{PgPool, connect, run_migrations}; pub use album_repository::PostgresAlbumRepository; pub use asset_metadata_repository::PostgresAssetMetadataRepository; pub use asset_repository::PostgresAssetRepository; +pub use duplicate_repository::PostgresDuplicateRepository; pub use ingest_session_repository::PostgresIngestSessionRepository; +pub use job_batch_repository::PostgresJobBatchRepository; +pub use job_repository::PostgresJobRepository; pub use library_path_repository::PostgresLibraryPathRepository; +pub use pipeline_repository::PostgresPipelineRepository; +pub use plugin_repository::PostgresPluginRepository; pub use quota_repository::{PostgresQuotaRepository, PostgresUsageLedgerRepository}; pub use share_repository::PostgresShareRepository; +pub use sidecar_repository::PostgresSidecarRepository; pub use storage_volume_repository::PostgresStorageVolumeRepository; +pub use tag_repository::PostgresTagRepository; pub use user_repository::PostgresUserRepository; pub use visibility_filter_repository::PostgresVisibilityFilterRepository; diff --git a/crates/adapters/postgres/src/pipeline_repository.rs b/crates/adapters/postgres/src/pipeline_repository.rs new file mode 100644 index 0000000..7143a59 --- /dev/null +++ b/crates/adapters/postgres/src/pipeline_repository.rs @@ -0,0 +1,126 @@ +use crate::db::PgPool; +use async_trait::async_trait; +use domain::{ + entities::{PipelineStep, ProcessingPipeline}, + errors::DomainError, + ports::PipelineRepository, + value_objects::{MetadataValue, StructuredData, SystemId}, +}; +use uuid::Uuid; + +#[derive(sqlx::FromRow)] +struct PipelineRow { + pipeline_id: Uuid, + trigger_event: String, + steps: serde_json::Value, +} + +#[derive(serde::Serialize, serde::Deserialize)] +struct StepJson { + plugin_id: Uuid, + step_order: u32, + configuration: serde_json::Map, +} + +fn steps_from_json(v: serde_json::Value) -> Vec { + let arr: Vec = serde_json::from_value(v).unwrap_or_default(); + arr.into_iter() + .map(|s| { + let mut config = StructuredData::new(); + for (k, val) in s.configuration { + config.insert(k, MetadataValue::from(val)); + } + PipelineStep { + plugin_id: SystemId::from_uuid(s.plugin_id), + step_order: s.step_order, + configuration: config, + } + }) + .collect() +} + +fn steps_to_json(steps: &[PipelineStep]) -> serde_json::Value { + let arr: Vec = steps + .iter() + .map(|s| { + let config: serde_json::Map = s + .configuration + .inner() + .iter() + .map(|(k, v)| (k.clone(), serde_json::Value::from(v))) + .collect(); + StepJson { + plugin_id: *s.plugin_id.as_uuid(), + step_order: s.step_order, + configuration: config, + } + }) + .collect(); + serde_json::to_value(arr).unwrap_or(serde_json::Value::Array(vec![])) +} + +impl From for ProcessingPipeline { + fn from(r: PipelineRow) -> Self { + Self { + pipeline_id: SystemId::from_uuid(r.pipeline_id), + trigger_event: r.trigger_event, + steps: steps_from_json(r.steps), + } + } +} + +pub struct PostgresPipelineRepository { + pool: PgPool, +} + +impl PostgresPipelineRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl PipelineRepository for PostgresPipelineRepository { + async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { + let row = sqlx::query_as::<_, PipelineRow>( + "SELECT pipeline_id, trigger_event, steps + FROM processing_pipelines WHERE pipeline_id = $1", + ) + .bind(*id.as_uuid()) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(row.map(Into::into)) + } + + async fn find_by_trigger(&self, event: &str) -> Result, DomainError> { + let rows = sqlx::query_as::<_, PipelineRow>( + "SELECT pipeline_id, trigger_event, steps + FROM processing_pipelines WHERE trigger_event = $1", + ) + .bind(event) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(rows.into_iter().map(Into::into).collect()) + } + + async fn save(&self, pipeline: &ProcessingPipeline) -> Result<(), DomainError> { + sqlx::query( + "INSERT INTO processing_pipelines (pipeline_id, trigger_event, steps) + VALUES ($1, $2, $3) + ON CONFLICT (pipeline_id) DO UPDATE SET + trigger_event = EXCLUDED.trigger_event, + steps = EXCLUDED.steps", + ) + .bind(*pipeline.pipeline_id.as_uuid()) + .bind(&pipeline.trigger_event) + .bind(steps_to_json(&pipeline.steps)) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } +} diff --git a/crates/adapters/postgres/src/plugin_repository.rs b/crates/adapters/postgres/src/plugin_repository.rs new file mode 100644 index 0000000..2fffd28 --- /dev/null +++ b/crates/adapters/postgres/src/plugin_repository.rs @@ -0,0 +1,127 @@ +use crate::db::PgPool; +use async_trait::async_trait; +use domain::{ + entities::{Plugin, PluginType}, + errors::DomainError, + ports::PluginRepository, + value_objects::{MetadataValue, StructuredData, SystemId}, +}; +use uuid::Uuid; + +#[derive(sqlx::FromRow)] +struct PluginRow { + plugin_id: Uuid, + name: String, + plugin_type: String, + is_enabled: bool, + configuration: serde_json::Value, +} + +fn plugin_type_from_str(s: &str) -> PluginType { + match s { + "media_processor" => PluginType::MediaProcessor, + "scheduled_task" => PluginType::ScheduledTask, + "sidecar_writer" => PluginType::SidecarWriter, + _ => PluginType::MediaProcessor, + } +} + +fn plugin_type_to_str(t: &PluginType) -> &'static str { + match t { + PluginType::MediaProcessor => "media_processor", + PluginType::ScheduledTask => "scheduled_task", + PluginType::SidecarWriter => "sidecar_writer", + } +} + +fn structured_from_json(v: serde_json::Value) -> StructuredData { + if let serde_json::Value::Object(map) = v { + let mut sd = StructuredData::new(); + for (k, val) in map { + sd.insert(k, MetadataValue::from(val)); + } + sd + } else { + StructuredData::new() + } +} + +fn structured_to_json(sd: &StructuredData) -> serde_json::Value { + let map: serde_json::Map = sd + .inner() + .iter() + .map(|(k, v)| (k.clone(), serde_json::Value::from(v))) + .collect(); + serde_json::Value::Object(map) +} + +impl From for Plugin { + fn from(r: PluginRow) -> Self { + Self { + plugin_id: SystemId::from_uuid(r.plugin_id), + name: r.name, + plugin_type: plugin_type_from_str(&r.plugin_type), + is_enabled: r.is_enabled, + configuration: structured_from_json(r.configuration), + } + } +} + +pub struct PostgresPluginRepository { + pool: PgPool, +} + +impl PostgresPluginRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl PluginRepository for PostgresPluginRepository { + async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { + let row = sqlx::query_as::<_, PluginRow>( + "SELECT plugin_id, name, plugin_type, is_enabled, configuration + FROM plugins WHERE plugin_id = $1", + ) + .bind(*id.as_uuid()) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(row.map(Into::into)) + } + + async fn find_enabled(&self) -> Result, DomainError> { + let rows = sqlx::query_as::<_, PluginRow>( + "SELECT plugin_id, name, plugin_type, is_enabled, configuration + FROM plugins WHERE is_enabled = true", + ) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(rows.into_iter().map(Into::into).collect()) + } + + async fn save(&self, plugin: &Plugin) -> Result<(), DomainError> { + sqlx::query( + "INSERT INTO plugins (plugin_id, name, plugin_type, is_enabled, configuration) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (plugin_id) DO UPDATE SET + name = EXCLUDED.name, + plugin_type = EXCLUDED.plugin_type, + is_enabled = EXCLUDED.is_enabled, + configuration = EXCLUDED.configuration", + ) + .bind(*plugin.plugin_id.as_uuid()) + .bind(&plugin.name) + .bind(plugin_type_to_str(&plugin.plugin_type)) + .bind(plugin.is_enabled) + .bind(structured_to_json(&plugin.configuration)) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } +} diff --git a/crates/adapters/postgres/src/sidecar_repository.rs b/crates/adapters/postgres/src/sidecar_repository.rs new file mode 100644 index 0000000..88f17a8 --- /dev/null +++ b/crates/adapters/postgres/src/sidecar_repository.rs @@ -0,0 +1,133 @@ +use crate::db::PgPool; +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use domain::{ + entities::{SidecarRecord, SyncStatus}, + errors::DomainError, + ports::SidecarRepository, + value_objects::{Checksum, DateTimeStamp, SystemId}, +}; +use uuid::Uuid; + +#[derive(sqlx::FromRow)] +struct SidecarRow { + asset_id: Uuid, + sync_status: String, + sidecar_storage_path: String, + last_synced_at: Option>, + last_known_file_hash: Option, + error_message: Option, +} + +fn sync_status_from_str(s: &str) -> SyncStatus { + match s { + "in_sync" => SyncStatus::InSync, + "pending_write" => SyncStatus::PendingWrite, + "pending_read" => SyncStatus::PendingRead, + "conflict" => SyncStatus::Conflict, + "error" => SyncStatus::Error, + _ => SyncStatus::PendingWrite, + } +} + +fn sync_status_to_str(s: &SyncStatus) -> &'static str { + match s { + SyncStatus::InSync => "in_sync", + SyncStatus::PendingWrite => "pending_write", + SyncStatus::PendingRead => "pending_read", + SyncStatus::Conflict => "conflict", + SyncStatus::Error => "error", + } +} + +impl TryFrom for SidecarRecord { + type Error = DomainError; + fn try_from(r: SidecarRow) -> Result { + let hash = r.last_known_file_hash.map(Checksum::new).transpose()?; + Ok(Self { + asset_id: SystemId::from_uuid(r.asset_id), + sync_status: sync_status_from_str(&r.sync_status), + sidecar_storage_path: r.sidecar_storage_path, + last_synced_at: r.last_synced_at.map(DateTimeStamp::from_datetime), + last_known_file_hash: hash, + error_message: r.error_message, + }) + } +} + +pub struct PostgresSidecarRepository { + pool: PgPool, +} + +impl PostgresSidecarRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl SidecarRepository for PostgresSidecarRepository { + async fn find_by_asset( + &self, + asset_id: &SystemId, + ) -> Result, DomainError> { + let row = sqlx::query_as::<_, SidecarRow>( + "SELECT asset_id, sync_status, sidecar_storage_path, last_synced_at, + last_known_file_hash, error_message + FROM sidecar_records WHERE asset_id = $1", + ) + .bind(*asset_id.as_uuid()) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + row.map(TryInto::try_into).transpose() + } + + async fn find_by_status(&self, status: SyncStatus) -> Result, DomainError> { + let rows = sqlx::query_as::<_, SidecarRow>( + "SELECT asset_id, sync_status, sidecar_storage_path, last_synced_at, + last_known_file_hash, error_message + FROM sidecar_records WHERE sync_status = $1", + ) + .bind(sync_status_to_str(&status)) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + rows.into_iter().map(TryInto::try_into).collect() + } + + async fn save(&self, record: &SidecarRecord) -> Result<(), DomainError> { + sqlx::query( + "INSERT INTO sidecar_records (asset_id, sync_status, sidecar_storage_path, + last_synced_at, last_known_file_hash, error_message) + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT (asset_id) DO UPDATE SET + sync_status = EXCLUDED.sync_status, + sidecar_storage_path = EXCLUDED.sidecar_storage_path, + last_synced_at = EXCLUDED.last_synced_at, + last_known_file_hash = EXCLUDED.last_known_file_hash, + error_message = EXCLUDED.error_message", + ) + .bind(*record.asset_id.as_uuid()) + .bind(sync_status_to_str(&record.sync_status)) + .bind(&record.sidecar_storage_path) + .bind(record.last_synced_at.as_ref().map(|d| d.as_datetime())) + .bind(record.last_known_file_hash.as_ref().map(|c| c.as_str())) + .bind(&record.error_message) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } + + async fn delete(&self, asset_id: &SystemId) -> Result<(), DomainError> { + sqlx::query("DELETE FROM sidecar_records WHERE asset_id = $1") + .bind(*asset_id.as_uuid()) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } +} diff --git a/crates/adapters/postgres/src/tag_repository.rs b/crates/adapters/postgres/src/tag_repository.rs new file mode 100644 index 0000000..e345073 --- /dev/null +++ b/crates/adapters/postgres/src/tag_repository.rs @@ -0,0 +1,174 @@ +use crate::db::PgPool; +use async_trait::async_trait; +use domain::{ + entities::{AssetTag, Tag, TagSource}, + errors::DomainError, + ports::TagRepository, + value_objects::SystemId, +}; +use uuid::Uuid; + +#[derive(sqlx::FromRow)] +struct TagRow { + tag_id: Uuid, + name: String, + tag_source: String, +} + +#[derive(sqlx::FromRow)] +struct AssetTagRow { + asset_id: Uuid, + tag_id: Uuid, + tagged_by_user_id: Option, + confidence: f64, +} + +fn tag_source_from_str(s: &str) -> TagSource { + match s { + "ai_generated" => TagSource::AiGenerated, + "exif_extracted" => TagSource::ExifExtracted, + _ => TagSource::UserManual, + } +} + +fn tag_source_to_str(s: &TagSource) -> &'static str { + match s { + TagSource::UserManual => "user_manual", + TagSource::AiGenerated => "ai_generated", + TagSource::ExifExtracted => "exif_extracted", + } +} + +impl From for Tag { + fn from(r: TagRow) -> Self { + Self { + tag_id: SystemId::from_uuid(r.tag_id), + name: r.name, + tag_source: tag_source_from_str(&r.tag_source), + } + } +} + +impl From for AssetTag { + fn from(r: AssetTagRow) -> Self { + Self { + asset_id: SystemId::from_uuid(r.asset_id), + tag_id: SystemId::from_uuid(r.tag_id), + tagged_by_user_id: r.tagged_by_user_id.map(SystemId::from_uuid), + confidence: r.confidence, + } + } +} + +pub struct PostgresTagRepository { + pool: PgPool, +} + +impl PostgresTagRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl TagRepository for PostgresTagRepository { + async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { + let row = sqlx::query_as::<_, TagRow>( + "SELECT tag_id, name, tag_source FROM tags WHERE tag_id = $1", + ) + .bind(*id.as_uuid()) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(row.map(Into::into)) + } + + async fn find_by_name(&self, name: &str) -> Result, DomainError> { + let row = sqlx::query_as::<_, TagRow>( + "SELECT tag_id, name, tag_source FROM tags WHERE name = $1", + ) + .bind(name) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(row.map(Into::into)) + } + + async fn find_tags_for_asset( + &self, + asset_id: &SystemId, + ) -> Result, DomainError> { + let rows = sqlx::query_as::<_, TagRow>( + "SELECT t.tag_id, t.name, t.tag_source + FROM tags t JOIN asset_tags at ON t.tag_id = at.tag_id + WHERE at.asset_id = $1", + ) + .bind(*asset_id.as_uuid()) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + let at_rows = sqlx::query_as::<_, AssetTagRow>( + "SELECT asset_id, tag_id, tagged_by_user_id, confidence + FROM asset_tags WHERE asset_id = $1", + ) + .bind(*asset_id.as_uuid()) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + let tags: Vec = rows.into_iter().map(Into::into).collect(); + let asset_tags: Vec = at_rows.into_iter().map(Into::into).collect(); + + Ok(tags.into_iter().zip(asset_tags).collect()) + } + + async fn save_tag(&self, tag: &Tag) -> Result<(), DomainError> { + sqlx::query( + "INSERT INTO tags (tag_id, name, tag_source) + VALUES ($1, $2, $3) + ON CONFLICT (tag_id) DO UPDATE SET name = EXCLUDED.name, tag_source = EXCLUDED.tag_source", + ) + .bind(*tag.tag_id.as_uuid()) + .bind(&tag.name) + .bind(tag_source_to_str(&tag.tag_source)) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } + + async fn save_asset_tag(&self, asset_tag: &AssetTag) -> Result<(), DomainError> { + sqlx::query( + "INSERT INTO asset_tags (asset_id, tag_id, tagged_by_user_id, confidence) + VALUES ($1, $2, $3, $4) + ON CONFLICT (asset_id, tag_id) DO UPDATE SET + tagged_by_user_id = EXCLUDED.tagged_by_user_id, + confidence = EXCLUDED.confidence", + ) + .bind(*asset_tag.asset_id.as_uuid()) + .bind(*asset_tag.tag_id.as_uuid()) + .bind(asset_tag.tagged_by_user_id.as_ref().map(|id| *id.as_uuid())) + .bind(asset_tag.confidence) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } + + async fn remove_asset_tag( + &self, + asset_id: &SystemId, + tag_id: &SystemId, + ) -> Result<(), DomainError> { + sqlx::query("DELETE FROM asset_tags WHERE asset_id = $1 AND tag_id = $2") + .bind(*asset_id.as_uuid()) + .bind(*tag_id.as_uuid()) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } +}