- Tests for ExecutePipelineHandler (happy path, fallback, disabled skip, failure retry, not found) - Tests for ProcessNextJobHandler (empty queue, process, drain multiple) - DerivativeGenerated domain event + event-payload mapping + event_store aggregate - Renamed event-payload → adapters-event-payload, event-transport → adapters-event-transport
119 lines
3.5 KiB
Rust
119 lines
3.5 KiB
Rust
use application::processing::{
|
|
ExecutePipelineHandler, ProcessNextJobCommand, ProcessNextJobHandler,
|
|
};
|
|
use application::testing::{
|
|
InMemoryJobBatchRepository, InMemoryJobRepository, InMemoryPipelineRepository,
|
|
InMemoryPluginRepository, StubEventPublisher,
|
|
};
|
|
use async_trait::async_trait;
|
|
use domain::{
|
|
entities::{Job, JobStatus, JobType},
|
|
errors::DomainError,
|
|
ports::{JobRepository, PluginExecutor, PluginRegistry},
|
|
value_objects::{StructuredData, SystemId},
|
|
};
|
|
use std::collections::HashMap;
|
|
use std::sync::Arc;
|
|
|
|
struct NoOpExecutor;
|
|
|
|
#[async_trait]
|
|
impl PluginExecutor for NoOpExecutor {
|
|
fn plugin_name(&self) -> &str {
|
|
"extract_metadata"
|
|
}
|
|
|
|
async fn execute(
|
|
&self,
|
|
_asset_id: Option<SystemId>,
|
|
_payload: &StructuredData,
|
|
_config: &StructuredData,
|
|
) -> Result<StructuredData, DomainError> {
|
|
Ok(StructuredData::new())
|
|
}
|
|
}
|
|
|
|
struct SimpleRegistry {
|
|
executors: HashMap<String, Arc<dyn PluginExecutor>>,
|
|
}
|
|
|
|
impl PluginRegistry for SimpleRegistry {
|
|
fn get_executor(&self, name: &str) -> Option<Arc<dyn PluginExecutor>> {
|
|
self.executors.get(name).cloned()
|
|
}
|
|
|
|
fn registered_plugins(&self) -> Vec<String> {
|
|
self.executors.keys().cloned().collect()
|
|
}
|
|
}
|
|
|
|
fn build_handler(job_repo: Arc<InMemoryJobRepository>) -> ProcessNextJobHandler {
|
|
let batch_repo = Arc::new(InMemoryJobBatchRepository::new());
|
|
let pipeline_repo = Arc::new(InMemoryPipelineRepository::new());
|
|
let plugin_repo = Arc::new(InMemoryPluginRepository::new());
|
|
let event_pub = Arc::new(StubEventPublisher::new());
|
|
|
|
let mut executors = HashMap::new();
|
|
executors.insert(
|
|
"extract_metadata".to_string(),
|
|
Arc::new(NoOpExecutor) as Arc<dyn PluginExecutor>,
|
|
);
|
|
let registry = Arc::new(SimpleRegistry { executors });
|
|
|
|
let execute_pipeline = Arc::new(ExecutePipelineHandler::new(
|
|
job_repo.clone(),
|
|
batch_repo,
|
|
pipeline_repo,
|
|
plugin_repo,
|
|
registry,
|
|
event_pub,
|
|
));
|
|
|
|
ProcessNextJobHandler::new(job_repo, execute_pipeline)
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn returns_none_when_queue_empty() {
|
|
let job_repo = Arc::new(InMemoryJobRepository::new());
|
|
let handler = build_handler(job_repo);
|
|
|
|
let result = handler.execute(ProcessNextJobCommand).await.unwrap();
|
|
assert!(result.is_none());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn processes_queued_job() {
|
|
let job_repo = Arc::new(InMemoryJobRepository::new());
|
|
let job = Job::new(JobType::ExtractMetadata, 5, StructuredData::new());
|
|
job_repo.save(&job).await.unwrap();
|
|
|
|
let handler = build_handler(job_repo);
|
|
|
|
let result = handler.execute(ProcessNextJobCommand).await.unwrap();
|
|
assert!(result.is_some());
|
|
|
|
let processed = result.unwrap();
|
|
assert_eq!(processed.job_id, job.job_id);
|
|
assert_eq!(processed.status, JobStatus::Completed);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn drains_multiple_jobs_sequentially() {
|
|
let job_repo = Arc::new(InMemoryJobRepository::new());
|
|
let job1 = Job::new(JobType::ExtractMetadata, 5, StructuredData::new());
|
|
let job2 = Job::new(JobType::ExtractMetadata, 3, StructuredData::new());
|
|
job_repo.save(&job1).await.unwrap();
|
|
job_repo.save(&job2).await.unwrap();
|
|
|
|
let handler = build_handler(job_repo);
|
|
|
|
let r1 = handler.execute(ProcessNextJobCommand).await.unwrap();
|
|
assert!(r1.is_some());
|
|
|
|
let r2 = handler.execute(ProcessNextJobCommand).await.unwrap();
|
|
assert!(r2.is_some());
|
|
|
|
let r3 = handler.execute(ProcessNextJobCommand).await.unwrap();
|
|
assert!(r3.is_none());
|
|
}
|