domain: add Processing entities and ports (Job, JobBatch, Plugin, Pipeline)

This commit is contained in:
2026-05-31 03:35:41 +02:00
parent ee79be0351
commit b67e595280
14 changed files with 454 additions and 18 deletions

View File

@@ -0,0 +1,123 @@
use crate::errors::DomainError;
use crate::value_objects::{DateTimeStamp, StructuredData, SystemId};
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum JobType {
ScanDirectory,
ExtractMetadata,
GenerateDerivative,
SyncSidecar,
DetectDuplicates,
Custom(String),
}
impl PartialEq for JobType {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::ScanDirectory, Self::ScanDirectory) => true,
(Self::ExtractMetadata, Self::ExtractMetadata) => true,
(Self::GenerateDerivative, Self::GenerateDerivative) => true,
(Self::SyncSidecar, Self::SyncSidecar) => true,
(Self::DetectDuplicates, Self::DetectDuplicates) => true,
(Self::Custom(a), Self::Custom(b)) => a == b,
_ => false,
}
}
}
impl Eq for JobType {}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum JobStatus {
Queued,
Processing,
Completed,
Failed,
Cancelled,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Job {
pub job_id: SystemId,
pub job_type: JobType,
pub target_asset_id: Option<SystemId>,
pub batch_id: Option<SystemId>,
pub status: JobStatus,
pub priority: u32,
pub payload: StructuredData,
pub result_data: Option<StructuredData>,
pub retry_count: u32,
pub max_retries: u32,
pub created_at: DateTimeStamp,
pub started_at: Option<DateTimeStamp>,
pub completed_at: Option<DateTimeStamp>,
pub error_message: Option<String>,
}
impl Job {
pub fn new(job_type: JobType, priority: u32, payload: StructuredData) -> Self {
Self {
job_id: SystemId::new(),
job_type,
target_asset_id: None,
batch_id: None,
status: JobStatus::Queued,
priority,
payload,
result_data: None,
retry_count: 0,
max_retries: 3,
created_at: DateTimeStamp::now(),
started_at: None,
completed_at: None,
error_message: None,
}
}
pub fn with_target(mut self, asset_id: SystemId) -> Self {
self.target_asset_id = Some(asset_id);
self
}
pub fn with_batch(mut self, batch_id: SystemId) -> Self {
self.batch_id = Some(batch_id);
self
}
pub fn start(&mut self) -> Result<(), DomainError> {
if self.status != JobStatus::Queued {
return Err(DomainError::Conflict(
format!("Job can only start from Queued, currently {:?}", self.status),
));
}
self.status = JobStatus::Processing;
self.started_at = Some(DateTimeStamp::now());
Ok(())
}
pub fn complete(&mut self, result: StructuredData) {
self.status = JobStatus::Completed;
self.result_data = Some(result);
self.completed_at = Some(DateTimeStamp::now());
}
pub fn fail(&mut self, error: impl Into<String>) {
self.retry_count += 1;
self.error_message = Some(error.into());
self.started_at = None;
if self.retry_count >= self.max_retries {
self.status = JobStatus::Failed;
} else {
self.status = JobStatus::Queued;
}
}
pub fn cancel(&mut self) {
self.status = JobStatus::Cancelled;
self.completed_at = Some(DateTimeStamp::now());
}
pub fn can_retry(&self) -> bool {
self.retry_count < self.max_retries
}
}

View File

@@ -0,0 +1,59 @@
use crate::value_objects::SystemId;
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum BatchStatus {
InProgress,
CompletedWithErrors,
Completed,
Cancelled,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct JobBatch {
pub batch_id: SystemId,
pub batch_type: String,
pub total_jobs: u32,
pub completed_count: u32,
pub failed_count: u32,
pub status: BatchStatus,
}
impl JobBatch {
pub fn new(batch_type: impl Into<String>, total_jobs: u32) -> Self {
Self {
batch_id: SystemId::new(),
batch_type: batch_type.into(),
total_jobs,
completed_count: 0,
failed_count: 0,
status: BatchStatus::InProgress,
}
}
pub fn record_completion(&mut self) {
self.completed_count += 1;
self.check_finished();
}
pub fn record_failure(&mut self) {
self.failed_count += 1;
self.check_finished();
}
pub fn progress_percent(&self) -> f64 {
if self.total_jobs == 0 {
return 100.0;
}
((self.completed_count + self.failed_count) as f64 / self.total_jobs as f64) * 100.0
}
fn check_finished(&mut self) {
if self.completed_count + self.failed_count >= self.total_jobs {
self.status = if self.failed_count > 0 {
BatchStatus::CompletedWithErrors
} else {
BatchStatus::Completed
};
}
}
}

View File

@@ -1,31 +1,39 @@
// Identity & Access (Tasks 3-4)
pub mod permission;
pub mod role;
mod user;
mod group;
pub use permission::{Permission, PermissionAction, ResourceType};
pub use role::Role;
pub use user::User;
pub use group::Group;
// Storage & Sources (Task 6)
mod storage_volume;
mod library_path;
mod ingest_session;
mod quota;
pub use storage_volume::StorageVolume;
pub use library_path::{LibraryPath, OwnershipPolicy};
pub use ingest_session::{IngestSession, IngestStatus};
pub use quota::{QuotaDefinition, QuotaRule, TimePeriod, UsageLedgerEntry, UsageType};
// Media Catalog (Task 8)
mod asset;
mod asset_metadata;
mod asset_stack;
mod derivative_asset;
mod duplicate;
pub use permission::{Permission, PermissionAction, ResourceType};
pub use role::Role;
pub use user::User;
pub use group::Group;
pub use storage_volume::StorageVolume;
pub use library_path::{LibraryPath, OwnershipPolicy};
pub use ingest_session::{IngestSession, IngestStatus};
pub use quota::{QuotaDefinition, QuotaRule, TimePeriod, UsageLedgerEntry, UsageType};
pub use asset::{Asset, AssetType, SourceReference};
pub use asset_metadata::{AssetMetadata, MetadataSource};
pub use asset_stack::{AssetStack, AssetStackMember, StackMemberRole, StackType};
pub use derivative_asset::{DerivativeAsset, DerivativeProfile, GenerationStatus};
pub use duplicate::{DetectionMethod, DuplicateCandidate, DuplicateGroup, DuplicateStatus};
// Organization (Task 10)
mod album;
mod tag;
mod collection;
@@ -34,6 +42,7 @@ pub use album::{Album, AlbumEntry};
pub use tag::{AssetTag, Tag, TagSource};
pub use collection::Collection;
// Sharing (Task 11)
mod share_scope;
mod share_target;
mod share_link;
@@ -46,8 +55,20 @@ pub use share_link::{LinkAccessLevel, ShareLink};
pub use invite_code::InviteCode;
pub use visibility_filter::VisibilityFilter;
// Sidecar Sync (Task 12)
mod sidecar_record;
mod sidecar_config;
pub use sidecar_record::{SidecarRecord, SyncStatus};
pub use sidecar_config::{ConflictPolicy, SidecarConfig, SyncMode};
// Processing (Task 13)
mod job;
mod job_batch;
mod plugin;
mod processing_pipeline;
pub use job::{Job, JobStatus, JobType};
pub use job_batch::{BatchStatus, JobBatch};
pub use plugin::{Plugin, PluginType};
pub use processing_pipeline::{PipelineStep, ProcessingPipeline};

View File

@@ -0,0 +1,37 @@
use crate::value_objects::{StructuredData, SystemId};
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum PluginType {
MediaProcessor,
ScheduledTask,
SidecarWriter,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Plugin {
pub plugin_id: SystemId,
pub name: String,
pub plugin_type: PluginType,
pub is_enabled: bool,
pub configuration: StructuredData,
}
impl Plugin {
pub fn new(name: impl Into<String>, plugin_type: PluginType) -> Self {
Self {
plugin_id: SystemId::new(),
name: name.into(),
plugin_type,
is_enabled: true,
configuration: StructuredData::new(),
}
}
pub fn disable(&mut self) {
self.is_enabled = false;
}
pub fn enable(&mut self) {
self.is_enabled = true;
}
}

View File

@@ -0,0 +1,35 @@
use crate::value_objects::{StructuredData, SystemId};
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct PipelineStep {
pub plugin_id: SystemId,
pub step_order: u32,
pub configuration: StructuredData,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ProcessingPipeline {
pub pipeline_id: SystemId,
pub trigger_event: String,
pub steps: Vec<PipelineStep>,
}
impl ProcessingPipeline {
pub fn new(trigger_event: impl Into<String>) -> Self {
Self {
pipeline_id: SystemId::new(),
trigger_event: trigger_event.into(),
steps: Vec::new(),
}
}
pub fn add_step(&mut self, plugin_id: SystemId, config: StructuredData) {
let next_order = self.steps.iter().map(|s| s.step_order).max().unwrap_or(0)
+ if self.steps.is_empty() { 0 } else { 1 };
self.steps.push(PipelineStep {
plugin_id,
step_order: next_order,
configuration: config,
});
}
}

View File

@@ -0,0 +1,8 @@
use async_trait::async_trait;
use crate::{entities::JobBatch, errors::DomainError, value_objects::SystemId};
#[async_trait]
pub trait JobBatchRepository: Send + Sync {
async fn find_by_id(&self, id: &SystemId) -> Result<Option<JobBatch>, DomainError>;
async fn save(&self, batch: &JobBatch) -> Result<(), DomainError>;
}

View File

@@ -0,0 +1,10 @@
use async_trait::async_trait;
use crate::{entities::Job, errors::DomainError, value_objects::SystemId};
#[async_trait]
pub trait JobRepository: Send + Sync {
async fn find_by_id(&self, id: &SystemId) -> Result<Option<Job>, DomainError>;
async fn find_next_queued(&self) -> Result<Option<Job>, DomainError>;
async fn find_by_batch(&self, batch_id: &SystemId) -> Result<Vec<Job>, DomainError>;
async fn save(&self, job: &Job) -> Result<(), DomainError>;
}

View File

@@ -1,19 +1,10 @@
// Identity & Access (Tasks 4-5)
mod auth;
mod event_publisher;
mod group_repo;
mod role_repo;
mod storage;
mod user_repo;
mod storage_volume_repo;
mod library_path_repo;
mod ingest_session_repo;
mod quota_repo;
mod file_storage;
mod asset_repo;
mod asset_metadata_repo;
mod asset_stack_repo;
mod derivative_repo;
mod duplicate_repo;
pub use auth::{PasswordHasher, TokenIssuer};
pub use event_publisher::EventPublisher;
@@ -21,17 +12,34 @@ pub use group_repo::GroupRepository;
pub use role_repo::RoleRepository;
pub use storage::{DataStream, StoragePort, StorageReader, StorageWriter};
pub use user_repo::UserRepository;
// Storage & Sources (Task 7)
mod storage_volume_repo;
mod library_path_repo;
mod ingest_session_repo;
mod quota_repo;
mod file_storage;
pub use storage_volume_repo::StorageVolumeRepository;
pub use library_path_repo::LibraryPathRepository;
pub use ingest_session_repo::IngestSessionRepository;
pub use quota_repo::{QuotaRepository, UsageLedgerRepository};
pub use file_storage::{FileEntry, FileStoragePort};
// Media Catalog (Task 9)
mod asset_repo;
mod asset_metadata_repo;
mod asset_stack_repo;
mod derivative_repo;
mod duplicate_repo;
pub use asset_repo::AssetRepository;
pub use asset_metadata_repo::AssetMetadataRepository;
pub use asset_stack_repo::AssetStackRepository;
pub use derivative_repo::DerivativeRepository;
pub use duplicate_repo::DuplicateRepository;
// Organization (Task 10)
mod album_repo;
mod tag_repo;
mod collection_repo;
@@ -40,14 +48,27 @@ pub use album_repo::AlbumRepository;
pub use tag_repo::TagRepository;
pub use collection_repo::CollectionRepository;
// Sharing (Task 11)
mod share_repo;
mod visibility_filter_repo;
pub use share_repo::ShareRepository;
pub use visibility_filter_repo::VisibilityFilterRepository;
// Sidecar Sync (Task 12)
mod sidecar_repo;
mod sidecar_writer;
pub use sidecar_repo::SidecarRepository;
pub use sidecar_writer::SidecarWriterPort;
// Processing (Task 13)
mod job_repo;
mod job_batch_repo;
mod plugin_repo;
mod pipeline_repo;
pub use job_repo::JobRepository;
pub use job_batch_repo::JobBatchRepository;
pub use plugin_repo::PluginRepository;
pub use pipeline_repo::PipelineRepository;

View File

@@ -0,0 +1,9 @@
use async_trait::async_trait;
use crate::{entities::ProcessingPipeline, errors::DomainError, value_objects::SystemId};
#[async_trait]
pub trait PipelineRepository: Send + Sync {
async fn find_by_id(&self, id: &SystemId) -> Result<Option<ProcessingPipeline>, DomainError>;
async fn find_by_trigger(&self, event: &str) -> Result<Vec<ProcessingPipeline>, DomainError>;
async fn save(&self, pipeline: &ProcessingPipeline) -> Result<(), DomainError>;
}

View File

@@ -0,0 +1,9 @@
use async_trait::async_trait;
use crate::{entities::Plugin, errors::DomainError, value_objects::SystemId};
#[async_trait]
pub trait PluginRepository: Send + Sync {
async fn find_by_id(&self, id: &SystemId) -> Result<Option<Plugin>, DomainError>;
async fn find_enabled(&self) -> Result<Vec<Plugin>, DomainError>;
async fn save(&self, plugin: &Plugin) -> Result<(), DomainError>;
}

View File

@@ -0,0 +1,53 @@
use domain::entities::{Job, JobStatus, JobType};
use domain::errors::DomainError;
use domain::value_objects::StructuredData;
#[test]
fn job_lifecycle_success() {
let mut job = Job::new(JobType::ExtractMetadata, 5, StructuredData::new());
assert_eq!(job.status, JobStatus::Queued);
job.start().unwrap();
assert_eq!(job.status, JobStatus::Processing);
assert!(job.started_at.is_some());
job.complete(StructuredData::new());
assert_eq!(job.status, JobStatus::Completed);
assert!(job.result_data.is_some());
assert!(job.completed_at.is_some());
}
#[test]
fn retry_on_failure() {
let mut job = Job::new(JobType::ScanDirectory, 1, StructuredData::new());
job.start().unwrap();
job.fail("timeout");
assert_eq!(job.status, JobStatus::Queued);
assert_eq!(job.retry_count, 1);
assert!(job.can_retry());
assert!(job.started_at.is_none());
}
#[test]
fn fails_after_max_retries() {
let mut job = Job::new(JobType::ScanDirectory, 1, StructuredData::new());
job.max_retries = 2;
job.start().unwrap();
job.fail("err1");
assert_eq!(job.status, JobStatus::Queued);
job.start().unwrap();
job.fail("err2");
assert_eq!(job.status, JobStatus::Failed);
assert!(!job.can_retry());
}
#[test]
fn cannot_start_from_processing() {
let mut job = Job::new(JobType::ScanDirectory, 1, StructuredData::new());
job.start().unwrap();
let result = job.start();
assert!(matches!(result, Err(DomainError::Conflict(_))));
}

View File

@@ -0,0 +1,31 @@
use domain::entities::{BatchStatus, JobBatch};
#[test]
fn completes_when_all_done() {
let mut batch = JobBatch::new("scan", 3);
batch.record_completion();
batch.record_completion();
batch.record_completion();
assert_eq!(batch.status, BatchStatus::Completed);
}
#[test]
fn completes_with_errors() {
let mut batch = JobBatch::new("scan", 3);
batch.record_completion();
batch.record_failure();
batch.record_completion();
assert_eq!(batch.status, BatchStatus::CompletedWithErrors);
}
#[test]
fn progress_tracking() {
let mut batch = JobBatch::new("scan", 4);
assert_eq!(batch.progress_percent(), 0.0);
batch.record_completion();
assert_eq!(batch.progress_percent(), 25.0);
batch.record_completion();
assert_eq!(batch.progress_percent(), 50.0);
}

View File

@@ -16,3 +16,6 @@ mod tag;
mod share_scope;
mod share_link;
mod sidecar_record;
mod job;
mod job_batch;
mod processing_pipeline;

View File

@@ -0,0 +1,17 @@
use domain::entities::ProcessingPipeline;
use domain::value_objects::{StructuredData, SystemId};
#[test]
fn steps_ordered() {
let mut pipeline = ProcessingPipeline::new("asset.created");
assert!(pipeline.steps.is_empty());
pipeline.add_step(SystemId::new(), StructuredData::new());
pipeline.add_step(SystemId::new(), StructuredData::new());
pipeline.add_step(SystemId::new(), StructuredData::new());
assert_eq!(pipeline.steps.len(), 3);
assert_eq!(pipeline.steps[0].step_order, 0);
assert_eq!(pipeline.steps[1].step_order, 1);
assert_eq!(pipeline.steps[2].step_order, 2);
}