diff --git a/Cargo.lock b/Cargo.lock index 3dbd97d..a6f97a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -94,6 +94,8 @@ dependencies = [ "chrono", "domain", "serde", + "serde_json", + "sha2", "thiserror", "tokio", "uuid", diff --git a/Cargo.toml b/Cargo.toml index f231fc8..cf98bcb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ anyhow = "1.0" thiserror = "2.0" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" +sha2 = "0.10" uuid = { version = "1.0", features = ["v4", "serde"] } chrono = { version = "0.4", features = ["serde"] } dotenvy = "0.15" diff --git a/crates/application/Cargo.toml b/crates/application/Cargo.toml index b5f9132..e787932 100644 --- a/crates/application/Cargo.toml +++ b/crates/application/Cargo.toml @@ -12,6 +12,8 @@ uuid = { workspace = true } tokio = { workspace = true } bytes = { workspace = true } serde = { workspace = true } +serde_json = { workspace = true } +sha2 = { workspace = true } [dev-dependencies] chrono = { workspace = true } diff --git a/crates/application/src/processing/commands/complete_job.rs b/crates/application/src/processing/commands/complete_job.rs new file mode 100644 index 0000000..141dd94 --- /dev/null +++ b/crates/application/src/processing/commands/complete_job.rs @@ -0,0 +1,48 @@ +use std::sync::Arc; +use domain::{ + entities::Job, + errors::DomainError, + events::DomainEvent, + ports::{EventPublisher, JobBatchRepository, JobRepository}, + value_objects::{DateTimeStamp, StructuredData, SystemId}, +}; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct CompleteJobCommand { + pub job_id: SystemId, + pub result: StructuredData, +} + +pub struct CompleteJobHandler { + job_repo: Arc, + batch_repo: Arc, + event_pub: Arc, +} + +impl CompleteJobHandler { + pub fn new( + job_repo: Arc, + batch_repo: Arc, + event_pub: Arc, + ) -> Self { + Self { job_repo, batch_repo, event_pub } + } + + pub async fn execute(&self, cmd: CompleteJobCommand) -> Result { + let mut job = self.job_repo.find_by_id(&cmd.job_id).await? + .ok_or_else(|| DomainError::NotFound(format!("Job {} not found", cmd.job_id)))?; + job.complete(cmd.result); + self.job_repo.save(&job).await?; + if let Some(ref batch_id) = job.batch_id { + let mut batch = self.batch_repo.find_by_id(batch_id).await? + .ok_or_else(|| DomainError::NotFound(format!("Batch {} not found", batch_id)))?; + batch.record_completion(); + self.batch_repo.save(&batch).await?; + } + self.event_pub.publish(DomainEvent::JobCompleted { + job_id: job.job_id, + timestamp: DateTimeStamp::now(), + }).await?; + Ok(job) + } +} diff --git a/crates/application/src/processing/commands/configure_pipeline.rs b/crates/application/src/processing/commands/configure_pipeline.rs new file mode 100644 index 0000000..b81bf8c --- /dev/null +++ b/crates/application/src/processing/commands/configure_pipeline.rs @@ -0,0 +1,46 @@ +use std::sync::Arc; +use domain::{ + entities::ProcessingPipeline, + errors::DomainError, + ports::{PipelineRepository, PluginRepository}, + value_objects::{StructuredData, SystemId}, +}; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct PipelineStepConfig { + pub plugin_id: SystemId, + pub config: StructuredData, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct ConfigurePipelineCommand { + pub trigger_event: String, + pub steps: Vec, +} + +pub struct ConfigurePipelineHandler { + pipeline_repo: Arc, + plugin_repo: Arc, +} + +impl ConfigurePipelineHandler { + pub fn new( + pipeline_repo: Arc, + plugin_repo: Arc, + ) -> Self { + Self { pipeline_repo, plugin_repo } + } + + pub async fn execute(&self, cmd: ConfigurePipelineCommand) -> Result { + for step in &cmd.steps { + self.plugin_repo.find_by_id(&step.plugin_id).await? + .ok_or_else(|| DomainError::NotFound(format!("Plugin {} not found", step.plugin_id)))?; + } + let mut pipeline = ProcessingPipeline::new(cmd.trigger_event); + for step in cmd.steps { + pipeline.add_step(step.plugin_id, step.config); + } + self.pipeline_repo.save(&pipeline).await?; + Ok(pipeline) + } +} diff --git a/crates/application/src/processing/commands/enqueue_job.rs b/crates/application/src/processing/commands/enqueue_job.rs new file mode 100644 index 0000000..4d957d9 --- /dev/null +++ b/crates/application/src/processing/commands/enqueue_job.rs @@ -0,0 +1,45 @@ +use std::sync::Arc; +use domain::{ + entities::{Job, JobType}, + errors::DomainError, + events::DomainEvent, + ports::{EventPublisher, JobRepository}, + value_objects::{DateTimeStamp, StructuredData, SystemId}, +}; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct EnqueueJobCommand { + pub job_type: JobType, + pub priority: u32, + pub payload: StructuredData, + pub target_asset_id: Option, + pub batch_id: Option, +} + +pub struct EnqueueJobHandler { + job_repo: Arc, + event_pub: Arc, +} + +impl EnqueueJobHandler { + pub fn new(job_repo: Arc, event_pub: Arc) -> Self { + Self { job_repo, event_pub } + } + + pub async fn execute(&self, cmd: EnqueueJobCommand) -> Result { + let mut job = Job::new(cmd.job_type.clone(), cmd.priority, cmd.payload); + if let Some(id) = cmd.target_asset_id { + job = job.with_target(id); + } + if let Some(id) = cmd.batch_id { + job = job.with_batch(id); + } + self.job_repo.save(&job).await?; + self.event_pub.publish(DomainEvent::JobEnqueued { + job_id: job.job_id, + job_type: format!("{:?}", cmd.job_type), + timestamp: DateTimeStamp::now(), + }).await?; + Ok(job) + } +} diff --git a/crates/application/src/processing/commands/fail_job.rs b/crates/application/src/processing/commands/fail_job.rs new file mode 100644 index 0000000..a1d3875 --- /dev/null +++ b/crates/application/src/processing/commands/fail_job.rs @@ -0,0 +1,57 @@ +use std::sync::Arc; +use domain::{ + entities::{Job, JobStatus}, + errors::DomainError, + events::DomainEvent, + ports::{EventPublisher, JobBatchRepository, JobRepository}, + value_objects::{DateTimeStamp, SystemId}, +}; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct FailJobCommand { + pub job_id: SystemId, + pub error: String, +} + +pub struct FailJobHandler { + job_repo: Arc, + batch_repo: Arc, + event_pub: Arc, +} + +impl FailJobHandler { + pub fn new( + job_repo: Arc, + batch_repo: Arc, + event_pub: Arc, + ) -> Self { + Self { job_repo, batch_repo, event_pub } + } + + pub async fn execute(&self, cmd: FailJobCommand) -> Result { + let mut job = self.job_repo.find_by_id(&cmd.job_id).await? + .ok_or_else(|| DomainError::NotFound(format!("Job {} not found", cmd.job_id)))?; + job.fail(&cmd.error); + self.job_repo.save(&job).await?; + if job.status == JobStatus::Failed { + if let Some(ref batch_id) = job.batch_id { + let mut batch = self.batch_repo.find_by_id(batch_id).await? + .ok_or_else(|| DomainError::NotFound(format!("Batch {} not found", batch_id)))?; + batch.record_failure(); + self.batch_repo.save(&batch).await?; + } + self.event_pub.publish(DomainEvent::JobFailed { + job_id: job.job_id, + error: cmd.error, + timestamp: DateTimeStamp::now(), + }).await?; + } else if job.status == JobStatus::Queued { + self.event_pub.publish(DomainEvent::JobEnqueued { + job_id: job.job_id, + job_type: format!("{:?}", job.job_type), + timestamp: DateTimeStamp::now(), + }).await?; + } + Ok(job) + } +} diff --git a/crates/application/src/processing/commands/manage_plugin.rs b/crates/application/src/processing/commands/manage_plugin.rs new file mode 100644 index 0000000..60dd21e --- /dev/null +++ b/crates/application/src/processing/commands/manage_plugin.rs @@ -0,0 +1,63 @@ +use std::sync::Arc; +use domain::{ + entities::{Plugin, PluginType}, + errors::DomainError, + ports::PluginRepository, + value_objects::{StructuredData, SystemId}, +}; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum PluginAction { + Create { + name: String, + plugin_type: PluginType, + config: StructuredData, + }, + Enable, + Disable, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct ManagePluginCommand { + pub plugin_id: Option, + pub action: PluginAction, +} + +pub struct ManagePluginHandler { + plugin_repo: Arc, +} + +impl ManagePluginHandler { + pub fn new(plugin_repo: Arc) -> Self { + Self { plugin_repo } + } + + pub async fn execute(&self, cmd: ManagePluginCommand) -> Result { + match cmd.action { + PluginAction::Create { name, plugin_type, config } => { + let mut plugin = Plugin::new(name, plugin_type); + plugin.configuration = config; + self.plugin_repo.save(&plugin).await?; + Ok(plugin) + } + PluginAction::Enable => { + let id = cmd.plugin_id + .ok_or_else(|| DomainError::Validation("plugin_id required for Enable".into()))?; + let mut plugin = self.plugin_repo.find_by_id(&id).await? + .ok_or_else(|| DomainError::NotFound(format!("Plugin {} not found", id)))?; + plugin.enable(); + self.plugin_repo.save(&plugin).await?; + Ok(plugin) + } + PluginAction::Disable => { + let id = cmd.plugin_id + .ok_or_else(|| DomainError::Validation("plugin_id required for Disable".into()))?; + let mut plugin = self.plugin_repo.find_by_id(&id).await? + .ok_or_else(|| DomainError::NotFound(format!("Plugin {} not found", id)))?; + plugin.disable(); + self.plugin_repo.save(&plugin).await?; + Ok(plugin) + } + } + } +} diff --git a/crates/application/src/processing/commands/mod.rs b/crates/application/src/processing/commands/mod.rs new file mode 100644 index 0000000..9c2ae73 --- /dev/null +++ b/crates/application/src/processing/commands/mod.rs @@ -0,0 +1,6 @@ +pub mod enqueue_job; +pub mod start_job; +pub mod complete_job; +pub mod fail_job; +pub mod manage_plugin; +pub mod configure_pipeline; diff --git a/crates/application/src/processing/commands/start_job.rs b/crates/application/src/processing/commands/start_job.rs new file mode 100644 index 0000000..8b8dffa --- /dev/null +++ b/crates/application/src/processing/commands/start_job.rs @@ -0,0 +1,30 @@ +use std::sync::Arc; +use domain::{ + entities::Job, + errors::DomainError, + ports::JobRepository, + value_objects::SystemId, +}; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct StartJobCommand { + pub job_id: SystemId, +} + +pub struct StartJobHandler { + job_repo: Arc, +} + +impl StartJobHandler { + pub fn new(job_repo: Arc) -> Self { + Self { job_repo } + } + + pub async fn execute(&self, cmd: StartJobCommand) -> Result { + let mut job = self.job_repo.find_by_id(&cmd.job_id).await? + .ok_or_else(|| DomainError::NotFound(format!("Job {} not found", cmd.job_id)))?; + job.start()?; + self.job_repo.save(&job).await?; + Ok(job) + } +} diff --git a/crates/application/src/processing/mod.rs b/crates/application/src/processing/mod.rs index eec5f1e..0c8495a 100644 --- a/crates/application/src/processing/mod.rs +++ b/crates/application/src/processing/mod.rs @@ -1 +1,10 @@ -// Processing commands/queries (future: EnqueueJob, ProcessBatch, etc.) +pub mod commands; +pub mod queries; + +pub use commands::enqueue_job::{EnqueueJobCommand, EnqueueJobHandler}; +pub use commands::start_job::{StartJobCommand, StartJobHandler}; +pub use commands::complete_job::{CompleteJobCommand, CompleteJobHandler}; +pub use commands::fail_job::{FailJobCommand, FailJobHandler}; +pub use commands::manage_plugin::{ManagePluginCommand, ManagePluginHandler, PluginAction}; +pub use commands::configure_pipeline::{ConfigurePipelineCommand, ConfigurePipelineHandler, PipelineStepConfig}; +pub use queries::report_batch_progress::{ReportBatchProgressQuery, ReportBatchProgressHandler, BatchProgress}; diff --git a/crates/application/src/processing/queries/mod.rs b/crates/application/src/processing/queries/mod.rs new file mode 100644 index 0000000..affaceb --- /dev/null +++ b/crates/application/src/processing/queries/mod.rs @@ -0,0 +1 @@ +pub mod report_batch_progress; diff --git a/crates/application/src/processing/queries/report_batch_progress.rs b/crates/application/src/processing/queries/report_batch_progress.rs new file mode 100644 index 0000000..4b22924 --- /dev/null +++ b/crates/application/src/processing/queries/report_batch_progress.rs @@ -0,0 +1,36 @@ +use std::sync::Arc; +use domain::{ + entities::{Job, JobBatch}, + errors::DomainError, + ports::{JobBatchRepository, JobRepository}, + value_objects::SystemId, +}; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct ReportBatchProgressQuery { + pub batch_id: SystemId, +} + +#[derive(Debug, Clone)] +pub struct BatchProgress { + pub batch: JobBatch, + pub jobs: Vec, +} + +pub struct ReportBatchProgressHandler { + batch_repo: Arc, + job_repo: Arc, +} + +impl ReportBatchProgressHandler { + pub fn new(batch_repo: Arc, job_repo: Arc) -> Self { + Self { batch_repo, job_repo } + } + + pub async fn execute(&self, query: ReportBatchProgressQuery) -> Result { + let batch = self.batch_repo.find_by_id(&query.batch_id).await? + .ok_or_else(|| DomainError::NotFound(format!("Batch {} not found", query.batch_id)))?; + let jobs = self.job_repo.find_by_batch(&query.batch_id).await?; + Ok(BatchProgress { batch, jobs }) + } +} diff --git a/crates/application/src/sidecar/commands/detect_external_changes.rs b/crates/application/src/sidecar/commands/detect_external_changes.rs new file mode 100644 index 0000000..50e9990 --- /dev/null +++ b/crates/application/src/sidecar/commands/detect_external_changes.rs @@ -0,0 +1,49 @@ +use std::sync::Arc; +use domain::{ + entities::SyncStatus, + errors::DomainError, + ports::{SidecarRepository, SidecarWriterPort}, +}; +use crate::sidecar::hash_helper::hash_structured_data; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct DetectExternalChangesCommand; + +pub struct DetectExternalChangesHandler { + sidecar_repo: Arc, + writer: Arc, +} + +impl DetectExternalChangesHandler { + pub fn new( + sidecar_repo: Arc, + writer: Arc, + ) -> Self { + Self { sidecar_repo, writer } + } + + pub async fn execute(&self, _cmd: DetectExternalChangesCommand) -> Result { + let records = self.sidecar_repo.find_by_status(SyncStatus::InSync).await?; + let mut changed = 0u32; + + for mut record in records { + match self.writer.read_sidecar(&record.sidecar_storage_path).await { + Ok(data) => { + let hash = hash_structured_data(&data); + let differs = record.last_known_file_hash.as_ref() != Some(&hash); + if differs { + record.mark_pending_read(); + self.sidecar_repo.save(&record).await?; + changed += 1; + } + } + Err(_) => { + record.mark_error("Sidecar file not found"); + self.sidecar_repo.save(&record).await?; + } + } + } + + Ok(changed) + } +} diff --git a/crates/application/src/sidecar/commands/export_sidecar.rs b/crates/application/src/sidecar/commands/export_sidecar.rs new file mode 100644 index 0000000..a6c9d92 --- /dev/null +++ b/crates/application/src/sidecar/commands/export_sidecar.rs @@ -0,0 +1,48 @@ +use std::sync::Arc; +use domain::{ + catalog::services::resolve_metadata, + entities::SidecarRecord, + errors::DomainError, + ports::{AssetMetadataRepository, SidecarRepository, SidecarWriterPort}, + value_objects::SystemId, +}; +use crate::sidecar::hash_helper::hash_structured_data; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct ExportSidecarCommand { + pub asset_id: SystemId, +} + +pub struct ExportSidecarHandler { + metadata_repo: Arc, + sidecar_repo: Arc, + writer: Arc, +} + +impl ExportSidecarHandler { + pub fn new( + metadata_repo: Arc, + sidecar_repo: Arc, + writer: Arc, + ) -> Self { + Self { metadata_repo, sidecar_repo, writer } + } + + pub async fn execute(&self, cmd: ExportSidecarCommand) -> Result { + let layers = self.metadata_repo.find_by_asset(&cmd.asset_id).await?; + let resolved = resolve_metadata(&layers); + + let mut record = match self.sidecar_repo.find_by_asset(&cmd.asset_id).await? { + Some(r) => r, + None => SidecarRecord::new(cmd.asset_id, format!("sidecars/{}.xmp", cmd.asset_id)), + }; + + self.writer.write_sidecar(&resolved, &record.sidecar_storage_path).await?; + + let hash = hash_structured_data(&resolved); + record.mark_synced(hash); + self.sidecar_repo.save(&record).await?; + + Ok(record) + } +} diff --git a/crates/application/src/sidecar/commands/full_export.rs b/crates/application/src/sidecar/commands/full_export.rs new file mode 100644 index 0000000..cae8718 --- /dev/null +++ b/crates/application/src/sidecar/commands/full_export.rs @@ -0,0 +1,55 @@ +use std::sync::Arc; +use domain::{ + catalog::services::resolve_metadata, + entities::SidecarRecord, + errors::DomainError, + ports::{AssetRepository, AssetMetadataRepository, SidecarRepository, SidecarWriterPort}, + value_objects::SystemId, +}; +use crate::sidecar::hash_helper::hash_structured_data; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct FullExportCommand { + pub owner_id: SystemId, +} + +pub struct FullExportHandler { + asset_repo: Arc, + metadata_repo: Arc, + sidecar_repo: Arc, + writer: Arc, +} + +impl FullExportHandler { + pub fn new( + asset_repo: Arc, + metadata_repo: Arc, + sidecar_repo: Arc, + writer: Arc, + ) -> Self { + Self { asset_repo, metadata_repo, sidecar_repo, writer } + } + + pub async fn execute(&self, cmd: FullExportCommand) -> Result { + let assets = self.asset_repo.find_by_owner(&cmd.owner_id, u32::MAX, 0).await?; + let mut count = 0u32; + + for asset in &assets { + let layers = self.metadata_repo.find_by_asset(&asset.asset_id).await?; + let resolved = resolve_metadata(&layers); + + let mut record = match self.sidecar_repo.find_by_asset(&asset.asset_id).await? { + Some(r) => r, + None => SidecarRecord::new(asset.asset_id, format!("sidecars/{}.xmp", asset.asset_id)), + }; + + self.writer.write_sidecar(&resolved, &record.sidecar_storage_path).await?; + let hash = hash_structured_data(&resolved); + record.mark_synced(hash); + self.sidecar_repo.save(&record).await?; + count += 1; + } + + Ok(count) + } +} diff --git a/crates/application/src/sidecar/commands/full_import.rs b/crates/application/src/sidecar/commands/full_import.rs new file mode 100644 index 0000000..31ead80 --- /dev/null +++ b/crates/application/src/sidecar/commands/full_import.rs @@ -0,0 +1,64 @@ +use std::sync::Arc; +use domain::{ + catalog::entities::{AssetMetadata, MetadataSource}, + entities::SidecarRecord, + errors::DomainError, + ports::{AssetRepository, AssetMetadataRepository, SidecarRepository, SidecarWriterPort}, + value_objects::SystemId, +}; +use crate::sidecar::hash_helper::hash_structured_data; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct FullImportCommand { + pub owner_id: SystemId, +} + +pub struct FullImportHandler { + asset_repo: Arc, + metadata_repo: Arc, + sidecar_repo: Arc, + writer: Arc, +} + +impl FullImportHandler { + pub fn new( + asset_repo: Arc, + metadata_repo: Arc, + sidecar_repo: Arc, + writer: Arc, + ) -> Self { + Self { asset_repo, metadata_repo, sidecar_repo, writer } + } + + pub async fn execute(&self, cmd: FullImportCommand) -> Result { + let assets = self.asset_repo.find_by_owner(&cmd.owner_id, u32::MAX, 0).await?; + let mut count = 0u32; + + for asset in &assets { + let record = match self.sidecar_repo.find_by_asset(&asset.asset_id).await? { + Some(r) => r, + None => { + // No sidecar record — try creating one to read from + SidecarRecord::new(asset.asset_id, format!("sidecars/{}.xmp", asset.asset_id)) + } + }; + + match self.writer.read_sidecar(&record.sidecar_storage_path).await { + Ok(data) => { + let metadata = AssetMetadata::new(asset.asset_id, MetadataSource::ExifExtracted, data.clone()); + self.metadata_repo.save(&metadata).await?; + let hash = hash_structured_data(&data); + let mut record = record; + record.mark_synced(hash); + self.sidecar_repo.save(&record).await?; + count += 1; + } + Err(_) => { + // Sidecar file missing — skip + } + } + } + + Ok(count) + } +} diff --git a/crates/application/src/sidecar/commands/import_sidecar.rs b/crates/application/src/sidecar/commands/import_sidecar.rs new file mode 100644 index 0000000..2d66ca7 --- /dev/null +++ b/crates/application/src/sidecar/commands/import_sidecar.rs @@ -0,0 +1,51 @@ +use std::sync::Arc; +use domain::{ + catalog::entities::{AssetMetadata, MetadataSource}, + entities::SyncStatus, + errors::DomainError, + ports::{AssetMetadataRepository, SidecarRepository, SidecarWriterPort}, + value_objects::SystemId, +}; +use crate::sidecar::hash_helper::hash_structured_data; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct ImportSidecarCommand { + pub asset_id: SystemId, +} + +pub struct ImportSidecarHandler { + sidecar_repo: Arc, + writer: Arc, + metadata_repo: Arc, +} + +impl ImportSidecarHandler { + pub fn new( + sidecar_repo: Arc, + writer: Arc, + metadata_repo: Arc, + ) -> Self { + Self { sidecar_repo, writer, metadata_repo } + } + + pub async fn execute(&self, cmd: ImportSidecarCommand) -> Result { + let mut record = self.sidecar_repo.find_by_asset(&cmd.asset_id).await? + .ok_or_else(|| DomainError::NotFound(format!("Sidecar record for {} not found", cmd.asset_id)))?; + + if record.sync_status != SyncStatus::PendingRead { + return Err(DomainError::Validation( + format!("Sidecar is not pending read (status: {:?})", record.sync_status), + )); + } + + let data = self.writer.read_sidecar(&record.sidecar_storage_path).await?; + let metadata = AssetMetadata::new(cmd.asset_id, MetadataSource::ExifExtracted, data.clone()); + self.metadata_repo.save(&metadata).await?; + + let hash = hash_structured_data(&data); + record.mark_synced(hash); + self.sidecar_repo.save(&record).await?; + + Ok(metadata) + } +} diff --git a/crates/application/src/sidecar/commands/mod.rs b/crates/application/src/sidecar/commands/mod.rs new file mode 100644 index 0000000..f468171 --- /dev/null +++ b/crates/application/src/sidecar/commands/mod.rs @@ -0,0 +1,6 @@ +pub mod export_sidecar; +pub mod detect_external_changes; +pub mod import_sidecar; +pub mod resolve_conflict; +pub mod full_export; +pub mod full_import; diff --git a/crates/application/src/sidecar/commands/resolve_conflict.rs b/crates/application/src/sidecar/commands/resolve_conflict.rs new file mode 100644 index 0000000..fe2f8ef --- /dev/null +++ b/crates/application/src/sidecar/commands/resolve_conflict.rs @@ -0,0 +1,66 @@ +use std::sync::Arc; +use domain::{ + catalog::entities::{AssetMetadata, MetadataSource}, + catalog::services::resolve_metadata, + entities::{ConflictPolicy, SidecarRecord, SyncStatus}, + errors::DomainError, + ports::{AssetMetadataRepository, SidecarRepository, SidecarWriterPort}, + value_objects::SystemId, +}; +use crate::sidecar::hash_helper::hash_structured_data; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct ResolveConflictCommand { + pub asset_id: SystemId, + pub policy: ConflictPolicy, +} + +pub struct ResolveConflictHandler { + sidecar_repo: Arc, + writer: Arc, + metadata_repo: Arc, +} + +impl ResolveConflictHandler { + pub fn new( + sidecar_repo: Arc, + writer: Arc, + metadata_repo: Arc, + ) -> Self { + Self { sidecar_repo, writer, metadata_repo } + } + + pub async fn execute(&self, cmd: ResolveConflictCommand) -> Result { + let mut record = self.sidecar_repo.find_by_asset(&cmd.asset_id).await? + .ok_or_else(|| DomainError::NotFound(format!("Sidecar record for {} not found", cmd.asset_id)))?; + + if record.sync_status != SyncStatus::Conflict { + return Err(DomainError::Validation( + format!("Sidecar is not in conflict (status: {:?})", record.sync_status), + )); + } + + match cmd.policy { + ConflictPolicy::DbWins => { + let layers = self.metadata_repo.find_by_asset(&cmd.asset_id).await?; + let resolved = resolve_metadata(&layers); + self.writer.write_sidecar(&resolved, &record.sidecar_storage_path).await?; + let hash = hash_structured_data(&resolved); + record.mark_synced(hash); + } + ConflictPolicy::FileWins => { + let data = self.writer.read_sidecar(&record.sidecar_storage_path).await?; + let metadata = AssetMetadata::new(cmd.asset_id, MetadataSource::ExifExtracted, data.clone()); + self.metadata_repo.save(&metadata).await?; + let hash = hash_structured_data(&data); + record.mark_synced(hash); + } + ConflictPolicy::RequireUserDecision => { + return Err(DomainError::Validation("Manual resolution required".to_string())); + } + } + + self.sidecar_repo.save(&record).await?; + Ok(record) + } +} diff --git a/crates/application/src/sidecar/hash_helper.rs b/crates/application/src/sidecar/hash_helper.rs new file mode 100644 index 0000000..ac2847b --- /dev/null +++ b/crates/application/src/sidecar/hash_helper.rs @@ -0,0 +1,11 @@ +use domain::value_objects::{Checksum, StructuredData}; +use sha2::{Sha256, Digest}; + +pub fn hash_structured_data(data: &StructuredData) -> Checksum { + let json = serde_json::to_string(data).unwrap_or_default(); + let mut hasher = Sha256::new(); + hasher.update(json.as_bytes()); + let result = hasher.finalize(); + let hex = format!("{:x}", result); + Checksum::new(hex).expect("SHA-256 always produces valid 64-char hex") +} diff --git a/crates/application/src/sidecar/mod.rs b/crates/application/src/sidecar/mod.rs index f2c54f4..444b792 100644 --- a/crates/application/src/sidecar/mod.rs +++ b/crates/application/src/sidecar/mod.rs @@ -1 +1,9 @@ -// Sidecar commands/queries (future: SyncSidecar, ExportMetadata, etc.) +pub mod commands; +pub mod hash_helper; + +pub use commands::export_sidecar::{ExportSidecarCommand, ExportSidecarHandler}; +pub use commands::detect_external_changes::{DetectExternalChangesCommand, DetectExternalChangesHandler}; +pub use commands::import_sidecar::{ImportSidecarCommand, ImportSidecarHandler}; +pub use commands::resolve_conflict::{ResolveConflictCommand, ResolveConflictHandler}; +pub use commands::full_export::{FullExportCommand, FullExportHandler}; +pub use commands::full_import::{FullImportCommand, FullImportHandler}; diff --git a/crates/application/src/testing/fakes.rs b/crates/application/src/testing/fakes.rs index 57bd33f..3dea0c0 100644 --- a/crates/application/src/testing/fakes.rs +++ b/crates/application/src/testing/fakes.rs @@ -109,6 +109,41 @@ impl SidecarWriterPort for StubSidecarWriter { } } +// --- InMemorySidecarWriter --- + +pub struct InMemorySidecarWriter { + data: Mutex>, +} + +impl InMemorySidecarWriter { + pub fn new() -> Self { + Self { data: Mutex::new(HashMap::new()) } + } + + pub async fn get(&self, path: &str) -> Option { + self.data.lock().await.get(path).cloned() + } +} + +impl Default for InMemorySidecarWriter { + fn default() -> Self { Self::new() } +} + +#[async_trait] +impl SidecarWriterPort for InMemorySidecarWriter { + fn format_name(&self) -> &str { "in-memory" } + + async fn write_sidecar(&self, data: &StructuredData, path: &str) -> Result<(), DomainError> { + self.data.lock().await.insert(path.to_string(), data.clone()); + Ok(()) + } + + async fn read_sidecar(&self, path: &str) -> Result { + self.data.lock().await.get(path).cloned() + .ok_or_else(|| DomainError::NotFound(format!("Sidecar not found: {path}"))) + } +} + // --- StubPasswordHasher --- pub struct StubPasswordHasher; diff --git a/crates/application/src/testing/repositories.rs b/crates/application/src/testing/repositories.rs index 66edd6a..289d00b 100644 --- a/crates/application/src/testing/repositories.rs +++ b/crates/application/src/testing/repositories.rs @@ -4,16 +4,18 @@ use tokio::sync::Mutex; use domain::{ entities::{ Album, Asset, AssetMetadata, AssetTag, DuplicateGroup, DuplicateStatus, - Group, IngestSession, InviteCode, Job, JobStatus, LibraryPath, - MetadataSource, QuotaDefinition, Role, ShareLink, ShareScope, ShareTarget, + Group, IngestSession, InviteCode, Job, JobBatch, JobStatus, LibraryPath, + MetadataSource, Plugin, ProcessingPipeline, QuotaDefinition, Role, + ShareLink, ShareScope, ShareTarget, SidecarRecord, SyncStatus, StorageVolume, Tag, UsageLedgerEntry, UsageType, User, }, errors::DomainError, ports::{ AlbumRepository, AssetMetadataRepository, AssetRepository, DuplicateRepository, GroupRepository, IngestSessionRepository, - JobRepository, LibraryPathRepository, QuotaRepository, - RoleRepository, ShareRepository, StorageVolumeRepository, + JobBatchRepository, JobRepository, LibraryPathRepository, + PipelineRepository, PluginRepository, QuotaRepository, + RoleRepository, ShareRepository, SidecarRepository, StorageVolumeRepository, TagRepository, UsageLedgerRepository, UserRepository, }, value_objects::{Checksum, DateTimeStamp, Email, SystemId}, @@ -716,3 +718,141 @@ impl DuplicateRepository for InMemoryDuplicateRepository { Ok(()) } } + +// --- InMemorySidecarRepository --- + +pub struct InMemorySidecarRepository { + data: Mutex>, +} + +impl InMemorySidecarRepository { + pub fn new() -> Self { + Self { data: Mutex::new(HashMap::new()) } + } +} + +impl Default for InMemorySidecarRepository { + fn default() -> Self { Self::new() } +} + +#[async_trait] +impl SidecarRepository for InMemorySidecarRepository { + async fn find_by_asset(&self, asset_id: &SystemId) -> Result, DomainError> { + Ok(self.data.lock().await.get(&asset_id.to_string()).cloned()) + } + + async fn find_by_status(&self, status: SyncStatus) -> Result, DomainError> { + Ok(self.data.lock().await.values() + .filter(|r| r.sync_status == status) + .cloned() + .collect()) + } + + async fn save(&self, record: &SidecarRecord) -> Result<(), DomainError> { + self.data.lock().await.insert(record.asset_id.to_string(), record.clone()); + Ok(()) + } + + async fn delete(&self, asset_id: &SystemId) -> Result<(), DomainError> { + self.data.lock().await.remove(&asset_id.to_string()); + Ok(()) + } +} + +// --- InMemoryJobBatchRepository --- + +pub struct InMemoryJobBatchRepository { + data: Mutex>, +} + +impl InMemoryJobBatchRepository { + pub fn new() -> Self { + Self { data: Mutex::new(HashMap::new()) } + } +} + +impl Default for InMemoryJobBatchRepository { + fn default() -> Self { Self::new() } +} + +#[async_trait] +impl JobBatchRepository for InMemoryJobBatchRepository { + async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { + Ok(self.data.lock().await.get(&id.to_string()).cloned()) + } + + async fn save(&self, batch: &JobBatch) -> Result<(), DomainError> { + self.data.lock().await.insert(batch.batch_id.to_string(), batch.clone()); + Ok(()) + } +} + +// --- InMemoryPluginRepository --- + +pub struct InMemoryPluginRepository { + data: Mutex>, +} + +impl InMemoryPluginRepository { + pub fn new() -> Self { + Self { data: Mutex::new(HashMap::new()) } + } +} + +impl Default for InMemoryPluginRepository { + fn default() -> Self { Self::new() } +} + +#[async_trait] +impl PluginRepository for InMemoryPluginRepository { + async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { + Ok(self.data.lock().await.get(&id.to_string()).cloned()) + } + + async fn find_enabled(&self) -> Result, DomainError> { + Ok(self.data.lock().await.values() + .filter(|p| p.is_enabled) + .cloned() + .collect()) + } + + async fn save(&self, plugin: &Plugin) -> Result<(), DomainError> { + self.data.lock().await.insert(plugin.plugin_id.to_string(), plugin.clone()); + Ok(()) + } +} + +// --- InMemoryPipelineRepository --- + +pub struct InMemoryPipelineRepository { + data: Mutex>, +} + +impl InMemoryPipelineRepository { + pub fn new() -> Self { + Self { data: Mutex::new(HashMap::new()) } + } +} + +impl Default for InMemoryPipelineRepository { + fn default() -> Self { Self::new() } +} + +#[async_trait] +impl PipelineRepository for InMemoryPipelineRepository { + async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { + Ok(self.data.lock().await.get(&id.to_string()).cloned()) + } + + async fn find_by_trigger(&self, event: &str) -> Result, DomainError> { + Ok(self.data.lock().await.values() + .filter(|p| p.trigger_event == event) + .cloned() + .collect()) + } + + async fn save(&self, pipeline: &ProcessingPipeline) -> Result<(), DomainError> { + self.data.lock().await.insert(pipeline.pipeline_id.to_string(), pipeline.clone()); + Ok(()) + } +} diff --git a/crates/application/tests/app_tests.rs b/crates/application/tests/app_tests.rs index bbbcaef..a98bce2 100644 --- a/crates/application/tests/app_tests.rs +++ b/crates/application/tests/app_tests.rs @@ -3,3 +3,5 @@ mod organization; mod storage; mod catalog; mod sharing; +mod sidecar; +mod processing; diff --git a/crates/application/tests/processing/commands/complete_job.rs b/crates/application/tests/processing/commands/complete_job.rs new file mode 100644 index 0000000..46bc060 --- /dev/null +++ b/crates/application/tests/processing/commands/complete_job.rs @@ -0,0 +1,76 @@ +use std::sync::Arc; +use application::testing::{InMemoryJobBatchRepository, InMemoryJobRepository, StubEventPublisher}; +use application::processing::{CompleteJobCommand, CompleteJobHandler}; +use domain::entities::{Job, JobBatch, JobStatus, JobType}; +use domain::events::DomainEvent; +use domain::ports::{JobBatchRepository, JobRepository}; +use domain::value_objects::StructuredData; + +#[tokio::test] +async fn completes_job() { + let job_repo = Arc::new(InMemoryJobRepository::new()); + let batch_repo = Arc::new(InMemoryJobBatchRepository::new()); + let event_pub = Arc::new(StubEventPublisher::new()); + + let mut job = Job::new(JobType::ExtractMetadata, 5, StructuredData::new()); + job.start().unwrap(); + let job_id = job.job_id; + job_repo.save(&job).await.unwrap(); + + let handler = CompleteJobHandler::new(job_repo.clone(), batch_repo.clone(), event_pub.clone()); + let result = handler.execute(CompleteJobCommand { + job_id, + result: StructuredData::new(), + }).await.unwrap(); + + assert_eq!(result.status, JobStatus::Completed); + assert!(result.result_data.is_some()); +} + +#[tokio::test] +async fn completes_job_and_updates_batch() { + let job_repo = Arc::new(InMemoryJobRepository::new()); + let batch_repo = Arc::new(InMemoryJobBatchRepository::new()); + let event_pub = Arc::new(StubEventPublisher::new()); + + let batch = JobBatch::new("test-batch", 2); + let batch_id = batch.batch_id; + batch_repo.save(&batch).await.unwrap(); + + let mut job = Job::new(JobType::ExtractMetadata, 5, StructuredData::new()) + .with_batch(batch_id); + job.start().unwrap(); + let job_id = job.job_id; + job_repo.save(&job).await.unwrap(); + + let handler = CompleteJobHandler::new(job_repo.clone(), batch_repo.clone(), event_pub.clone()); + handler.execute(CompleteJobCommand { + job_id, + result: StructuredData::new(), + }).await.unwrap(); + + let updated_batch = batch_repo.find_by_id(&batch_id).await.unwrap().unwrap(); + assert_eq!(updated_batch.completed_count, 1); +} + +#[tokio::test] +async fn publishes_event() { + let job_repo = Arc::new(InMemoryJobRepository::new()); + let batch_repo = Arc::new(InMemoryJobBatchRepository::new()); + let event_pub = Arc::new(StubEventPublisher::new()); + + let mut job = Job::new(JobType::ExtractMetadata, 5, StructuredData::new()); + job.start().unwrap(); + let job_id = job.job_id; + job_repo.save(&job).await.unwrap(); + + let handler = CompleteJobHandler::new(job_repo.clone(), batch_repo.clone(), event_pub.clone()); + handler.execute(CompleteJobCommand { + job_id, + result: StructuredData::new(), + }).await.unwrap(); + + let events = event_pub.published().await; + assert_eq!(events.len(), 1); + assert!(matches!(&events[0], DomainEvent::JobCompleted { job_id: id, .. } if *id == job_id)); +} diff --git a/crates/application/tests/processing/commands/configure_pipeline.rs b/crates/application/tests/processing/commands/configure_pipeline.rs new file mode 100644 index 0000000..ad6f580 --- /dev/null +++ b/crates/application/tests/processing/commands/configure_pipeline.rs @@ -0,0 +1,48 @@ +use std::sync::Arc; +use application::testing::{InMemoryPipelineRepository, InMemoryPluginRepository}; +use application::processing::{ConfigurePipelineCommand, ConfigurePipelineHandler, PipelineStepConfig}; +use domain::entities::{Plugin, PluginType}; +use domain::errors::DomainError; +use domain::ports::PluginRepository; +use domain::value_objects::{StructuredData, SystemId}; + +#[tokio::test] +async fn creates_pipeline() { + let pipeline_repo = Arc::new(InMemoryPipelineRepository::new()); + let plugin_repo = Arc::new(InMemoryPluginRepository::new()); + + let p1 = Plugin::new("EXIF", PluginType::MediaProcessor); + let p2 = Plugin::new("Thumb", PluginType::MediaProcessor); + let p1_id = p1.plugin_id; + let p2_id = p2.plugin_id; + plugin_repo.save(&p1).await.unwrap(); + plugin_repo.save(&p2).await.unwrap(); + + let handler = ConfigurePipelineHandler::new(pipeline_repo.clone(), plugin_repo.clone()); + let pipeline = handler.execute(ConfigurePipelineCommand { + trigger_event: "asset.ingested".into(), + steps: vec![ + PipelineStepConfig { plugin_id: p1_id, config: StructuredData::new() }, + PipelineStepConfig { plugin_id: p2_id, config: StructuredData::new() }, + ], + }).await.unwrap(); + + assert_eq!(pipeline.trigger_event, "asset.ingested"); + assert_eq!(pipeline.steps.len(), 2); +} + +#[tokio::test] +async fn rejects_nonexistent_plugin() { + let pipeline_repo = Arc::new(InMemoryPipelineRepository::new()); + let plugin_repo = Arc::new(InMemoryPluginRepository::new()); + + let handler = ConfigurePipelineHandler::new(pipeline_repo.clone(), plugin_repo.clone()); + let result = handler.execute(ConfigurePipelineCommand { + trigger_event: "asset.ingested".into(), + steps: vec![ + PipelineStepConfig { plugin_id: SystemId::new(), config: StructuredData::new() }, + ], + }).await; + + assert!(matches!(result, Err(DomainError::NotFound(_)))); +} diff --git a/crates/application/tests/processing/commands/enqueue_job.rs b/crates/application/tests/processing/commands/enqueue_job.rs new file mode 100644 index 0000000..a3607f5 --- /dev/null +++ b/crates/application/tests/processing/commands/enqueue_job.rs @@ -0,0 +1,65 @@ +use std::sync::Arc; +use application::testing::{InMemoryJobRepository, StubEventPublisher}; +use application::processing::{EnqueueJobCommand, EnqueueJobHandler}; +use domain::entities::{JobStatus, JobType}; +use domain::events::DomainEvent; +use domain::value_objects::{StructuredData, SystemId}; + +#[tokio::test] +async fn enqueues_job() { + let job_repo = Arc::new(InMemoryJobRepository::new()); + let event_pub = Arc::new(StubEventPublisher::new()); + let handler = EnqueueJobHandler::new(job_repo.clone(), event_pub.clone()); + + let job = handler.execute(EnqueueJobCommand { + job_type: JobType::ExtractMetadata, + priority: 5, + payload: StructuredData::new(), + target_asset_id: None, + batch_id: None, + }).await.unwrap(); + + assert_eq!(job.status, JobStatus::Queued); + assert_eq!(job.priority, 5); + assert!(job.target_asset_id.is_none()); + assert!(job.batch_id.is_none()); +} + +#[tokio::test] +async fn enqueues_with_target_and_batch() { + let job_repo = Arc::new(InMemoryJobRepository::new()); + let event_pub = Arc::new(StubEventPublisher::new()); + let handler = EnqueueJobHandler::new(job_repo.clone(), event_pub.clone()); + + let target = SystemId::new(); + let batch = SystemId::new(); + let job = handler.execute(EnqueueJobCommand { + job_type: JobType::GenerateDerivative, + priority: 10, + payload: StructuredData::new(), + target_asset_id: Some(target), + batch_id: Some(batch), + }).await.unwrap(); + + assert_eq!(job.target_asset_id, Some(target)); + assert_eq!(job.batch_id, Some(batch)); +} + +#[tokio::test] +async fn publishes_event() { + let job_repo = Arc::new(InMemoryJobRepository::new()); + let event_pub = Arc::new(StubEventPublisher::new()); + let handler = EnqueueJobHandler::new(job_repo.clone(), event_pub.clone()); + + let job = handler.execute(EnqueueJobCommand { + job_type: JobType::ScanDirectory, + priority: 1, + payload: StructuredData::new(), + target_asset_id: None, + batch_id: None, + }).await.unwrap(); + + let events = event_pub.published().await; + assert_eq!(events.len(), 1); + assert!(matches!(&events[0], DomainEvent::JobEnqueued { job_id, .. } if *job_id == job.job_id)); +} diff --git a/crates/application/tests/processing/commands/fail_job.rs b/crates/application/tests/processing/commands/fail_job.rs new file mode 100644 index 0000000..9670c27 --- /dev/null +++ b/crates/application/tests/processing/commands/fail_job.rs @@ -0,0 +1,95 @@ +use std::sync::Arc; +use application::testing::{InMemoryJobBatchRepository, InMemoryJobRepository, StubEventPublisher}; +use application::processing::{FailJobCommand, FailJobHandler}; +use domain::entities::{Job, JobBatch, JobStatus, JobType}; +use domain::events::DomainEvent; +use domain::ports::{JobBatchRepository, JobRepository}; +use domain::value_objects::StructuredData; + +fn make_handler( + job_repo: Arc, + batch_repo: Arc, + event_pub: Arc, +) -> FailJobHandler { + FailJobHandler::new(job_repo, batch_repo, event_pub) +} + +#[tokio::test] +async fn retries_on_failure() { + let job_repo = Arc::new(InMemoryJobRepository::new()); + let batch_repo = Arc::new(InMemoryJobBatchRepository::new()); + let event_pub = Arc::new(StubEventPublisher::new()); + + let job = Job::new(JobType::ExtractMetadata, 5, StructuredData::new()); + let job_id = job.job_id; + assert_eq!(job.retry_count, 0); + job_repo.save(&job).await.unwrap(); + + let handler = make_handler(job_repo.clone(), batch_repo.clone(), event_pub.clone()); + let result = handler.execute(FailJobCommand { + job_id, + error: "transient error".into(), + }).await.unwrap(); + + assert_eq!(result.status, JobStatus::Queued); + assert_eq!(result.retry_count, 1); + + let events = event_pub.published().await; + assert!(matches!(&events[0], DomainEvent::JobEnqueued { .. })); +} + +#[tokio::test] +async fn fails_permanently_after_max_retries() { + let job_repo = Arc::new(InMemoryJobRepository::new()); + let batch_repo = Arc::new(InMemoryJobBatchRepository::new()); + let event_pub = Arc::new(StubEventPublisher::new()); + + let mut job = Job::new(JobType::ExtractMetadata, 5, StructuredData::new()); + // Exhaust retries (max_retries=3, so fail 3 times) + job.fail("err1"); + job.fail("err2"); + assert_eq!(job.retry_count, 2); + assert_eq!(job.status, JobStatus::Queued); // still retryable + let job_id = job.job_id; + job_repo.save(&job).await.unwrap(); + + let handler = make_handler(job_repo.clone(), batch_repo.clone(), event_pub.clone()); + let result = handler.execute(FailJobCommand { + job_id, + error: "fatal".into(), + }).await.unwrap(); + + assert_eq!(result.status, JobStatus::Failed); + assert_eq!(result.retry_count, 3); + + let events = event_pub.published().await; + assert!(matches!(&events[0], DomainEvent::JobFailed { .. })); +} + +#[tokio::test] +async fn updates_batch_on_permanent_failure() { + let job_repo = Arc::new(InMemoryJobRepository::new()); + let batch_repo = Arc::new(InMemoryJobBatchRepository::new()); + let event_pub = Arc::new(StubEventPublisher::new()); + + let batch = JobBatch::new("test-batch", 2); + let batch_id = batch.batch_id; + batch_repo.save(&batch).await.unwrap(); + + let mut job = Job::new(JobType::ExtractMetadata, 5, StructuredData::new()) + .with_batch(batch_id); + // Exhaust retries + job.fail("err1"); + job.fail("err2"); + let job_id = job.job_id; + job_repo.save(&job).await.unwrap(); + + let handler = make_handler(job_repo.clone(), batch_repo.clone(), event_pub.clone()); + handler.execute(FailJobCommand { + job_id, + error: "permanent failure".into(), + }).await.unwrap(); + + let updated_batch = batch_repo.find_by_id(&batch_id).await.unwrap().unwrap(); + assert_eq!(updated_batch.failed_count, 1); +} diff --git a/crates/application/tests/processing/commands/manage_plugin.rs b/crates/application/tests/processing/commands/manage_plugin.rs new file mode 100644 index 0000000..98ada9a --- /dev/null +++ b/crates/application/tests/processing/commands/manage_plugin.rs @@ -0,0 +1,61 @@ +use std::sync::Arc; +use application::testing::InMemoryPluginRepository; +use application::processing::{ManagePluginCommand, ManagePluginHandler, PluginAction}; +use domain::entities::{Plugin, PluginType}; +use domain::ports::PluginRepository; +use domain::value_objects::StructuredData; + +#[tokio::test] +async fn creates_plugin() { + let plugin_repo = Arc::new(InMemoryPluginRepository::new()); + let handler = ManagePluginHandler::new(plugin_repo.clone()); + + let plugin = handler.execute(ManagePluginCommand { + plugin_id: None, + action: PluginAction::Create { + name: "EXIF Extractor".into(), + plugin_type: PluginType::MediaProcessor, + config: StructuredData::new(), + }, + }).await.unwrap(); + + assert_eq!(plugin.name, "EXIF Extractor"); + assert_eq!(plugin.plugin_type, PluginType::MediaProcessor); + assert!(plugin.is_enabled); + + let saved = plugin_repo.find_by_id(&plugin.plugin_id).await.unwrap(); + assert!(saved.is_some()); +} + +#[tokio::test] +async fn enables_plugin() { + let plugin_repo = Arc::new(InMemoryPluginRepository::new()); + let mut plugin = Plugin::new("Test", PluginType::ScheduledTask); + plugin.disable(); + let plugin_id = plugin.plugin_id; + plugin_repo.save(&plugin).await.unwrap(); + + let handler = ManagePluginHandler::new(plugin_repo.clone()); + let result = handler.execute(ManagePluginCommand { + plugin_id: Some(plugin_id), + action: PluginAction::Enable, + }).await.unwrap(); + + assert!(result.is_enabled); +} + +#[tokio::test] +async fn disables_plugin() { + let plugin_repo = Arc::new(InMemoryPluginRepository::new()); + let plugin = Plugin::new("Test", PluginType::SidecarWriter); + let plugin_id = plugin.plugin_id; + plugin_repo.save(&plugin).await.unwrap(); + + let handler = ManagePluginHandler::new(plugin_repo.clone()); + let result = handler.execute(ManagePluginCommand { + plugin_id: Some(plugin_id), + action: PluginAction::Disable, + }).await.unwrap(); + + assert!(!result.is_enabled); +} diff --git a/crates/application/tests/processing/commands/mod.rs b/crates/application/tests/processing/commands/mod.rs new file mode 100644 index 0000000..c0ca8cf --- /dev/null +++ b/crates/application/tests/processing/commands/mod.rs @@ -0,0 +1,6 @@ +mod enqueue_job; +mod start_job; +mod complete_job; +mod fail_job; +mod manage_plugin; +mod configure_pipeline; diff --git a/crates/application/tests/processing/commands/start_job.rs b/crates/application/tests/processing/commands/start_job.rs new file mode 100644 index 0000000..3ef12a3 --- /dev/null +++ b/crates/application/tests/processing/commands/start_job.rs @@ -0,0 +1,35 @@ +use std::sync::Arc; +use application::testing::InMemoryJobRepository; +use application::processing::{StartJobCommand, StartJobHandler}; +use domain::entities::{Job, JobStatus, JobType}; +use domain::errors::DomainError; +use domain::ports::JobRepository; +use domain::value_objects::StructuredData; + +#[tokio::test] +async fn starts_queued_job() { + let job_repo = Arc::new(InMemoryJobRepository::new()); + let job = Job::new(JobType::ExtractMetadata, 5, StructuredData::new()); + let job_id = job.job_id; + job_repo.save(&job).await.unwrap(); + + let handler = StartJobHandler::new(job_repo.clone()); + let result = handler.execute(StartJobCommand { job_id }).await.unwrap(); + + assert_eq!(result.status, JobStatus::Processing); + assert!(result.started_at.is_some()); +} + +#[tokio::test] +async fn rejects_non_queued_job() { + let job_repo = Arc::new(InMemoryJobRepository::new()); + let mut job = Job::new(JobType::ExtractMetadata, 5, StructuredData::new()); + job.start().unwrap(); // now Processing + let job_id = job.job_id; + job_repo.save(&job).await.unwrap(); + + let handler = StartJobHandler::new(job_repo.clone()); + let result = handler.execute(StartJobCommand { job_id }).await; + + assert!(matches!(result, Err(DomainError::Conflict(_)))); +} diff --git a/crates/application/tests/processing/mod.rs b/crates/application/tests/processing/mod.rs new file mode 100644 index 0000000..2406e7d --- /dev/null +++ b/crates/application/tests/processing/mod.rs @@ -0,0 +1,2 @@ +mod commands; +mod queries; diff --git a/crates/application/tests/processing/queries/mod.rs b/crates/application/tests/processing/queries/mod.rs new file mode 100644 index 0000000..7924ba3 --- /dev/null +++ b/crates/application/tests/processing/queries/mod.rs @@ -0,0 +1 @@ +mod report_batch_progress; diff --git a/crates/application/tests/processing/queries/report_batch_progress.rs b/crates/application/tests/processing/queries/report_batch_progress.rs new file mode 100644 index 0000000..01ce26c --- /dev/null +++ b/crates/application/tests/processing/queries/report_batch_progress.rs @@ -0,0 +1,41 @@ +use std::sync::Arc; +use application::testing::{InMemoryJobBatchRepository, InMemoryJobRepository}; +use application::processing::{ReportBatchProgressQuery, ReportBatchProgressHandler}; +use domain::entities::{Job, JobBatch, JobType}; +use domain::errors::DomainError; +use domain::ports::{JobBatchRepository, JobRepository}; +use domain::value_objects::{StructuredData, SystemId}; + +#[tokio::test] +async fn returns_progress() { + let batch_repo = Arc::new(InMemoryJobBatchRepository::new()); + let job_repo = Arc::new(InMemoryJobRepository::new()); + + let batch = JobBatch::new("import", 3); + let batch_id = batch.batch_id; + batch_repo.save(&batch).await.unwrap(); + + let j1 = Job::new(JobType::ExtractMetadata, 5, StructuredData::new()).with_batch(batch_id); + let j2 = Job::new(JobType::GenerateDerivative, 3, StructuredData::new()).with_batch(batch_id); + job_repo.save(&j1).await.unwrap(); + job_repo.save(&j2).await.unwrap(); + + let handler = ReportBatchProgressHandler::new(batch_repo.clone(), job_repo.clone()); + let progress = handler.execute(ReportBatchProgressQuery { batch_id }).await.unwrap(); + + assert_eq!(progress.batch.batch_id, batch_id); + assert_eq!(progress.jobs.len(), 2); +} + +#[tokio::test] +async fn rejects_nonexistent_batch() { + let batch_repo = Arc::new(InMemoryJobBatchRepository::new()); + let job_repo = Arc::new(InMemoryJobRepository::new()); + + let handler = ReportBatchProgressHandler::new(batch_repo.clone(), job_repo.clone()); + let result = handler.execute(ReportBatchProgressQuery { + batch_id: SystemId::new(), + }).await; + + assert!(matches!(result, Err(DomainError::NotFound(_)))); +} diff --git a/crates/application/tests/sidecar/commands/detect_external_changes.rs b/crates/application/tests/sidecar/commands/detect_external_changes.rs new file mode 100644 index 0000000..7b752b6 --- /dev/null +++ b/crates/application/tests/sidecar/commands/detect_external_changes.rs @@ -0,0 +1,63 @@ +use std::sync::Arc; +use application::sidecar::{DetectExternalChangesCommand, DetectExternalChangesHandler}; +use application::sidecar::hash_helper::hash_structured_data; +use application::testing::{InMemorySidecarRepository, InMemorySidecarWriter}; +use domain::entities::{SidecarRecord, SyncStatus}; +use domain::ports::{SidecarRepository, SidecarWriterPort}; +use domain::value_objects::{MetadataValue, StructuredData, SystemId}; + +#[tokio::test] +async fn detects_changed_sidecar() { + let sidecar_repo = Arc::new(InMemorySidecarRepository::new()); + let writer = Arc::new(InMemorySidecarWriter::new()); + + let asset_id = SystemId::new(); + let path = format!("sidecars/{}.xmp", asset_id); + + // Write original data and create InSync record with its hash + let mut original = StructuredData::new(); + original.insert("title", MetadataValue::String("Old".into())); + let hash = hash_structured_data(&original); + let mut record = SidecarRecord::new(asset_id, &path); + record.mark_synced(hash); + sidecar_repo.save(&record).await.unwrap(); + + // Simulate external edit: write different data to sidecar file + let mut modified = StructuredData::new(); + modified.insert("title", MetadataValue::String("New".into())); + writer.write_sidecar(&modified, &path).await.unwrap(); + + let handler = DetectExternalChangesHandler::new(sidecar_repo.clone(), writer); + let changed = handler.execute(DetectExternalChangesCommand).await.unwrap(); + + assert_eq!(changed, 1); + let updated = sidecar_repo.find_by_asset(&asset_id).await.unwrap().unwrap(); + assert_eq!(updated.sync_status, SyncStatus::PendingRead); +} + +#[tokio::test] +async fn ignores_unchanged_sidecar() { + let sidecar_repo = Arc::new(InMemorySidecarRepository::new()); + let writer = Arc::new(InMemorySidecarWriter::new()); + + let asset_id = SystemId::new(); + let path = format!("sidecars/{}.xmp", asset_id); + + let mut data = StructuredData::new(); + data.insert("title", MetadataValue::String("Same".into())); + let hash = hash_structured_data(&data); + + let mut record = SidecarRecord::new(asset_id, &path); + record.mark_synced(hash); + sidecar_repo.save(&record).await.unwrap(); + + // Write identical data to sidecar file + writer.write_sidecar(&data, &path).await.unwrap(); + + let handler = DetectExternalChangesHandler::new(sidecar_repo.clone(), writer); + let changed = handler.execute(DetectExternalChangesCommand).await.unwrap(); + + assert_eq!(changed, 0); + let updated = sidecar_repo.find_by_asset(&asset_id).await.unwrap().unwrap(); + assert_eq!(updated.sync_status, SyncStatus::InSync); +} diff --git a/crates/application/tests/sidecar/commands/export_sidecar.rs b/crates/application/tests/sidecar/commands/export_sidecar.rs new file mode 100644 index 0000000..62c760b --- /dev/null +++ b/crates/application/tests/sidecar/commands/export_sidecar.rs @@ -0,0 +1,49 @@ +use std::sync::Arc; +use application::sidecar::{ExportSidecarCommand, ExportSidecarHandler}; +use application::testing::{InMemoryAssetMetadataRepository, InMemorySidecarRepository, InMemorySidecarWriter}; +use domain::catalog::entities::{AssetMetadata, MetadataSource}; +use domain::entities::SyncStatus; +use domain::ports::SidecarRepository; +use domain::value_objects::{MetadataValue, StructuredData, SystemId}; + +#[tokio::test] +async fn exports_sidecar_marks_in_sync() { + let meta_repo = Arc::new(InMemoryAssetMetadataRepository::new()); + let sidecar_repo = Arc::new(InMemorySidecarRepository::new()); + let writer = Arc::new(InMemorySidecarWriter::new()); + + let asset_id = SystemId::new(); + let mut data = StructuredData::new(); + data.insert("title", MetadataValue::String("Beach".into())); + let metadata = AssetMetadata::new(asset_id, MetadataSource::UserEdited, data); + use domain::ports::AssetMetadataRepository; + meta_repo.save(&metadata).await.unwrap(); + + let handler = ExportSidecarHandler::new(meta_repo, sidecar_repo.clone(), writer.clone()); + let record = handler.execute(ExportSidecarCommand { asset_id }).await.unwrap(); + + assert_eq!(record.sync_status, SyncStatus::InSync); + assert!(record.last_known_file_hash.is_some()); + + let written = writer.get(&record.sidecar_storage_path).await; + assert!(written.is_some()); + assert_eq!(written.unwrap().get_string("title"), Some("Beach")); +} + +#[tokio::test] +async fn creates_new_record_if_none_exists() { + let meta_repo = Arc::new(InMemoryAssetMetadataRepository::new()); + let sidecar_repo = Arc::new(InMemorySidecarRepository::new()); + let writer = Arc::new(InMemorySidecarWriter::new()); + + let asset_id = SystemId::new(); + + let handler = ExportSidecarHandler::new(meta_repo, sidecar_repo.clone(), writer); + let record = handler.execute(ExportSidecarCommand { asset_id }).await.unwrap(); + + assert_eq!(record.asset_id, asset_id); + assert_eq!(record.sync_status, SyncStatus::InSync); + + let saved = sidecar_repo.find_by_asset(&asset_id).await.unwrap(); + assert!(saved.is_some()); +} diff --git a/crates/application/tests/sidecar/commands/full_export.rs b/crates/application/tests/sidecar/commands/full_export.rs new file mode 100644 index 0000000..136e51d --- /dev/null +++ b/crates/application/tests/sidecar/commands/full_export.rs @@ -0,0 +1,45 @@ +use std::sync::Arc; +use application::sidecar::{FullExportCommand, FullExportHandler}; +use application::testing::{ + InMemoryAssetRepository, InMemoryAssetMetadataRepository, + InMemorySidecarRepository, InMemorySidecarWriter, +}; +use domain::catalog::entities::{Asset, AssetMetadata, AssetType, MetadataSource, SourceReference}; +use domain::ports::AssetRepository; +use domain::value_objects::{Checksum, MetadataValue, StructuredData, SystemId}; + +fn make_asset(owner: SystemId) -> Asset { + let source = SourceReference { + volume_id: SystemId::new(), + relative_path: "photos/img.jpg".into(), + checksum: Checksum::new("a".repeat(64)).unwrap(), + }; + Asset::new(source, AssetType::Image, "image/jpeg", 1024, owner) +} + +#[tokio::test] +async fn exports_all_user_assets() { + let asset_repo = Arc::new(InMemoryAssetRepository::new()); + let meta_repo = Arc::new(InMemoryAssetMetadataRepository::new()); + let sidecar_repo = Arc::new(InMemorySidecarRepository::new()); + let writer = Arc::new(InMemorySidecarWriter::new()); + + let owner = SystemId::new(); + let a1 = make_asset(owner); + let a2 = make_asset(owner); + asset_repo.save(&a1).await.unwrap(); + asset_repo.save(&a2).await.unwrap(); + + let mut data = StructuredData::new(); + data.insert("title", MetadataValue::String("Sunset".into())); + use domain::ports::AssetMetadataRepository; + meta_repo.save(&AssetMetadata::new(a1.asset_id, MetadataSource::UserEdited, data)).await.unwrap(); + + let handler = FullExportHandler::new(asset_repo, meta_repo, sidecar_repo, writer.clone()); + let count = handler.execute(FullExportCommand { owner_id: owner }).await.unwrap(); + + assert_eq!(count, 2); + + let written = writer.get(&format!("sidecars/{}.xmp", a1.asset_id)).await; + assert!(written.is_some()); +} diff --git a/crates/application/tests/sidecar/commands/full_import.rs b/crates/application/tests/sidecar/commands/full_import.rs new file mode 100644 index 0000000..9ff0a0f --- /dev/null +++ b/crates/application/tests/sidecar/commands/full_import.rs @@ -0,0 +1,65 @@ +use std::sync::Arc; +use application::sidecar::{FullImportCommand, FullImportHandler}; +use application::testing::{ + InMemoryAssetRepository, InMemoryAssetMetadataRepository, + InMemorySidecarRepository, InMemorySidecarWriter, +}; +use domain::catalog::entities::{Asset, AssetType, MetadataSource, SourceReference}; +use domain::entities::SidecarRecord; +use domain::ports::{AssetMetadataRepository, AssetRepository, SidecarRepository, SidecarWriterPort}; +use domain::value_objects::{Checksum, MetadataValue, StructuredData, SystemId}; + +fn make_asset(owner: SystemId) -> Asset { + let source = SourceReference { + volume_id: SystemId::new(), + relative_path: "photos/img.jpg".into(), + checksum: Checksum::new("b".repeat(64)).unwrap(), + }; + Asset::new(source, AssetType::Image, "image/jpeg", 2048, owner) +} + +#[tokio::test] +async fn imports_from_existing_sidecars() { + let asset_repo = Arc::new(InMemoryAssetRepository::new()); + let meta_repo = Arc::new(InMemoryAssetMetadataRepository::new()); + let sidecar_repo = Arc::new(InMemorySidecarRepository::new()); + let writer = Arc::new(InMemorySidecarWriter::new()); + + let owner = SystemId::new(); + let asset = make_asset(owner); + asset_repo.save(&asset).await.unwrap(); + + let path = format!("sidecars/{}.xmp", asset.asset_id); + let record = SidecarRecord::new(asset.asset_id, &path); + sidecar_repo.save(&record).await.unwrap(); + + let mut data = StructuredData::new(); + data.insert("lens", MetadataValue::String("50mm".into())); + writer.write_sidecar(&data, &path).await.unwrap(); + + let handler = FullImportHandler::new(asset_repo, meta_repo.clone(), sidecar_repo, writer); + let count = handler.execute(FullImportCommand { owner_id: owner }).await.unwrap(); + + assert_eq!(count, 1); + let imported = meta_repo.find_by_asset_and_source(&asset.asset_id, MetadataSource::ExifExtracted).await.unwrap(); + assert!(imported.is_some()); + assert_eq!(imported.unwrap().data.get_string("lens"), Some("50mm")); +} + +#[tokio::test] +async fn skips_missing_sidecars() { + let asset_repo = Arc::new(InMemoryAssetRepository::new()); + let meta_repo = Arc::new(InMemoryAssetMetadataRepository::new()); + let sidecar_repo = Arc::new(InMemorySidecarRepository::new()); + let writer = Arc::new(InMemorySidecarWriter::new()); + + let owner = SystemId::new(); + let asset = make_asset(owner); + asset_repo.save(&asset).await.unwrap(); + // No sidecar record, no sidecar file + + let handler = FullImportHandler::new(asset_repo, meta_repo, sidecar_repo, writer); + let count = handler.execute(FullImportCommand { owner_id: owner }).await.unwrap(); + + assert_eq!(count, 0); +} diff --git a/crates/application/tests/sidecar/commands/import_sidecar.rs b/crates/application/tests/sidecar/commands/import_sidecar.rs new file mode 100644 index 0000000..dc2e608 --- /dev/null +++ b/crates/application/tests/sidecar/commands/import_sidecar.rs @@ -0,0 +1,54 @@ +use std::sync::Arc; +use application::sidecar::{ImportSidecarCommand, ImportSidecarHandler}; +use application::testing::{InMemoryAssetMetadataRepository, InMemorySidecarRepository, InMemorySidecarWriter}; +use domain::catalog::entities::MetadataSource; +use domain::entities::{SidecarRecord, SyncStatus}; +use domain::errors::DomainError; +use domain::ports::{SidecarRepository, SidecarWriterPort}; +use domain::value_objects::{MetadataValue, StructuredData, SystemId}; + +#[tokio::test] +async fn imports_pending_read_sidecar() { + let sidecar_repo = Arc::new(InMemorySidecarRepository::new()); + let writer = Arc::new(InMemorySidecarWriter::new()); + let meta_repo = Arc::new(InMemoryAssetMetadataRepository::new()); + + let asset_id = SystemId::new(); + let path = format!("sidecars/{}.xmp", asset_id); + + // Create PendingRead record + let mut record = SidecarRecord::new(asset_id, &path); + record.mark_pending_read(); + sidecar_repo.save(&record).await.unwrap(); + + // Write sidecar file data + let mut data = StructuredData::new(); + data.insert("camera", MetadataValue::String("Canon".into())); + writer.write_sidecar(&data, &path).await.unwrap(); + + let handler = ImportSidecarHandler::new(sidecar_repo.clone(), writer, meta_repo); + let metadata = handler.execute(ImportSidecarCommand { asset_id }).await.unwrap(); + + assert_eq!(metadata.metadata_source, MetadataSource::ExifExtracted); + assert_eq!(metadata.data.get_string("camera"), Some("Canon")); + + let updated = sidecar_repo.find_by_asset(&asset_id).await.unwrap().unwrap(); + assert_eq!(updated.sync_status, SyncStatus::InSync); +} + +#[tokio::test] +async fn rejects_non_pending_read() { + let sidecar_repo = Arc::new(InMemorySidecarRepository::new()); + let writer = Arc::new(InMemorySidecarWriter::new()); + let meta_repo = Arc::new(InMemoryAssetMetadataRepository::new()); + + let asset_id = SystemId::new(); + let record = SidecarRecord::new(asset_id, "sidecars/x.xmp"); + // Default status is PendingWrite, not PendingRead + sidecar_repo.save(&record).await.unwrap(); + + let handler = ImportSidecarHandler::new(sidecar_repo, writer, meta_repo); + let result = handler.execute(ImportSidecarCommand { asset_id }).await; + + assert!(matches!(result, Err(DomainError::Validation(_)))); +} diff --git a/crates/application/tests/sidecar/commands/mod.rs b/crates/application/tests/sidecar/commands/mod.rs new file mode 100644 index 0000000..c403682 --- /dev/null +++ b/crates/application/tests/sidecar/commands/mod.rs @@ -0,0 +1,6 @@ +mod export_sidecar; +mod detect_external_changes; +mod import_sidecar; +mod resolve_conflict; +mod full_export; +mod full_import; diff --git a/crates/application/tests/sidecar/commands/resolve_conflict.rs b/crates/application/tests/sidecar/commands/resolve_conflict.rs new file mode 100644 index 0000000..733dd7d --- /dev/null +++ b/crates/application/tests/sidecar/commands/resolve_conflict.rs @@ -0,0 +1,85 @@ +use std::sync::Arc; +use application::sidecar::{ResolveConflictCommand, ResolveConflictHandler}; +use application::testing::{InMemoryAssetMetadataRepository, InMemorySidecarRepository, InMemorySidecarWriter}; +use domain::catalog::entities::{AssetMetadata, MetadataSource}; +use domain::entities::{ConflictPolicy, SidecarRecord, SyncStatus}; +use domain::errors::DomainError; +use domain::ports::{AssetMetadataRepository, SidecarRepository, SidecarWriterPort}; +use domain::value_objects::{MetadataValue, StructuredData, SystemId}; + +fn conflict_record(asset_id: SystemId, path: &str) -> SidecarRecord { + let mut r = SidecarRecord::new(asset_id, path); + r.mark_conflict(); + r +} + +#[tokio::test] +async fn db_wins_re_exports() { + let sidecar_repo = Arc::new(InMemorySidecarRepository::new()); + let writer = Arc::new(InMemorySidecarWriter::new()); + let meta_repo = Arc::new(InMemoryAssetMetadataRepository::new()); + + let asset_id = SystemId::new(); + let path = format!("sidecars/{}.xmp", asset_id); + + sidecar_repo.save(&conflict_record(asset_id, &path)).await.unwrap(); + + let mut data = StructuredData::new(); + data.insert("title", MetadataValue::String("DB Value".into())); + meta_repo.save(&AssetMetadata::new(asset_id, MetadataSource::UserEdited, data)).await.unwrap(); + + let handler = ResolveConflictHandler::new(sidecar_repo.clone(), writer.clone(), meta_repo); + let record = handler.execute(ResolveConflictCommand { + asset_id, + policy: ConflictPolicy::DbWins, + }).await.unwrap(); + + assert_eq!(record.sync_status, SyncStatus::InSync); + let written = writer.get(&path).await.unwrap(); + assert_eq!(written.get_string("title"), Some("DB Value")); +} + +#[tokio::test] +async fn file_wins_re_imports() { + let sidecar_repo = Arc::new(InMemorySidecarRepository::new()); + let writer = Arc::new(InMemorySidecarWriter::new()); + let meta_repo = Arc::new(InMemoryAssetMetadataRepository::new()); + + let asset_id = SystemId::new(); + let path = format!("sidecars/{}.xmp", asset_id); + + sidecar_repo.save(&conflict_record(asset_id, &path)).await.unwrap(); + + let mut file_data = StructuredData::new(); + file_data.insert("title", MetadataValue::String("File Value".into())); + writer.write_sidecar(&file_data, &path).await.unwrap(); + + let handler = ResolveConflictHandler::new(sidecar_repo.clone(), writer, meta_repo.clone()); + let record = handler.execute(ResolveConflictCommand { + asset_id, + policy: ConflictPolicy::FileWins, + }).await.unwrap(); + + assert_eq!(record.sync_status, SyncStatus::InSync); + let imported = meta_repo.find_by_asset_and_source(&asset_id, MetadataSource::ExifExtracted).await.unwrap(); + assert!(imported.is_some()); + assert_eq!(imported.unwrap().data.get_string("title"), Some("File Value")); +} + +#[tokio::test] +async fn user_decision_returns_error() { + let sidecar_repo = Arc::new(InMemorySidecarRepository::new()); + let writer = Arc::new(InMemorySidecarWriter::new()); + let meta_repo = Arc::new(InMemoryAssetMetadataRepository::new()); + + let asset_id = SystemId::new(); + sidecar_repo.save(&conflict_record(asset_id, "sidecars/x.xmp")).await.unwrap(); + + let handler = ResolveConflictHandler::new(sidecar_repo, writer, meta_repo); + let result = handler.execute(ResolveConflictCommand { + asset_id, + policy: ConflictPolicy::RequireUserDecision, + }).await; + + assert!(matches!(result, Err(DomainError::Validation(msg)) if msg.contains("Manual"))); +} diff --git a/crates/application/tests/sidecar/mod.rs b/crates/application/tests/sidecar/mod.rs new file mode 100644 index 0000000..f3d4468 --- /dev/null +++ b/crates/application/tests/sidecar/mod.rs @@ -0,0 +1 @@ +mod commands;