diff --git a/crates/worker/src/bootstrap.rs b/crates/worker/src/bootstrap.rs new file mode 100644 index 0000000..dbb740e --- /dev/null +++ b/crates/worker/src/bootstrap.rs @@ -0,0 +1,74 @@ +use std::sync::Arc; + +use application::processing::{EnqueueJobHandler, ProcessNextJobHandler}; +use domain::ports::JobRepository; + +use crate::config::WorkerConfig; +use crate::factories::{ + Repos, build_enqueue_handler, build_plugin_registry, build_process_next_handler, +}; + +pub struct WorkerServices { + pub process_next: Arc, + pub enqueue: Arc, + pub job_repo: Arc, + pub event_consumer: + adapters_event_transport::EventConsumerAdapter, +} + +pub async fn build(config: &WorkerConfig) -> anyhow::Result { + let pool = adapters_postgres::connect(&config.database_url).await?; + adapters_postgres::run_migrations(&pool).await?; + + let nats_client = async_nats::connect(&config.nats_url).await?; + adapters_nats::ensure_stream(&nats_client).await?; + tracing::info!(nats_url = %config.nats_url, "NATS connected"); + + let event_store: Arc = + Arc::new(adapters_postgres::PostgresEventStore::new(pool.clone())); + let repos = Repos::new(pool); + let file_storage = Arc::new(adapters_storage::LocalFileStorage::new( + &config.storage_path, + )); + let sidecar_writer: Arc = + Arc::new(adapters_sidecar::XmpSidecarWriter); + + let pub_transport = adapters_nats::NatsTransport::new(nats_client.clone()); + let nats_publisher: Arc = Arc::new( + adapters_event_transport::EventPublisherAdapter::new(pub_transport), + ); + let event_pub: Arc = Arc::new( + adapters_event_transport::CompositeEventPublisher::new(nats_publisher, event_store), + ); + + let extractor: Arc = + Arc::new(adapters_exif::NomExifExtractor); + let thumbnail_gen: Arc = + Arc::new(adapters_thumbnail::ImageThumbnailGenerator); + let registry = Arc::new(build_plugin_registry( + &repos, + file_storage, + sidecar_writer, + extractor, + thumbnail_gen, + event_pub.clone(), + )); + + let process_next = Arc::new(build_process_next_handler( + &repos, + registry, + event_pub.clone(), + )); + let job_repo: Arc = repos.job.clone(); + let enqueue = Arc::new(build_enqueue_handler(&repos, event_pub)); + + let consumer_source = adapters_nats::NatsMessageSource::new(nats_client); + let event_consumer = adapters_event_transport::EventConsumerAdapter::new(consumer_source); + + Ok(WorkerServices { + process_next, + enqueue, + job_repo, + event_consumer, + }) +} diff --git a/crates/worker/src/event_loop.rs b/crates/worker/src/event_loop.rs new file mode 100644 index 0000000..9eaebf7 --- /dev/null +++ b/crates/worker/src/event_loop.rs @@ -0,0 +1,112 @@ +use std::sync::Arc; + +use futures::StreamExt; +use tokio::sync::watch; +use tracing::{error, info, warn}; + +use application::processing::{EnqueueJobCommand, ProcessNextJobCommand, ProcessNextJobHandler}; +use domain::entities::JobType; +use domain::events::DomainEvent; +use domain::ports::{EventConsumer, JobRepository}; +use domain::value_objects::{StructuredData, SystemId}; + +use crate::bootstrap::WorkerServices; + +fn enqueue_cmd(job_type: JobType, priority: u32, asset_id: SystemId) -> EnqueueJobCommand { + EnqueueJobCommand { + job_type, + priority, + payload: StructuredData::new(), + target_asset_id: Some(asset_id), + batch_id: None, + } +} + +pub async fn run(services: WorkerServices, mut shutdown: watch::Receiver) { + info!("event loop: listening for NATS events"); + let mut stream = services.event_consumer.consume(); + + loop { + tokio::select! { + _ = shutdown.changed() => { + info!("event loop: shutting down"); + break; + } + msg = stream.next() => { + let Some(result) = msg else { break }; + let envelope = match result { + Ok(env) => env, + Err(e) => { + error!(error = %e, "event loop: consumer error"); + continue; + } + }; + + (envelope.ack)(); + + match &envelope.event { + DomainEvent::AssetIngested { asset_id, .. } => { + info!(asset_id = %asset_id, "AssetIngested → ExtractMetadata"); + let cmd = enqueue_cmd(JobType::ExtractMetadata, 10, *asset_id); + if let Err(e) = services.enqueue.execute(cmd).await { + error!(error = %e, "failed to enqueue ExtractMetadata"); + } + } + DomainEvent::SidecarSyncRequested { asset_id, .. } => { + info!(asset_id = %asset_id, "SidecarSyncRequested → SyncSidecar"); + let cmd = enqueue_cmd(JobType::SyncSidecar, 5, *asset_id); + if let Err(e) = services.enqueue.execute(cmd).await { + error!(error = %e, "failed to enqueue SyncSidecar"); + } + } + DomainEvent::JobCompleted { job_id, .. } => { + handle_job_completed( + job_id, + &services.job_repo, + &services.enqueue, + ) + .await; + } + DomainEvent::JobEnqueued { job_id, job_type, .. } => { + info!(job_id = %job_id, job_type = %job_type, "JobEnqueued → process"); + drain_one(&services.process_next).await; + } + other => { + tracing::debug!(event = ?other, "unhandled event, acked"); + } + } + } + } + } +} + +async fn handle_job_completed( + job_id: &SystemId, + job_repo: &Arc, + enqueue: &application::processing::EnqueueJobHandler, +) { + if let Ok(Some(job)) = job_repo.find_by_id(job_id).await + && job.job_type == JobType::ExtractMetadata + && let Some(asset_id) = job.target_asset_id + { + info!(asset_id = %asset_id, "ExtractMetadata done → GenerateDerivative"); + let cmd = enqueue_cmd(JobType::GenerateDerivative, 5, asset_id); + if let Err(e) = enqueue.execute(cmd).await { + error!(error = %e, "failed to enqueue GenerateDerivative"); + } + } +} + +async fn drain_one(handler: &Arc) { + match handler.execute(ProcessNextJobCommand).await { + Ok(Some(job)) => { + info!(job_id = %job.job_id, status = ?job.status, "processed job"); + } + Ok(None) => { + warn!("JobEnqueued but no queued job found"); + } + Err(e) => { + error!(error = %e, "error processing job"); + } + } +} diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index 12787a2..f5c340e 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -1,22 +1,15 @@ -use std::sync::Arc; use std::time::Duration; -use futures::StreamExt; use tokio::sync::watch; -use tracing::{error, info, warn}; - -use application::processing::{EnqueueJobCommand, ProcessNextJobCommand}; -use domain::entities::JobType; -use domain::events::DomainEvent; -use domain::ports::{EventConsumer, JobRepository}; -use domain::value_objects::StructuredData; +use tracing::info; +mod bootstrap; mod config; +mod event_loop; mod factories; mod plugin_registry; mod plugins; - -use factories::{Repos, build_enqueue_handler, build_plugin_registry, build_process_next_handler}; +mod sweep; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -29,51 +22,8 @@ async fn main() -> anyhow::Result<()> { let config = config::WorkerConfig::from_env(); info!("Worker starting"); - let pool = adapters_postgres::connect(&config.database_url).await?; - adapters_postgres::run_migrations(&pool).await?; + let services = bootstrap::build(&config).await?; - let nats_client = async_nats::connect(&config.nats_url).await?; - adapters_nats::ensure_stream(&nats_client).await?; - info!(nats_url = %config.nats_url, "NATS connected"); - - let event_store: Arc = - Arc::new(adapters_postgres::PostgresEventStore::new(pool.clone())); - let repos = Repos::new(pool); - let file_storage = Arc::new(adapters_storage::LocalFileStorage::new( - &config.storage_path, - )); - let sidecar_writer: Arc = - Arc::new(adapters_sidecar::XmpSidecarWriter); - - let pub_transport = adapters_nats::NatsTransport::new(nats_client.clone()); - let nats_publisher: Arc = Arc::new( - adapters_event_transport::EventPublisherAdapter::new(pub_transport), - ); - let event_pub: Arc = Arc::new( - adapters_event_transport::CompositeEventPublisher::new(nats_publisher, event_store), - ); - - let extractor: Arc = - Arc::new(adapters_exif::NomExifExtractor); - let thumbnail_gen: Arc = - Arc::new(adapters_thumbnail::ImageThumbnailGenerator); - let registry = Arc::new(build_plugin_registry( - &repos, - file_storage, - sidecar_writer, - extractor, - thumbnail_gen, - event_pub.clone(), - )); - let process_next = Arc::new(build_process_next_handler( - &repos, - registry, - event_pub.clone(), - )); - let job_repo: Arc = repos.job.clone(); - let enqueue = Arc::new(build_enqueue_handler(&repos, event_pub)); - - // ── Shutdown signal ─────────────────────────────────────────────── let (shutdown_tx, shutdown_rx) = watch::channel(false); tokio::spawn(async move { @@ -95,138 +45,14 @@ async fn main() -> anyhow::Result<()> { shutdown_tx.send(true).ok(); }); - // ── Fallback sweep task ──────────────────────────────────────────── let sweep_interval = Duration::from_secs(config.fallback_sweep_secs); - let sweep_handler = Arc::clone(&process_next); - let mut sweep_shutdown = shutdown_rx.clone(); - tokio::spawn(async move { - info!( - every_secs = config.fallback_sweep_secs, - "fallback sweep task started" - ); - loop { - tokio::select! { - _ = sweep_shutdown.changed() => { - info!("sweep task: shutting down"); - break; - } - _ = tokio::time::sleep(sweep_interval) => {} - } - info!("fallback sweep: draining queued jobs"); - loop { - match sweep_handler.execute(ProcessNextJobCommand).await { - Ok(Some(job)) => { - info!(job_id = %job.job_id, status = ?job.status, "sweep: processed job"); - } - Ok(None) => break, - Err(e) => { - error!(error = %e, "sweep: error processing job"); - break; - } - } - } - } - }); + tokio::spawn(sweep::run( + services.process_next.clone(), + sweep_interval, + shutdown_rx.clone(), + )); - // ── Event-driven loop via NATS ───────────────────────────────────── - let consumer_source = adapters_nats::NatsMessageSource::new(nats_client); - let event_consumer = adapters_event_transport::EventConsumerAdapter::new(consumer_source); - - info!("event loop: listening for NATS events"); - let mut stream = event_consumer.consume(); - let mut event_shutdown = shutdown_rx.clone(); - - loop { - tokio::select! { - _ = event_shutdown.changed() => { - info!("event loop: shutting down"); - break; - } - msg = stream.next() => { - let Some(result) = msg else { break }; - let envelope = match result { - Ok(env) => env, - Err(e) => { - error!(error = %e, "event loop: consumer error"); - continue; - } - }; - - match &envelope.event { - DomainEvent::AssetIngested { asset_id, .. } => { - info!(asset_id = %asset_id, "event loop: AssetIngested → enqueue ExtractMetadata"); - (envelope.ack)(); - let cmd = EnqueueJobCommand { - job_type: JobType::ExtractMetadata, - priority: 10, - payload: StructuredData::new(), - target_asset_id: Some(*asset_id), - batch_id: None, - }; - if let Err(e) = enqueue.execute(cmd).await { - error!(error = %e, "event loop: failed to enqueue ExtractMetadata"); - } - } - DomainEvent::SidecarSyncRequested { asset_id, .. } => { - info!(asset_id = %asset_id, "event loop: SidecarSyncRequested → enqueue SyncSidecar"); - (envelope.ack)(); - let cmd = EnqueueJobCommand { - job_type: JobType::SyncSidecar, - priority: 5, - payload: StructuredData::new(), - target_asset_id: Some(*asset_id), - batch_id: None, - }; - if let Err(e) = enqueue.execute(cmd).await { - error!(error = %e, "event loop: failed to enqueue SyncSidecar"); - } - } - DomainEvent::JobCompleted { job_id, .. } => { - info!(job_id = %job_id, "event loop: JobCompleted → check derivative generation"); - (envelope.ack)(); - // Look up the job to see if it was ExtractMetadata - if let Ok(Some(job)) = job_repo.find_by_id(job_id).await - && job.job_type == JobType::ExtractMetadata - && let Some(asset_id) = job.target_asset_id - { - info!(asset_id = %asset_id, "event loop: ExtractMetadata done → enqueue GenerateDerivative"); - let cmd = EnqueueJobCommand { - job_type: JobType::GenerateDerivative, - priority: 5, - payload: StructuredData::new(), - target_asset_id: Some(asset_id), - batch_id: None, - }; - if let Err(e) = enqueue.execute(cmd).await { - error!(error = %e, "event loop: failed to enqueue GenerateDerivative"); - } - } - } - DomainEvent::JobEnqueued { - job_id, job_type, .. - } => { - info!(job_id = %job_id, job_type = %job_type, "event loop: JobEnqueued → process"); - (envelope.ack)(); - match process_next.execute(ProcessNextJobCommand).await { - Ok(Some(job)) => { - info!(job_id = %job.job_id, status = ?job.status, "event loop: processed job"); - } - Ok(None) => { - warn!("event loop: JobEnqueued but no queued job found"); - } - Err(e) => { - error!(error = %e, "event loop: error processing job"); - } - } - } - other => { - (envelope.ack)(); - tracing::debug!(event = ?other, "event loop: unhandled event, acked"); - } - } - } - } - } + event_loop::run(services, shutdown_rx).await; info!("worker shutdown complete"); Ok(()) diff --git a/crates/worker/src/sweep.rs b/crates/worker/src/sweep.rs new file mode 100644 index 0000000..f295eb1 --- /dev/null +++ b/crates/worker/src/sweep.rs @@ -0,0 +1,37 @@ +use std::sync::Arc; +use std::time::Duration; + +use tokio::sync::watch; +use tracing::{error, info}; + +use application::processing::{ProcessNextJobCommand, ProcessNextJobHandler}; + +pub async fn run( + handler: Arc, + interval: Duration, + mut shutdown: watch::Receiver, +) { + info!(every_secs = interval.as_secs(), "sweep task started"); + loop { + tokio::select! { + _ = shutdown.changed() => { + info!("sweep task: shutting down"); + break; + } + _ = tokio::time::sleep(interval) => {} + } + info!("sweep: draining queued jobs"); + loop { + match handler.execute(ProcessNextJobCommand).await { + Ok(Some(job)) => { + info!(job_id = %job.job_id, status = ?job.status, "sweep: processed job"); + } + Ok(None) => break, + Err(e) => { + error!(error = %e, "sweep: error processing job"); + break; + } + } + } + } +}