refactor: restructure domain crate by bounded context
This commit is contained in:
259
crates/domain/src/processing/entities.rs
Normal file
259
crates/domain/src/processing/entities.rs
Normal file
@@ -0,0 +1,259 @@
|
||||
use crate::common::errors::DomainError;
|
||||
use crate::common::value_objects::{DateTimeStamp, StructuredData, SystemId};
|
||||
|
||||
// --- Job ---
|
||||
|
||||
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||
pub enum JobType {
|
||||
ScanDirectory,
|
||||
ExtractMetadata,
|
||||
GenerateDerivative,
|
||||
SyncSidecar,
|
||||
DetectDuplicates,
|
||||
Custom(String),
|
||||
}
|
||||
|
||||
impl PartialEq for JobType {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
match (self, other) {
|
||||
(Self::ScanDirectory, Self::ScanDirectory) => true,
|
||||
(Self::ExtractMetadata, Self::ExtractMetadata) => true,
|
||||
(Self::GenerateDerivative, Self::GenerateDerivative) => true,
|
||||
(Self::SyncSidecar, Self::SyncSidecar) => true,
|
||||
(Self::DetectDuplicates, Self::DetectDuplicates) => true,
|
||||
(Self::Custom(a), Self::Custom(b)) => a == b,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Eq for JobType {}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
pub enum JobStatus {
|
||||
Queued,
|
||||
Processing,
|
||||
Completed,
|
||||
Failed,
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||
pub struct Job {
|
||||
pub job_id: SystemId,
|
||||
pub job_type: JobType,
|
||||
pub target_asset_id: Option<SystemId>,
|
||||
pub batch_id: Option<SystemId>,
|
||||
pub status: JobStatus,
|
||||
pub priority: u32,
|
||||
pub payload: StructuredData,
|
||||
pub result_data: Option<StructuredData>,
|
||||
pub retry_count: u32,
|
||||
pub max_retries: u32,
|
||||
pub created_at: DateTimeStamp,
|
||||
pub started_at: Option<DateTimeStamp>,
|
||||
pub completed_at: Option<DateTimeStamp>,
|
||||
pub error_message: Option<String>,
|
||||
}
|
||||
|
||||
impl Job {
|
||||
pub fn new(job_type: JobType, priority: u32, payload: StructuredData) -> Self {
|
||||
Self {
|
||||
job_id: SystemId::new(),
|
||||
job_type,
|
||||
target_asset_id: None,
|
||||
batch_id: None,
|
||||
status: JobStatus::Queued,
|
||||
priority,
|
||||
payload,
|
||||
result_data: None,
|
||||
retry_count: 0,
|
||||
max_retries: 3,
|
||||
created_at: DateTimeStamp::now(),
|
||||
started_at: None,
|
||||
completed_at: None,
|
||||
error_message: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_target(mut self, asset_id: SystemId) -> Self {
|
||||
self.target_asset_id = Some(asset_id);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_batch(mut self, batch_id: SystemId) -> Self {
|
||||
self.batch_id = Some(batch_id);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn start(&mut self) -> Result<(), DomainError> {
|
||||
if self.status != JobStatus::Queued {
|
||||
return Err(DomainError::Conflict(
|
||||
format!("Job can only start from Queued, currently {:?}", self.status),
|
||||
));
|
||||
}
|
||||
self.status = JobStatus::Processing;
|
||||
self.started_at = Some(DateTimeStamp::now());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn complete(&mut self, result: StructuredData) {
|
||||
self.status = JobStatus::Completed;
|
||||
self.result_data = Some(result);
|
||||
self.completed_at = Some(DateTimeStamp::now());
|
||||
}
|
||||
|
||||
pub fn fail(&mut self, error: impl Into<String>) {
|
||||
self.retry_count += 1;
|
||||
self.error_message = Some(error.into());
|
||||
self.started_at = None;
|
||||
if self.retry_count >= self.max_retries {
|
||||
self.status = JobStatus::Failed;
|
||||
} else {
|
||||
self.status = JobStatus::Queued;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn cancel(&mut self) {
|
||||
self.status = JobStatus::Cancelled;
|
||||
self.completed_at = Some(DateTimeStamp::now());
|
||||
}
|
||||
|
||||
pub fn can_retry(&self) -> bool {
|
||||
self.retry_count < self.max_retries
|
||||
}
|
||||
}
|
||||
|
||||
// --- JobBatch ---
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
pub enum BatchStatus {
|
||||
InProgress,
|
||||
CompletedWithErrors,
|
||||
Completed,
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||
pub struct JobBatch {
|
||||
pub batch_id: SystemId,
|
||||
pub batch_type: String,
|
||||
pub total_jobs: u32,
|
||||
pub completed_count: u32,
|
||||
pub failed_count: u32,
|
||||
pub status: BatchStatus,
|
||||
}
|
||||
|
||||
impl JobBatch {
|
||||
pub fn new(batch_type: impl Into<String>, total_jobs: u32) -> Self {
|
||||
Self {
|
||||
batch_id: SystemId::new(),
|
||||
batch_type: batch_type.into(),
|
||||
total_jobs,
|
||||
completed_count: 0,
|
||||
failed_count: 0,
|
||||
status: BatchStatus::InProgress,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn record_completion(&mut self) {
|
||||
self.completed_count += 1;
|
||||
self.check_finished();
|
||||
}
|
||||
|
||||
pub fn record_failure(&mut self) {
|
||||
self.failed_count += 1;
|
||||
self.check_finished();
|
||||
}
|
||||
|
||||
pub fn progress_percent(&self) -> f64 {
|
||||
if self.total_jobs == 0 {
|
||||
return 100.0;
|
||||
}
|
||||
((self.completed_count + self.failed_count) as f64 / self.total_jobs as f64) * 100.0
|
||||
}
|
||||
|
||||
fn check_finished(&mut self) {
|
||||
if self.completed_count + self.failed_count >= self.total_jobs {
|
||||
self.status = if self.failed_count > 0 {
|
||||
BatchStatus::CompletedWithErrors
|
||||
} else {
|
||||
BatchStatus::Completed
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// --- Plugin ---
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
pub enum PluginType {
|
||||
MediaProcessor,
|
||||
ScheduledTask,
|
||||
SidecarWriter,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||
pub struct Plugin {
|
||||
pub plugin_id: SystemId,
|
||||
pub name: String,
|
||||
pub plugin_type: PluginType,
|
||||
pub is_enabled: bool,
|
||||
pub configuration: StructuredData,
|
||||
}
|
||||
|
||||
impl Plugin {
|
||||
pub fn new(name: impl Into<String>, plugin_type: PluginType) -> Self {
|
||||
Self {
|
||||
plugin_id: SystemId::new(),
|
||||
name: name.into(),
|
||||
plugin_type,
|
||||
is_enabled: true,
|
||||
configuration: StructuredData::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn disable(&mut self) {
|
||||
self.is_enabled = false;
|
||||
}
|
||||
|
||||
pub fn enable(&mut self) {
|
||||
self.is_enabled = true;
|
||||
}
|
||||
}
|
||||
|
||||
// --- ProcessingPipeline ---
|
||||
|
||||
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||
pub struct PipelineStep {
|
||||
pub plugin_id: SystemId,
|
||||
pub step_order: u32,
|
||||
pub configuration: StructuredData,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||
pub struct ProcessingPipeline {
|
||||
pub pipeline_id: SystemId,
|
||||
pub trigger_event: String,
|
||||
pub steps: Vec<PipelineStep>,
|
||||
}
|
||||
|
||||
impl ProcessingPipeline {
|
||||
pub fn new(trigger_event: impl Into<String>) -> Self {
|
||||
Self {
|
||||
pipeline_id: SystemId::new(),
|
||||
trigger_event: trigger_event.into(),
|
||||
steps: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_step(&mut self, plugin_id: SystemId, config: StructuredData) {
|
||||
let next_order = self.steps.iter().map(|s| s.step_order).max().unwrap_or(0)
|
||||
+ if self.steps.is_empty() { 0 } else { 1 };
|
||||
self.steps.push(PipelineStep {
|
||||
plugin_id,
|
||||
step_order: next_order,
|
||||
configuration: config,
|
||||
});
|
||||
}
|
||||
}
|
||||
5
crates/domain/src/processing/mod.rs
Normal file
5
crates/domain/src/processing/mod.rs
Normal file
@@ -0,0 +1,5 @@
|
||||
pub mod entities;
|
||||
pub mod ports;
|
||||
|
||||
pub use entities::*;
|
||||
pub use ports::*;
|
||||
40
crates/domain/src/processing/ports.rs
Normal file
40
crates/domain/src/processing/ports.rs
Normal file
@@ -0,0 +1,40 @@
|
||||
use async_trait::async_trait;
|
||||
use crate::common::errors::DomainError;
|
||||
use crate::common::value_objects::SystemId;
|
||||
use super::entities::{Job, JobBatch, Plugin, ProcessingPipeline};
|
||||
|
||||
// --- JobRepository ---
|
||||
|
||||
#[async_trait]
|
||||
pub trait JobRepository: Send + Sync {
|
||||
async fn find_by_id(&self, id: &SystemId) -> Result<Option<Job>, DomainError>;
|
||||
async fn find_next_queued(&self) -> Result<Option<Job>, DomainError>;
|
||||
async fn find_by_batch(&self, batch_id: &SystemId) -> Result<Vec<Job>, DomainError>;
|
||||
async fn save(&self, job: &Job) -> Result<(), DomainError>;
|
||||
}
|
||||
|
||||
// --- JobBatchRepository ---
|
||||
|
||||
#[async_trait]
|
||||
pub trait JobBatchRepository: Send + Sync {
|
||||
async fn find_by_id(&self, id: &SystemId) -> Result<Option<JobBatch>, DomainError>;
|
||||
async fn save(&self, batch: &JobBatch) -> Result<(), DomainError>;
|
||||
}
|
||||
|
||||
// --- PluginRepository ---
|
||||
|
||||
#[async_trait]
|
||||
pub trait PluginRepository: Send + Sync {
|
||||
async fn find_by_id(&self, id: &SystemId) -> Result<Option<Plugin>, DomainError>;
|
||||
async fn find_enabled(&self) -> Result<Vec<Plugin>, DomainError>;
|
||||
async fn save(&self, plugin: &Plugin) -> Result<(), DomainError>;
|
||||
}
|
||||
|
||||
// --- PipelineRepository ---
|
||||
|
||||
#[async_trait]
|
||||
pub trait PipelineRepository: Send + Sync {
|
||||
async fn find_by_id(&self, id: &SystemId) -> Result<Option<ProcessingPipeline>, DomainError>;
|
||||
async fn find_by_trigger(&self, event: &str) -> Result<Vec<ProcessingPipeline>, DomainError>;
|
||||
async fn save(&self, pipeline: &ProcessingPipeline) -> Result<(), DomainError>;
|
||||
}
|
||||
Reference in New Issue
Block a user