From dacfc3d4534d0e4fb0aeaf80ef1d858dc2fbde2e Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Sun, 31 May 2026 11:35:05 +0200 Subject: [PATCH] =?UTF-8?q?feat:=20worker=20plugin=20system=20=E2=80=94=20?= =?UTF-8?q?domain=20ports,=20pipeline=20executor,=20built-in=20plugins?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - PluginExecutor + PluginRegistry ports in domain - ExecutePipelineCommand orchestrates job→pipeline→plugin steps - ProcessNextJobCommand polls + executes next queued job - InMemoryPluginRegistry, NoOp/MetadataExtractor/SidecarSync plugins - Worker main rewritten with poll loop, factories module for DI - Deleted template job/runner/jobs remnants --- Cargo.lock | 2 + .../processing/commands/execute_pipeline.rs | 176 ++++++++++++++++++ .../src/processing/commands/mod.rs | 2 + .../processing/commands/process_next_job.rs | 36 ++++ crates/application/src/processing/mod.rs | 2 + crates/domain/src/processing/ports.rs | 23 ++- crates/worker/Cargo.toml | 5 +- crates/worker/src/config.rs | 8 +- crates/worker/src/factories/infra.rs | 30 +++ crates/worker/src/factories/mod.rs | 7 + crates/worker/src/factories/plugins.rs | 30 +++ crates/worker/src/factories/processing.rs | 22 +++ crates/worker/src/job.rs | 7 - crates/worker/src/jobs/example.rs | 16 -- crates/worker/src/jobs/mod.rs | 2 - crates/worker/src/main.rs | 82 ++++++-- crates/worker/src/plugin_registry.rs | 30 +++ .../worker/src/plugins/metadata_extractor.rs | 68 +++++++ crates/worker/src/plugins/mod.rs | 7 + crates/worker/src/plugins/no_op.rs | 26 +++ crates/worker/src/plugins/sidecar_sync.rs | 50 +++++ crates/worker/src/runner.rs | 46 ----- 22 files changed, 587 insertions(+), 90 deletions(-) create mode 100644 crates/application/src/processing/commands/execute_pipeline.rs create mode 100644 crates/application/src/processing/commands/process_next_job.rs create mode 100644 crates/worker/src/factories/infra.rs create mode 100644 crates/worker/src/factories/mod.rs create mode 100644 crates/worker/src/factories/plugins.rs create mode 100644 crates/worker/src/factories/processing.rs delete mode 100644 crates/worker/src/job.rs delete mode 100644 crates/worker/src/jobs/example.rs delete mode 100644 crates/worker/src/jobs/mod.rs create mode 100644 crates/worker/src/plugin_registry.rs create mode 100644 crates/worker/src/plugins/metadata_extractor.rs create mode 100644 crates/worker/src/plugins/mod.rs create mode 100644 crates/worker/src/plugins/no_op.rs create mode 100644 crates/worker/src/plugins/sidecar_sync.rs delete mode 100644 crates/worker/src/runner.rs diff --git a/Cargo.lock b/Cargo.lock index cf5675e..78fd7e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3061,7 +3061,9 @@ name = "worker" version = "0.1.0" dependencies = [ "adapters-postgres", + "adapters-storage", "anyhow", + "application", "async-trait", "domain", "dotenvy", diff --git a/crates/application/src/processing/commands/execute_pipeline.rs b/crates/application/src/processing/commands/execute_pipeline.rs new file mode 100644 index 0000000..b12b734 --- /dev/null +++ b/crates/application/src/processing/commands/execute_pipeline.rs @@ -0,0 +1,176 @@ +use domain::{ + entities::{Job, JobType}, + errors::DomainError, + events::DomainEvent, + ports::{ + EventPublisher, JobBatchRepository, JobRepository, PipelineRepository, PluginRegistry, + PluginRepository, + }, + value_objects::{DateTimeStamp, StructuredData, SystemId}, +}; +use std::sync::Arc; + +pub struct ExecutePipelineCommand { + pub job_id: SystemId, +} + +pub struct ExecutePipelineHandler { + job_repo: Arc, + batch_repo: Arc, + pipeline_repo: Arc, + plugin_repo: Arc, + plugin_registry: Arc, + event_pub: Arc, +} + +fn job_type_to_trigger(job_type: &JobType) -> &str { + match job_type { + JobType::ScanDirectory => "scan_directory", + JobType::ExtractMetadata => "extract_metadata", + JobType::GenerateDerivative => "generate_derivative", + JobType::SyncSidecar => "sync_sidecar", + JobType::DetectDuplicates => "detect_duplicates", + JobType::Custom(s) => s.as_str(), + } +} + +impl ExecutePipelineHandler { + pub fn new( + job_repo: Arc, + batch_repo: Arc, + pipeline_repo: Arc, + plugin_repo: Arc, + plugin_registry: Arc, + event_pub: Arc, + ) -> Self { + Self { + job_repo, + batch_repo, + pipeline_repo, + plugin_repo, + plugin_registry, + event_pub, + } + } + + pub async fn execute(&self, cmd: ExecutePipelineCommand) -> Result { + let mut job = self + .job_repo + .find_by_id(&cmd.job_id) + .await? + .ok_or_else(|| DomainError::NotFound(format!("Job {} not found", cmd.job_id)))?; + + job.start()?; + self.job_repo.save(&job).await?; + + let trigger = job_type_to_trigger(&job.job_type); + let pipelines = self.pipeline_repo.find_by_trigger(trigger).await?; + + let result = match pipelines.first() { + Some(pipeline) => self.run_pipeline_steps(&job, pipeline).await, + None => self.run_direct(&job).await, + }; + + match result { + Ok(result_data) => { + job.complete(result_data); + self.job_repo.save(&job).await?; + self.update_batch_on_complete(&job).await?; + self.event_pub + .publish(DomainEvent::JobCompleted { + job_id: job.job_id, + timestamp: DateTimeStamp::now(), + }) + .await?; + } + Err(e) => { + let error_msg = e.to_string(); + job.fail(&error_msg); + self.job_repo.save(&job).await?; + self.update_batch_on_fail(&job).await?; + self.event_pub + .publish(DomainEvent::JobFailed { + job_id: job.job_id, + error: error_msg, + timestamp: DateTimeStamp::now(), + }) + .await?; + } + } + + Ok(job) + } + + async fn run_pipeline_steps( + &self, + job: &Job, + pipeline: &domain::entities::ProcessingPipeline, + ) -> Result { + let mut accumulated = StructuredData::new(); + let mut sorted_steps = pipeline.steps.clone(); + sorted_steps.sort_by_key(|s| s.step_order); + + for step in &sorted_steps { + let plugin = self + .plugin_repo + .find_by_id(&step.plugin_id) + .await? + .ok_or_else(|| { + DomainError::NotFound(format!("Plugin {} not found", step.plugin_id)) + })?; + + if !plugin.is_enabled { + continue; + } + + let executor = self + .plugin_registry + .get_executor(&plugin.name) + .ok_or_else(|| { + DomainError::NotFound(format!( + "No executor registered for plugin '{}'", + plugin.name + )) + })?; + + let step_result = executor + .execute(job.target_asset_id, &job.payload, &step.configuration) + .await?; + + accumulated.merge_from(step_result); + } + + Ok(accumulated) + } + + async fn run_direct(&self, job: &Job) -> Result { + let trigger = job_type_to_trigger(&job.job_type); + let executor = self.plugin_registry.get_executor(trigger).ok_or_else(|| { + DomainError::NotFound(format!("No pipeline or executor found for '{}'", trigger)) + })?; + + executor + .execute(job.target_asset_id, &job.payload, &StructuredData::new()) + .await + } + + async fn update_batch_on_complete(&self, job: &Job) -> Result<(), DomainError> { + if let Some(ref batch_id) = job.batch_id + && let Some(mut batch) = self.batch_repo.find_by_id(batch_id).await? + { + batch.record_completion(); + self.batch_repo.save(&batch).await?; + } + Ok(()) + } + + async fn update_batch_on_fail(&self, job: &Job) -> Result<(), DomainError> { + if let Some(ref batch_id) = job.batch_id + && let Some(mut batch) = self.batch_repo.find_by_id(batch_id).await? + { + batch.record_failure(); + self.batch_repo.save(&batch).await?; + } + Ok(()) + } +} diff --git a/crates/application/src/processing/commands/mod.rs b/crates/application/src/processing/commands/mod.rs index 398ccc0..86fc7a4 100644 --- a/crates/application/src/processing/commands/mod.rs +++ b/crates/application/src/processing/commands/mod.rs @@ -1,6 +1,8 @@ pub mod complete_job; pub mod configure_pipeline; pub mod enqueue_job; +pub mod execute_pipeline; pub mod fail_job; pub mod manage_plugin; +pub mod process_next_job; pub mod start_job; diff --git a/crates/application/src/processing/commands/process_next_job.rs b/crates/application/src/processing/commands/process_next_job.rs new file mode 100644 index 0000000..14f15e0 --- /dev/null +++ b/crates/application/src/processing/commands/process_next_job.rs @@ -0,0 +1,36 @@ +use super::execute_pipeline::{ExecutePipelineCommand, ExecutePipelineHandler}; +use domain::{entities::Job, errors::DomainError, ports::JobRepository}; +use std::sync::Arc; + +pub struct ProcessNextJobCommand; + +pub struct ProcessNextJobHandler { + job_repo: Arc, + execute_pipeline: Arc, +} + +impl ProcessNextJobHandler { + pub fn new( + job_repo: Arc, + execute_pipeline: Arc, + ) -> Self { + Self { + job_repo, + execute_pipeline, + } + } + + pub async fn execute(&self, _cmd: ProcessNextJobCommand) -> Result, DomainError> { + let job = self.job_repo.find_next_queued().await?; + match job { + None => Ok(None), + Some(j) => { + let result = self + .execute_pipeline + .execute(ExecutePipelineCommand { job_id: j.job_id }) + .await?; + Ok(Some(result)) + } + } + } +} diff --git a/crates/application/src/processing/mod.rs b/crates/application/src/processing/mod.rs index 0372381..5cc1323 100644 --- a/crates/application/src/processing/mod.rs +++ b/crates/application/src/processing/mod.rs @@ -6,8 +6,10 @@ pub use commands::configure_pipeline::{ ConfigurePipelineCommand, ConfigurePipelineHandler, PipelineStepConfig, }; pub use commands::enqueue_job::{EnqueueJobCommand, EnqueueJobHandler}; +pub use commands::execute_pipeline::{ExecutePipelineCommand, ExecutePipelineHandler}; pub use commands::fail_job::{FailJobCommand, FailJobHandler}; pub use commands::manage_plugin::{ManagePluginCommand, ManagePluginHandler, PluginAction}; +pub use commands::process_next_job::{ProcessNextJobCommand, ProcessNextJobHandler}; pub use commands::start_job::{StartJobCommand, StartJobHandler}; pub use queries::report_batch_progress::{ BatchProgress, ReportBatchProgressHandler, ReportBatchProgressQuery, diff --git a/crates/domain/src/processing/ports.rs b/crates/domain/src/processing/ports.rs index 283e770..859334b 100644 --- a/crates/domain/src/processing/ports.rs +++ b/crates/domain/src/processing/ports.rs @@ -1,7 +1,8 @@ use super::entities::{Job, JobBatch, Plugin, ProcessingPipeline}; use crate::common::errors::DomainError; -use crate::common::value_objects::SystemId; +use crate::common::value_objects::{StructuredData, SystemId}; use async_trait::async_trait; +use std::sync::Arc; // --- JobRepository --- @@ -38,3 +39,23 @@ pub trait PipelineRepository: Send + Sync { async fn find_by_trigger(&self, event: &str) -> Result, DomainError>; async fn save(&self, pipeline: &ProcessingPipeline) -> Result<(), DomainError>; } + +// --- PluginExecutor --- + +#[async_trait] +pub trait PluginExecutor: Send + Sync { + fn plugin_name(&self) -> &str; + async fn execute( + &self, + asset_id: Option, + payload: &StructuredData, + config: &StructuredData, + ) -> Result; +} + +// --- PluginRegistry --- + +pub trait PluginRegistry: Send + Sync { + fn get_executor(&self, plugin_name: &str) -> Option>; + fn registered_plugins(&self) -> Vec; +} diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index 9d686fa..2159a9f 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -8,10 +8,11 @@ name = "k_photos-worker" path = "src/main.rs" [dependencies] -domain = { workspace = true } - +domain = { workspace = true } +application = { workspace = true } adapters-postgres = { path = "../adapters/postgres" } +adapters-storage = { workspace = true } tokio = { workspace = true } anyhow = { workspace = true } diff --git a/crates/worker/src/config.rs b/crates/worker/src/config.rs index 4a176fe..34b3ea5 100644 --- a/crates/worker/src/config.rs +++ b/crates/worker/src/config.rs @@ -1,7 +1,8 @@ #[derive(Debug, Clone)] pub struct WorkerConfig { pub database_url: String, - pub example_job_interval_secs: u64, + pub poll_interval_secs: u64, + pub storage_path: String, } impl WorkerConfig { @@ -9,10 +10,11 @@ impl WorkerConfig { dotenvy::dotenv().ok(); Self { database_url: std::env::var("DATABASE_URL").expect("DATABASE_URL must be set"), - example_job_interval_secs: std::env::var("EXAMPLE_JOB_INTERVAL_SECS") + poll_interval_secs: std::env::var("POLL_INTERVAL_SECS") .ok() .and_then(|v| v.parse().ok()) - .unwrap_or(60), + .unwrap_or(5), + storage_path: std::env::var("STORAGE_PATH").unwrap_or_else(|_| "./storage".into()), } } } diff --git a/crates/worker/src/factories/infra.rs b/crates/worker/src/factories/infra.rs new file mode 100644 index 0000000..73a956e --- /dev/null +++ b/crates/worker/src/factories/infra.rs @@ -0,0 +1,30 @@ +use adapters_postgres::{ + PostgresAssetMetadataRepository, PostgresAssetRepository, PostgresJobBatchRepository, + PostgresJobRepository, PostgresPipelineRepository, PostgresPluginRepository, + PostgresSidecarRepository, +}; +use std::sync::Arc; + +pub struct Repos { + pub job: Arc, + pub batch: Arc, + pub pipeline: Arc, + pub plugin: Arc, + pub asset: Arc, + pub metadata: Arc, + pub sidecar: Arc, +} + +impl Repos { + pub fn new(pool: adapters_postgres::PgPool) -> Self { + Self { + job: Arc::new(PostgresJobRepository::new(pool.clone())), + batch: Arc::new(PostgresJobBatchRepository::new(pool.clone())), + pipeline: Arc::new(PostgresPipelineRepository::new(pool.clone())), + plugin: Arc::new(PostgresPluginRepository::new(pool.clone())), + asset: Arc::new(PostgresAssetRepository::new(pool.clone())), + metadata: Arc::new(PostgresAssetMetadataRepository::new(pool.clone())), + sidecar: Arc::new(PostgresSidecarRepository::new(pool)), + } + } +} diff --git a/crates/worker/src/factories/mod.rs b/crates/worker/src/factories/mod.rs new file mode 100644 index 0000000..fbedfe3 --- /dev/null +++ b/crates/worker/src/factories/mod.rs @@ -0,0 +1,7 @@ +mod infra; +mod plugins; +mod processing; + +pub use infra::Repos; +pub use plugins::build_plugin_registry; +pub use processing::build_process_next_handler; diff --git a/crates/worker/src/factories/plugins.rs b/crates/worker/src/factories/plugins.rs new file mode 100644 index 0000000..88c46b6 --- /dev/null +++ b/crates/worker/src/factories/plugins.rs @@ -0,0 +1,30 @@ +use crate::plugin_registry::InMemoryPluginRegistry; +use crate::plugins::{MetadataExtractorPlugin, NoOpPlugin, SidecarSyncPlugin}; +use domain::ports::SidecarWriterPort; +use std::sync::Arc; + +use super::Repos; + +pub fn build_plugin_registry( + repos: &Repos, + file_storage: Arc, + sidecar_writer: Arc, +) -> InMemoryPluginRegistry { + let mut registry = InMemoryPluginRegistry::new(); + + registry.register(Arc::new(NoOpPlugin)); + registry.register(Arc::new(MetadataExtractorPlugin::new( + repos.asset.clone(), + file_storage, + repos.metadata.clone(), + ))); + + let export_handler = Arc::new(application::sidecar::ExportSidecarHandler::new( + repos.metadata.clone(), + repos.sidecar.clone(), + sidecar_writer, + )); + registry.register(Arc::new(SidecarSyncPlugin::new(export_handler))); + + registry +} diff --git a/crates/worker/src/factories/processing.rs b/crates/worker/src/factories/processing.rs new file mode 100644 index 0000000..ddaf152 --- /dev/null +++ b/crates/worker/src/factories/processing.rs @@ -0,0 +1,22 @@ +use application::processing::{ExecutePipelineHandler, ProcessNextJobHandler}; +use domain::ports::{EventPublisher, PluginRegistry}; +use std::sync::Arc; + +use super::Repos; + +pub fn build_process_next_handler( + repos: &Repos, + registry: Arc, + event_pub: Arc, +) -> ProcessNextJobHandler { + let execute_pipeline = Arc::new(ExecutePipelineHandler::new( + repos.job.clone(), + repos.batch.clone(), + repos.pipeline.clone(), + repos.plugin.clone(), + registry, + event_pub, + )); + + ProcessNextJobHandler::new(repos.job.clone(), execute_pipeline) +} diff --git a/crates/worker/src/job.rs b/crates/worker/src/job.rs deleted file mode 100644 index 19da8a8..0000000 --- a/crates/worker/src/job.rs +++ /dev/null @@ -1,7 +0,0 @@ -use async_trait::async_trait; - -#[async_trait] -pub trait Job: Send + Sync { - fn name(&self) -> &str; - async fn run(&self) -> anyhow::Result<()>; -} diff --git a/crates/worker/src/jobs/example.rs b/crates/worker/src/jobs/example.rs deleted file mode 100644 index 013a3b6..0000000 --- a/crates/worker/src/jobs/example.rs +++ /dev/null @@ -1,16 +0,0 @@ -use crate::job::Job; -use async_trait::async_trait; -use tracing::info; - -pub struct ExampleJob; - -#[async_trait] -impl Job for ExampleJob { - fn name(&self) -> &str { - "example" - } - async fn run(&self) -> anyhow::Result<()> { - info!("example job ran — replace with real work"); - Ok(()) - } -} diff --git a/crates/worker/src/jobs/mod.rs b/crates/worker/src/jobs/mod.rs deleted file mode 100644 index c03205c..0000000 --- a/crates/worker/src/jobs/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod example; -pub use example::ExampleJob; diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index 458339f..42eb770 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -1,14 +1,14 @@ use std::sync::Arc; use std::time::Duration; -use tracing::info; +use tracing::{error, info}; mod config; -mod job; -mod jobs; -mod runner; +mod factories; +mod plugin_registry; +mod plugins; -use jobs::ExampleJob; -use runner::JobRunner; +use application::processing::ProcessNextJobCommand; +use factories::{Repos, build_plugin_registry, build_process_next_handler}; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -21,13 +21,69 @@ async fn main() -> anyhow::Result<()> { let config = config::WorkerConfig::from_env(); info!("Worker starting"); - let _pool = adapters_postgres::connect(&config.database_url).await?; - adapters_postgres::run_migrations(&_pool).await?; + let pool = adapters_postgres::connect(&config.database_url).await?; + adapters_postgres::run_migrations(&pool).await?; - let interval = Duration::from_secs(config.example_job_interval_secs); - let runner = JobRunner::new().register(Arc::new(ExampleJob), interval); + let repos = Repos::new(pool); + let file_storage = Arc::new(adapters_storage::LocalFileStorage::new( + &config.storage_path, + )); + let sidecar_writer: Arc = Arc::new(LogSidecarWriter); + let event_pub: Arc = Arc::new(LogEventPublisher); - info!("Worker running"); - runner.run().await; - Ok(()) + let registry = Arc::new(build_plugin_registry(&repos, file_storage, sidecar_writer)); + let process_next = build_process_next_handler(&repos, registry, event_pub); + + let poll_interval = Duration::from_secs(config.poll_interval_secs); + info!(poll_secs = config.poll_interval_secs, "Worker running"); + + loop { + match process_next.execute(ProcessNextJobCommand).await { + Ok(Some(job)) => info!(job_id = %job.job_id, status = ?job.status, "processed job"), + Ok(None) => tokio::time::sleep(poll_interval).await, + Err(e) => { + error!(error = %e, "worker error"); + tokio::time::sleep(poll_interval).await; + } + } + } +} + +struct LogEventPublisher; + +#[async_trait::async_trait] +impl domain::ports::EventPublisher for LogEventPublisher { + async fn publish( + &self, + event: domain::events::DomainEvent, + ) -> Result<(), domain::errors::DomainError> { + info!(event = ?event, "domain event"); + Ok(()) + } +} + +struct LogSidecarWriter; + +#[async_trait::async_trait] +impl domain::ports::SidecarWriterPort for LogSidecarWriter { + fn format_name(&self) -> &str { + "log_noop" + } + + async fn write_sidecar( + &self, + _data: &domain::value_objects::StructuredData, + path: &str, + ) -> Result<(), domain::errors::DomainError> { + info!(path, "sidecar write (no-op)"); + Ok(()) + } + + async fn read_sidecar( + &self, + path: &str, + ) -> Result { + info!(path, "sidecar read (no-op)"); + Ok(domain::value_objects::StructuredData::new()) + } } diff --git a/crates/worker/src/plugin_registry.rs b/crates/worker/src/plugin_registry.rs new file mode 100644 index 0000000..a033ca4 --- /dev/null +++ b/crates/worker/src/plugin_registry.rs @@ -0,0 +1,30 @@ +use domain::ports::{PluginExecutor, PluginRegistry}; +use std::collections::HashMap; +use std::sync::Arc; + +pub struct InMemoryPluginRegistry { + executors: HashMap>, +} + +impl InMemoryPluginRegistry { + pub fn new() -> Self { + Self { + executors: HashMap::new(), + } + } + + pub fn register(&mut self, executor: Arc) { + self.executors + .insert(executor.plugin_name().to_string(), executor); + } +} + +impl PluginRegistry for InMemoryPluginRegistry { + fn get_executor(&self, plugin_name: &str) -> Option> { + self.executors.get(plugin_name).cloned() + } + + fn registered_plugins(&self) -> Vec { + self.executors.keys().cloned().collect() + } +} diff --git a/crates/worker/src/plugins/metadata_extractor.rs b/crates/worker/src/plugins/metadata_extractor.rs new file mode 100644 index 0000000..0b3529e --- /dev/null +++ b/crates/worker/src/plugins/metadata_extractor.rs @@ -0,0 +1,68 @@ +use async_trait::async_trait; +use domain::{ + entities::{AssetMetadata, MetadataSource}, + errors::DomainError, + ports::{AssetMetadataRepository, AssetRepository, FileStoragePort, PluginExecutor}, + value_objects::{MetadataValue, StructuredData, SystemId}, +}; +use std::sync::Arc; +use tracing::info; + +pub struct MetadataExtractorPlugin { + asset_repo: Arc, + file_storage: Arc, + metadata_repo: Arc, +} + +impl MetadataExtractorPlugin { + pub fn new( + asset_repo: Arc, + file_storage: Arc, + metadata_repo: Arc, + ) -> Self { + Self { + asset_repo, + file_storage, + metadata_repo, + } + } +} + +#[async_trait] +impl PluginExecutor for MetadataExtractorPlugin { + fn plugin_name(&self) -> &str { + "metadata_extractor" + } + + async fn execute( + &self, + asset_id: Option, + _payload: &StructuredData, + _config: &StructuredData, + ) -> Result { + let asset_id = asset_id.ok_or_else(|| { + DomainError::Validation("metadata_extractor requires asset_id".into()) + })?; + + let asset = self + .asset_repo + .find_by_id(&asset_id) + .await? + .ok_or_else(|| DomainError::NotFound(format!("Asset {} not found", asset_id)))?; + + let path = &asset.source_reference.relative_path; + let data = self.file_storage.read_file(path).await?; + let file_size = data.len() as i64; + + let mut extracted = StructuredData::new(); + extracted.insert("file_size_bytes", MetadataValue::Integer(file_size)); + extracted.insert("mime_type", MetadataValue::String(asset.mime_type.clone())); + + let metadata = + AssetMetadata::new(asset_id, MetadataSource::ExifExtracted, extracted.clone()); + self.metadata_repo.save(&metadata).await?; + + info!(asset_id = %asset_id, file_size, "extracted basic metadata"); + Ok(extracted) + } +} diff --git a/crates/worker/src/plugins/mod.rs b/crates/worker/src/plugins/mod.rs new file mode 100644 index 0000000..047144f --- /dev/null +++ b/crates/worker/src/plugins/mod.rs @@ -0,0 +1,7 @@ +pub mod metadata_extractor; +pub mod no_op; +pub mod sidecar_sync; + +pub use metadata_extractor::MetadataExtractorPlugin; +pub use no_op::NoOpPlugin; +pub use sidecar_sync::SidecarSyncPlugin; diff --git a/crates/worker/src/plugins/no_op.rs b/crates/worker/src/plugins/no_op.rs new file mode 100644 index 0000000..1573c94 --- /dev/null +++ b/crates/worker/src/plugins/no_op.rs @@ -0,0 +1,26 @@ +use async_trait::async_trait; +use domain::{ + errors::DomainError, + ports::PluginExecutor, + value_objects::{StructuredData, SystemId}, +}; +use tracing::info; + +pub struct NoOpPlugin; + +#[async_trait] +impl PluginExecutor for NoOpPlugin { + fn plugin_name(&self) -> &str { + "no_op" + } + + async fn execute( + &self, + asset_id: Option, + _payload: &StructuredData, + _config: &StructuredData, + ) -> Result { + info!(asset_id = ?asset_id, "no_op plugin executed"); + Ok(StructuredData::new()) + } +} diff --git a/crates/worker/src/plugins/sidecar_sync.rs b/crates/worker/src/plugins/sidecar_sync.rs new file mode 100644 index 0000000..d9fe8ac --- /dev/null +++ b/crates/worker/src/plugins/sidecar_sync.rs @@ -0,0 +1,50 @@ +use application::sidecar::{ExportSidecarCommand, ExportSidecarHandler}; +use async_trait::async_trait; +use domain::{ + errors::DomainError, + ports::PluginExecutor, + value_objects::{MetadataValue, StructuredData, SystemId}, +}; +use std::sync::Arc; +use tracing::info; + +pub struct SidecarSyncPlugin { + export_handler: Arc, +} + +impl SidecarSyncPlugin { + pub fn new(export_handler: Arc) -> Self { + Self { export_handler } + } +} + +#[async_trait] +impl PluginExecutor for SidecarSyncPlugin { + fn plugin_name(&self) -> &str { + "sidecar_sync" + } + + async fn execute( + &self, + asset_id: Option, + _payload: &StructuredData, + _config: &StructuredData, + ) -> Result { + let asset_id = asset_id + .ok_or_else(|| DomainError::Validation("sidecar_sync requires asset_id".into()))?; + + let record = self + .export_handler + .execute(ExportSidecarCommand { asset_id }) + .await?; + + let mut result = StructuredData::new(); + result.insert( + "sidecar_path", + MetadataValue::String(record.sidecar_storage_path), + ); + + info!(asset_id = %asset_id, "sidecar synced"); + Ok(result) + } +} diff --git a/crates/worker/src/runner.rs b/crates/worker/src/runner.rs deleted file mode 100644 index 7f533bc..0000000 --- a/crates/worker/src/runner.rs +++ /dev/null @@ -1,46 +0,0 @@ -use crate::job::Job; -use std::sync::Arc; -use std::time::Duration; -use tracing::{error, info}; - -pub struct JobRunner { - jobs: Vec<(Arc, Duration)>, -} - -impl JobRunner { - pub fn new() -> Self { - Self { jobs: vec![] } - } - - pub fn register(mut self, job: Arc, interval: Duration) -> Self { - self.jobs.push((job, interval)); - self - } - - pub async fn run(self) { - let handles: Vec<_> = self - .jobs - .into_iter() - .map(|(job, interval)| { - tokio::spawn(async move { - loop { - info!(job = job.name(), "running job"); - if let Err(e) = job.run().await { - error!(job = job.name(), error = %e, "job failed"); - } - tokio::time::sleep(interval).await; - } - }) - }) - .collect(); - for handle in handles { - let _ = handle.await; - } - } -} - -impl Default for JobRunner { - fn default() -> Self { - Self::new() - } -}