use application::processing::{ ExecutePipelineHandler, ProcessNextJobCommand, ProcessNextJobHandler, }; use application::testing::{ InMemoryJobBatchRepository, InMemoryJobRepository, InMemoryPipelineRepository, InMemoryPluginRepository, StubEventPublisher, }; use async_trait::async_trait; use domain::{ entities::{Job, JobStatus, JobType}, errors::DomainError, ports::{JobRepository, PluginExecutor, PluginRegistry}, value_objects::{StructuredData, SystemId}, }; use std::collections::HashMap; use std::sync::Arc; struct NoOpExecutor; #[async_trait] impl PluginExecutor for NoOpExecutor { fn plugin_name(&self) -> &str { "extract_metadata" } async fn execute( &self, _asset_id: Option, _payload: &StructuredData, _config: &StructuredData, ) -> Result { Ok(StructuredData::new()) } } struct SimpleRegistry { executors: HashMap>, } impl PluginRegistry for SimpleRegistry { fn get_executor(&self, name: &str) -> Option> { self.executors.get(name).cloned() } fn registered_plugins(&self) -> Vec { self.executors.keys().cloned().collect() } } fn build_handler(job_repo: Arc) -> ProcessNextJobHandler { let batch_repo = Arc::new(InMemoryJobBatchRepository::new()); let pipeline_repo = Arc::new(InMemoryPipelineRepository::new()); let plugin_repo = Arc::new(InMemoryPluginRepository::new()); let event_pub = Arc::new(StubEventPublisher::new()); let mut executors = HashMap::new(); executors.insert( "extract_metadata".to_string(), Arc::new(NoOpExecutor) as Arc, ); let registry = Arc::new(SimpleRegistry { executors }); let execute_pipeline = Arc::new(ExecutePipelineHandler::new( job_repo.clone(), batch_repo, pipeline_repo, plugin_repo, registry, event_pub, )); ProcessNextJobHandler::new(job_repo, execute_pipeline) } #[tokio::test] async fn returns_none_when_queue_empty() { let job_repo = Arc::new(InMemoryJobRepository::new()); let handler = build_handler(job_repo); let result = handler.execute(ProcessNextJobCommand).await.unwrap(); assert!(result.is_none()); } #[tokio::test] async fn processes_queued_job() { let job_repo = Arc::new(InMemoryJobRepository::new()); let job = Job::new(JobType::ExtractMetadata, 5, StructuredData::new()); job_repo.save(&job).await.unwrap(); let handler = build_handler(job_repo); let result = handler.execute(ProcessNextJobCommand).await.unwrap(); assert!(result.is_some()); let processed = result.unwrap(); assert_eq!(processed.job_id, job.job_id); assert_eq!(processed.status, JobStatus::Completed); } #[tokio::test] async fn drains_multiple_jobs_sequentially() { let job_repo = Arc::new(InMemoryJobRepository::new()); let job1 = Job::new(JobType::ExtractMetadata, 5, StructuredData::new()); let job2 = Job::new(JobType::ExtractMetadata, 3, StructuredData::new()); job_repo.save(&job1).await.unwrap(); job_repo.save(&job2).await.unwrap(); let handler = build_handler(job_repo); let r1 = handler.execute(ProcessNextJobCommand).await.unwrap(); assert!(r1.is_some()); let r2 = handler.execute(ProcessNextJobCommand).await.unwrap(); assert!(r2.is_some()); let r3 = handler.execute(ProcessNextJobCommand).await.unwrap(); assert!(r3.is_none()); }