refactor: split worker into bootstrap, event_loop, sweep modules

main.rs: 234 → 55 lines, just config + spawn + await.
bootstrap.rs: DI wiring, returns WorkerServices struct.
event_loop.rs: event dispatch with extracted helpers
  (enqueue_cmd, handle_job_completed, drain_one).
sweep.rs: fallback job drainer on interval.
This commit is contained in:
2026-05-31 23:09:21 +02:00
parent 7b5bb66b37
commit 6140ecd3ba
4 changed files with 234 additions and 185 deletions

View File

@@ -0,0 +1,74 @@
use std::sync::Arc;
use application::processing::{EnqueueJobHandler, ProcessNextJobHandler};
use domain::ports::JobRepository;
use crate::config::WorkerConfig;
use crate::factories::{
Repos, build_enqueue_handler, build_plugin_registry, build_process_next_handler,
};
pub struct WorkerServices {
pub process_next: Arc<ProcessNextJobHandler>,
pub enqueue: Arc<EnqueueJobHandler>,
pub job_repo: Arc<dyn JobRepository>,
pub event_consumer:
adapters_event_transport::EventConsumerAdapter<adapters_nats::NatsMessageSource>,
}
pub async fn build(config: &WorkerConfig) -> anyhow::Result<WorkerServices> {
let pool = adapters_postgres::connect(&config.database_url).await?;
adapters_postgres::run_migrations(&pool).await?;
let nats_client = async_nats::connect(&config.nats_url).await?;
adapters_nats::ensure_stream(&nats_client).await?;
tracing::info!(nats_url = %config.nats_url, "NATS connected");
let event_store: Arc<dyn domain::ports::EventStore> =
Arc::new(adapters_postgres::PostgresEventStore::new(pool.clone()));
let repos = Repos::new(pool);
let file_storage = Arc::new(adapters_storage::LocalFileStorage::new(
&config.storage_path,
));
let sidecar_writer: Arc<dyn domain::ports::SidecarWriterPort> =
Arc::new(adapters_sidecar::XmpSidecarWriter);
let pub_transport = adapters_nats::NatsTransport::new(nats_client.clone());
let nats_publisher: Arc<dyn domain::ports::EventPublisher> = Arc::new(
adapters_event_transport::EventPublisherAdapter::new(pub_transport),
);
let event_pub: Arc<dyn domain::ports::EventPublisher> = Arc::new(
adapters_event_transport::CompositeEventPublisher::new(nats_publisher, event_store),
);
let extractor: Arc<dyn domain::ports::MetadataExtractorPort> =
Arc::new(adapters_exif::NomExifExtractor);
let thumbnail_gen: Arc<dyn domain::ports::ThumbnailGeneratorPort> =
Arc::new(adapters_thumbnail::ImageThumbnailGenerator);
let registry = Arc::new(build_plugin_registry(
&repos,
file_storage,
sidecar_writer,
extractor,
thumbnail_gen,
event_pub.clone(),
));
let process_next = Arc::new(build_process_next_handler(
&repos,
registry,
event_pub.clone(),
));
let job_repo: Arc<dyn JobRepository> = repos.job.clone();
let enqueue = Arc::new(build_enqueue_handler(&repos, event_pub));
let consumer_source = adapters_nats::NatsMessageSource::new(nats_client);
let event_consumer = adapters_event_transport::EventConsumerAdapter::new(consumer_source);
Ok(WorkerServices {
process_next,
enqueue,
job_repo,
event_consumer,
})
}

View File

@@ -0,0 +1,112 @@
use std::sync::Arc;
use futures::StreamExt;
use tokio::sync::watch;
use tracing::{error, info, warn};
use application::processing::{EnqueueJobCommand, ProcessNextJobCommand, ProcessNextJobHandler};
use domain::entities::JobType;
use domain::events::DomainEvent;
use domain::ports::{EventConsumer, JobRepository};
use domain::value_objects::{StructuredData, SystemId};
use crate::bootstrap::WorkerServices;
fn enqueue_cmd(job_type: JobType, priority: u32, asset_id: SystemId) -> EnqueueJobCommand {
EnqueueJobCommand {
job_type,
priority,
payload: StructuredData::new(),
target_asset_id: Some(asset_id),
batch_id: None,
}
}
pub async fn run(services: WorkerServices, mut shutdown: watch::Receiver<bool>) {
info!("event loop: listening for NATS events");
let mut stream = services.event_consumer.consume();
loop {
tokio::select! {
_ = shutdown.changed() => {
info!("event loop: shutting down");
break;
}
msg = stream.next() => {
let Some(result) = msg else { break };
let envelope = match result {
Ok(env) => env,
Err(e) => {
error!(error = %e, "event loop: consumer error");
continue;
}
};
(envelope.ack)();
match &envelope.event {
DomainEvent::AssetIngested { asset_id, .. } => {
info!(asset_id = %asset_id, "AssetIngested → ExtractMetadata");
let cmd = enqueue_cmd(JobType::ExtractMetadata, 10, *asset_id);
if let Err(e) = services.enqueue.execute(cmd).await {
error!(error = %e, "failed to enqueue ExtractMetadata");
}
}
DomainEvent::SidecarSyncRequested { asset_id, .. } => {
info!(asset_id = %asset_id, "SidecarSyncRequested → SyncSidecar");
let cmd = enqueue_cmd(JobType::SyncSidecar, 5, *asset_id);
if let Err(e) = services.enqueue.execute(cmd).await {
error!(error = %e, "failed to enqueue SyncSidecar");
}
}
DomainEvent::JobCompleted { job_id, .. } => {
handle_job_completed(
job_id,
&services.job_repo,
&services.enqueue,
)
.await;
}
DomainEvent::JobEnqueued { job_id, job_type, .. } => {
info!(job_id = %job_id, job_type = %job_type, "JobEnqueued → process");
drain_one(&services.process_next).await;
}
other => {
tracing::debug!(event = ?other, "unhandled event, acked");
}
}
}
}
}
}
async fn handle_job_completed(
job_id: &SystemId,
job_repo: &Arc<dyn JobRepository>,
enqueue: &application::processing::EnqueueJobHandler,
) {
if let Ok(Some(job)) = job_repo.find_by_id(job_id).await
&& job.job_type == JobType::ExtractMetadata
&& let Some(asset_id) = job.target_asset_id
{
info!(asset_id = %asset_id, "ExtractMetadata done → GenerateDerivative");
let cmd = enqueue_cmd(JobType::GenerateDerivative, 5, asset_id);
if let Err(e) = enqueue.execute(cmd).await {
error!(error = %e, "failed to enqueue GenerateDerivative");
}
}
}
async fn drain_one(handler: &Arc<ProcessNextJobHandler>) {
match handler.execute(ProcessNextJobCommand).await {
Ok(Some(job)) => {
info!(job_id = %job.job_id, status = ?job.status, "processed job");
}
Ok(None) => {
warn!("JobEnqueued but no queued job found");
}
Err(e) => {
error!(error = %e, "error processing job");
}
}
}

View File

@@ -1,22 +1,15 @@
use std::sync::Arc;
use std::time::Duration;
use futures::StreamExt;
use tokio::sync::watch;
use tracing::{error, info, warn};
use application::processing::{EnqueueJobCommand, ProcessNextJobCommand};
use domain::entities::JobType;
use domain::events::DomainEvent;
use domain::ports::{EventConsumer, JobRepository};
use domain::value_objects::StructuredData;
use tracing::info;
mod bootstrap;
mod config;
mod event_loop;
mod factories;
mod plugin_registry;
mod plugins;
use factories::{Repos, build_enqueue_handler, build_plugin_registry, build_process_next_handler};
mod sweep;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
@@ -29,51 +22,8 @@ async fn main() -> anyhow::Result<()> {
let config = config::WorkerConfig::from_env();
info!("Worker starting");
let pool = adapters_postgres::connect(&config.database_url).await?;
adapters_postgres::run_migrations(&pool).await?;
let services = bootstrap::build(&config).await?;
let nats_client = async_nats::connect(&config.nats_url).await?;
adapters_nats::ensure_stream(&nats_client).await?;
info!(nats_url = %config.nats_url, "NATS connected");
let event_store: Arc<dyn domain::ports::EventStore> =
Arc::new(adapters_postgres::PostgresEventStore::new(pool.clone()));
let repos = Repos::new(pool);
let file_storage = Arc::new(adapters_storage::LocalFileStorage::new(
&config.storage_path,
));
let sidecar_writer: Arc<dyn domain::ports::SidecarWriterPort> =
Arc::new(adapters_sidecar::XmpSidecarWriter);
let pub_transport = adapters_nats::NatsTransport::new(nats_client.clone());
let nats_publisher: Arc<dyn domain::ports::EventPublisher> = Arc::new(
adapters_event_transport::EventPublisherAdapter::new(pub_transport),
);
let event_pub: Arc<dyn domain::ports::EventPublisher> = Arc::new(
adapters_event_transport::CompositeEventPublisher::new(nats_publisher, event_store),
);
let extractor: Arc<dyn domain::ports::MetadataExtractorPort> =
Arc::new(adapters_exif::NomExifExtractor);
let thumbnail_gen: Arc<dyn domain::ports::ThumbnailGeneratorPort> =
Arc::new(adapters_thumbnail::ImageThumbnailGenerator);
let registry = Arc::new(build_plugin_registry(
&repos,
file_storage,
sidecar_writer,
extractor,
thumbnail_gen,
event_pub.clone(),
));
let process_next = Arc::new(build_process_next_handler(
&repos,
registry,
event_pub.clone(),
));
let job_repo: Arc<dyn JobRepository> = repos.job.clone();
let enqueue = Arc::new(build_enqueue_handler(&repos, event_pub));
// ── Shutdown signal ───────────────────────────────────────────────
let (shutdown_tx, shutdown_rx) = watch::channel(false);
tokio::spawn(async move {
@@ -95,138 +45,14 @@ async fn main() -> anyhow::Result<()> {
shutdown_tx.send(true).ok();
});
// ── Fallback sweep task ────────────────────────────────────────────
let sweep_interval = Duration::from_secs(config.fallback_sweep_secs);
let sweep_handler = Arc::clone(&process_next);
let mut sweep_shutdown = shutdown_rx.clone();
tokio::spawn(async move {
info!(
every_secs = config.fallback_sweep_secs,
"fallback sweep task started"
);
loop {
tokio::select! {
_ = sweep_shutdown.changed() => {
info!("sweep task: shutting down");
break;
}
_ = tokio::time::sleep(sweep_interval) => {}
}
info!("fallback sweep: draining queued jobs");
loop {
match sweep_handler.execute(ProcessNextJobCommand).await {
Ok(Some(job)) => {
info!(job_id = %job.job_id, status = ?job.status, "sweep: processed job");
}
Ok(None) => break,
Err(e) => {
error!(error = %e, "sweep: error processing job");
break;
}
}
}
}
});
tokio::spawn(sweep::run(
services.process_next.clone(),
sweep_interval,
shutdown_rx.clone(),
));
// ── Event-driven loop via NATS ─────────────────────────────────────
let consumer_source = adapters_nats::NatsMessageSource::new(nats_client);
let event_consumer = adapters_event_transport::EventConsumerAdapter::new(consumer_source);
info!("event loop: listening for NATS events");
let mut stream = event_consumer.consume();
let mut event_shutdown = shutdown_rx.clone();
loop {
tokio::select! {
_ = event_shutdown.changed() => {
info!("event loop: shutting down");
break;
}
msg = stream.next() => {
let Some(result) = msg else { break };
let envelope = match result {
Ok(env) => env,
Err(e) => {
error!(error = %e, "event loop: consumer error");
continue;
}
};
match &envelope.event {
DomainEvent::AssetIngested { asset_id, .. } => {
info!(asset_id = %asset_id, "event loop: AssetIngested → enqueue ExtractMetadata");
(envelope.ack)();
let cmd = EnqueueJobCommand {
job_type: JobType::ExtractMetadata,
priority: 10,
payload: StructuredData::new(),
target_asset_id: Some(*asset_id),
batch_id: None,
};
if let Err(e) = enqueue.execute(cmd).await {
error!(error = %e, "event loop: failed to enqueue ExtractMetadata");
}
}
DomainEvent::SidecarSyncRequested { asset_id, .. } => {
info!(asset_id = %asset_id, "event loop: SidecarSyncRequested → enqueue SyncSidecar");
(envelope.ack)();
let cmd = EnqueueJobCommand {
job_type: JobType::SyncSidecar,
priority: 5,
payload: StructuredData::new(),
target_asset_id: Some(*asset_id),
batch_id: None,
};
if let Err(e) = enqueue.execute(cmd).await {
error!(error = %e, "event loop: failed to enqueue SyncSidecar");
}
}
DomainEvent::JobCompleted { job_id, .. } => {
info!(job_id = %job_id, "event loop: JobCompleted → check derivative generation");
(envelope.ack)();
// Look up the job to see if it was ExtractMetadata
if let Ok(Some(job)) = job_repo.find_by_id(job_id).await
&& job.job_type == JobType::ExtractMetadata
&& let Some(asset_id) = job.target_asset_id
{
info!(asset_id = %asset_id, "event loop: ExtractMetadata done → enqueue GenerateDerivative");
let cmd = EnqueueJobCommand {
job_type: JobType::GenerateDerivative,
priority: 5,
payload: StructuredData::new(),
target_asset_id: Some(asset_id),
batch_id: None,
};
if let Err(e) = enqueue.execute(cmd).await {
error!(error = %e, "event loop: failed to enqueue GenerateDerivative");
}
}
}
DomainEvent::JobEnqueued {
job_id, job_type, ..
} => {
info!(job_id = %job_id, job_type = %job_type, "event loop: JobEnqueued → process");
(envelope.ack)();
match process_next.execute(ProcessNextJobCommand).await {
Ok(Some(job)) => {
info!(job_id = %job.job_id, status = ?job.status, "event loop: processed job");
}
Ok(None) => {
warn!("event loop: JobEnqueued but no queued job found");
}
Err(e) => {
error!(error = %e, "event loop: error processing job");
}
}
}
other => {
(envelope.ack)();
tracing::debug!(event = ?other, "event loop: unhandled event, acked");
}
}
}
}
}
event_loop::run(services, shutdown_rx).await;
info!("worker shutdown complete");
Ok(())

View File

@@ -0,0 +1,37 @@
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::watch;
use tracing::{error, info};
use application::processing::{ProcessNextJobCommand, ProcessNextJobHandler};
pub async fn run(
handler: Arc<ProcessNextJobHandler>,
interval: Duration,
mut shutdown: watch::Receiver<bool>,
) {
info!(every_secs = interval.as_secs(), "sweep task started");
loop {
tokio::select! {
_ = shutdown.changed() => {
info!("sweep task: shutting down");
break;
}
_ = tokio::time::sleep(interval) => {}
}
info!("sweep: draining queued jobs");
loop {
match handler.execute(ProcessNextJobCommand).await {
Ok(Some(job)) => {
info!(job_id = %job.job_id, status = ?job.status, "sweep: processed job");
}
Ok(None) => break,
Err(e) => {
error!(error = %e, "sweep: error processing job");
break;
}
}
}
}
}