From d379f3d3c86004cd32c426cce00c4d95f2f0694d Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Sun, 31 May 2026 21:00:50 +0200 Subject: [PATCH] =?UTF-8?q?refactor:=20code=20smell=20fixes=20=E2=80=94=20?= =?UTF-8?q?tests,=20events,=20naming?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Tests for ExecutePipelineHandler (happy path, fallback, disabled skip, failure retry, not found) - Tests for ProcessNextJobHandler (empty queue, process, drain multiple) - DerivativeGenerated domain event + event-payload mapping + event_store aggregate - Renamed event-payload → adapters-event-payload, event-transport → adapters-event-transport --- Cargo.lock | 56 ++-- Cargo.toml | 4 +- crates/adapters/event-payload/Cargo.toml | 2 +- crates/adapters/event-payload/src/lib.rs | 29 ++ crates/adapters/event-transport/Cargo.toml | 4 +- crates/adapters/event-transport/src/lib.rs | 2 +- crates/adapters/event-transport/src/tests.rs | 2 +- crates/adapters/nats/Cargo.toml | 2 +- crates/adapters/nats/src/lib.rs | 2 +- crates/adapters/postgres/Cargo.toml | 2 +- crates/adapters/postgres/src/event_store.rs | 5 +- .../processing/commands/execute_pipeline.rs | 274 ++++++++++++++++++ .../tests/processing/commands/mod.rs | 2 + .../processing/commands/process_next_job.rs | 118 ++++++++ crates/bootstrap/Cargo.toml | 2 +- crates/bootstrap/src/factory.rs | 7 +- crates/domain/src/common/events.rs | 6 + crates/worker/Cargo.toml | 2 +- crates/worker/src/main.rs | 9 +- 19 files changed, 481 insertions(+), 49 deletions(-) create mode 100644 crates/application/tests/processing/commands/execute_pipeline.rs create mode 100644 crates/application/tests/processing/commands/process_next_job.rs diff --git a/Cargo.lock b/Cargo.lock index ee61680..6985fb2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,30 @@ dependencies = [ "uuid", ] +[[package]] +name = "adapters-event-payload" +version = "0.1.0" +dependencies = [ + "chrono", + "domain", + "serde", + "serde_json", + "uuid", +] + +[[package]] +name = "adapters-event-transport" +version = "0.1.0" +dependencies = [ + "adapters-event-payload", + "async-trait", + "domain", + "futures", + "serde_json", + "tokio", + "tracing", +] + [[package]] name = "adapters-exif" version = "0.1.0" @@ -30,10 +54,10 @@ dependencies = [ name = "adapters-nats" version = "0.1.0" dependencies = [ + "adapters-event-transport", "async-nats", "async-trait", "domain", - "event-transport", "futures", "tokio", "tracing", @@ -43,11 +67,11 @@ dependencies = [ name = "adapters-postgres" version = "0.1.0" dependencies = [ + "adapters-event-payload", "anyhow", "async-trait", "chrono", "domain", - "event-payload", "serde", "serde_json", "sqlx", @@ -451,6 +475,7 @@ name = "bootstrap" version = "0.1.0" dependencies = [ "adapters-auth", + "adapters-event-transport", "adapters-nats", "adapters-postgres", "adapters-storage", @@ -461,7 +486,6 @@ dependencies = [ "axum", "domain", "dotenvy", - "event-transport", "presentation", "tokio", "tower-http", @@ -880,30 +904,6 @@ dependencies = [ "pin-project-lite", ] -[[package]] -name = "event-payload" -version = "0.1.0" -dependencies = [ - "chrono", - "domain", - "serde", - "serde_json", - "uuid", -] - -[[package]] -name = "event-transport" -version = "0.1.0" -dependencies = [ - "async-trait", - "domain", - "event-payload", - "futures", - "serde_json", - "tokio", - "tracing", -] - [[package]] name = "exr" version = "1.74.0" @@ -4262,6 +4262,7 @@ dependencies = [ name = "worker" version = "0.1.0" dependencies = [ + "adapters-event-transport", "adapters-exif", "adapters-nats", "adapters-postgres", @@ -4273,7 +4274,6 @@ dependencies = [ "async-trait", "domain", "dotenvy", - "event-transport", "futures", "tokio", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 912dee0..21f8a33 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,8 +45,8 @@ application = { path = "crates/application" } api-types = { path = "crates/api-types" } adapters-auth = { path = "crates/adapters/auth" } adapters-storage = { path = "crates/adapters/storage" } -event-payload = { path = "crates/adapters/event-payload" } -event-transport = { path = "crates/adapters/event-transport" } +adapters-event-payload = { path = "crates/adapters/event-payload" } +adapters-event-transport = { path = "crates/adapters/event-transport" } adapters-nats = { path = "crates/adapters/nats" } adapters-exif = { path = "crates/adapters/exif" } adapters-thumbnail = { path = "crates/adapters/thumbnail" } diff --git a/crates/adapters/event-payload/Cargo.toml b/crates/adapters/event-payload/Cargo.toml index acd7a9e..ad053ef 100644 --- a/crates/adapters/event-payload/Cargo.toml +++ b/crates/adapters/event-payload/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "event-payload" +name = "adapters-event-payload" version = "0.1.0" edition = "2024" diff --git a/crates/adapters/event-payload/src/lib.rs b/crates/adapters/event-payload/src/lib.rs index 2e0c523..9e884b9 100644 --- a/crates/adapters/event-payload/src/lib.rs +++ b/crates/adapters/event-payload/src/lib.rs @@ -34,6 +34,12 @@ pub enum EventPayload { asset_id: String, timestamp: String, }, + DerivativeGenerated { + asset_id: String, + derivative_id: String, + profile: String, + timestamp: String, + }, JobEnqueued { job_id: String, job_type: String, @@ -59,6 +65,7 @@ impl EventPayload { Self::ShareCreated { .. } => "shares.created", Self::ShareRevoked { .. } => "shares.revoked", Self::SidecarSyncRequested { .. } => "sidecars.sync_requested", + Self::DerivativeGenerated { .. } => "derivatives.generated", Self::JobEnqueued { .. } => "jobs.enqueued", Self::JobCompleted { .. } => "jobs.completed", Self::JobFailed { .. } => "jobs.failed", @@ -123,6 +130,17 @@ impl From<&DomainEvent> for EventPayload { asset_id: asset_id.to_string(), timestamp: timestamp.to_string(), }, + DomainEvent::DerivativeGenerated { + asset_id, + derivative_id, + profile, + timestamp, + } => Self::DerivativeGenerated { + asset_id: asset_id.to_string(), + derivative_id: derivative_id.to_string(), + profile: profile.clone(), + timestamp: timestamp.to_string(), + }, DomainEvent::JobEnqueued { job_id, job_type, @@ -222,6 +240,17 @@ impl TryFrom for DomainEvent { asset_id: SystemId::from_uuid(parse_uuid(&asset_id, "asset_id")?), timestamp: parse_timestamp(×tamp)?, }, + EventPayload::DerivativeGenerated { + asset_id, + derivative_id, + profile, + timestamp, + } => DomainEvent::DerivativeGenerated { + asset_id: SystemId::from_uuid(parse_uuid(&asset_id, "asset_id")?), + derivative_id: SystemId::from_uuid(parse_uuid(&derivative_id, "derivative_id")?), + profile, + timestamp: parse_timestamp(×tamp)?, + }, EventPayload::JobEnqueued { job_id, job_type, diff --git a/crates/adapters/event-transport/Cargo.toml b/crates/adapters/event-transport/Cargo.toml index 6fcc3cb..7f69ad8 100644 --- a/crates/adapters/event-transport/Cargo.toml +++ b/crates/adapters/event-transport/Cargo.toml @@ -1,11 +1,11 @@ [package] -name = "event-transport" +name = "adapters-event-transport" version = "0.1.0" edition = "2024" [dependencies] domain = { workspace = true } -event-payload = { workspace = true } +adapters-event-payload = { workspace = true } serde_json = { workspace = true } async-trait = { workspace = true } tracing = { workspace = true } diff --git a/crates/adapters/event-transport/src/lib.rs b/crates/adapters/event-transport/src/lib.rs index 46a6521..53b0fad 100644 --- a/crates/adapters/event-transport/src/lib.rs +++ b/crates/adapters/event-transport/src/lib.rs @@ -1,13 +1,13 @@ pub mod composite; pub use composite::CompositeEventPublisher; +use adapters_event_payload::EventPayload; use async_trait::async_trait; use domain::{ errors::DomainError, events::{DomainEvent, EventEnvelope}, ports::{EventConsumer, EventPublisher}, }; -use event_payload::EventPayload; use futures::stream::BoxStream; #[async_trait] diff --git a/crates/adapters/event-transport/src/tests.rs b/crates/adapters/event-transport/src/tests.rs index 0c1a453..825d068 100644 --- a/crates/adapters/event-transport/src/tests.rs +++ b/crates/adapters/event-transport/src/tests.rs @@ -55,7 +55,7 @@ async fn published_bytes_are_valid_json() { adapter.publish(&event).await.unwrap(); let recorded = messages.lock().unwrap(); - let payload: event_payload::EventPayload = + let payload: adapters_event_payload::EventPayload = serde_json::from_slice(&recorded[0].1).expect("should be valid JSON"); assert_eq!(payload.subject(), "assets.ingested"); } diff --git a/crates/adapters/nats/Cargo.toml b/crates/adapters/nats/Cargo.toml index 55c62cd..c0d4691 100644 --- a/crates/adapters/nats/Cargo.toml +++ b/crates/adapters/nats/Cargo.toml @@ -5,7 +5,7 @@ edition = "2024" [dependencies] domain = { workspace = true } -event-transport = { workspace = true } +adapters-event-transport = { workspace = true } async-nats = { workspace = true } async-trait = { workspace = true } futures = { workspace = true } diff --git a/crates/adapters/nats/src/lib.rs b/crates/adapters/nats/src/lib.rs index 9d8d70d..e7c2abb 100644 --- a/crates/adapters/nats/src/lib.rs +++ b/crates/adapters/nats/src/lib.rs @@ -1,7 +1,7 @@ +use adapters_event_transport::{MessageSource, RawMessage, Transport}; use async_nats::jetstream::{self, AckKind, stream::Config as StreamConfig}; use async_trait::async_trait; use domain::errors::DomainError; -use event_transport::{MessageSource, RawMessage, Transport}; use futures::stream::BoxStream; use std::sync::Arc; diff --git a/crates/adapters/postgres/Cargo.toml b/crates/adapters/postgres/Cargo.toml index 022cc1f..4d9f155 100644 --- a/crates/adapters/postgres/Cargo.toml +++ b/crates/adapters/postgres/Cargo.toml @@ -5,7 +5,7 @@ edition = "2024" [dependencies] domain = { workspace = true } -event-payload = { workspace = true } +adapters-event-payload = { workspace = true } sqlx = { workspace = true, features = ["postgres", "runtime-tokio", "migrate", "uuid", "chrono", "json"] } uuid = { workspace = true } chrono = { workspace = true } diff --git a/crates/adapters/postgres/src/event_store.rs b/crates/adapters/postgres/src/event_store.rs index a084476..229417a 100644 --- a/crates/adapters/postgres/src/event_store.rs +++ b/crates/adapters/postgres/src/event_store.rs @@ -1,9 +1,9 @@ use crate::helpers::{MapDomainError, pg_repo}; +use adapters_event_payload::EventPayload; use async_trait::async_trait; use domain::{ errors::DomainError, events::DomainEvent, ports::EventStore, value_objects::SystemId, }; -use event_payload::EventPayload; use uuid::Uuid; pg_repo!(PostgresEventStore); @@ -14,7 +14,8 @@ fn aggregate_id(event: &DomainEvent) -> Uuid { DomainEvent::AssetIngested { asset_id, .. } | DomainEvent::MetadataUpdated { asset_id, .. } | DomainEvent::AssetDeleted { asset_id, .. } - | DomainEvent::SidecarSyncRequested { asset_id, .. } => *asset_id.as_uuid(), + | DomainEvent::SidecarSyncRequested { asset_id, .. } + | DomainEvent::DerivativeGenerated { asset_id, .. } => *asset_id.as_uuid(), DomainEvent::ShareCreated { scope_id, .. } | DomainEvent::ShareRevoked { scope_id, .. } => { *scope_id.as_uuid() diff --git a/crates/application/tests/processing/commands/execute_pipeline.rs b/crates/application/tests/processing/commands/execute_pipeline.rs new file mode 100644 index 0000000..a8c439d --- /dev/null +++ b/crates/application/tests/processing/commands/execute_pipeline.rs @@ -0,0 +1,274 @@ +use application::processing::{ExecutePipelineCommand, ExecutePipelineHandler}; +use application::testing::{ + InMemoryJobBatchRepository, InMemoryJobRepository, InMemoryPipelineRepository, + InMemoryPluginRepository, StubEventPublisher, +}; +use async_trait::async_trait; +use domain::{ + entities::{Job, JobStatus, JobType, Plugin, PluginType, ProcessingPipeline}, + errors::DomainError, + ports::{JobRepository, PipelineRepository, PluginExecutor, PluginRegistry, PluginRepository}, + value_objects::{MetadataValue, StructuredData, SystemId}, +}; +use std::collections::HashMap; +use std::sync::Arc; + +struct StubPluginExecutor { + name: String, + result: StructuredData, +} + +#[async_trait] +impl PluginExecutor for StubPluginExecutor { + fn plugin_name(&self) -> &str { + &self.name + } + + async fn execute( + &self, + _asset_id: Option, + _payload: &StructuredData, + _config: &StructuredData, + ) -> Result { + Ok(self.result.clone()) + } +} + +struct FailingPluginExecutor; + +#[async_trait] +impl PluginExecutor for FailingPluginExecutor { + fn plugin_name(&self) -> &str { + "failing" + } + + async fn execute( + &self, + _asset_id: Option, + _payload: &StructuredData, + _config: &StructuredData, + ) -> Result { + Err(DomainError::Internal("plugin crashed".into())) + } +} + +struct TestPluginRegistry { + executors: HashMap>, +} + +impl TestPluginRegistry { + fn new() -> Self { + Self { + executors: HashMap::new(), + } + } + + fn with(mut self, executor: Arc) -> Self { + self.executors + .insert(executor.plugin_name().to_string(), executor); + self + } +} + +impl PluginRegistry for TestPluginRegistry { + fn get_executor(&self, plugin_name: &str) -> Option> { + self.executors.get(plugin_name).cloned() + } + + fn registered_plugins(&self) -> Vec { + self.executors.keys().cloned().collect() + } +} + +fn make_handler( + job_repo: Arc, + batch_repo: Arc, + pipeline_repo: Arc, + plugin_repo: Arc, + registry: Arc, +) -> ExecutePipelineHandler { + let event_pub = Arc::new(StubEventPublisher::new()); + ExecutePipelineHandler::new( + job_repo, + batch_repo, + pipeline_repo, + plugin_repo, + registry, + event_pub, + ) +} + +async fn seed_plugin(repo: &InMemoryPluginRepository, name: &str) -> Plugin { + let plugin = Plugin::new(name, PluginType::MediaProcessor); + repo.save(&plugin).await.unwrap(); + plugin +} + +async fn seed_pipeline( + repo: &InMemoryPipelineRepository, + trigger: &str, + plugin: &Plugin, +) -> ProcessingPipeline { + let mut pipeline = ProcessingPipeline::new(trigger); + pipeline.add_step(plugin.plugin_id, StructuredData::new()); + repo.save(&pipeline).await.unwrap(); + pipeline +} + +async fn seed_job(repo: &InMemoryJobRepository, job_type: JobType) -> Job { + let job = Job::new(job_type, 10, StructuredData::new()); + repo.save(&job).await.unwrap(); + job +} + +#[tokio::test] +async fn executes_pipeline_with_one_step() { + let job_repo = Arc::new(InMemoryJobRepository::new()); + let batch_repo = Arc::new(InMemoryJobBatchRepository::new()); + let pipeline_repo = Arc::new(InMemoryPipelineRepository::new()); + let plugin_repo = Arc::new(InMemoryPluginRepository::new()); + + let plugin = seed_plugin(&plugin_repo, "test_plugin").await; + seed_pipeline(&pipeline_repo, "extract_metadata", &plugin).await; + let job = seed_job(&job_repo, JobType::ExtractMetadata).await; + + let mut result_data = StructuredData::new(); + result_data.insert("key", MetadataValue::String("value".into())); + let executor = Arc::new(StubPluginExecutor { + name: "test_plugin".into(), + result: result_data, + }); + let registry = Arc::new(TestPluginRegistry::new().with(executor)); + + let handler = make_handler( + job_repo.clone(), + batch_repo, + pipeline_repo, + plugin_repo, + registry, + ); + + let result = handler + .execute(ExecutePipelineCommand { job_id: job.job_id }) + .await + .unwrap(); + + assert_eq!(result.status, JobStatus::Completed); + assert_eq!(result.result_data.unwrap().get_string("key"), Some("value")); +} + +#[tokio::test] +async fn falls_back_to_direct_executor_when_no_pipeline() { + let job_repo = Arc::new(InMemoryJobRepository::new()); + let batch_repo = Arc::new(InMemoryJobBatchRepository::new()); + let pipeline_repo = Arc::new(InMemoryPipelineRepository::new()); + let plugin_repo = Arc::new(InMemoryPluginRepository::new()); + + let job = seed_job(&job_repo, JobType::ExtractMetadata).await; + + let executor = Arc::new(StubPluginExecutor { + name: "extract_metadata".into(), + result: StructuredData::new(), + }); + let registry = Arc::new(TestPluginRegistry::new().with(executor)); + + let handler = make_handler( + job_repo.clone(), + batch_repo, + pipeline_repo, + plugin_repo, + registry, + ); + + let result = handler + .execute(ExecutePipelineCommand { job_id: job.job_id }) + .await + .unwrap(); + + assert_eq!(result.status, JobStatus::Completed); +} + +#[tokio::test] +async fn skips_disabled_plugin() { + let job_repo = Arc::new(InMemoryJobRepository::new()); + let batch_repo = Arc::new(InMemoryJobBatchRepository::new()); + let pipeline_repo = Arc::new(InMemoryPipelineRepository::new()); + let plugin_repo = Arc::new(InMemoryPluginRepository::new()); + + let mut plugin = Plugin::new("disabled_one", PluginType::MediaProcessor); + plugin.disable(); + plugin_repo.save(&plugin).await.unwrap(); + + seed_pipeline(&pipeline_repo, "extract_metadata", &plugin).await; + let job = seed_job(&job_repo, JobType::ExtractMetadata).await; + + let registry = Arc::new(TestPluginRegistry::new()); + + let handler = make_handler( + job_repo.clone(), + batch_repo, + pipeline_repo, + plugin_repo, + registry, + ); + + let result = handler + .execute(ExecutePipelineCommand { job_id: job.job_id }) + .await + .unwrap(); + + assert_eq!(result.status, JobStatus::Completed); +} + +#[tokio::test] +async fn step_failure_fails_job_with_retry() { + let job_repo = Arc::new(InMemoryJobRepository::new()); + let batch_repo = Arc::new(InMemoryJobBatchRepository::new()); + let pipeline_repo = Arc::new(InMemoryPipelineRepository::new()); + let plugin_repo = Arc::new(InMemoryPluginRepository::new()); + + let plugin = seed_plugin(&plugin_repo, "failing").await; + seed_pipeline(&pipeline_repo, "extract_metadata", &plugin).await; + let job = seed_job(&job_repo, JobType::ExtractMetadata).await; + + let executor: Arc = Arc::new(FailingPluginExecutor); + let registry = Arc::new(TestPluginRegistry::new().with(executor)); + + let handler = make_handler( + job_repo.clone(), + batch_repo, + pipeline_repo, + plugin_repo, + registry, + ); + + let result = handler + .execute(ExecutePipelineCommand { job_id: job.job_id }) + .await + .unwrap(); + + // First failure → retry_count=1, back to Queued (max_retries=3) + assert_eq!(result.status, JobStatus::Queued); + assert_eq!(result.retry_count, 1); + assert!(result.error_message.unwrap().contains("plugin crashed")); +} + +#[tokio::test] +async fn missing_job_returns_not_found() { + let job_repo = Arc::new(InMemoryJobRepository::new()); + let batch_repo = Arc::new(InMemoryJobBatchRepository::new()); + let pipeline_repo = Arc::new(InMemoryPipelineRepository::new()); + let plugin_repo = Arc::new(InMemoryPluginRepository::new()); + let registry = Arc::new(TestPluginRegistry::new()); + + let handler = make_handler(job_repo, batch_repo, pipeline_repo, plugin_repo, registry); + + let err = handler + .execute(ExecutePipelineCommand { + job_id: SystemId::new(), + }) + .await + .unwrap_err(); + + assert!(matches!(err, DomainError::NotFound(_))); +} diff --git a/crates/application/tests/processing/commands/mod.rs b/crates/application/tests/processing/commands/mod.rs index a701da5..9bb8e1d 100644 --- a/crates/application/tests/processing/commands/mod.rs +++ b/crates/application/tests/processing/commands/mod.rs @@ -1,6 +1,8 @@ mod complete_job; mod configure_pipeline; mod enqueue_job; +mod execute_pipeline; mod fail_job; mod manage_plugin; +mod process_next_job; mod start_job; diff --git a/crates/application/tests/processing/commands/process_next_job.rs b/crates/application/tests/processing/commands/process_next_job.rs new file mode 100644 index 0000000..fb80642 --- /dev/null +++ b/crates/application/tests/processing/commands/process_next_job.rs @@ -0,0 +1,118 @@ +use application::processing::{ + ExecutePipelineHandler, ProcessNextJobCommand, ProcessNextJobHandler, +}; +use application::testing::{ + InMemoryJobBatchRepository, InMemoryJobRepository, InMemoryPipelineRepository, + InMemoryPluginRepository, StubEventPublisher, +}; +use async_trait::async_trait; +use domain::{ + entities::{Job, JobStatus, JobType}, + errors::DomainError, + ports::{JobRepository, PluginExecutor, PluginRegistry}, + value_objects::{StructuredData, SystemId}, +}; +use std::collections::HashMap; +use std::sync::Arc; + +struct NoOpExecutor; + +#[async_trait] +impl PluginExecutor for NoOpExecutor { + fn plugin_name(&self) -> &str { + "extract_metadata" + } + + async fn execute( + &self, + _asset_id: Option, + _payload: &StructuredData, + _config: &StructuredData, + ) -> Result { + Ok(StructuredData::new()) + } +} + +struct SimpleRegistry { + executors: HashMap>, +} + +impl PluginRegistry for SimpleRegistry { + fn get_executor(&self, name: &str) -> Option> { + self.executors.get(name).cloned() + } + + fn registered_plugins(&self) -> Vec { + self.executors.keys().cloned().collect() + } +} + +fn build_handler(job_repo: Arc) -> ProcessNextJobHandler { + let batch_repo = Arc::new(InMemoryJobBatchRepository::new()); + let pipeline_repo = Arc::new(InMemoryPipelineRepository::new()); + let plugin_repo = Arc::new(InMemoryPluginRepository::new()); + let event_pub = Arc::new(StubEventPublisher::new()); + + let mut executors = HashMap::new(); + executors.insert( + "extract_metadata".to_string(), + Arc::new(NoOpExecutor) as Arc, + ); + let registry = Arc::new(SimpleRegistry { executors }); + + let execute_pipeline = Arc::new(ExecutePipelineHandler::new( + job_repo.clone(), + batch_repo, + pipeline_repo, + plugin_repo, + registry, + event_pub, + )); + + ProcessNextJobHandler::new(job_repo, execute_pipeline) +} + +#[tokio::test] +async fn returns_none_when_queue_empty() { + let job_repo = Arc::new(InMemoryJobRepository::new()); + let handler = build_handler(job_repo); + + let result = handler.execute(ProcessNextJobCommand).await.unwrap(); + assert!(result.is_none()); +} + +#[tokio::test] +async fn processes_queued_job() { + let job_repo = Arc::new(InMemoryJobRepository::new()); + let job = Job::new(JobType::ExtractMetadata, 5, StructuredData::new()); + job_repo.save(&job).await.unwrap(); + + let handler = build_handler(job_repo); + + let result = handler.execute(ProcessNextJobCommand).await.unwrap(); + assert!(result.is_some()); + + let processed = result.unwrap(); + assert_eq!(processed.job_id, job.job_id); + assert_eq!(processed.status, JobStatus::Completed); +} + +#[tokio::test] +async fn drains_multiple_jobs_sequentially() { + let job_repo = Arc::new(InMemoryJobRepository::new()); + let job1 = Job::new(JobType::ExtractMetadata, 5, StructuredData::new()); + let job2 = Job::new(JobType::ExtractMetadata, 3, StructuredData::new()); + job_repo.save(&job1).await.unwrap(); + job_repo.save(&job2).await.unwrap(); + + let handler = build_handler(job_repo); + + let r1 = handler.execute(ProcessNextJobCommand).await.unwrap(); + assert!(r1.is_some()); + + let r2 = handler.execute(ProcessNextJobCommand).await.unwrap(); + assert!(r2.is_some()); + + let r3 = handler.execute(ProcessNextJobCommand).await.unwrap(); + assert!(r3.is_none()); +} diff --git a/crates/bootstrap/Cargo.toml b/crates/bootstrap/Cargo.toml index 60928d5..41c7c5e 100644 --- a/crates/bootstrap/Cargo.toml +++ b/crates/bootstrap/Cargo.toml @@ -14,7 +14,7 @@ adapters-auth = { workspace = true } adapters-storage = { workspace = true, features = ["s3"] } adapters-nats = { workspace = true } -event-transport = { workspace = true } +adapters-event-transport = { workspace = true } async-nats = { workspace = true } presentation = { workspace = true } diff --git a/crates/bootstrap/src/factory.rs b/crates/bootstrap/src/factory.rs index 2396346..466e6b5 100644 --- a/crates/bootstrap/src/factory.rs +++ b/crates/bootstrap/src/factory.rs @@ -22,12 +22,13 @@ pub async fn build_app(config: &Config) -> Result { adapters_nats::ensure_stream(&nats_client).await?; let transport = adapters_nats::NatsTransport::new(nats_client); - let nats_publisher: Arc = - Arc::new(event_transport::EventPublisherAdapter::new(transport)); + let nats_publisher: Arc = Arc::new( + adapters_event_transport::EventPublisherAdapter::new(transport), + ); let event_store: Arc = Arc::new(adapters_postgres::PostgresEventStore::new(pool.clone())); let event_publisher: Arc = Arc::new( - event_transport::CompositeEventPublisher::new(nats_publisher, event_store), + adapters_event_transport::CompositeEventPublisher::new(nats_publisher, event_store), ); let storage_path = std::env::var("STORAGE_PATH").unwrap_or_else(|_| "./data/media".to_string()); diff --git a/crates/domain/src/common/events.rs b/crates/domain/src/common/events.rs index 9f7aa3e..84c8b00 100644 --- a/crates/domain/src/common/events.rs +++ b/crates/domain/src/common/events.rs @@ -39,6 +39,12 @@ pub enum DomainEvent { asset_id: SystemId, timestamp: DateTimeStamp, }, + DerivativeGenerated { + asset_id: SystemId, + derivative_id: SystemId, + profile: String, + timestamp: DateTimeStamp, + }, JobEnqueued { job_id: SystemId, job_type: String, diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index 828e9b0..891709f 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -14,7 +14,7 @@ application = { workspace = true } adapters-postgres = { workspace = true } adapters-storage = { workspace = true } adapters-nats = { workspace = true } -event-transport = { workspace = true } +adapters-event-transport = { workspace = true } adapters-exif = { workspace = true } adapters-thumbnail = { workspace = true } async-nats = { workspace = true } diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index 30ab958..0df14de 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -45,10 +45,11 @@ async fn main() -> anyhow::Result<()> { // Publisher transport consumes a client clone; the consumer gets another. let pub_transport = adapters_nats::NatsTransport::new(nats_client.clone()); - let nats_publisher: Arc = - Arc::new(event_transport::EventPublisherAdapter::new(pub_transport)); + let nats_publisher: Arc = Arc::new( + adapters_event_transport::EventPublisherAdapter::new(pub_transport), + ); let event_pub: Arc = Arc::new( - event_transport::CompositeEventPublisher::new(nats_publisher, event_store), + adapters_event_transport::CompositeEventPublisher::new(nats_publisher, event_store), ); let extractor: Arc = @@ -97,7 +98,7 @@ async fn main() -> anyhow::Result<()> { // ── Event-driven loop via NATS ───────────────────────────────────── let consumer_source = adapters_nats::NatsMessageSource::new(nats_client); - let event_consumer = event_transport::EventConsumerAdapter::new(consumer_source); + let event_consumer = adapters_event_transport::EventConsumerAdapter::new(consumer_source); info!("event loop: listening for NATS events"); let mut stream = event_consumer.consume();