feat: worker plugin system — domain ports, pipeline executor, built-in plugins
- PluginExecutor + PluginRegistry ports in domain - ExecutePipelineCommand orchestrates job→pipeline→plugin steps - ProcessNextJobCommand polls + executes next queued job - InMemoryPluginRegistry, NoOp/MetadataExtractor/SidecarSync plugins - Worker main rewritten with poll loop, factories module for DI - Deleted template job/runner/jobs remnants
This commit is contained in:
176
crates/application/src/processing/commands/execute_pipeline.rs
Normal file
176
crates/application/src/processing/commands/execute_pipeline.rs
Normal file
@@ -0,0 +1,176 @@
|
||||
use domain::{
|
||||
entities::{Job, JobType},
|
||||
errors::DomainError,
|
||||
events::DomainEvent,
|
||||
ports::{
|
||||
EventPublisher, JobBatchRepository, JobRepository, PipelineRepository, PluginRegistry,
|
||||
PluginRepository,
|
||||
},
|
||||
value_objects::{DateTimeStamp, StructuredData, SystemId},
|
||||
};
|
||||
use std::sync::Arc;
|
||||
|
||||
pub struct ExecutePipelineCommand {
|
||||
pub job_id: SystemId,
|
||||
}
|
||||
|
||||
pub struct ExecutePipelineHandler {
|
||||
job_repo: Arc<dyn JobRepository>,
|
||||
batch_repo: Arc<dyn JobBatchRepository>,
|
||||
pipeline_repo: Arc<dyn PipelineRepository>,
|
||||
plugin_repo: Arc<dyn PluginRepository>,
|
||||
plugin_registry: Arc<dyn PluginRegistry>,
|
||||
event_pub: Arc<dyn EventPublisher>,
|
||||
}
|
||||
|
||||
fn job_type_to_trigger(job_type: &JobType) -> &str {
|
||||
match job_type {
|
||||
JobType::ScanDirectory => "scan_directory",
|
||||
JobType::ExtractMetadata => "extract_metadata",
|
||||
JobType::GenerateDerivative => "generate_derivative",
|
||||
JobType::SyncSidecar => "sync_sidecar",
|
||||
JobType::DetectDuplicates => "detect_duplicates",
|
||||
JobType::Custom(s) => s.as_str(),
|
||||
}
|
||||
}
|
||||
|
||||
impl ExecutePipelineHandler {
|
||||
pub fn new(
|
||||
job_repo: Arc<dyn JobRepository>,
|
||||
batch_repo: Arc<dyn JobBatchRepository>,
|
||||
pipeline_repo: Arc<dyn PipelineRepository>,
|
||||
plugin_repo: Arc<dyn PluginRepository>,
|
||||
plugin_registry: Arc<dyn PluginRegistry>,
|
||||
event_pub: Arc<dyn EventPublisher>,
|
||||
) -> Self {
|
||||
Self {
|
||||
job_repo,
|
||||
batch_repo,
|
||||
pipeline_repo,
|
||||
plugin_repo,
|
||||
plugin_registry,
|
||||
event_pub,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn execute(&self, cmd: ExecutePipelineCommand) -> Result<Job, DomainError> {
|
||||
let mut job = self
|
||||
.job_repo
|
||||
.find_by_id(&cmd.job_id)
|
||||
.await?
|
||||
.ok_or_else(|| DomainError::NotFound(format!("Job {} not found", cmd.job_id)))?;
|
||||
|
||||
job.start()?;
|
||||
self.job_repo.save(&job).await?;
|
||||
|
||||
let trigger = job_type_to_trigger(&job.job_type);
|
||||
let pipelines = self.pipeline_repo.find_by_trigger(trigger).await?;
|
||||
|
||||
let result = match pipelines.first() {
|
||||
Some(pipeline) => self.run_pipeline_steps(&job, pipeline).await,
|
||||
None => self.run_direct(&job).await,
|
||||
};
|
||||
|
||||
match result {
|
||||
Ok(result_data) => {
|
||||
job.complete(result_data);
|
||||
self.job_repo.save(&job).await?;
|
||||
self.update_batch_on_complete(&job).await?;
|
||||
self.event_pub
|
||||
.publish(DomainEvent::JobCompleted {
|
||||
job_id: job.job_id,
|
||||
timestamp: DateTimeStamp::now(),
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
Err(e) => {
|
||||
let error_msg = e.to_string();
|
||||
job.fail(&error_msg);
|
||||
self.job_repo.save(&job).await?;
|
||||
self.update_batch_on_fail(&job).await?;
|
||||
self.event_pub
|
||||
.publish(DomainEvent::JobFailed {
|
||||
job_id: job.job_id,
|
||||
error: error_msg,
|
||||
timestamp: DateTimeStamp::now(),
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(job)
|
||||
}
|
||||
|
||||
async fn run_pipeline_steps(
|
||||
&self,
|
||||
job: &Job,
|
||||
pipeline: &domain::entities::ProcessingPipeline,
|
||||
) -> Result<StructuredData, DomainError> {
|
||||
let mut accumulated = StructuredData::new();
|
||||
let mut sorted_steps = pipeline.steps.clone();
|
||||
sorted_steps.sort_by_key(|s| s.step_order);
|
||||
|
||||
for step in &sorted_steps {
|
||||
let plugin = self
|
||||
.plugin_repo
|
||||
.find_by_id(&step.plugin_id)
|
||||
.await?
|
||||
.ok_or_else(|| {
|
||||
DomainError::NotFound(format!("Plugin {} not found", step.plugin_id))
|
||||
})?;
|
||||
|
||||
if !plugin.is_enabled {
|
||||
continue;
|
||||
}
|
||||
|
||||
let executor = self
|
||||
.plugin_registry
|
||||
.get_executor(&plugin.name)
|
||||
.ok_or_else(|| {
|
||||
DomainError::NotFound(format!(
|
||||
"No executor registered for plugin '{}'",
|
||||
plugin.name
|
||||
))
|
||||
})?;
|
||||
|
||||
let step_result = executor
|
||||
.execute(job.target_asset_id, &job.payload, &step.configuration)
|
||||
.await?;
|
||||
|
||||
accumulated.merge_from(step_result);
|
||||
}
|
||||
|
||||
Ok(accumulated)
|
||||
}
|
||||
|
||||
async fn run_direct(&self, job: &Job) -> Result<StructuredData, DomainError> {
|
||||
let trigger = job_type_to_trigger(&job.job_type);
|
||||
let executor = self.plugin_registry.get_executor(trigger).ok_or_else(|| {
|
||||
DomainError::NotFound(format!("No pipeline or executor found for '{}'", trigger))
|
||||
})?;
|
||||
|
||||
executor
|
||||
.execute(job.target_asset_id, &job.payload, &StructuredData::new())
|
||||
.await
|
||||
}
|
||||
|
||||
async fn update_batch_on_complete(&self, job: &Job) -> Result<(), DomainError> {
|
||||
if let Some(ref batch_id) = job.batch_id
|
||||
&& let Some(mut batch) = self.batch_repo.find_by_id(batch_id).await?
|
||||
{
|
||||
batch.record_completion();
|
||||
self.batch_repo.save(&batch).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn update_batch_on_fail(&self, job: &Job) -> Result<(), DomainError> {
|
||||
if let Some(ref batch_id) = job.batch_id
|
||||
&& let Some(mut batch) = self.batch_repo.find_by_id(batch_id).await?
|
||||
{
|
||||
batch.record_failure();
|
||||
self.batch_repo.save(&batch).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user