From b67e5952807ea04332d96e607614fc2b006bfa88 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Sun, 31 May 2026 03:35:41 +0200 Subject: [PATCH] domain: add Processing entities and ports (Job, JobBatch, Plugin, Pipeline) --- crates/domain/src/entities/job.rs | 123 ++++++++++++++++++ crates/domain/src/entities/job_batch.rs | 59 +++++++++ crates/domain/src/entities/mod.rs | 37 ++++-- crates/domain/src/entities/plugin.rs | 37 ++++++ .../src/entities/processing_pipeline.rs | 35 +++++ crates/domain/src/ports/job_batch_repo.rs | 8 ++ crates/domain/src/ports/job_repo.rs | 10 ++ crates/domain/src/ports/mod.rs | 41 ++++-- crates/domain/src/ports/pipeline_repo.rs | 9 ++ crates/domain/src/ports/plugin_repo.rs | 9 ++ crates/domain/tests/entities/job.rs | 53 ++++++++ crates/domain/tests/entities/job_batch.rs | 31 +++++ crates/domain/tests/entities/mod.rs | 3 + .../tests/entities/processing_pipeline.rs | 17 +++ 14 files changed, 454 insertions(+), 18 deletions(-) create mode 100644 crates/domain/src/entities/job.rs create mode 100644 crates/domain/src/entities/job_batch.rs create mode 100644 crates/domain/src/entities/plugin.rs create mode 100644 crates/domain/src/entities/processing_pipeline.rs create mode 100644 crates/domain/src/ports/job_batch_repo.rs create mode 100644 crates/domain/src/ports/job_repo.rs create mode 100644 crates/domain/src/ports/pipeline_repo.rs create mode 100644 crates/domain/src/ports/plugin_repo.rs create mode 100644 crates/domain/tests/entities/job.rs create mode 100644 crates/domain/tests/entities/job_batch.rs create mode 100644 crates/domain/tests/entities/processing_pipeline.rs diff --git a/crates/domain/src/entities/job.rs b/crates/domain/src/entities/job.rs new file mode 100644 index 0000000..5f4eda7 --- /dev/null +++ b/crates/domain/src/entities/job.rs @@ -0,0 +1,123 @@ +use crate::errors::DomainError; +use crate::value_objects::{DateTimeStamp, StructuredData, SystemId}; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum JobType { + ScanDirectory, + ExtractMetadata, + GenerateDerivative, + SyncSidecar, + DetectDuplicates, + Custom(String), +} + +impl PartialEq for JobType { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (Self::ScanDirectory, Self::ScanDirectory) => true, + (Self::ExtractMetadata, Self::ExtractMetadata) => true, + (Self::GenerateDerivative, Self::GenerateDerivative) => true, + (Self::SyncSidecar, Self::SyncSidecar) => true, + (Self::DetectDuplicates, Self::DetectDuplicates) => true, + (Self::Custom(a), Self::Custom(b)) => a == b, + _ => false, + } + } +} + +impl Eq for JobType {} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub enum JobStatus { + Queued, + Processing, + Completed, + Failed, + Cancelled, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct Job { + pub job_id: SystemId, + pub job_type: JobType, + pub target_asset_id: Option, + pub batch_id: Option, + pub status: JobStatus, + pub priority: u32, + pub payload: StructuredData, + pub result_data: Option, + pub retry_count: u32, + pub max_retries: u32, + pub created_at: DateTimeStamp, + pub started_at: Option, + pub completed_at: Option, + pub error_message: Option, +} + +impl Job { + pub fn new(job_type: JobType, priority: u32, payload: StructuredData) -> Self { + Self { + job_id: SystemId::new(), + job_type, + target_asset_id: None, + batch_id: None, + status: JobStatus::Queued, + priority, + payload, + result_data: None, + retry_count: 0, + max_retries: 3, + created_at: DateTimeStamp::now(), + started_at: None, + completed_at: None, + error_message: None, + } + } + + pub fn with_target(mut self, asset_id: SystemId) -> Self { + self.target_asset_id = Some(asset_id); + self + } + + pub fn with_batch(mut self, batch_id: SystemId) -> Self { + self.batch_id = Some(batch_id); + self + } + + pub fn start(&mut self) -> Result<(), DomainError> { + if self.status != JobStatus::Queued { + return Err(DomainError::Conflict( + format!("Job can only start from Queued, currently {:?}", self.status), + )); + } + self.status = JobStatus::Processing; + self.started_at = Some(DateTimeStamp::now()); + Ok(()) + } + + pub fn complete(&mut self, result: StructuredData) { + self.status = JobStatus::Completed; + self.result_data = Some(result); + self.completed_at = Some(DateTimeStamp::now()); + } + + pub fn fail(&mut self, error: impl Into) { + self.retry_count += 1; + self.error_message = Some(error.into()); + self.started_at = None; + if self.retry_count >= self.max_retries { + self.status = JobStatus::Failed; + } else { + self.status = JobStatus::Queued; + } + } + + pub fn cancel(&mut self) { + self.status = JobStatus::Cancelled; + self.completed_at = Some(DateTimeStamp::now()); + } + + pub fn can_retry(&self) -> bool { + self.retry_count < self.max_retries + } +} diff --git a/crates/domain/src/entities/job_batch.rs b/crates/domain/src/entities/job_batch.rs new file mode 100644 index 0000000..a045dcb --- /dev/null +++ b/crates/domain/src/entities/job_batch.rs @@ -0,0 +1,59 @@ +use crate::value_objects::SystemId; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub enum BatchStatus { + InProgress, + CompletedWithErrors, + Completed, + Cancelled, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct JobBatch { + pub batch_id: SystemId, + pub batch_type: String, + pub total_jobs: u32, + pub completed_count: u32, + pub failed_count: u32, + pub status: BatchStatus, +} + +impl JobBatch { + pub fn new(batch_type: impl Into, total_jobs: u32) -> Self { + Self { + batch_id: SystemId::new(), + batch_type: batch_type.into(), + total_jobs, + completed_count: 0, + failed_count: 0, + status: BatchStatus::InProgress, + } + } + + pub fn record_completion(&mut self) { + self.completed_count += 1; + self.check_finished(); + } + + pub fn record_failure(&mut self) { + self.failed_count += 1; + self.check_finished(); + } + + pub fn progress_percent(&self) -> f64 { + if self.total_jobs == 0 { + return 100.0; + } + ((self.completed_count + self.failed_count) as f64 / self.total_jobs as f64) * 100.0 + } + + fn check_finished(&mut self) { + if self.completed_count + self.failed_count >= self.total_jobs { + self.status = if self.failed_count > 0 { + BatchStatus::CompletedWithErrors + } else { + BatchStatus::Completed + }; + } + } +} diff --git a/crates/domain/src/entities/mod.rs b/crates/domain/src/entities/mod.rs index 2a7f1e3..945c550 100644 --- a/crates/domain/src/entities/mod.rs +++ b/crates/domain/src/entities/mod.rs @@ -1,31 +1,39 @@ +// Identity & Access (Tasks 3-4) pub mod permission; pub mod role; mod user; mod group; + +pub use permission::{Permission, PermissionAction, ResourceType}; +pub use role::Role; +pub use user::User; +pub use group::Group; + +// Storage & Sources (Task 6) mod storage_volume; mod library_path; mod ingest_session; mod quota; + +pub use storage_volume::StorageVolume; +pub use library_path::{LibraryPath, OwnershipPolicy}; +pub use ingest_session::{IngestSession, IngestStatus}; +pub use quota::{QuotaDefinition, QuotaRule, TimePeriod, UsageLedgerEntry, UsageType}; + +// Media Catalog (Task 8) mod asset; mod asset_metadata; mod asset_stack; mod derivative_asset; mod duplicate; -pub use permission::{Permission, PermissionAction, ResourceType}; -pub use role::Role; -pub use user::User; -pub use group::Group; -pub use storage_volume::StorageVolume; -pub use library_path::{LibraryPath, OwnershipPolicy}; -pub use ingest_session::{IngestSession, IngestStatus}; -pub use quota::{QuotaDefinition, QuotaRule, TimePeriod, UsageLedgerEntry, UsageType}; pub use asset::{Asset, AssetType, SourceReference}; pub use asset_metadata::{AssetMetadata, MetadataSource}; pub use asset_stack::{AssetStack, AssetStackMember, StackMemberRole, StackType}; pub use derivative_asset::{DerivativeAsset, DerivativeProfile, GenerationStatus}; pub use duplicate::{DetectionMethod, DuplicateCandidate, DuplicateGroup, DuplicateStatus}; +// Organization (Task 10) mod album; mod tag; mod collection; @@ -34,6 +42,7 @@ pub use album::{Album, AlbumEntry}; pub use tag::{AssetTag, Tag, TagSource}; pub use collection::Collection; +// Sharing (Task 11) mod share_scope; mod share_target; mod share_link; @@ -46,8 +55,20 @@ pub use share_link::{LinkAccessLevel, ShareLink}; pub use invite_code::InviteCode; pub use visibility_filter::VisibilityFilter; +// Sidecar Sync (Task 12) mod sidecar_record; mod sidecar_config; pub use sidecar_record::{SidecarRecord, SyncStatus}; pub use sidecar_config::{ConflictPolicy, SidecarConfig, SyncMode}; + +// Processing (Task 13) +mod job; +mod job_batch; +mod plugin; +mod processing_pipeline; + +pub use job::{Job, JobStatus, JobType}; +pub use job_batch::{BatchStatus, JobBatch}; +pub use plugin::{Plugin, PluginType}; +pub use processing_pipeline::{PipelineStep, ProcessingPipeline}; diff --git a/crates/domain/src/entities/plugin.rs b/crates/domain/src/entities/plugin.rs new file mode 100644 index 0000000..0f14bf9 --- /dev/null +++ b/crates/domain/src/entities/plugin.rs @@ -0,0 +1,37 @@ +use crate::value_objects::{StructuredData, SystemId}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub enum PluginType { + MediaProcessor, + ScheduledTask, + SidecarWriter, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct Plugin { + pub plugin_id: SystemId, + pub name: String, + pub plugin_type: PluginType, + pub is_enabled: bool, + pub configuration: StructuredData, +} + +impl Plugin { + pub fn new(name: impl Into, plugin_type: PluginType) -> Self { + Self { + plugin_id: SystemId::new(), + name: name.into(), + plugin_type, + is_enabled: true, + configuration: StructuredData::new(), + } + } + + pub fn disable(&mut self) { + self.is_enabled = false; + } + + pub fn enable(&mut self) { + self.is_enabled = true; + } +} diff --git a/crates/domain/src/entities/processing_pipeline.rs b/crates/domain/src/entities/processing_pipeline.rs new file mode 100644 index 0000000..2fdaeda --- /dev/null +++ b/crates/domain/src/entities/processing_pipeline.rs @@ -0,0 +1,35 @@ +use crate::value_objects::{StructuredData, SystemId}; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct PipelineStep { + pub plugin_id: SystemId, + pub step_order: u32, + pub configuration: StructuredData, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct ProcessingPipeline { + pub pipeline_id: SystemId, + pub trigger_event: String, + pub steps: Vec, +} + +impl ProcessingPipeline { + pub fn new(trigger_event: impl Into) -> Self { + Self { + pipeline_id: SystemId::new(), + trigger_event: trigger_event.into(), + steps: Vec::new(), + } + } + + pub fn add_step(&mut self, plugin_id: SystemId, config: StructuredData) { + let next_order = self.steps.iter().map(|s| s.step_order).max().unwrap_or(0) + + if self.steps.is_empty() { 0 } else { 1 }; + self.steps.push(PipelineStep { + plugin_id, + step_order: next_order, + configuration: config, + }); + } +} diff --git a/crates/domain/src/ports/job_batch_repo.rs b/crates/domain/src/ports/job_batch_repo.rs new file mode 100644 index 0000000..66abc5d --- /dev/null +++ b/crates/domain/src/ports/job_batch_repo.rs @@ -0,0 +1,8 @@ +use async_trait::async_trait; +use crate::{entities::JobBatch, errors::DomainError, value_objects::SystemId}; + +#[async_trait] +pub trait JobBatchRepository: Send + Sync { + async fn find_by_id(&self, id: &SystemId) -> Result, DomainError>; + async fn save(&self, batch: &JobBatch) -> Result<(), DomainError>; +} diff --git a/crates/domain/src/ports/job_repo.rs b/crates/domain/src/ports/job_repo.rs new file mode 100644 index 0000000..a309170 --- /dev/null +++ b/crates/domain/src/ports/job_repo.rs @@ -0,0 +1,10 @@ +use async_trait::async_trait; +use crate::{entities::Job, errors::DomainError, value_objects::SystemId}; + +#[async_trait] +pub trait JobRepository: Send + Sync { + async fn find_by_id(&self, id: &SystemId) -> Result, DomainError>; + async fn find_next_queued(&self) -> Result, DomainError>; + async fn find_by_batch(&self, batch_id: &SystemId) -> Result, DomainError>; + async fn save(&self, job: &Job) -> Result<(), DomainError>; +} diff --git a/crates/domain/src/ports/mod.rs b/crates/domain/src/ports/mod.rs index 05375cb..d707384 100644 --- a/crates/domain/src/ports/mod.rs +++ b/crates/domain/src/ports/mod.rs @@ -1,19 +1,10 @@ +// Identity & Access (Tasks 4-5) mod auth; mod event_publisher; mod group_repo; mod role_repo; mod storage; mod user_repo; -mod storage_volume_repo; -mod library_path_repo; -mod ingest_session_repo; -mod quota_repo; -mod file_storage; -mod asset_repo; -mod asset_metadata_repo; -mod asset_stack_repo; -mod derivative_repo; -mod duplicate_repo; pub use auth::{PasswordHasher, TokenIssuer}; pub use event_publisher::EventPublisher; @@ -21,17 +12,34 @@ pub use group_repo::GroupRepository; pub use role_repo::RoleRepository; pub use storage::{DataStream, StoragePort, StorageReader, StorageWriter}; pub use user_repo::UserRepository; + +// Storage & Sources (Task 7) +mod storage_volume_repo; +mod library_path_repo; +mod ingest_session_repo; +mod quota_repo; +mod file_storage; + pub use storage_volume_repo::StorageVolumeRepository; pub use library_path_repo::LibraryPathRepository; pub use ingest_session_repo::IngestSessionRepository; pub use quota_repo::{QuotaRepository, UsageLedgerRepository}; pub use file_storage::{FileEntry, FileStoragePort}; + +// Media Catalog (Task 9) +mod asset_repo; +mod asset_metadata_repo; +mod asset_stack_repo; +mod derivative_repo; +mod duplicate_repo; + pub use asset_repo::AssetRepository; pub use asset_metadata_repo::AssetMetadataRepository; pub use asset_stack_repo::AssetStackRepository; pub use derivative_repo::DerivativeRepository; pub use duplicate_repo::DuplicateRepository; +// Organization (Task 10) mod album_repo; mod tag_repo; mod collection_repo; @@ -40,14 +48,27 @@ pub use album_repo::AlbumRepository; pub use tag_repo::TagRepository; pub use collection_repo::CollectionRepository; +// Sharing (Task 11) mod share_repo; mod visibility_filter_repo; pub use share_repo::ShareRepository; pub use visibility_filter_repo::VisibilityFilterRepository; +// Sidecar Sync (Task 12) mod sidecar_repo; mod sidecar_writer; pub use sidecar_repo::SidecarRepository; pub use sidecar_writer::SidecarWriterPort; + +// Processing (Task 13) +mod job_repo; +mod job_batch_repo; +mod plugin_repo; +mod pipeline_repo; + +pub use job_repo::JobRepository; +pub use job_batch_repo::JobBatchRepository; +pub use plugin_repo::PluginRepository; +pub use pipeline_repo::PipelineRepository; diff --git a/crates/domain/src/ports/pipeline_repo.rs b/crates/domain/src/ports/pipeline_repo.rs new file mode 100644 index 0000000..ee20d44 --- /dev/null +++ b/crates/domain/src/ports/pipeline_repo.rs @@ -0,0 +1,9 @@ +use async_trait::async_trait; +use crate::{entities::ProcessingPipeline, errors::DomainError, value_objects::SystemId}; + +#[async_trait] +pub trait PipelineRepository: Send + Sync { + async fn find_by_id(&self, id: &SystemId) -> Result, DomainError>; + async fn find_by_trigger(&self, event: &str) -> Result, DomainError>; + async fn save(&self, pipeline: &ProcessingPipeline) -> Result<(), DomainError>; +} diff --git a/crates/domain/src/ports/plugin_repo.rs b/crates/domain/src/ports/plugin_repo.rs new file mode 100644 index 0000000..9b84aef --- /dev/null +++ b/crates/domain/src/ports/plugin_repo.rs @@ -0,0 +1,9 @@ +use async_trait::async_trait; +use crate::{entities::Plugin, errors::DomainError, value_objects::SystemId}; + +#[async_trait] +pub trait PluginRepository: Send + Sync { + async fn find_by_id(&self, id: &SystemId) -> Result, DomainError>; + async fn find_enabled(&self) -> Result, DomainError>; + async fn save(&self, plugin: &Plugin) -> Result<(), DomainError>; +} diff --git a/crates/domain/tests/entities/job.rs b/crates/domain/tests/entities/job.rs new file mode 100644 index 0000000..31543ae --- /dev/null +++ b/crates/domain/tests/entities/job.rs @@ -0,0 +1,53 @@ +use domain::entities::{Job, JobStatus, JobType}; +use domain::errors::DomainError; +use domain::value_objects::StructuredData; + +#[test] +fn job_lifecycle_success() { + let mut job = Job::new(JobType::ExtractMetadata, 5, StructuredData::new()); + assert_eq!(job.status, JobStatus::Queued); + + job.start().unwrap(); + assert_eq!(job.status, JobStatus::Processing); + assert!(job.started_at.is_some()); + + job.complete(StructuredData::new()); + assert_eq!(job.status, JobStatus::Completed); + assert!(job.result_data.is_some()); + assert!(job.completed_at.is_some()); +} + +#[test] +fn retry_on_failure() { + let mut job = Job::new(JobType::ScanDirectory, 1, StructuredData::new()); + job.start().unwrap(); + + job.fail("timeout"); + assert_eq!(job.status, JobStatus::Queued); + assert_eq!(job.retry_count, 1); + assert!(job.can_retry()); + assert!(job.started_at.is_none()); +} + +#[test] +fn fails_after_max_retries() { + let mut job = Job::new(JobType::ScanDirectory, 1, StructuredData::new()); + job.max_retries = 2; + + job.start().unwrap(); + job.fail("err1"); + assert_eq!(job.status, JobStatus::Queued); + + job.start().unwrap(); + job.fail("err2"); + assert_eq!(job.status, JobStatus::Failed); + assert!(!job.can_retry()); +} + +#[test] +fn cannot_start_from_processing() { + let mut job = Job::new(JobType::ScanDirectory, 1, StructuredData::new()); + job.start().unwrap(); + let result = job.start(); + assert!(matches!(result, Err(DomainError::Conflict(_)))); +} diff --git a/crates/domain/tests/entities/job_batch.rs b/crates/domain/tests/entities/job_batch.rs new file mode 100644 index 0000000..e925ac2 --- /dev/null +++ b/crates/domain/tests/entities/job_batch.rs @@ -0,0 +1,31 @@ +use domain::entities::{BatchStatus, JobBatch}; + +#[test] +fn completes_when_all_done() { + let mut batch = JobBatch::new("scan", 3); + batch.record_completion(); + batch.record_completion(); + batch.record_completion(); + assert_eq!(batch.status, BatchStatus::Completed); +} + +#[test] +fn completes_with_errors() { + let mut batch = JobBatch::new("scan", 3); + batch.record_completion(); + batch.record_failure(); + batch.record_completion(); + assert_eq!(batch.status, BatchStatus::CompletedWithErrors); +} + +#[test] +fn progress_tracking() { + let mut batch = JobBatch::new("scan", 4); + assert_eq!(batch.progress_percent(), 0.0); + + batch.record_completion(); + assert_eq!(batch.progress_percent(), 25.0); + + batch.record_completion(); + assert_eq!(batch.progress_percent(), 50.0); +} diff --git a/crates/domain/tests/entities/mod.rs b/crates/domain/tests/entities/mod.rs index a37b7a1..9f12d10 100644 --- a/crates/domain/tests/entities/mod.rs +++ b/crates/domain/tests/entities/mod.rs @@ -16,3 +16,6 @@ mod tag; mod share_scope; mod share_link; mod sidecar_record; +mod job; +mod job_batch; +mod processing_pipeline; diff --git a/crates/domain/tests/entities/processing_pipeline.rs b/crates/domain/tests/entities/processing_pipeline.rs new file mode 100644 index 0000000..a664a4b --- /dev/null +++ b/crates/domain/tests/entities/processing_pipeline.rs @@ -0,0 +1,17 @@ +use domain::entities::ProcessingPipeline; +use domain::value_objects::{StructuredData, SystemId}; + +#[test] +fn steps_ordered() { + let mut pipeline = ProcessingPipeline::new("asset.created"); + assert!(pipeline.steps.is_empty()); + + pipeline.add_step(SystemId::new(), StructuredData::new()); + pipeline.add_step(SystemId::new(), StructuredData::new()); + pipeline.add_step(SystemId::new(), StructuredData::new()); + + assert_eq!(pipeline.steps.len(), 3); + assert_eq!(pipeline.steps[0].step_order, 0); + assert_eq!(pipeline.steps[1].step_order, 1); + assert_eq!(pipeline.steps[2].step_order, 2); +}