refactor: code smell fixes — tests, events, naming
- Tests for ExecutePipelineHandler (happy path, fallback, disabled skip, failure retry, not found) - Tests for ProcessNextJobHandler (empty queue, process, drain multiple) - DerivativeGenerated domain event + event-payload mapping + event_store aggregate - Renamed event-payload → adapters-event-payload, event-transport → adapters-event-transport
This commit is contained in:
56
Cargo.lock
generated
56
Cargo.lock
generated
@@ -17,6 +17,30 @@ dependencies = [
|
|||||||
"uuid",
|
"uuid",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "adapters-event-payload"
|
||||||
|
version = "0.1.0"
|
||||||
|
dependencies = [
|
||||||
|
"chrono",
|
||||||
|
"domain",
|
||||||
|
"serde",
|
||||||
|
"serde_json",
|
||||||
|
"uuid",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "adapters-event-transport"
|
||||||
|
version = "0.1.0"
|
||||||
|
dependencies = [
|
||||||
|
"adapters-event-payload",
|
||||||
|
"async-trait",
|
||||||
|
"domain",
|
||||||
|
"futures",
|
||||||
|
"serde_json",
|
||||||
|
"tokio",
|
||||||
|
"tracing",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "adapters-exif"
|
name = "adapters-exif"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
@@ -30,10 +54,10 @@ dependencies = [
|
|||||||
name = "adapters-nats"
|
name = "adapters-nats"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"adapters-event-transport",
|
||||||
"async-nats",
|
"async-nats",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"domain",
|
"domain",
|
||||||
"event-transport",
|
|
||||||
"futures",
|
"futures",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tracing",
|
"tracing",
|
||||||
@@ -43,11 +67,11 @@ dependencies = [
|
|||||||
name = "adapters-postgres"
|
name = "adapters-postgres"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"adapters-event-payload",
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"chrono",
|
"chrono",
|
||||||
"domain",
|
"domain",
|
||||||
"event-payload",
|
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"sqlx",
|
"sqlx",
|
||||||
@@ -451,6 +475,7 @@ name = "bootstrap"
|
|||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"adapters-auth",
|
"adapters-auth",
|
||||||
|
"adapters-event-transport",
|
||||||
"adapters-nats",
|
"adapters-nats",
|
||||||
"adapters-postgres",
|
"adapters-postgres",
|
||||||
"adapters-storage",
|
"adapters-storage",
|
||||||
@@ -461,7 +486,6 @@ dependencies = [
|
|||||||
"axum",
|
"axum",
|
||||||
"domain",
|
"domain",
|
||||||
"dotenvy",
|
"dotenvy",
|
||||||
"event-transport",
|
|
||||||
"presentation",
|
"presentation",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tower-http",
|
"tower-http",
|
||||||
@@ -880,30 +904,6 @@ dependencies = [
|
|||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "event-payload"
|
|
||||||
version = "0.1.0"
|
|
||||||
dependencies = [
|
|
||||||
"chrono",
|
|
||||||
"domain",
|
|
||||||
"serde",
|
|
||||||
"serde_json",
|
|
||||||
"uuid",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "event-transport"
|
|
||||||
version = "0.1.0"
|
|
||||||
dependencies = [
|
|
||||||
"async-trait",
|
|
||||||
"domain",
|
|
||||||
"event-payload",
|
|
||||||
"futures",
|
|
||||||
"serde_json",
|
|
||||||
"tokio",
|
|
||||||
"tracing",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "exr"
|
name = "exr"
|
||||||
version = "1.74.0"
|
version = "1.74.0"
|
||||||
@@ -4262,6 +4262,7 @@ dependencies = [
|
|||||||
name = "worker"
|
name = "worker"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"adapters-event-transport",
|
||||||
"adapters-exif",
|
"adapters-exif",
|
||||||
"adapters-nats",
|
"adapters-nats",
|
||||||
"adapters-postgres",
|
"adapters-postgres",
|
||||||
@@ -4273,7 +4274,6 @@ dependencies = [
|
|||||||
"async-trait",
|
"async-trait",
|
||||||
"domain",
|
"domain",
|
||||||
"dotenvy",
|
"dotenvy",
|
||||||
"event-transport",
|
|
||||||
"futures",
|
"futures",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tracing",
|
"tracing",
|
||||||
|
|||||||
@@ -45,8 +45,8 @@ application = { path = "crates/application" }
|
|||||||
api-types = { path = "crates/api-types" }
|
api-types = { path = "crates/api-types" }
|
||||||
adapters-auth = { path = "crates/adapters/auth" }
|
adapters-auth = { path = "crates/adapters/auth" }
|
||||||
adapters-storage = { path = "crates/adapters/storage" }
|
adapters-storage = { path = "crates/adapters/storage" }
|
||||||
event-payload = { path = "crates/adapters/event-payload" }
|
adapters-event-payload = { path = "crates/adapters/event-payload" }
|
||||||
event-transport = { path = "crates/adapters/event-transport" }
|
adapters-event-transport = { path = "crates/adapters/event-transport" }
|
||||||
adapters-nats = { path = "crates/adapters/nats" }
|
adapters-nats = { path = "crates/adapters/nats" }
|
||||||
adapters-exif = { path = "crates/adapters/exif" }
|
adapters-exif = { path = "crates/adapters/exif" }
|
||||||
adapters-thumbnail = { path = "crates/adapters/thumbnail" }
|
adapters-thumbnail = { path = "crates/adapters/thumbnail" }
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "event-payload"
|
name = "adapters-event-payload"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
|
|
||||||
|
|||||||
@@ -34,6 +34,12 @@ pub enum EventPayload {
|
|||||||
asset_id: String,
|
asset_id: String,
|
||||||
timestamp: String,
|
timestamp: String,
|
||||||
},
|
},
|
||||||
|
DerivativeGenerated {
|
||||||
|
asset_id: String,
|
||||||
|
derivative_id: String,
|
||||||
|
profile: String,
|
||||||
|
timestamp: String,
|
||||||
|
},
|
||||||
JobEnqueued {
|
JobEnqueued {
|
||||||
job_id: String,
|
job_id: String,
|
||||||
job_type: String,
|
job_type: String,
|
||||||
@@ -59,6 +65,7 @@ impl EventPayload {
|
|||||||
Self::ShareCreated { .. } => "shares.created",
|
Self::ShareCreated { .. } => "shares.created",
|
||||||
Self::ShareRevoked { .. } => "shares.revoked",
|
Self::ShareRevoked { .. } => "shares.revoked",
|
||||||
Self::SidecarSyncRequested { .. } => "sidecars.sync_requested",
|
Self::SidecarSyncRequested { .. } => "sidecars.sync_requested",
|
||||||
|
Self::DerivativeGenerated { .. } => "derivatives.generated",
|
||||||
Self::JobEnqueued { .. } => "jobs.enqueued",
|
Self::JobEnqueued { .. } => "jobs.enqueued",
|
||||||
Self::JobCompleted { .. } => "jobs.completed",
|
Self::JobCompleted { .. } => "jobs.completed",
|
||||||
Self::JobFailed { .. } => "jobs.failed",
|
Self::JobFailed { .. } => "jobs.failed",
|
||||||
@@ -123,6 +130,17 @@ impl From<&DomainEvent> for EventPayload {
|
|||||||
asset_id: asset_id.to_string(),
|
asset_id: asset_id.to_string(),
|
||||||
timestamp: timestamp.to_string(),
|
timestamp: timestamp.to_string(),
|
||||||
},
|
},
|
||||||
|
DomainEvent::DerivativeGenerated {
|
||||||
|
asset_id,
|
||||||
|
derivative_id,
|
||||||
|
profile,
|
||||||
|
timestamp,
|
||||||
|
} => Self::DerivativeGenerated {
|
||||||
|
asset_id: asset_id.to_string(),
|
||||||
|
derivative_id: derivative_id.to_string(),
|
||||||
|
profile: profile.clone(),
|
||||||
|
timestamp: timestamp.to_string(),
|
||||||
|
},
|
||||||
DomainEvent::JobEnqueued {
|
DomainEvent::JobEnqueued {
|
||||||
job_id,
|
job_id,
|
||||||
job_type,
|
job_type,
|
||||||
@@ -222,6 +240,17 @@ impl TryFrom<EventPayload> for DomainEvent {
|
|||||||
asset_id: SystemId::from_uuid(parse_uuid(&asset_id, "asset_id")?),
|
asset_id: SystemId::from_uuid(parse_uuid(&asset_id, "asset_id")?),
|
||||||
timestamp: parse_timestamp(×tamp)?,
|
timestamp: parse_timestamp(×tamp)?,
|
||||||
},
|
},
|
||||||
|
EventPayload::DerivativeGenerated {
|
||||||
|
asset_id,
|
||||||
|
derivative_id,
|
||||||
|
profile,
|
||||||
|
timestamp,
|
||||||
|
} => DomainEvent::DerivativeGenerated {
|
||||||
|
asset_id: SystemId::from_uuid(parse_uuid(&asset_id, "asset_id")?),
|
||||||
|
derivative_id: SystemId::from_uuid(parse_uuid(&derivative_id, "derivative_id")?),
|
||||||
|
profile,
|
||||||
|
timestamp: parse_timestamp(×tamp)?,
|
||||||
|
},
|
||||||
EventPayload::JobEnqueued {
|
EventPayload::JobEnqueued {
|
||||||
job_id,
|
job_id,
|
||||||
job_type,
|
job_type,
|
||||||
|
|||||||
@@ -1,11 +1,11 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "event-transport"
|
name = "adapters-event-transport"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
domain = { workspace = true }
|
domain = { workspace = true }
|
||||||
event-payload = { workspace = true }
|
adapters-event-payload = { workspace = true }
|
||||||
serde_json = { workspace = true }
|
serde_json = { workspace = true }
|
||||||
async-trait = { workspace = true }
|
async-trait = { workspace = true }
|
||||||
tracing = { workspace = true }
|
tracing = { workspace = true }
|
||||||
|
|||||||
@@ -1,13 +1,13 @@
|
|||||||
pub mod composite;
|
pub mod composite;
|
||||||
pub use composite::CompositeEventPublisher;
|
pub use composite::CompositeEventPublisher;
|
||||||
|
|
||||||
|
use adapters_event_payload::EventPayload;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use domain::{
|
use domain::{
|
||||||
errors::DomainError,
|
errors::DomainError,
|
||||||
events::{DomainEvent, EventEnvelope},
|
events::{DomainEvent, EventEnvelope},
|
||||||
ports::{EventConsumer, EventPublisher},
|
ports::{EventConsumer, EventPublisher},
|
||||||
};
|
};
|
||||||
use event_payload::EventPayload;
|
|
||||||
use futures::stream::BoxStream;
|
use futures::stream::BoxStream;
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
|
|||||||
@@ -55,7 +55,7 @@ async fn published_bytes_are_valid_json() {
|
|||||||
adapter.publish(&event).await.unwrap();
|
adapter.publish(&event).await.unwrap();
|
||||||
|
|
||||||
let recorded = messages.lock().unwrap();
|
let recorded = messages.lock().unwrap();
|
||||||
let payload: event_payload::EventPayload =
|
let payload: adapters_event_payload::EventPayload =
|
||||||
serde_json::from_slice(&recorded[0].1).expect("should be valid JSON");
|
serde_json::from_slice(&recorded[0].1).expect("should be valid JSON");
|
||||||
assert_eq!(payload.subject(), "assets.ingested");
|
assert_eq!(payload.subject(), "assets.ingested");
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ edition = "2024"
|
|||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
domain = { workspace = true }
|
domain = { workspace = true }
|
||||||
event-transport = { workspace = true }
|
adapters-event-transport = { workspace = true }
|
||||||
async-nats = { workspace = true }
|
async-nats = { workspace = true }
|
||||||
async-trait = { workspace = true }
|
async-trait = { workspace = true }
|
||||||
futures = { workspace = true }
|
futures = { workspace = true }
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
|
use adapters_event_transport::{MessageSource, RawMessage, Transport};
|
||||||
use async_nats::jetstream::{self, AckKind, stream::Config as StreamConfig};
|
use async_nats::jetstream::{self, AckKind, stream::Config as StreamConfig};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use domain::errors::DomainError;
|
use domain::errors::DomainError;
|
||||||
use event_transport::{MessageSource, RawMessage, Transport};
|
|
||||||
use futures::stream::BoxStream;
|
use futures::stream::BoxStream;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ edition = "2024"
|
|||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
domain = { workspace = true }
|
domain = { workspace = true }
|
||||||
event-payload = { workspace = true }
|
adapters-event-payload = { workspace = true }
|
||||||
sqlx = { workspace = true, features = ["postgres", "runtime-tokio", "migrate", "uuid", "chrono", "json"] }
|
sqlx = { workspace = true, features = ["postgres", "runtime-tokio", "migrate", "uuid", "chrono", "json"] }
|
||||||
uuid = { workspace = true }
|
uuid = { workspace = true }
|
||||||
chrono = { workspace = true }
|
chrono = { workspace = true }
|
||||||
|
|||||||
@@ -1,9 +1,9 @@
|
|||||||
use crate::helpers::{MapDomainError, pg_repo};
|
use crate::helpers::{MapDomainError, pg_repo};
|
||||||
|
use adapters_event_payload::EventPayload;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use domain::{
|
use domain::{
|
||||||
errors::DomainError, events::DomainEvent, ports::EventStore, value_objects::SystemId,
|
errors::DomainError, events::DomainEvent, ports::EventStore, value_objects::SystemId,
|
||||||
};
|
};
|
||||||
use event_payload::EventPayload;
|
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
pg_repo!(PostgresEventStore);
|
pg_repo!(PostgresEventStore);
|
||||||
@@ -14,7 +14,8 @@ fn aggregate_id(event: &DomainEvent) -> Uuid {
|
|||||||
DomainEvent::AssetIngested { asset_id, .. }
|
DomainEvent::AssetIngested { asset_id, .. }
|
||||||
| DomainEvent::MetadataUpdated { asset_id, .. }
|
| DomainEvent::MetadataUpdated { asset_id, .. }
|
||||||
| DomainEvent::AssetDeleted { asset_id, .. }
|
| DomainEvent::AssetDeleted { asset_id, .. }
|
||||||
| DomainEvent::SidecarSyncRequested { asset_id, .. } => *asset_id.as_uuid(),
|
| DomainEvent::SidecarSyncRequested { asset_id, .. }
|
||||||
|
| DomainEvent::DerivativeGenerated { asset_id, .. } => *asset_id.as_uuid(),
|
||||||
|
|
||||||
DomainEvent::ShareCreated { scope_id, .. } | DomainEvent::ShareRevoked { scope_id, .. } => {
|
DomainEvent::ShareCreated { scope_id, .. } | DomainEvent::ShareRevoked { scope_id, .. } => {
|
||||||
*scope_id.as_uuid()
|
*scope_id.as_uuid()
|
||||||
|
|||||||
274
crates/application/tests/processing/commands/execute_pipeline.rs
Normal file
274
crates/application/tests/processing/commands/execute_pipeline.rs
Normal file
@@ -0,0 +1,274 @@
|
|||||||
|
use application::processing::{ExecutePipelineCommand, ExecutePipelineHandler};
|
||||||
|
use application::testing::{
|
||||||
|
InMemoryJobBatchRepository, InMemoryJobRepository, InMemoryPipelineRepository,
|
||||||
|
InMemoryPluginRepository, StubEventPublisher,
|
||||||
|
};
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use domain::{
|
||||||
|
entities::{Job, JobStatus, JobType, Plugin, PluginType, ProcessingPipeline},
|
||||||
|
errors::DomainError,
|
||||||
|
ports::{JobRepository, PipelineRepository, PluginExecutor, PluginRegistry, PluginRepository},
|
||||||
|
value_objects::{MetadataValue, StructuredData, SystemId},
|
||||||
|
};
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
struct StubPluginExecutor {
|
||||||
|
name: String,
|
||||||
|
result: StructuredData,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl PluginExecutor for StubPluginExecutor {
|
||||||
|
fn plugin_name(&self) -> &str {
|
||||||
|
&self.name
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn execute(
|
||||||
|
&self,
|
||||||
|
_asset_id: Option<SystemId>,
|
||||||
|
_payload: &StructuredData,
|
||||||
|
_config: &StructuredData,
|
||||||
|
) -> Result<StructuredData, DomainError> {
|
||||||
|
Ok(self.result.clone())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct FailingPluginExecutor;
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl PluginExecutor for FailingPluginExecutor {
|
||||||
|
fn plugin_name(&self) -> &str {
|
||||||
|
"failing"
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn execute(
|
||||||
|
&self,
|
||||||
|
_asset_id: Option<SystemId>,
|
||||||
|
_payload: &StructuredData,
|
||||||
|
_config: &StructuredData,
|
||||||
|
) -> Result<StructuredData, DomainError> {
|
||||||
|
Err(DomainError::Internal("plugin crashed".into()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct TestPluginRegistry {
|
||||||
|
executors: HashMap<String, Arc<dyn PluginExecutor>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TestPluginRegistry {
|
||||||
|
fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
executors: HashMap::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn with(mut self, executor: Arc<dyn PluginExecutor>) -> Self {
|
||||||
|
self.executors
|
||||||
|
.insert(executor.plugin_name().to_string(), executor);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PluginRegistry for TestPluginRegistry {
|
||||||
|
fn get_executor(&self, plugin_name: &str) -> Option<Arc<dyn PluginExecutor>> {
|
||||||
|
self.executors.get(plugin_name).cloned()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn registered_plugins(&self) -> Vec<String> {
|
||||||
|
self.executors.keys().cloned().collect()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn make_handler(
|
||||||
|
job_repo: Arc<InMemoryJobRepository>,
|
||||||
|
batch_repo: Arc<InMemoryJobBatchRepository>,
|
||||||
|
pipeline_repo: Arc<InMemoryPipelineRepository>,
|
||||||
|
plugin_repo: Arc<InMemoryPluginRepository>,
|
||||||
|
registry: Arc<dyn PluginRegistry>,
|
||||||
|
) -> ExecutePipelineHandler {
|
||||||
|
let event_pub = Arc::new(StubEventPublisher::new());
|
||||||
|
ExecutePipelineHandler::new(
|
||||||
|
job_repo,
|
||||||
|
batch_repo,
|
||||||
|
pipeline_repo,
|
||||||
|
plugin_repo,
|
||||||
|
registry,
|
||||||
|
event_pub,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn seed_plugin(repo: &InMemoryPluginRepository, name: &str) -> Plugin {
|
||||||
|
let plugin = Plugin::new(name, PluginType::MediaProcessor);
|
||||||
|
repo.save(&plugin).await.unwrap();
|
||||||
|
plugin
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn seed_pipeline(
|
||||||
|
repo: &InMemoryPipelineRepository,
|
||||||
|
trigger: &str,
|
||||||
|
plugin: &Plugin,
|
||||||
|
) -> ProcessingPipeline {
|
||||||
|
let mut pipeline = ProcessingPipeline::new(trigger);
|
||||||
|
pipeline.add_step(plugin.plugin_id, StructuredData::new());
|
||||||
|
repo.save(&pipeline).await.unwrap();
|
||||||
|
pipeline
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn seed_job(repo: &InMemoryJobRepository, job_type: JobType) -> Job {
|
||||||
|
let job = Job::new(job_type, 10, StructuredData::new());
|
||||||
|
repo.save(&job).await.unwrap();
|
||||||
|
job
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn executes_pipeline_with_one_step() {
|
||||||
|
let job_repo = Arc::new(InMemoryJobRepository::new());
|
||||||
|
let batch_repo = Arc::new(InMemoryJobBatchRepository::new());
|
||||||
|
let pipeline_repo = Arc::new(InMemoryPipelineRepository::new());
|
||||||
|
let plugin_repo = Arc::new(InMemoryPluginRepository::new());
|
||||||
|
|
||||||
|
let plugin = seed_plugin(&plugin_repo, "test_plugin").await;
|
||||||
|
seed_pipeline(&pipeline_repo, "extract_metadata", &plugin).await;
|
||||||
|
let job = seed_job(&job_repo, JobType::ExtractMetadata).await;
|
||||||
|
|
||||||
|
let mut result_data = StructuredData::new();
|
||||||
|
result_data.insert("key", MetadataValue::String("value".into()));
|
||||||
|
let executor = Arc::new(StubPluginExecutor {
|
||||||
|
name: "test_plugin".into(),
|
||||||
|
result: result_data,
|
||||||
|
});
|
||||||
|
let registry = Arc::new(TestPluginRegistry::new().with(executor));
|
||||||
|
|
||||||
|
let handler = make_handler(
|
||||||
|
job_repo.clone(),
|
||||||
|
batch_repo,
|
||||||
|
pipeline_repo,
|
||||||
|
plugin_repo,
|
||||||
|
registry,
|
||||||
|
);
|
||||||
|
|
||||||
|
let result = handler
|
||||||
|
.execute(ExecutePipelineCommand { job_id: job.job_id })
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(result.status, JobStatus::Completed);
|
||||||
|
assert_eq!(result.result_data.unwrap().get_string("key"), Some("value"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn falls_back_to_direct_executor_when_no_pipeline() {
|
||||||
|
let job_repo = Arc::new(InMemoryJobRepository::new());
|
||||||
|
let batch_repo = Arc::new(InMemoryJobBatchRepository::new());
|
||||||
|
let pipeline_repo = Arc::new(InMemoryPipelineRepository::new());
|
||||||
|
let plugin_repo = Arc::new(InMemoryPluginRepository::new());
|
||||||
|
|
||||||
|
let job = seed_job(&job_repo, JobType::ExtractMetadata).await;
|
||||||
|
|
||||||
|
let executor = Arc::new(StubPluginExecutor {
|
||||||
|
name: "extract_metadata".into(),
|
||||||
|
result: StructuredData::new(),
|
||||||
|
});
|
||||||
|
let registry = Arc::new(TestPluginRegistry::new().with(executor));
|
||||||
|
|
||||||
|
let handler = make_handler(
|
||||||
|
job_repo.clone(),
|
||||||
|
batch_repo,
|
||||||
|
pipeline_repo,
|
||||||
|
plugin_repo,
|
||||||
|
registry,
|
||||||
|
);
|
||||||
|
|
||||||
|
let result = handler
|
||||||
|
.execute(ExecutePipelineCommand { job_id: job.job_id })
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(result.status, JobStatus::Completed);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn skips_disabled_plugin() {
|
||||||
|
let job_repo = Arc::new(InMemoryJobRepository::new());
|
||||||
|
let batch_repo = Arc::new(InMemoryJobBatchRepository::new());
|
||||||
|
let pipeline_repo = Arc::new(InMemoryPipelineRepository::new());
|
||||||
|
let plugin_repo = Arc::new(InMemoryPluginRepository::new());
|
||||||
|
|
||||||
|
let mut plugin = Plugin::new("disabled_one", PluginType::MediaProcessor);
|
||||||
|
plugin.disable();
|
||||||
|
plugin_repo.save(&plugin).await.unwrap();
|
||||||
|
|
||||||
|
seed_pipeline(&pipeline_repo, "extract_metadata", &plugin).await;
|
||||||
|
let job = seed_job(&job_repo, JobType::ExtractMetadata).await;
|
||||||
|
|
||||||
|
let registry = Arc::new(TestPluginRegistry::new());
|
||||||
|
|
||||||
|
let handler = make_handler(
|
||||||
|
job_repo.clone(),
|
||||||
|
batch_repo,
|
||||||
|
pipeline_repo,
|
||||||
|
plugin_repo,
|
||||||
|
registry,
|
||||||
|
);
|
||||||
|
|
||||||
|
let result = handler
|
||||||
|
.execute(ExecutePipelineCommand { job_id: job.job_id })
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(result.status, JobStatus::Completed);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn step_failure_fails_job_with_retry() {
|
||||||
|
let job_repo = Arc::new(InMemoryJobRepository::new());
|
||||||
|
let batch_repo = Arc::new(InMemoryJobBatchRepository::new());
|
||||||
|
let pipeline_repo = Arc::new(InMemoryPipelineRepository::new());
|
||||||
|
let plugin_repo = Arc::new(InMemoryPluginRepository::new());
|
||||||
|
|
||||||
|
let plugin = seed_plugin(&plugin_repo, "failing").await;
|
||||||
|
seed_pipeline(&pipeline_repo, "extract_metadata", &plugin).await;
|
||||||
|
let job = seed_job(&job_repo, JobType::ExtractMetadata).await;
|
||||||
|
|
||||||
|
let executor: Arc<dyn PluginExecutor> = Arc::new(FailingPluginExecutor);
|
||||||
|
let registry = Arc::new(TestPluginRegistry::new().with(executor));
|
||||||
|
|
||||||
|
let handler = make_handler(
|
||||||
|
job_repo.clone(),
|
||||||
|
batch_repo,
|
||||||
|
pipeline_repo,
|
||||||
|
plugin_repo,
|
||||||
|
registry,
|
||||||
|
);
|
||||||
|
|
||||||
|
let result = handler
|
||||||
|
.execute(ExecutePipelineCommand { job_id: job.job_id })
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// First failure → retry_count=1, back to Queued (max_retries=3)
|
||||||
|
assert_eq!(result.status, JobStatus::Queued);
|
||||||
|
assert_eq!(result.retry_count, 1);
|
||||||
|
assert!(result.error_message.unwrap().contains("plugin crashed"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn missing_job_returns_not_found() {
|
||||||
|
let job_repo = Arc::new(InMemoryJobRepository::new());
|
||||||
|
let batch_repo = Arc::new(InMemoryJobBatchRepository::new());
|
||||||
|
let pipeline_repo = Arc::new(InMemoryPipelineRepository::new());
|
||||||
|
let plugin_repo = Arc::new(InMemoryPluginRepository::new());
|
||||||
|
let registry = Arc::new(TestPluginRegistry::new());
|
||||||
|
|
||||||
|
let handler = make_handler(job_repo, batch_repo, pipeline_repo, plugin_repo, registry);
|
||||||
|
|
||||||
|
let err = handler
|
||||||
|
.execute(ExecutePipelineCommand {
|
||||||
|
job_id: SystemId::new(),
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap_err();
|
||||||
|
|
||||||
|
assert!(matches!(err, DomainError::NotFound(_)));
|
||||||
|
}
|
||||||
@@ -1,6 +1,8 @@
|
|||||||
mod complete_job;
|
mod complete_job;
|
||||||
mod configure_pipeline;
|
mod configure_pipeline;
|
||||||
mod enqueue_job;
|
mod enqueue_job;
|
||||||
|
mod execute_pipeline;
|
||||||
mod fail_job;
|
mod fail_job;
|
||||||
mod manage_plugin;
|
mod manage_plugin;
|
||||||
|
mod process_next_job;
|
||||||
mod start_job;
|
mod start_job;
|
||||||
|
|||||||
118
crates/application/tests/processing/commands/process_next_job.rs
Normal file
118
crates/application/tests/processing/commands/process_next_job.rs
Normal file
@@ -0,0 +1,118 @@
|
|||||||
|
use application::processing::{
|
||||||
|
ExecutePipelineHandler, ProcessNextJobCommand, ProcessNextJobHandler,
|
||||||
|
};
|
||||||
|
use application::testing::{
|
||||||
|
InMemoryJobBatchRepository, InMemoryJobRepository, InMemoryPipelineRepository,
|
||||||
|
InMemoryPluginRepository, StubEventPublisher,
|
||||||
|
};
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use domain::{
|
||||||
|
entities::{Job, JobStatus, JobType},
|
||||||
|
errors::DomainError,
|
||||||
|
ports::{JobRepository, PluginExecutor, PluginRegistry},
|
||||||
|
value_objects::{StructuredData, SystemId},
|
||||||
|
};
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
struct NoOpExecutor;
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl PluginExecutor for NoOpExecutor {
|
||||||
|
fn plugin_name(&self) -> &str {
|
||||||
|
"extract_metadata"
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn execute(
|
||||||
|
&self,
|
||||||
|
_asset_id: Option<SystemId>,
|
||||||
|
_payload: &StructuredData,
|
||||||
|
_config: &StructuredData,
|
||||||
|
) -> Result<StructuredData, DomainError> {
|
||||||
|
Ok(StructuredData::new())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct SimpleRegistry {
|
||||||
|
executors: HashMap<String, Arc<dyn PluginExecutor>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PluginRegistry for SimpleRegistry {
|
||||||
|
fn get_executor(&self, name: &str) -> Option<Arc<dyn PluginExecutor>> {
|
||||||
|
self.executors.get(name).cloned()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn registered_plugins(&self) -> Vec<String> {
|
||||||
|
self.executors.keys().cloned().collect()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn build_handler(job_repo: Arc<InMemoryJobRepository>) -> ProcessNextJobHandler {
|
||||||
|
let batch_repo = Arc::new(InMemoryJobBatchRepository::new());
|
||||||
|
let pipeline_repo = Arc::new(InMemoryPipelineRepository::new());
|
||||||
|
let plugin_repo = Arc::new(InMemoryPluginRepository::new());
|
||||||
|
let event_pub = Arc::new(StubEventPublisher::new());
|
||||||
|
|
||||||
|
let mut executors = HashMap::new();
|
||||||
|
executors.insert(
|
||||||
|
"extract_metadata".to_string(),
|
||||||
|
Arc::new(NoOpExecutor) as Arc<dyn PluginExecutor>,
|
||||||
|
);
|
||||||
|
let registry = Arc::new(SimpleRegistry { executors });
|
||||||
|
|
||||||
|
let execute_pipeline = Arc::new(ExecutePipelineHandler::new(
|
||||||
|
job_repo.clone(),
|
||||||
|
batch_repo,
|
||||||
|
pipeline_repo,
|
||||||
|
plugin_repo,
|
||||||
|
registry,
|
||||||
|
event_pub,
|
||||||
|
));
|
||||||
|
|
||||||
|
ProcessNextJobHandler::new(job_repo, execute_pipeline)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn returns_none_when_queue_empty() {
|
||||||
|
let job_repo = Arc::new(InMemoryJobRepository::new());
|
||||||
|
let handler = build_handler(job_repo);
|
||||||
|
|
||||||
|
let result = handler.execute(ProcessNextJobCommand).await.unwrap();
|
||||||
|
assert!(result.is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn processes_queued_job() {
|
||||||
|
let job_repo = Arc::new(InMemoryJobRepository::new());
|
||||||
|
let job = Job::new(JobType::ExtractMetadata, 5, StructuredData::new());
|
||||||
|
job_repo.save(&job).await.unwrap();
|
||||||
|
|
||||||
|
let handler = build_handler(job_repo);
|
||||||
|
|
||||||
|
let result = handler.execute(ProcessNextJobCommand).await.unwrap();
|
||||||
|
assert!(result.is_some());
|
||||||
|
|
||||||
|
let processed = result.unwrap();
|
||||||
|
assert_eq!(processed.job_id, job.job_id);
|
||||||
|
assert_eq!(processed.status, JobStatus::Completed);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn drains_multiple_jobs_sequentially() {
|
||||||
|
let job_repo = Arc::new(InMemoryJobRepository::new());
|
||||||
|
let job1 = Job::new(JobType::ExtractMetadata, 5, StructuredData::new());
|
||||||
|
let job2 = Job::new(JobType::ExtractMetadata, 3, StructuredData::new());
|
||||||
|
job_repo.save(&job1).await.unwrap();
|
||||||
|
job_repo.save(&job2).await.unwrap();
|
||||||
|
|
||||||
|
let handler = build_handler(job_repo);
|
||||||
|
|
||||||
|
let r1 = handler.execute(ProcessNextJobCommand).await.unwrap();
|
||||||
|
assert!(r1.is_some());
|
||||||
|
|
||||||
|
let r2 = handler.execute(ProcessNextJobCommand).await.unwrap();
|
||||||
|
assert!(r2.is_some());
|
||||||
|
|
||||||
|
let r3 = handler.execute(ProcessNextJobCommand).await.unwrap();
|
||||||
|
assert!(r3.is_none());
|
||||||
|
}
|
||||||
@@ -14,7 +14,7 @@ adapters-auth = { workspace = true }
|
|||||||
|
|
||||||
adapters-storage = { workspace = true, features = ["s3"] }
|
adapters-storage = { workspace = true, features = ["s3"] }
|
||||||
adapters-nats = { workspace = true }
|
adapters-nats = { workspace = true }
|
||||||
event-transport = { workspace = true }
|
adapters-event-transport = { workspace = true }
|
||||||
async-nats = { workspace = true }
|
async-nats = { workspace = true }
|
||||||
|
|
||||||
presentation = { workspace = true }
|
presentation = { workspace = true }
|
||||||
|
|||||||
@@ -22,12 +22,13 @@ pub async fn build_app(config: &Config) -> Result<Router> {
|
|||||||
adapters_nats::ensure_stream(&nats_client).await?;
|
adapters_nats::ensure_stream(&nats_client).await?;
|
||||||
|
|
||||||
let transport = adapters_nats::NatsTransport::new(nats_client);
|
let transport = adapters_nats::NatsTransport::new(nats_client);
|
||||||
let nats_publisher: Arc<dyn domain::ports::EventPublisher> =
|
let nats_publisher: Arc<dyn domain::ports::EventPublisher> = Arc::new(
|
||||||
Arc::new(event_transport::EventPublisherAdapter::new(transport));
|
adapters_event_transport::EventPublisherAdapter::new(transport),
|
||||||
|
);
|
||||||
let event_store: Arc<dyn domain::ports::EventStore> =
|
let event_store: Arc<dyn domain::ports::EventStore> =
|
||||||
Arc::new(adapters_postgres::PostgresEventStore::new(pool.clone()));
|
Arc::new(adapters_postgres::PostgresEventStore::new(pool.clone()));
|
||||||
let event_publisher: Arc<dyn domain::ports::EventPublisher> = Arc::new(
|
let event_publisher: Arc<dyn domain::ports::EventPublisher> = Arc::new(
|
||||||
event_transport::CompositeEventPublisher::new(nats_publisher, event_store),
|
adapters_event_transport::CompositeEventPublisher::new(nats_publisher, event_store),
|
||||||
);
|
);
|
||||||
|
|
||||||
let storage_path = std::env::var("STORAGE_PATH").unwrap_or_else(|_| "./data/media".to_string());
|
let storage_path = std::env::var("STORAGE_PATH").unwrap_or_else(|_| "./data/media".to_string());
|
||||||
|
|||||||
@@ -39,6 +39,12 @@ pub enum DomainEvent {
|
|||||||
asset_id: SystemId,
|
asset_id: SystemId,
|
||||||
timestamp: DateTimeStamp,
|
timestamp: DateTimeStamp,
|
||||||
},
|
},
|
||||||
|
DerivativeGenerated {
|
||||||
|
asset_id: SystemId,
|
||||||
|
derivative_id: SystemId,
|
||||||
|
profile: String,
|
||||||
|
timestamp: DateTimeStamp,
|
||||||
|
},
|
||||||
JobEnqueued {
|
JobEnqueued {
|
||||||
job_id: SystemId,
|
job_id: SystemId,
|
||||||
job_type: String,
|
job_type: String,
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ application = { workspace = true }
|
|||||||
adapters-postgres = { workspace = true }
|
adapters-postgres = { workspace = true }
|
||||||
adapters-storage = { workspace = true }
|
adapters-storage = { workspace = true }
|
||||||
adapters-nats = { workspace = true }
|
adapters-nats = { workspace = true }
|
||||||
event-transport = { workspace = true }
|
adapters-event-transport = { workspace = true }
|
||||||
adapters-exif = { workspace = true }
|
adapters-exif = { workspace = true }
|
||||||
adapters-thumbnail = { workspace = true }
|
adapters-thumbnail = { workspace = true }
|
||||||
async-nats = { workspace = true }
|
async-nats = { workspace = true }
|
||||||
|
|||||||
@@ -45,10 +45,11 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
|
|
||||||
// Publisher transport consumes a client clone; the consumer gets another.
|
// Publisher transport consumes a client clone; the consumer gets another.
|
||||||
let pub_transport = adapters_nats::NatsTransport::new(nats_client.clone());
|
let pub_transport = adapters_nats::NatsTransport::new(nats_client.clone());
|
||||||
let nats_publisher: Arc<dyn domain::ports::EventPublisher> =
|
let nats_publisher: Arc<dyn domain::ports::EventPublisher> = Arc::new(
|
||||||
Arc::new(event_transport::EventPublisherAdapter::new(pub_transport));
|
adapters_event_transport::EventPublisherAdapter::new(pub_transport),
|
||||||
|
);
|
||||||
let event_pub: Arc<dyn domain::ports::EventPublisher> = Arc::new(
|
let event_pub: Arc<dyn domain::ports::EventPublisher> = Arc::new(
|
||||||
event_transport::CompositeEventPublisher::new(nats_publisher, event_store),
|
adapters_event_transport::CompositeEventPublisher::new(nats_publisher, event_store),
|
||||||
);
|
);
|
||||||
|
|
||||||
let extractor: Arc<dyn domain::ports::MetadataExtractorPort> =
|
let extractor: Arc<dyn domain::ports::MetadataExtractorPort> =
|
||||||
@@ -97,7 +98,7 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
|
|
||||||
// ── Event-driven loop via NATS ─────────────────────────────────────
|
// ── Event-driven loop via NATS ─────────────────────────────────────
|
||||||
let consumer_source = adapters_nats::NatsMessageSource::new(nats_client);
|
let consumer_source = adapters_nats::NatsMessageSource::new(nats_client);
|
||||||
let event_consumer = event_transport::EventConsumerAdapter::new(consumer_source);
|
let event_consumer = adapters_event_transport::EventConsumerAdapter::new(consumer_source);
|
||||||
|
|
||||||
info!("event loop: listening for NATS events");
|
info!("event loop: listening for NATS events");
|
||||||
let mut stream = event_consumer.consume();
|
let mut stream = event_consumer.consume();
|
||||||
|
|||||||
Reference in New Issue
Block a user