use application::processing::{ExecutePipelineCommand, ExecutePipelineHandler}; use application::testing::{ InMemoryJobBatchRepository, InMemoryJobRepository, InMemoryPipelineRepository, InMemoryPluginRepository, StubEventPublisher, }; use async_trait::async_trait; use domain::{ entities::{Job, JobStatus, JobType, Plugin, PluginType, ProcessingPipeline}, errors::DomainError, ports::{JobRepository, PipelineRepository, PluginExecutor, PluginRegistry, PluginRepository}, value_objects::{MetadataValue, StructuredData, SystemId}, }; use std::collections::HashMap; use std::sync::Arc; struct StubPluginExecutor { name: String, result: StructuredData, } #[async_trait] impl PluginExecutor for StubPluginExecutor { fn plugin_name(&self) -> &str { &self.name } async fn execute( &self, _asset_id: Option, _payload: &StructuredData, _config: &StructuredData, ) -> Result { Ok(self.result.clone()) } } struct FailingPluginExecutor; #[async_trait] impl PluginExecutor for FailingPluginExecutor { fn plugin_name(&self) -> &str { "failing" } async fn execute( &self, _asset_id: Option, _payload: &StructuredData, _config: &StructuredData, ) -> Result { Err(DomainError::Internal("plugin crashed".into())) } } struct TestPluginRegistry { executors: HashMap>, } impl TestPluginRegistry { fn new() -> Self { Self { executors: HashMap::new(), } } fn with(mut self, executor: Arc) -> Self { self.executors .insert(executor.plugin_name().to_string(), executor); self } } impl PluginRegistry for TestPluginRegistry { fn get_executor(&self, plugin_name: &str) -> Option> { self.executors.get(plugin_name).cloned() } fn registered_plugins(&self) -> Vec { self.executors.keys().cloned().collect() } } fn make_handler( job_repo: Arc, batch_repo: Arc, pipeline_repo: Arc, plugin_repo: Arc, registry: Arc, ) -> ExecutePipelineHandler { let event_pub = Arc::new(StubEventPublisher::new()); ExecutePipelineHandler::new( job_repo, batch_repo, pipeline_repo, plugin_repo, registry, event_pub, ) } async fn seed_plugin(repo: &InMemoryPluginRepository, name: &str) -> Plugin { let plugin = Plugin::new(name, PluginType::MediaProcessor); repo.save(&plugin).await.unwrap(); plugin } async fn seed_pipeline( repo: &InMemoryPipelineRepository, trigger: &str, plugin: &Plugin, ) -> ProcessingPipeline { let mut pipeline = ProcessingPipeline::new(trigger); pipeline.add_step(plugin.plugin_id, StructuredData::new()); repo.save(&pipeline).await.unwrap(); pipeline } async fn seed_job(repo: &InMemoryJobRepository, job_type: JobType) -> Job { let job = Job::new(job_type, 10, StructuredData::new()); repo.save(&job).await.unwrap(); job } #[tokio::test] async fn executes_pipeline_with_one_step() { let job_repo = Arc::new(InMemoryJobRepository::new()); let batch_repo = Arc::new(InMemoryJobBatchRepository::new()); let pipeline_repo = Arc::new(InMemoryPipelineRepository::new()); let plugin_repo = Arc::new(InMemoryPluginRepository::new()); let plugin = seed_plugin(&plugin_repo, "test_plugin").await; seed_pipeline(&pipeline_repo, "extract_metadata", &plugin).await; let job = seed_job(&job_repo, JobType::ExtractMetadata).await; let mut result_data = StructuredData::new(); result_data.insert("key", MetadataValue::String("value".into())); let executor = Arc::new(StubPluginExecutor { name: "test_plugin".into(), result: result_data, }); let registry = Arc::new(TestPluginRegistry::new().with(executor)); let handler = make_handler( job_repo.clone(), batch_repo, pipeline_repo, plugin_repo, registry, ); let result = handler .execute(ExecutePipelineCommand { job_id: job.job_id }) .await .unwrap(); assert_eq!(result.status, JobStatus::Completed); assert_eq!(result.result_data.unwrap().get_string("key"), Some("value")); } #[tokio::test] async fn falls_back_to_direct_executor_when_no_pipeline() { let job_repo = Arc::new(InMemoryJobRepository::new()); let batch_repo = Arc::new(InMemoryJobBatchRepository::new()); let pipeline_repo = Arc::new(InMemoryPipelineRepository::new()); let plugin_repo = Arc::new(InMemoryPluginRepository::new()); let job = seed_job(&job_repo, JobType::ExtractMetadata).await; let executor = Arc::new(StubPluginExecutor { name: "extract_metadata".into(), result: StructuredData::new(), }); let registry = Arc::new(TestPluginRegistry::new().with(executor)); let handler = make_handler( job_repo.clone(), batch_repo, pipeline_repo, plugin_repo, registry, ); let result = handler .execute(ExecutePipelineCommand { job_id: job.job_id }) .await .unwrap(); assert_eq!(result.status, JobStatus::Completed); } #[tokio::test] async fn skips_disabled_plugin() { let job_repo = Arc::new(InMemoryJobRepository::new()); let batch_repo = Arc::new(InMemoryJobBatchRepository::new()); let pipeline_repo = Arc::new(InMemoryPipelineRepository::new()); let plugin_repo = Arc::new(InMemoryPluginRepository::new()); let mut plugin = Plugin::new("disabled_one", PluginType::MediaProcessor); plugin.disable(); plugin_repo.save(&plugin).await.unwrap(); seed_pipeline(&pipeline_repo, "extract_metadata", &plugin).await; let job = seed_job(&job_repo, JobType::ExtractMetadata).await; let registry = Arc::new(TestPluginRegistry::new()); let handler = make_handler( job_repo.clone(), batch_repo, pipeline_repo, plugin_repo, registry, ); let result = handler .execute(ExecutePipelineCommand { job_id: job.job_id }) .await .unwrap(); assert_eq!(result.status, JobStatus::Completed); } #[tokio::test] async fn step_failure_fails_job_with_retry() { let job_repo = Arc::new(InMemoryJobRepository::new()); let batch_repo = Arc::new(InMemoryJobBatchRepository::new()); let pipeline_repo = Arc::new(InMemoryPipelineRepository::new()); let plugin_repo = Arc::new(InMemoryPluginRepository::new()); let plugin = seed_plugin(&plugin_repo, "failing").await; seed_pipeline(&pipeline_repo, "extract_metadata", &plugin).await; let job = seed_job(&job_repo, JobType::ExtractMetadata).await; let executor: Arc = Arc::new(FailingPluginExecutor); let registry = Arc::new(TestPluginRegistry::new().with(executor)); let handler = make_handler( job_repo.clone(), batch_repo, pipeline_repo, plugin_repo, registry, ); let result = handler .execute(ExecutePipelineCommand { job_id: job.job_id }) .await .unwrap(); // First failure → retry_count=1, back to Queued (max_retries=3) assert_eq!(result.status, JobStatus::Queued); assert_eq!(result.retry_count, 1); assert!(result.error_message.unwrap().contains("plugin crashed")); } #[tokio::test] async fn missing_job_returns_not_found() { let job_repo = Arc::new(InMemoryJobRepository::new()); let batch_repo = Arc::new(InMemoryJobBatchRepository::new()); let pipeline_repo = Arc::new(InMemoryPipelineRepository::new()); let plugin_repo = Arc::new(InMemoryPluginRepository::new()); let registry = Arc::new(TestPluginRegistry::new()); let handler = make_handler(job_repo, batch_repo, pipeline_repo, plugin_repo, registry); let err = handler .execute(ExecutePipelineCommand { job_id: SystemId::new(), }) .await .unwrap_err(); assert!(matches!(err, DomainError::NotFound(_))); }