app: add sidecar sync commands (export, detect, import, resolve, full export/import)
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -94,6 +94,8 @@ dependencies = [
|
|||||||
"chrono",
|
"chrono",
|
||||||
"domain",
|
"domain",
|
||||||
"serde",
|
"serde",
|
||||||
|
"serde_json",
|
||||||
|
"sha2",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
"tokio",
|
"tokio",
|
||||||
"uuid",
|
"uuid",
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ anyhow = "1.0"
|
|||||||
thiserror = "2.0"
|
thiserror = "2.0"
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
||||||
|
sha2 = "0.10"
|
||||||
uuid = { version = "1.0", features = ["v4", "serde"] }
|
uuid = { version = "1.0", features = ["v4", "serde"] }
|
||||||
chrono = { version = "0.4", features = ["serde"] }
|
chrono = { version = "0.4", features = ["serde"] }
|
||||||
dotenvy = "0.15"
|
dotenvy = "0.15"
|
||||||
|
|||||||
@@ -12,6 +12,8 @@ uuid = { workspace = true }
|
|||||||
tokio = { workspace = true }
|
tokio = { workspace = true }
|
||||||
bytes = { workspace = true }
|
bytes = { workspace = true }
|
||||||
serde = { workspace = true }
|
serde = { workspace = true }
|
||||||
|
serde_json = { workspace = true }
|
||||||
|
sha2 = { workspace = true }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
chrono = { workspace = true }
|
chrono = { workspace = true }
|
||||||
|
|||||||
48
crates/application/src/processing/commands/complete_job.rs
Normal file
48
crates/application/src/processing/commands/complete_job.rs
Normal file
@@ -0,0 +1,48 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
use domain::{
|
||||||
|
entities::Job,
|
||||||
|
errors::DomainError,
|
||||||
|
events::DomainEvent,
|
||||||
|
ports::{EventPublisher, JobBatchRepository, JobRepository},
|
||||||
|
value_objects::{DateTimeStamp, StructuredData, SystemId},
|
||||||
|
};
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||||
|
pub struct CompleteJobCommand {
|
||||||
|
pub job_id: SystemId,
|
||||||
|
pub result: StructuredData,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct CompleteJobHandler {
|
||||||
|
job_repo: Arc<dyn JobRepository>,
|
||||||
|
batch_repo: Arc<dyn JobBatchRepository>,
|
||||||
|
event_pub: Arc<dyn EventPublisher>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CompleteJobHandler {
|
||||||
|
pub fn new(
|
||||||
|
job_repo: Arc<dyn JobRepository>,
|
||||||
|
batch_repo: Arc<dyn JobBatchRepository>,
|
||||||
|
event_pub: Arc<dyn EventPublisher>,
|
||||||
|
) -> Self {
|
||||||
|
Self { job_repo, batch_repo, event_pub }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn execute(&self, cmd: CompleteJobCommand) -> Result<Job, DomainError> {
|
||||||
|
let mut job = self.job_repo.find_by_id(&cmd.job_id).await?
|
||||||
|
.ok_or_else(|| DomainError::NotFound(format!("Job {} not found", cmd.job_id)))?;
|
||||||
|
job.complete(cmd.result);
|
||||||
|
self.job_repo.save(&job).await?;
|
||||||
|
if let Some(ref batch_id) = job.batch_id {
|
||||||
|
let mut batch = self.batch_repo.find_by_id(batch_id).await?
|
||||||
|
.ok_or_else(|| DomainError::NotFound(format!("Batch {} not found", batch_id)))?;
|
||||||
|
batch.record_completion();
|
||||||
|
self.batch_repo.save(&batch).await?;
|
||||||
|
}
|
||||||
|
self.event_pub.publish(DomainEvent::JobCompleted {
|
||||||
|
job_id: job.job_id,
|
||||||
|
timestamp: DateTimeStamp::now(),
|
||||||
|
}).await?;
|
||||||
|
Ok(job)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,46 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
use domain::{
|
||||||
|
entities::ProcessingPipeline,
|
||||||
|
errors::DomainError,
|
||||||
|
ports::{PipelineRepository, PluginRepository},
|
||||||
|
value_objects::{StructuredData, SystemId},
|
||||||
|
};
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||||
|
pub struct PipelineStepConfig {
|
||||||
|
pub plugin_id: SystemId,
|
||||||
|
pub config: StructuredData,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||||
|
pub struct ConfigurePipelineCommand {
|
||||||
|
pub trigger_event: String,
|
||||||
|
pub steps: Vec<PipelineStepConfig>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct ConfigurePipelineHandler {
|
||||||
|
pipeline_repo: Arc<dyn PipelineRepository>,
|
||||||
|
plugin_repo: Arc<dyn PluginRepository>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ConfigurePipelineHandler {
|
||||||
|
pub fn new(
|
||||||
|
pipeline_repo: Arc<dyn PipelineRepository>,
|
||||||
|
plugin_repo: Arc<dyn PluginRepository>,
|
||||||
|
) -> Self {
|
||||||
|
Self { pipeline_repo, plugin_repo }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn execute(&self, cmd: ConfigurePipelineCommand) -> Result<ProcessingPipeline, DomainError> {
|
||||||
|
for step in &cmd.steps {
|
||||||
|
self.plugin_repo.find_by_id(&step.plugin_id).await?
|
||||||
|
.ok_or_else(|| DomainError::NotFound(format!("Plugin {} not found", step.plugin_id)))?;
|
||||||
|
}
|
||||||
|
let mut pipeline = ProcessingPipeline::new(cmd.trigger_event);
|
||||||
|
for step in cmd.steps {
|
||||||
|
pipeline.add_step(step.plugin_id, step.config);
|
||||||
|
}
|
||||||
|
self.pipeline_repo.save(&pipeline).await?;
|
||||||
|
Ok(pipeline)
|
||||||
|
}
|
||||||
|
}
|
||||||
45
crates/application/src/processing/commands/enqueue_job.rs
Normal file
45
crates/application/src/processing/commands/enqueue_job.rs
Normal file
@@ -0,0 +1,45 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
use domain::{
|
||||||
|
entities::{Job, JobType},
|
||||||
|
errors::DomainError,
|
||||||
|
events::DomainEvent,
|
||||||
|
ports::{EventPublisher, JobRepository},
|
||||||
|
value_objects::{DateTimeStamp, StructuredData, SystemId},
|
||||||
|
};
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||||
|
pub struct EnqueueJobCommand {
|
||||||
|
pub job_type: JobType,
|
||||||
|
pub priority: u32,
|
||||||
|
pub payload: StructuredData,
|
||||||
|
pub target_asset_id: Option<SystemId>,
|
||||||
|
pub batch_id: Option<SystemId>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct EnqueueJobHandler {
|
||||||
|
job_repo: Arc<dyn JobRepository>,
|
||||||
|
event_pub: Arc<dyn EventPublisher>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EnqueueJobHandler {
|
||||||
|
pub fn new(job_repo: Arc<dyn JobRepository>, event_pub: Arc<dyn EventPublisher>) -> Self {
|
||||||
|
Self { job_repo, event_pub }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn execute(&self, cmd: EnqueueJobCommand) -> Result<Job, DomainError> {
|
||||||
|
let mut job = Job::new(cmd.job_type.clone(), cmd.priority, cmd.payload);
|
||||||
|
if let Some(id) = cmd.target_asset_id {
|
||||||
|
job = job.with_target(id);
|
||||||
|
}
|
||||||
|
if let Some(id) = cmd.batch_id {
|
||||||
|
job = job.with_batch(id);
|
||||||
|
}
|
||||||
|
self.job_repo.save(&job).await?;
|
||||||
|
self.event_pub.publish(DomainEvent::JobEnqueued {
|
||||||
|
job_id: job.job_id,
|
||||||
|
job_type: format!("{:?}", cmd.job_type),
|
||||||
|
timestamp: DateTimeStamp::now(),
|
||||||
|
}).await?;
|
||||||
|
Ok(job)
|
||||||
|
}
|
||||||
|
}
|
||||||
57
crates/application/src/processing/commands/fail_job.rs
Normal file
57
crates/application/src/processing/commands/fail_job.rs
Normal file
@@ -0,0 +1,57 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
use domain::{
|
||||||
|
entities::{Job, JobStatus},
|
||||||
|
errors::DomainError,
|
||||||
|
events::DomainEvent,
|
||||||
|
ports::{EventPublisher, JobBatchRepository, JobRepository},
|
||||||
|
value_objects::{DateTimeStamp, SystemId},
|
||||||
|
};
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||||
|
pub struct FailJobCommand {
|
||||||
|
pub job_id: SystemId,
|
||||||
|
pub error: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct FailJobHandler {
|
||||||
|
job_repo: Arc<dyn JobRepository>,
|
||||||
|
batch_repo: Arc<dyn JobBatchRepository>,
|
||||||
|
event_pub: Arc<dyn EventPublisher>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FailJobHandler {
|
||||||
|
pub fn new(
|
||||||
|
job_repo: Arc<dyn JobRepository>,
|
||||||
|
batch_repo: Arc<dyn JobBatchRepository>,
|
||||||
|
event_pub: Arc<dyn EventPublisher>,
|
||||||
|
) -> Self {
|
||||||
|
Self { job_repo, batch_repo, event_pub }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn execute(&self, cmd: FailJobCommand) -> Result<Job, DomainError> {
|
||||||
|
let mut job = self.job_repo.find_by_id(&cmd.job_id).await?
|
||||||
|
.ok_or_else(|| DomainError::NotFound(format!("Job {} not found", cmd.job_id)))?;
|
||||||
|
job.fail(&cmd.error);
|
||||||
|
self.job_repo.save(&job).await?;
|
||||||
|
if job.status == JobStatus::Failed {
|
||||||
|
if let Some(ref batch_id) = job.batch_id {
|
||||||
|
let mut batch = self.batch_repo.find_by_id(batch_id).await?
|
||||||
|
.ok_or_else(|| DomainError::NotFound(format!("Batch {} not found", batch_id)))?;
|
||||||
|
batch.record_failure();
|
||||||
|
self.batch_repo.save(&batch).await?;
|
||||||
|
}
|
||||||
|
self.event_pub.publish(DomainEvent::JobFailed {
|
||||||
|
job_id: job.job_id,
|
||||||
|
error: cmd.error,
|
||||||
|
timestamp: DateTimeStamp::now(),
|
||||||
|
}).await?;
|
||||||
|
} else if job.status == JobStatus::Queued {
|
||||||
|
self.event_pub.publish(DomainEvent::JobEnqueued {
|
||||||
|
job_id: job.job_id,
|
||||||
|
job_type: format!("{:?}", job.job_type),
|
||||||
|
timestamp: DateTimeStamp::now(),
|
||||||
|
}).await?;
|
||||||
|
}
|
||||||
|
Ok(job)
|
||||||
|
}
|
||||||
|
}
|
||||||
63
crates/application/src/processing/commands/manage_plugin.rs
Normal file
63
crates/application/src/processing/commands/manage_plugin.rs
Normal file
@@ -0,0 +1,63 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
use domain::{
|
||||||
|
entities::{Plugin, PluginType},
|
||||||
|
errors::DomainError,
|
||||||
|
ports::PluginRepository,
|
||||||
|
value_objects::{StructuredData, SystemId},
|
||||||
|
};
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||||
|
pub enum PluginAction {
|
||||||
|
Create {
|
||||||
|
name: String,
|
||||||
|
plugin_type: PluginType,
|
||||||
|
config: StructuredData,
|
||||||
|
},
|
||||||
|
Enable,
|
||||||
|
Disable,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||||
|
pub struct ManagePluginCommand {
|
||||||
|
pub plugin_id: Option<SystemId>,
|
||||||
|
pub action: PluginAction,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct ManagePluginHandler {
|
||||||
|
plugin_repo: Arc<dyn PluginRepository>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ManagePluginHandler {
|
||||||
|
pub fn new(plugin_repo: Arc<dyn PluginRepository>) -> Self {
|
||||||
|
Self { plugin_repo }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn execute(&self, cmd: ManagePluginCommand) -> Result<Plugin, DomainError> {
|
||||||
|
match cmd.action {
|
||||||
|
PluginAction::Create { name, plugin_type, config } => {
|
||||||
|
let mut plugin = Plugin::new(name, plugin_type);
|
||||||
|
plugin.configuration = config;
|
||||||
|
self.plugin_repo.save(&plugin).await?;
|
||||||
|
Ok(plugin)
|
||||||
|
}
|
||||||
|
PluginAction::Enable => {
|
||||||
|
let id = cmd.plugin_id
|
||||||
|
.ok_or_else(|| DomainError::Validation("plugin_id required for Enable".into()))?;
|
||||||
|
let mut plugin = self.plugin_repo.find_by_id(&id).await?
|
||||||
|
.ok_or_else(|| DomainError::NotFound(format!("Plugin {} not found", id)))?;
|
||||||
|
plugin.enable();
|
||||||
|
self.plugin_repo.save(&plugin).await?;
|
||||||
|
Ok(plugin)
|
||||||
|
}
|
||||||
|
PluginAction::Disable => {
|
||||||
|
let id = cmd.plugin_id
|
||||||
|
.ok_or_else(|| DomainError::Validation("plugin_id required for Disable".into()))?;
|
||||||
|
let mut plugin = self.plugin_repo.find_by_id(&id).await?
|
||||||
|
.ok_or_else(|| DomainError::NotFound(format!("Plugin {} not found", id)))?;
|
||||||
|
plugin.disable();
|
||||||
|
self.plugin_repo.save(&plugin).await?;
|
||||||
|
Ok(plugin)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
6
crates/application/src/processing/commands/mod.rs
Normal file
6
crates/application/src/processing/commands/mod.rs
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
pub mod enqueue_job;
|
||||||
|
pub mod start_job;
|
||||||
|
pub mod complete_job;
|
||||||
|
pub mod fail_job;
|
||||||
|
pub mod manage_plugin;
|
||||||
|
pub mod configure_pipeline;
|
||||||
30
crates/application/src/processing/commands/start_job.rs
Normal file
30
crates/application/src/processing/commands/start_job.rs
Normal file
@@ -0,0 +1,30 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
use domain::{
|
||||||
|
entities::Job,
|
||||||
|
errors::DomainError,
|
||||||
|
ports::JobRepository,
|
||||||
|
value_objects::SystemId,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||||
|
pub struct StartJobCommand {
|
||||||
|
pub job_id: SystemId,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct StartJobHandler {
|
||||||
|
job_repo: Arc<dyn JobRepository>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl StartJobHandler {
|
||||||
|
pub fn new(job_repo: Arc<dyn JobRepository>) -> Self {
|
||||||
|
Self { job_repo }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn execute(&self, cmd: StartJobCommand) -> Result<Job, DomainError> {
|
||||||
|
let mut job = self.job_repo.find_by_id(&cmd.job_id).await?
|
||||||
|
.ok_or_else(|| DomainError::NotFound(format!("Job {} not found", cmd.job_id)))?;
|
||||||
|
job.start()?;
|
||||||
|
self.job_repo.save(&job).await?;
|
||||||
|
Ok(job)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1 +1,10 @@
|
|||||||
// Processing commands/queries (future: EnqueueJob, ProcessBatch, etc.)
|
pub mod commands;
|
||||||
|
pub mod queries;
|
||||||
|
|
||||||
|
pub use commands::enqueue_job::{EnqueueJobCommand, EnqueueJobHandler};
|
||||||
|
pub use commands::start_job::{StartJobCommand, StartJobHandler};
|
||||||
|
pub use commands::complete_job::{CompleteJobCommand, CompleteJobHandler};
|
||||||
|
pub use commands::fail_job::{FailJobCommand, FailJobHandler};
|
||||||
|
pub use commands::manage_plugin::{ManagePluginCommand, ManagePluginHandler, PluginAction};
|
||||||
|
pub use commands::configure_pipeline::{ConfigurePipelineCommand, ConfigurePipelineHandler, PipelineStepConfig};
|
||||||
|
pub use queries::report_batch_progress::{ReportBatchProgressQuery, ReportBatchProgressHandler, BatchProgress};
|
||||||
|
|||||||
1
crates/application/src/processing/queries/mod.rs
Normal file
1
crates/application/src/processing/queries/mod.rs
Normal file
@@ -0,0 +1 @@
|
|||||||
|
pub mod report_batch_progress;
|
||||||
@@ -0,0 +1,36 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
use domain::{
|
||||||
|
entities::{Job, JobBatch},
|
||||||
|
errors::DomainError,
|
||||||
|
ports::{JobBatchRepository, JobRepository},
|
||||||
|
value_objects::SystemId,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||||
|
pub struct ReportBatchProgressQuery {
|
||||||
|
pub batch_id: SystemId,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct BatchProgress {
|
||||||
|
pub batch: JobBatch,
|
||||||
|
pub jobs: Vec<Job>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct ReportBatchProgressHandler {
|
||||||
|
batch_repo: Arc<dyn JobBatchRepository>,
|
||||||
|
job_repo: Arc<dyn JobRepository>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ReportBatchProgressHandler {
|
||||||
|
pub fn new(batch_repo: Arc<dyn JobBatchRepository>, job_repo: Arc<dyn JobRepository>) -> Self {
|
||||||
|
Self { batch_repo, job_repo }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn execute(&self, query: ReportBatchProgressQuery) -> Result<BatchProgress, DomainError> {
|
||||||
|
let batch = self.batch_repo.find_by_id(&query.batch_id).await?
|
||||||
|
.ok_or_else(|| DomainError::NotFound(format!("Batch {} not found", query.batch_id)))?;
|
||||||
|
let jobs = self.job_repo.find_by_batch(&query.batch_id).await?;
|
||||||
|
Ok(BatchProgress { batch, jobs })
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,49 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
use domain::{
|
||||||
|
entities::SyncStatus,
|
||||||
|
errors::DomainError,
|
||||||
|
ports::{SidecarRepository, SidecarWriterPort},
|
||||||
|
};
|
||||||
|
use crate::sidecar::hash_helper::hash_structured_data;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||||
|
pub struct DetectExternalChangesCommand;
|
||||||
|
|
||||||
|
pub struct DetectExternalChangesHandler {
|
||||||
|
sidecar_repo: Arc<dyn SidecarRepository>,
|
||||||
|
writer: Arc<dyn SidecarWriterPort>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DetectExternalChangesHandler {
|
||||||
|
pub fn new(
|
||||||
|
sidecar_repo: Arc<dyn SidecarRepository>,
|
||||||
|
writer: Arc<dyn SidecarWriterPort>,
|
||||||
|
) -> Self {
|
||||||
|
Self { sidecar_repo, writer }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn execute(&self, _cmd: DetectExternalChangesCommand) -> Result<u32, DomainError> {
|
||||||
|
let records = self.sidecar_repo.find_by_status(SyncStatus::InSync).await?;
|
||||||
|
let mut changed = 0u32;
|
||||||
|
|
||||||
|
for mut record in records {
|
||||||
|
match self.writer.read_sidecar(&record.sidecar_storage_path).await {
|
||||||
|
Ok(data) => {
|
||||||
|
let hash = hash_structured_data(&data);
|
||||||
|
let differs = record.last_known_file_hash.as_ref() != Some(&hash);
|
||||||
|
if differs {
|
||||||
|
record.mark_pending_read();
|
||||||
|
self.sidecar_repo.save(&record).await?;
|
||||||
|
changed += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
record.mark_error("Sidecar file not found");
|
||||||
|
self.sidecar_repo.save(&record).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(changed)
|
||||||
|
}
|
||||||
|
}
|
||||||
48
crates/application/src/sidecar/commands/export_sidecar.rs
Normal file
48
crates/application/src/sidecar/commands/export_sidecar.rs
Normal file
@@ -0,0 +1,48 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
use domain::{
|
||||||
|
catalog::services::resolve_metadata,
|
||||||
|
entities::SidecarRecord,
|
||||||
|
errors::DomainError,
|
||||||
|
ports::{AssetMetadataRepository, SidecarRepository, SidecarWriterPort},
|
||||||
|
value_objects::SystemId,
|
||||||
|
};
|
||||||
|
use crate::sidecar::hash_helper::hash_structured_data;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||||
|
pub struct ExportSidecarCommand {
|
||||||
|
pub asset_id: SystemId,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct ExportSidecarHandler {
|
||||||
|
metadata_repo: Arc<dyn AssetMetadataRepository>,
|
||||||
|
sidecar_repo: Arc<dyn SidecarRepository>,
|
||||||
|
writer: Arc<dyn SidecarWriterPort>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ExportSidecarHandler {
|
||||||
|
pub fn new(
|
||||||
|
metadata_repo: Arc<dyn AssetMetadataRepository>,
|
||||||
|
sidecar_repo: Arc<dyn SidecarRepository>,
|
||||||
|
writer: Arc<dyn SidecarWriterPort>,
|
||||||
|
) -> Self {
|
||||||
|
Self { metadata_repo, sidecar_repo, writer }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn execute(&self, cmd: ExportSidecarCommand) -> Result<SidecarRecord, DomainError> {
|
||||||
|
let layers = self.metadata_repo.find_by_asset(&cmd.asset_id).await?;
|
||||||
|
let resolved = resolve_metadata(&layers);
|
||||||
|
|
||||||
|
let mut record = match self.sidecar_repo.find_by_asset(&cmd.asset_id).await? {
|
||||||
|
Some(r) => r,
|
||||||
|
None => SidecarRecord::new(cmd.asset_id, format!("sidecars/{}.xmp", cmd.asset_id)),
|
||||||
|
};
|
||||||
|
|
||||||
|
self.writer.write_sidecar(&resolved, &record.sidecar_storage_path).await?;
|
||||||
|
|
||||||
|
let hash = hash_structured_data(&resolved);
|
||||||
|
record.mark_synced(hash);
|
||||||
|
self.sidecar_repo.save(&record).await?;
|
||||||
|
|
||||||
|
Ok(record)
|
||||||
|
}
|
||||||
|
}
|
||||||
55
crates/application/src/sidecar/commands/full_export.rs
Normal file
55
crates/application/src/sidecar/commands/full_export.rs
Normal file
@@ -0,0 +1,55 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
use domain::{
|
||||||
|
catalog::services::resolve_metadata,
|
||||||
|
entities::SidecarRecord,
|
||||||
|
errors::DomainError,
|
||||||
|
ports::{AssetRepository, AssetMetadataRepository, SidecarRepository, SidecarWriterPort},
|
||||||
|
value_objects::SystemId,
|
||||||
|
};
|
||||||
|
use crate::sidecar::hash_helper::hash_structured_data;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||||
|
pub struct FullExportCommand {
|
||||||
|
pub owner_id: SystemId,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct FullExportHandler {
|
||||||
|
asset_repo: Arc<dyn AssetRepository>,
|
||||||
|
metadata_repo: Arc<dyn AssetMetadataRepository>,
|
||||||
|
sidecar_repo: Arc<dyn SidecarRepository>,
|
||||||
|
writer: Arc<dyn SidecarWriterPort>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FullExportHandler {
|
||||||
|
pub fn new(
|
||||||
|
asset_repo: Arc<dyn AssetRepository>,
|
||||||
|
metadata_repo: Arc<dyn AssetMetadataRepository>,
|
||||||
|
sidecar_repo: Arc<dyn SidecarRepository>,
|
||||||
|
writer: Arc<dyn SidecarWriterPort>,
|
||||||
|
) -> Self {
|
||||||
|
Self { asset_repo, metadata_repo, sidecar_repo, writer }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn execute(&self, cmd: FullExportCommand) -> Result<u32, DomainError> {
|
||||||
|
let assets = self.asset_repo.find_by_owner(&cmd.owner_id, u32::MAX, 0).await?;
|
||||||
|
let mut count = 0u32;
|
||||||
|
|
||||||
|
for asset in &assets {
|
||||||
|
let layers = self.metadata_repo.find_by_asset(&asset.asset_id).await?;
|
||||||
|
let resolved = resolve_metadata(&layers);
|
||||||
|
|
||||||
|
let mut record = match self.sidecar_repo.find_by_asset(&asset.asset_id).await? {
|
||||||
|
Some(r) => r,
|
||||||
|
None => SidecarRecord::new(asset.asset_id, format!("sidecars/{}.xmp", asset.asset_id)),
|
||||||
|
};
|
||||||
|
|
||||||
|
self.writer.write_sidecar(&resolved, &record.sidecar_storage_path).await?;
|
||||||
|
let hash = hash_structured_data(&resolved);
|
||||||
|
record.mark_synced(hash);
|
||||||
|
self.sidecar_repo.save(&record).await?;
|
||||||
|
count += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(count)
|
||||||
|
}
|
||||||
|
}
|
||||||
64
crates/application/src/sidecar/commands/full_import.rs
Normal file
64
crates/application/src/sidecar/commands/full_import.rs
Normal file
@@ -0,0 +1,64 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
use domain::{
|
||||||
|
catalog::entities::{AssetMetadata, MetadataSource},
|
||||||
|
entities::SidecarRecord,
|
||||||
|
errors::DomainError,
|
||||||
|
ports::{AssetRepository, AssetMetadataRepository, SidecarRepository, SidecarWriterPort},
|
||||||
|
value_objects::SystemId,
|
||||||
|
};
|
||||||
|
use crate::sidecar::hash_helper::hash_structured_data;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||||
|
pub struct FullImportCommand {
|
||||||
|
pub owner_id: SystemId,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct FullImportHandler {
|
||||||
|
asset_repo: Arc<dyn AssetRepository>,
|
||||||
|
metadata_repo: Arc<dyn AssetMetadataRepository>,
|
||||||
|
sidecar_repo: Arc<dyn SidecarRepository>,
|
||||||
|
writer: Arc<dyn SidecarWriterPort>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FullImportHandler {
|
||||||
|
pub fn new(
|
||||||
|
asset_repo: Arc<dyn AssetRepository>,
|
||||||
|
metadata_repo: Arc<dyn AssetMetadataRepository>,
|
||||||
|
sidecar_repo: Arc<dyn SidecarRepository>,
|
||||||
|
writer: Arc<dyn SidecarWriterPort>,
|
||||||
|
) -> Self {
|
||||||
|
Self { asset_repo, metadata_repo, sidecar_repo, writer }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn execute(&self, cmd: FullImportCommand) -> Result<u32, DomainError> {
|
||||||
|
let assets = self.asset_repo.find_by_owner(&cmd.owner_id, u32::MAX, 0).await?;
|
||||||
|
let mut count = 0u32;
|
||||||
|
|
||||||
|
for asset in &assets {
|
||||||
|
let record = match self.sidecar_repo.find_by_asset(&asset.asset_id).await? {
|
||||||
|
Some(r) => r,
|
||||||
|
None => {
|
||||||
|
// No sidecar record — try creating one to read from
|
||||||
|
SidecarRecord::new(asset.asset_id, format!("sidecars/{}.xmp", asset.asset_id))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
match self.writer.read_sidecar(&record.sidecar_storage_path).await {
|
||||||
|
Ok(data) => {
|
||||||
|
let metadata = AssetMetadata::new(asset.asset_id, MetadataSource::ExifExtracted, data.clone());
|
||||||
|
self.metadata_repo.save(&metadata).await?;
|
||||||
|
let hash = hash_structured_data(&data);
|
||||||
|
let mut record = record;
|
||||||
|
record.mark_synced(hash);
|
||||||
|
self.sidecar_repo.save(&record).await?;
|
||||||
|
count += 1;
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
// Sidecar file missing — skip
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(count)
|
||||||
|
}
|
||||||
|
}
|
||||||
51
crates/application/src/sidecar/commands/import_sidecar.rs
Normal file
51
crates/application/src/sidecar/commands/import_sidecar.rs
Normal file
@@ -0,0 +1,51 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
use domain::{
|
||||||
|
catalog::entities::{AssetMetadata, MetadataSource},
|
||||||
|
entities::SyncStatus,
|
||||||
|
errors::DomainError,
|
||||||
|
ports::{AssetMetadataRepository, SidecarRepository, SidecarWriterPort},
|
||||||
|
value_objects::SystemId,
|
||||||
|
};
|
||||||
|
use crate::sidecar::hash_helper::hash_structured_data;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||||
|
pub struct ImportSidecarCommand {
|
||||||
|
pub asset_id: SystemId,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct ImportSidecarHandler {
|
||||||
|
sidecar_repo: Arc<dyn SidecarRepository>,
|
||||||
|
writer: Arc<dyn SidecarWriterPort>,
|
||||||
|
metadata_repo: Arc<dyn AssetMetadataRepository>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ImportSidecarHandler {
|
||||||
|
pub fn new(
|
||||||
|
sidecar_repo: Arc<dyn SidecarRepository>,
|
||||||
|
writer: Arc<dyn SidecarWriterPort>,
|
||||||
|
metadata_repo: Arc<dyn AssetMetadataRepository>,
|
||||||
|
) -> Self {
|
||||||
|
Self { sidecar_repo, writer, metadata_repo }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn execute(&self, cmd: ImportSidecarCommand) -> Result<AssetMetadata, DomainError> {
|
||||||
|
let mut record = self.sidecar_repo.find_by_asset(&cmd.asset_id).await?
|
||||||
|
.ok_or_else(|| DomainError::NotFound(format!("Sidecar record for {} not found", cmd.asset_id)))?;
|
||||||
|
|
||||||
|
if record.sync_status != SyncStatus::PendingRead {
|
||||||
|
return Err(DomainError::Validation(
|
||||||
|
format!("Sidecar is not pending read (status: {:?})", record.sync_status),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
let data = self.writer.read_sidecar(&record.sidecar_storage_path).await?;
|
||||||
|
let metadata = AssetMetadata::new(cmd.asset_id, MetadataSource::ExifExtracted, data.clone());
|
||||||
|
self.metadata_repo.save(&metadata).await?;
|
||||||
|
|
||||||
|
let hash = hash_structured_data(&data);
|
||||||
|
record.mark_synced(hash);
|
||||||
|
self.sidecar_repo.save(&record).await?;
|
||||||
|
|
||||||
|
Ok(metadata)
|
||||||
|
}
|
||||||
|
}
|
||||||
6
crates/application/src/sidecar/commands/mod.rs
Normal file
6
crates/application/src/sidecar/commands/mod.rs
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
pub mod export_sidecar;
|
||||||
|
pub mod detect_external_changes;
|
||||||
|
pub mod import_sidecar;
|
||||||
|
pub mod resolve_conflict;
|
||||||
|
pub mod full_export;
|
||||||
|
pub mod full_import;
|
||||||
66
crates/application/src/sidecar/commands/resolve_conflict.rs
Normal file
66
crates/application/src/sidecar/commands/resolve_conflict.rs
Normal file
@@ -0,0 +1,66 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
use domain::{
|
||||||
|
catalog::entities::{AssetMetadata, MetadataSource},
|
||||||
|
catalog::services::resolve_metadata,
|
||||||
|
entities::{ConflictPolicy, SidecarRecord, SyncStatus},
|
||||||
|
errors::DomainError,
|
||||||
|
ports::{AssetMetadataRepository, SidecarRepository, SidecarWriterPort},
|
||||||
|
value_objects::SystemId,
|
||||||
|
};
|
||||||
|
use crate::sidecar::hash_helper::hash_structured_data;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||||
|
pub struct ResolveConflictCommand {
|
||||||
|
pub asset_id: SystemId,
|
||||||
|
pub policy: ConflictPolicy,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct ResolveConflictHandler {
|
||||||
|
sidecar_repo: Arc<dyn SidecarRepository>,
|
||||||
|
writer: Arc<dyn SidecarWriterPort>,
|
||||||
|
metadata_repo: Arc<dyn AssetMetadataRepository>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ResolveConflictHandler {
|
||||||
|
pub fn new(
|
||||||
|
sidecar_repo: Arc<dyn SidecarRepository>,
|
||||||
|
writer: Arc<dyn SidecarWriterPort>,
|
||||||
|
metadata_repo: Arc<dyn AssetMetadataRepository>,
|
||||||
|
) -> Self {
|
||||||
|
Self { sidecar_repo, writer, metadata_repo }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn execute(&self, cmd: ResolveConflictCommand) -> Result<SidecarRecord, DomainError> {
|
||||||
|
let mut record = self.sidecar_repo.find_by_asset(&cmd.asset_id).await?
|
||||||
|
.ok_or_else(|| DomainError::NotFound(format!("Sidecar record for {} not found", cmd.asset_id)))?;
|
||||||
|
|
||||||
|
if record.sync_status != SyncStatus::Conflict {
|
||||||
|
return Err(DomainError::Validation(
|
||||||
|
format!("Sidecar is not in conflict (status: {:?})", record.sync_status),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
match cmd.policy {
|
||||||
|
ConflictPolicy::DbWins => {
|
||||||
|
let layers = self.metadata_repo.find_by_asset(&cmd.asset_id).await?;
|
||||||
|
let resolved = resolve_metadata(&layers);
|
||||||
|
self.writer.write_sidecar(&resolved, &record.sidecar_storage_path).await?;
|
||||||
|
let hash = hash_structured_data(&resolved);
|
||||||
|
record.mark_synced(hash);
|
||||||
|
}
|
||||||
|
ConflictPolicy::FileWins => {
|
||||||
|
let data = self.writer.read_sidecar(&record.sidecar_storage_path).await?;
|
||||||
|
let metadata = AssetMetadata::new(cmd.asset_id, MetadataSource::ExifExtracted, data.clone());
|
||||||
|
self.metadata_repo.save(&metadata).await?;
|
||||||
|
let hash = hash_structured_data(&data);
|
||||||
|
record.mark_synced(hash);
|
||||||
|
}
|
||||||
|
ConflictPolicy::RequireUserDecision => {
|
||||||
|
return Err(DomainError::Validation("Manual resolution required".to_string()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
self.sidecar_repo.save(&record).await?;
|
||||||
|
Ok(record)
|
||||||
|
}
|
||||||
|
}
|
||||||
11
crates/application/src/sidecar/hash_helper.rs
Normal file
11
crates/application/src/sidecar/hash_helper.rs
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
use domain::value_objects::{Checksum, StructuredData};
|
||||||
|
use sha2::{Sha256, Digest};
|
||||||
|
|
||||||
|
pub fn hash_structured_data(data: &StructuredData) -> Checksum {
|
||||||
|
let json = serde_json::to_string(data).unwrap_or_default();
|
||||||
|
let mut hasher = Sha256::new();
|
||||||
|
hasher.update(json.as_bytes());
|
||||||
|
let result = hasher.finalize();
|
||||||
|
let hex = format!("{:x}", result);
|
||||||
|
Checksum::new(hex).expect("SHA-256 always produces valid 64-char hex")
|
||||||
|
}
|
||||||
@@ -1 +1,9 @@
|
|||||||
// Sidecar commands/queries (future: SyncSidecar, ExportMetadata, etc.)
|
pub mod commands;
|
||||||
|
pub mod hash_helper;
|
||||||
|
|
||||||
|
pub use commands::export_sidecar::{ExportSidecarCommand, ExportSidecarHandler};
|
||||||
|
pub use commands::detect_external_changes::{DetectExternalChangesCommand, DetectExternalChangesHandler};
|
||||||
|
pub use commands::import_sidecar::{ImportSidecarCommand, ImportSidecarHandler};
|
||||||
|
pub use commands::resolve_conflict::{ResolveConflictCommand, ResolveConflictHandler};
|
||||||
|
pub use commands::full_export::{FullExportCommand, FullExportHandler};
|
||||||
|
pub use commands::full_import::{FullImportCommand, FullImportHandler};
|
||||||
|
|||||||
@@ -109,6 +109,41 @@ impl SidecarWriterPort for StubSidecarWriter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// --- InMemorySidecarWriter ---
|
||||||
|
|
||||||
|
pub struct InMemorySidecarWriter {
|
||||||
|
data: Mutex<HashMap<String, StructuredData>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl InMemorySidecarWriter {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self { data: Mutex::new(HashMap::new()) }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get(&self, path: &str) -> Option<StructuredData> {
|
||||||
|
self.data.lock().await.get(path).cloned()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for InMemorySidecarWriter {
|
||||||
|
fn default() -> Self { Self::new() }
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl SidecarWriterPort for InMemorySidecarWriter {
|
||||||
|
fn format_name(&self) -> &str { "in-memory" }
|
||||||
|
|
||||||
|
async fn write_sidecar(&self, data: &StructuredData, path: &str) -> Result<(), DomainError> {
|
||||||
|
self.data.lock().await.insert(path.to_string(), data.clone());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn read_sidecar(&self, path: &str) -> Result<StructuredData, DomainError> {
|
||||||
|
self.data.lock().await.get(path).cloned()
|
||||||
|
.ok_or_else(|| DomainError::NotFound(format!("Sidecar not found: {path}")))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// --- StubPasswordHasher ---
|
// --- StubPasswordHasher ---
|
||||||
|
|
||||||
pub struct StubPasswordHasher;
|
pub struct StubPasswordHasher;
|
||||||
|
|||||||
@@ -4,16 +4,18 @@ use tokio::sync::Mutex;
|
|||||||
use domain::{
|
use domain::{
|
||||||
entities::{
|
entities::{
|
||||||
Album, Asset, AssetMetadata, AssetTag, DuplicateGroup, DuplicateStatus,
|
Album, Asset, AssetMetadata, AssetTag, DuplicateGroup, DuplicateStatus,
|
||||||
Group, IngestSession, InviteCode, Job, JobStatus, LibraryPath,
|
Group, IngestSession, InviteCode, Job, JobBatch, JobStatus, LibraryPath,
|
||||||
MetadataSource, QuotaDefinition, Role, ShareLink, ShareScope, ShareTarget,
|
MetadataSource, Plugin, ProcessingPipeline, QuotaDefinition, Role,
|
||||||
|
ShareLink, ShareScope, ShareTarget, SidecarRecord, SyncStatus,
|
||||||
StorageVolume, Tag, UsageLedgerEntry, UsageType, User,
|
StorageVolume, Tag, UsageLedgerEntry, UsageType, User,
|
||||||
},
|
},
|
||||||
errors::DomainError,
|
errors::DomainError,
|
||||||
ports::{
|
ports::{
|
||||||
AlbumRepository, AssetMetadataRepository, AssetRepository,
|
AlbumRepository, AssetMetadataRepository, AssetRepository,
|
||||||
DuplicateRepository, GroupRepository, IngestSessionRepository,
|
DuplicateRepository, GroupRepository, IngestSessionRepository,
|
||||||
JobRepository, LibraryPathRepository, QuotaRepository,
|
JobBatchRepository, JobRepository, LibraryPathRepository,
|
||||||
RoleRepository, ShareRepository, StorageVolumeRepository,
|
PipelineRepository, PluginRepository, QuotaRepository,
|
||||||
|
RoleRepository, ShareRepository, SidecarRepository, StorageVolumeRepository,
|
||||||
TagRepository, UsageLedgerRepository, UserRepository,
|
TagRepository, UsageLedgerRepository, UserRepository,
|
||||||
},
|
},
|
||||||
value_objects::{Checksum, DateTimeStamp, Email, SystemId},
|
value_objects::{Checksum, DateTimeStamp, Email, SystemId},
|
||||||
@@ -716,3 +718,141 @@ impl DuplicateRepository for InMemoryDuplicateRepository {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// --- InMemorySidecarRepository ---
|
||||||
|
|
||||||
|
pub struct InMemorySidecarRepository {
|
||||||
|
data: Mutex<HashMap<String, SidecarRecord>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl InMemorySidecarRepository {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self { data: Mutex::new(HashMap::new()) }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for InMemorySidecarRepository {
|
||||||
|
fn default() -> Self { Self::new() }
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl SidecarRepository for InMemorySidecarRepository {
|
||||||
|
async fn find_by_asset(&self, asset_id: &SystemId) -> Result<Option<SidecarRecord>, DomainError> {
|
||||||
|
Ok(self.data.lock().await.get(&asset_id.to_string()).cloned())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn find_by_status(&self, status: SyncStatus) -> Result<Vec<SidecarRecord>, DomainError> {
|
||||||
|
Ok(self.data.lock().await.values()
|
||||||
|
.filter(|r| r.sync_status == status)
|
||||||
|
.cloned()
|
||||||
|
.collect())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn save(&self, record: &SidecarRecord) -> Result<(), DomainError> {
|
||||||
|
self.data.lock().await.insert(record.asset_id.to_string(), record.clone());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn delete(&self, asset_id: &SystemId) -> Result<(), DomainError> {
|
||||||
|
self.data.lock().await.remove(&asset_id.to_string());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- InMemoryJobBatchRepository ---
|
||||||
|
|
||||||
|
pub struct InMemoryJobBatchRepository {
|
||||||
|
data: Mutex<HashMap<String, JobBatch>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl InMemoryJobBatchRepository {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self { data: Mutex::new(HashMap::new()) }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for InMemoryJobBatchRepository {
|
||||||
|
fn default() -> Self { Self::new() }
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl JobBatchRepository for InMemoryJobBatchRepository {
|
||||||
|
async fn find_by_id(&self, id: &SystemId) -> Result<Option<JobBatch>, DomainError> {
|
||||||
|
Ok(self.data.lock().await.get(&id.to_string()).cloned())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn save(&self, batch: &JobBatch) -> Result<(), DomainError> {
|
||||||
|
self.data.lock().await.insert(batch.batch_id.to_string(), batch.clone());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- InMemoryPluginRepository ---
|
||||||
|
|
||||||
|
pub struct InMemoryPluginRepository {
|
||||||
|
data: Mutex<HashMap<String, Plugin>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl InMemoryPluginRepository {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self { data: Mutex::new(HashMap::new()) }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for InMemoryPluginRepository {
|
||||||
|
fn default() -> Self { Self::new() }
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl PluginRepository for InMemoryPluginRepository {
|
||||||
|
async fn find_by_id(&self, id: &SystemId) -> Result<Option<Plugin>, DomainError> {
|
||||||
|
Ok(self.data.lock().await.get(&id.to_string()).cloned())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn find_enabled(&self) -> Result<Vec<Plugin>, DomainError> {
|
||||||
|
Ok(self.data.lock().await.values()
|
||||||
|
.filter(|p| p.is_enabled)
|
||||||
|
.cloned()
|
||||||
|
.collect())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn save(&self, plugin: &Plugin) -> Result<(), DomainError> {
|
||||||
|
self.data.lock().await.insert(plugin.plugin_id.to_string(), plugin.clone());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- InMemoryPipelineRepository ---
|
||||||
|
|
||||||
|
pub struct InMemoryPipelineRepository {
|
||||||
|
data: Mutex<HashMap<String, ProcessingPipeline>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl InMemoryPipelineRepository {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self { data: Mutex::new(HashMap::new()) }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for InMemoryPipelineRepository {
|
||||||
|
fn default() -> Self { Self::new() }
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl PipelineRepository for InMemoryPipelineRepository {
|
||||||
|
async fn find_by_id(&self, id: &SystemId) -> Result<Option<ProcessingPipeline>, DomainError> {
|
||||||
|
Ok(self.data.lock().await.get(&id.to_string()).cloned())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn find_by_trigger(&self, event: &str) -> Result<Vec<ProcessingPipeline>, DomainError> {
|
||||||
|
Ok(self.data.lock().await.values()
|
||||||
|
.filter(|p| p.trigger_event == event)
|
||||||
|
.cloned()
|
||||||
|
.collect())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn save(&self, pipeline: &ProcessingPipeline) -> Result<(), DomainError> {
|
||||||
|
self.data.lock().await.insert(pipeline.pipeline_id.to_string(), pipeline.clone());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -3,3 +3,5 @@ mod organization;
|
|||||||
mod storage;
|
mod storage;
|
||||||
mod catalog;
|
mod catalog;
|
||||||
mod sharing;
|
mod sharing;
|
||||||
|
mod sidecar;
|
||||||
|
mod processing;
|
||||||
|
|||||||
76
crates/application/tests/processing/commands/complete_job.rs
Normal file
76
crates/application/tests/processing/commands/complete_job.rs
Normal file
@@ -0,0 +1,76 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
use application::testing::{InMemoryJobBatchRepository, InMemoryJobRepository, StubEventPublisher};
|
||||||
|
use application::processing::{CompleteJobCommand, CompleteJobHandler};
|
||||||
|
use domain::entities::{Job, JobBatch, JobStatus, JobType};
|
||||||
|
use domain::events::DomainEvent;
|
||||||
|
use domain::ports::{JobBatchRepository, JobRepository};
|
||||||
|
use domain::value_objects::StructuredData;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn completes_job() {
|
||||||
|
let job_repo = Arc::new(InMemoryJobRepository::new());
|
||||||
|
let batch_repo = Arc::new(InMemoryJobBatchRepository::new());
|
||||||
|
let event_pub = Arc::new(StubEventPublisher::new());
|
||||||
|
|
||||||
|
let mut job = Job::new(JobType::ExtractMetadata, 5, StructuredData::new());
|
||||||
|
job.start().unwrap();
|
||||||
|
let job_id = job.job_id;
|
||||||
|
job_repo.save(&job).await.unwrap();
|
||||||
|
|
||||||
|
let handler = CompleteJobHandler::new(job_repo.clone(), batch_repo.clone(), event_pub.clone());
|
||||||
|
let result = handler.execute(CompleteJobCommand {
|
||||||
|
job_id,
|
||||||
|
result: StructuredData::new(),
|
||||||
|
}).await.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(result.status, JobStatus::Completed);
|
||||||
|
assert!(result.result_data.is_some());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn completes_job_and_updates_batch() {
|
||||||
|
let job_repo = Arc::new(InMemoryJobRepository::new());
|
||||||
|
let batch_repo = Arc::new(InMemoryJobBatchRepository::new());
|
||||||
|
let event_pub = Arc::new(StubEventPublisher::new());
|
||||||
|
|
||||||
|
let batch = JobBatch::new("test-batch", 2);
|
||||||
|
let batch_id = batch.batch_id;
|
||||||
|
batch_repo.save(&batch).await.unwrap();
|
||||||
|
|
||||||
|
let mut job = Job::new(JobType::ExtractMetadata, 5, StructuredData::new())
|
||||||
|
.with_batch(batch_id);
|
||||||
|
job.start().unwrap();
|
||||||
|
let job_id = job.job_id;
|
||||||
|
job_repo.save(&job).await.unwrap();
|
||||||
|
|
||||||
|
let handler = CompleteJobHandler::new(job_repo.clone(), batch_repo.clone(), event_pub.clone());
|
||||||
|
handler.execute(CompleteJobCommand {
|
||||||
|
job_id,
|
||||||
|
result: StructuredData::new(),
|
||||||
|
}).await.unwrap();
|
||||||
|
|
||||||
|
let updated_batch = batch_repo.find_by_id(&batch_id).await.unwrap().unwrap();
|
||||||
|
assert_eq!(updated_batch.completed_count, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn publishes_event() {
|
||||||
|
let job_repo = Arc::new(InMemoryJobRepository::new());
|
||||||
|
let batch_repo = Arc::new(InMemoryJobBatchRepository::new());
|
||||||
|
let event_pub = Arc::new(StubEventPublisher::new());
|
||||||
|
|
||||||
|
let mut job = Job::new(JobType::ExtractMetadata, 5, StructuredData::new());
|
||||||
|
job.start().unwrap();
|
||||||
|
let job_id = job.job_id;
|
||||||
|
job_repo.save(&job).await.unwrap();
|
||||||
|
|
||||||
|
let handler = CompleteJobHandler::new(job_repo.clone(), batch_repo.clone(), event_pub.clone());
|
||||||
|
handler.execute(CompleteJobCommand {
|
||||||
|
job_id,
|
||||||
|
result: StructuredData::new(),
|
||||||
|
}).await.unwrap();
|
||||||
|
|
||||||
|
let events = event_pub.published().await;
|
||||||
|
assert_eq!(events.len(), 1);
|
||||||
|
assert!(matches!(&events[0], DomainEvent::JobCompleted { job_id: id, .. } if *id == job_id));
|
||||||
|
}
|
||||||
@@ -0,0 +1,48 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
use application::testing::{InMemoryPipelineRepository, InMemoryPluginRepository};
|
||||||
|
use application::processing::{ConfigurePipelineCommand, ConfigurePipelineHandler, PipelineStepConfig};
|
||||||
|
use domain::entities::{Plugin, PluginType};
|
||||||
|
use domain::errors::DomainError;
|
||||||
|
use domain::ports::PluginRepository;
|
||||||
|
use domain::value_objects::{StructuredData, SystemId};
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn creates_pipeline() {
|
||||||
|
let pipeline_repo = Arc::new(InMemoryPipelineRepository::new());
|
||||||
|
let plugin_repo = Arc::new(InMemoryPluginRepository::new());
|
||||||
|
|
||||||
|
let p1 = Plugin::new("EXIF", PluginType::MediaProcessor);
|
||||||
|
let p2 = Plugin::new("Thumb", PluginType::MediaProcessor);
|
||||||
|
let p1_id = p1.plugin_id;
|
||||||
|
let p2_id = p2.plugin_id;
|
||||||
|
plugin_repo.save(&p1).await.unwrap();
|
||||||
|
plugin_repo.save(&p2).await.unwrap();
|
||||||
|
|
||||||
|
let handler = ConfigurePipelineHandler::new(pipeline_repo.clone(), plugin_repo.clone());
|
||||||
|
let pipeline = handler.execute(ConfigurePipelineCommand {
|
||||||
|
trigger_event: "asset.ingested".into(),
|
||||||
|
steps: vec![
|
||||||
|
PipelineStepConfig { plugin_id: p1_id, config: StructuredData::new() },
|
||||||
|
PipelineStepConfig { plugin_id: p2_id, config: StructuredData::new() },
|
||||||
|
],
|
||||||
|
}).await.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(pipeline.trigger_event, "asset.ingested");
|
||||||
|
assert_eq!(pipeline.steps.len(), 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn rejects_nonexistent_plugin() {
|
||||||
|
let pipeline_repo = Arc::new(InMemoryPipelineRepository::new());
|
||||||
|
let plugin_repo = Arc::new(InMemoryPluginRepository::new());
|
||||||
|
|
||||||
|
let handler = ConfigurePipelineHandler::new(pipeline_repo.clone(), plugin_repo.clone());
|
||||||
|
let result = handler.execute(ConfigurePipelineCommand {
|
||||||
|
trigger_event: "asset.ingested".into(),
|
||||||
|
steps: vec![
|
||||||
|
PipelineStepConfig { plugin_id: SystemId::new(), config: StructuredData::new() },
|
||||||
|
],
|
||||||
|
}).await;
|
||||||
|
|
||||||
|
assert!(matches!(result, Err(DomainError::NotFound(_))));
|
||||||
|
}
|
||||||
65
crates/application/tests/processing/commands/enqueue_job.rs
Normal file
65
crates/application/tests/processing/commands/enqueue_job.rs
Normal file
@@ -0,0 +1,65 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
use application::testing::{InMemoryJobRepository, StubEventPublisher};
|
||||||
|
use application::processing::{EnqueueJobCommand, EnqueueJobHandler};
|
||||||
|
use domain::entities::{JobStatus, JobType};
|
||||||
|
use domain::events::DomainEvent;
|
||||||
|
use domain::value_objects::{StructuredData, SystemId};
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn enqueues_job() {
|
||||||
|
let job_repo = Arc::new(InMemoryJobRepository::new());
|
||||||
|
let event_pub = Arc::new(StubEventPublisher::new());
|
||||||
|
let handler = EnqueueJobHandler::new(job_repo.clone(), event_pub.clone());
|
||||||
|
|
||||||
|
let job = handler.execute(EnqueueJobCommand {
|
||||||
|
job_type: JobType::ExtractMetadata,
|
||||||
|
priority: 5,
|
||||||
|
payload: StructuredData::new(),
|
||||||
|
target_asset_id: None,
|
||||||
|
batch_id: None,
|
||||||
|
}).await.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(job.status, JobStatus::Queued);
|
||||||
|
assert_eq!(job.priority, 5);
|
||||||
|
assert!(job.target_asset_id.is_none());
|
||||||
|
assert!(job.batch_id.is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn enqueues_with_target_and_batch() {
|
||||||
|
let job_repo = Arc::new(InMemoryJobRepository::new());
|
||||||
|
let event_pub = Arc::new(StubEventPublisher::new());
|
||||||
|
let handler = EnqueueJobHandler::new(job_repo.clone(), event_pub.clone());
|
||||||
|
|
||||||
|
let target = SystemId::new();
|
||||||
|
let batch = SystemId::new();
|
||||||
|
let job = handler.execute(EnqueueJobCommand {
|
||||||
|
job_type: JobType::GenerateDerivative,
|
||||||
|
priority: 10,
|
||||||
|
payload: StructuredData::new(),
|
||||||
|
target_asset_id: Some(target),
|
||||||
|
batch_id: Some(batch),
|
||||||
|
}).await.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(job.target_asset_id, Some(target));
|
||||||
|
assert_eq!(job.batch_id, Some(batch));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn publishes_event() {
|
||||||
|
let job_repo = Arc::new(InMemoryJobRepository::new());
|
||||||
|
let event_pub = Arc::new(StubEventPublisher::new());
|
||||||
|
let handler = EnqueueJobHandler::new(job_repo.clone(), event_pub.clone());
|
||||||
|
|
||||||
|
let job = handler.execute(EnqueueJobCommand {
|
||||||
|
job_type: JobType::ScanDirectory,
|
||||||
|
priority: 1,
|
||||||
|
payload: StructuredData::new(),
|
||||||
|
target_asset_id: None,
|
||||||
|
batch_id: None,
|
||||||
|
}).await.unwrap();
|
||||||
|
|
||||||
|
let events = event_pub.published().await;
|
||||||
|
assert_eq!(events.len(), 1);
|
||||||
|
assert!(matches!(&events[0], DomainEvent::JobEnqueued { job_id, .. } if *job_id == job.job_id));
|
||||||
|
}
|
||||||
95
crates/application/tests/processing/commands/fail_job.rs
Normal file
95
crates/application/tests/processing/commands/fail_job.rs
Normal file
@@ -0,0 +1,95 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
use application::testing::{InMemoryJobBatchRepository, InMemoryJobRepository, StubEventPublisher};
|
||||||
|
use application::processing::{FailJobCommand, FailJobHandler};
|
||||||
|
use domain::entities::{Job, JobBatch, JobStatus, JobType};
|
||||||
|
use domain::events::DomainEvent;
|
||||||
|
use domain::ports::{JobBatchRepository, JobRepository};
|
||||||
|
use domain::value_objects::StructuredData;
|
||||||
|
|
||||||
|
fn make_handler(
|
||||||
|
job_repo: Arc<InMemoryJobRepository>,
|
||||||
|
batch_repo: Arc<InMemoryJobBatchRepository>,
|
||||||
|
event_pub: Arc<StubEventPublisher>,
|
||||||
|
) -> FailJobHandler {
|
||||||
|
FailJobHandler::new(job_repo, batch_repo, event_pub)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn retries_on_failure() {
|
||||||
|
let job_repo = Arc::new(InMemoryJobRepository::new());
|
||||||
|
let batch_repo = Arc::new(InMemoryJobBatchRepository::new());
|
||||||
|
let event_pub = Arc::new(StubEventPublisher::new());
|
||||||
|
|
||||||
|
let job = Job::new(JobType::ExtractMetadata, 5, StructuredData::new());
|
||||||
|
let job_id = job.job_id;
|
||||||
|
assert_eq!(job.retry_count, 0);
|
||||||
|
job_repo.save(&job).await.unwrap();
|
||||||
|
|
||||||
|
let handler = make_handler(job_repo.clone(), batch_repo.clone(), event_pub.clone());
|
||||||
|
let result = handler.execute(FailJobCommand {
|
||||||
|
job_id,
|
||||||
|
error: "transient error".into(),
|
||||||
|
}).await.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(result.status, JobStatus::Queued);
|
||||||
|
assert_eq!(result.retry_count, 1);
|
||||||
|
|
||||||
|
let events = event_pub.published().await;
|
||||||
|
assert!(matches!(&events[0], DomainEvent::JobEnqueued { .. }));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn fails_permanently_after_max_retries() {
|
||||||
|
let job_repo = Arc::new(InMemoryJobRepository::new());
|
||||||
|
let batch_repo = Arc::new(InMemoryJobBatchRepository::new());
|
||||||
|
let event_pub = Arc::new(StubEventPublisher::new());
|
||||||
|
|
||||||
|
let mut job = Job::new(JobType::ExtractMetadata, 5, StructuredData::new());
|
||||||
|
// Exhaust retries (max_retries=3, so fail 3 times)
|
||||||
|
job.fail("err1");
|
||||||
|
job.fail("err2");
|
||||||
|
assert_eq!(job.retry_count, 2);
|
||||||
|
assert_eq!(job.status, JobStatus::Queued); // still retryable
|
||||||
|
let job_id = job.job_id;
|
||||||
|
job_repo.save(&job).await.unwrap();
|
||||||
|
|
||||||
|
let handler = make_handler(job_repo.clone(), batch_repo.clone(), event_pub.clone());
|
||||||
|
let result = handler.execute(FailJobCommand {
|
||||||
|
job_id,
|
||||||
|
error: "fatal".into(),
|
||||||
|
}).await.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(result.status, JobStatus::Failed);
|
||||||
|
assert_eq!(result.retry_count, 3);
|
||||||
|
|
||||||
|
let events = event_pub.published().await;
|
||||||
|
assert!(matches!(&events[0], DomainEvent::JobFailed { .. }));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn updates_batch_on_permanent_failure() {
|
||||||
|
let job_repo = Arc::new(InMemoryJobRepository::new());
|
||||||
|
let batch_repo = Arc::new(InMemoryJobBatchRepository::new());
|
||||||
|
let event_pub = Arc::new(StubEventPublisher::new());
|
||||||
|
|
||||||
|
let batch = JobBatch::new("test-batch", 2);
|
||||||
|
let batch_id = batch.batch_id;
|
||||||
|
batch_repo.save(&batch).await.unwrap();
|
||||||
|
|
||||||
|
let mut job = Job::new(JobType::ExtractMetadata, 5, StructuredData::new())
|
||||||
|
.with_batch(batch_id);
|
||||||
|
// Exhaust retries
|
||||||
|
job.fail("err1");
|
||||||
|
job.fail("err2");
|
||||||
|
let job_id = job.job_id;
|
||||||
|
job_repo.save(&job).await.unwrap();
|
||||||
|
|
||||||
|
let handler = make_handler(job_repo.clone(), batch_repo.clone(), event_pub.clone());
|
||||||
|
handler.execute(FailJobCommand {
|
||||||
|
job_id,
|
||||||
|
error: "permanent failure".into(),
|
||||||
|
}).await.unwrap();
|
||||||
|
|
||||||
|
let updated_batch = batch_repo.find_by_id(&batch_id).await.unwrap().unwrap();
|
||||||
|
assert_eq!(updated_batch.failed_count, 1);
|
||||||
|
}
|
||||||
@@ -0,0 +1,61 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
use application::testing::InMemoryPluginRepository;
|
||||||
|
use application::processing::{ManagePluginCommand, ManagePluginHandler, PluginAction};
|
||||||
|
use domain::entities::{Plugin, PluginType};
|
||||||
|
use domain::ports::PluginRepository;
|
||||||
|
use domain::value_objects::StructuredData;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn creates_plugin() {
|
||||||
|
let plugin_repo = Arc::new(InMemoryPluginRepository::new());
|
||||||
|
let handler = ManagePluginHandler::new(plugin_repo.clone());
|
||||||
|
|
||||||
|
let plugin = handler.execute(ManagePluginCommand {
|
||||||
|
plugin_id: None,
|
||||||
|
action: PluginAction::Create {
|
||||||
|
name: "EXIF Extractor".into(),
|
||||||
|
plugin_type: PluginType::MediaProcessor,
|
||||||
|
config: StructuredData::new(),
|
||||||
|
},
|
||||||
|
}).await.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(plugin.name, "EXIF Extractor");
|
||||||
|
assert_eq!(plugin.plugin_type, PluginType::MediaProcessor);
|
||||||
|
assert!(plugin.is_enabled);
|
||||||
|
|
||||||
|
let saved = plugin_repo.find_by_id(&plugin.plugin_id).await.unwrap();
|
||||||
|
assert!(saved.is_some());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn enables_plugin() {
|
||||||
|
let plugin_repo = Arc::new(InMemoryPluginRepository::new());
|
||||||
|
let mut plugin = Plugin::new("Test", PluginType::ScheduledTask);
|
||||||
|
plugin.disable();
|
||||||
|
let plugin_id = plugin.plugin_id;
|
||||||
|
plugin_repo.save(&plugin).await.unwrap();
|
||||||
|
|
||||||
|
let handler = ManagePluginHandler::new(plugin_repo.clone());
|
||||||
|
let result = handler.execute(ManagePluginCommand {
|
||||||
|
plugin_id: Some(plugin_id),
|
||||||
|
action: PluginAction::Enable,
|
||||||
|
}).await.unwrap();
|
||||||
|
|
||||||
|
assert!(result.is_enabled);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn disables_plugin() {
|
||||||
|
let plugin_repo = Arc::new(InMemoryPluginRepository::new());
|
||||||
|
let plugin = Plugin::new("Test", PluginType::SidecarWriter);
|
||||||
|
let plugin_id = plugin.plugin_id;
|
||||||
|
plugin_repo.save(&plugin).await.unwrap();
|
||||||
|
|
||||||
|
let handler = ManagePluginHandler::new(plugin_repo.clone());
|
||||||
|
let result = handler.execute(ManagePluginCommand {
|
||||||
|
plugin_id: Some(plugin_id),
|
||||||
|
action: PluginAction::Disable,
|
||||||
|
}).await.unwrap();
|
||||||
|
|
||||||
|
assert!(!result.is_enabled);
|
||||||
|
}
|
||||||
6
crates/application/tests/processing/commands/mod.rs
Normal file
6
crates/application/tests/processing/commands/mod.rs
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
mod enqueue_job;
|
||||||
|
mod start_job;
|
||||||
|
mod complete_job;
|
||||||
|
mod fail_job;
|
||||||
|
mod manage_plugin;
|
||||||
|
mod configure_pipeline;
|
||||||
35
crates/application/tests/processing/commands/start_job.rs
Normal file
35
crates/application/tests/processing/commands/start_job.rs
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
use application::testing::InMemoryJobRepository;
|
||||||
|
use application::processing::{StartJobCommand, StartJobHandler};
|
||||||
|
use domain::entities::{Job, JobStatus, JobType};
|
||||||
|
use domain::errors::DomainError;
|
||||||
|
use domain::ports::JobRepository;
|
||||||
|
use domain::value_objects::StructuredData;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn starts_queued_job() {
|
||||||
|
let job_repo = Arc::new(InMemoryJobRepository::new());
|
||||||
|
let job = Job::new(JobType::ExtractMetadata, 5, StructuredData::new());
|
||||||
|
let job_id = job.job_id;
|
||||||
|
job_repo.save(&job).await.unwrap();
|
||||||
|
|
||||||
|
let handler = StartJobHandler::new(job_repo.clone());
|
||||||
|
let result = handler.execute(StartJobCommand { job_id }).await.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(result.status, JobStatus::Processing);
|
||||||
|
assert!(result.started_at.is_some());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn rejects_non_queued_job() {
|
||||||
|
let job_repo = Arc::new(InMemoryJobRepository::new());
|
||||||
|
let mut job = Job::new(JobType::ExtractMetadata, 5, StructuredData::new());
|
||||||
|
job.start().unwrap(); // now Processing
|
||||||
|
let job_id = job.job_id;
|
||||||
|
job_repo.save(&job).await.unwrap();
|
||||||
|
|
||||||
|
let handler = StartJobHandler::new(job_repo.clone());
|
||||||
|
let result = handler.execute(StartJobCommand { job_id }).await;
|
||||||
|
|
||||||
|
assert!(matches!(result, Err(DomainError::Conflict(_))));
|
||||||
|
}
|
||||||
2
crates/application/tests/processing/mod.rs
Normal file
2
crates/application/tests/processing/mod.rs
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
mod commands;
|
||||||
|
mod queries;
|
||||||
1
crates/application/tests/processing/queries/mod.rs
Normal file
1
crates/application/tests/processing/queries/mod.rs
Normal file
@@ -0,0 +1 @@
|
|||||||
|
mod report_batch_progress;
|
||||||
@@ -0,0 +1,41 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
use application::testing::{InMemoryJobBatchRepository, InMemoryJobRepository};
|
||||||
|
use application::processing::{ReportBatchProgressQuery, ReportBatchProgressHandler};
|
||||||
|
use domain::entities::{Job, JobBatch, JobType};
|
||||||
|
use domain::errors::DomainError;
|
||||||
|
use domain::ports::{JobBatchRepository, JobRepository};
|
||||||
|
use domain::value_objects::{StructuredData, SystemId};
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn returns_progress() {
|
||||||
|
let batch_repo = Arc::new(InMemoryJobBatchRepository::new());
|
||||||
|
let job_repo = Arc::new(InMemoryJobRepository::new());
|
||||||
|
|
||||||
|
let batch = JobBatch::new("import", 3);
|
||||||
|
let batch_id = batch.batch_id;
|
||||||
|
batch_repo.save(&batch).await.unwrap();
|
||||||
|
|
||||||
|
let j1 = Job::new(JobType::ExtractMetadata, 5, StructuredData::new()).with_batch(batch_id);
|
||||||
|
let j2 = Job::new(JobType::GenerateDerivative, 3, StructuredData::new()).with_batch(batch_id);
|
||||||
|
job_repo.save(&j1).await.unwrap();
|
||||||
|
job_repo.save(&j2).await.unwrap();
|
||||||
|
|
||||||
|
let handler = ReportBatchProgressHandler::new(batch_repo.clone(), job_repo.clone());
|
||||||
|
let progress = handler.execute(ReportBatchProgressQuery { batch_id }).await.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(progress.batch.batch_id, batch_id);
|
||||||
|
assert_eq!(progress.jobs.len(), 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn rejects_nonexistent_batch() {
|
||||||
|
let batch_repo = Arc::new(InMemoryJobBatchRepository::new());
|
||||||
|
let job_repo = Arc::new(InMemoryJobRepository::new());
|
||||||
|
|
||||||
|
let handler = ReportBatchProgressHandler::new(batch_repo.clone(), job_repo.clone());
|
||||||
|
let result = handler.execute(ReportBatchProgressQuery {
|
||||||
|
batch_id: SystemId::new(),
|
||||||
|
}).await;
|
||||||
|
|
||||||
|
assert!(matches!(result, Err(DomainError::NotFound(_))));
|
||||||
|
}
|
||||||
@@ -0,0 +1,63 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
use application::sidecar::{DetectExternalChangesCommand, DetectExternalChangesHandler};
|
||||||
|
use application::sidecar::hash_helper::hash_structured_data;
|
||||||
|
use application::testing::{InMemorySidecarRepository, InMemorySidecarWriter};
|
||||||
|
use domain::entities::{SidecarRecord, SyncStatus};
|
||||||
|
use domain::ports::{SidecarRepository, SidecarWriterPort};
|
||||||
|
use domain::value_objects::{MetadataValue, StructuredData, SystemId};
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn detects_changed_sidecar() {
|
||||||
|
let sidecar_repo = Arc::new(InMemorySidecarRepository::new());
|
||||||
|
let writer = Arc::new(InMemorySidecarWriter::new());
|
||||||
|
|
||||||
|
let asset_id = SystemId::new();
|
||||||
|
let path = format!("sidecars/{}.xmp", asset_id);
|
||||||
|
|
||||||
|
// Write original data and create InSync record with its hash
|
||||||
|
let mut original = StructuredData::new();
|
||||||
|
original.insert("title", MetadataValue::String("Old".into()));
|
||||||
|
let hash = hash_structured_data(&original);
|
||||||
|
let mut record = SidecarRecord::new(asset_id, &path);
|
||||||
|
record.mark_synced(hash);
|
||||||
|
sidecar_repo.save(&record).await.unwrap();
|
||||||
|
|
||||||
|
// Simulate external edit: write different data to sidecar file
|
||||||
|
let mut modified = StructuredData::new();
|
||||||
|
modified.insert("title", MetadataValue::String("New".into()));
|
||||||
|
writer.write_sidecar(&modified, &path).await.unwrap();
|
||||||
|
|
||||||
|
let handler = DetectExternalChangesHandler::new(sidecar_repo.clone(), writer);
|
||||||
|
let changed = handler.execute(DetectExternalChangesCommand).await.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(changed, 1);
|
||||||
|
let updated = sidecar_repo.find_by_asset(&asset_id).await.unwrap().unwrap();
|
||||||
|
assert_eq!(updated.sync_status, SyncStatus::PendingRead);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn ignores_unchanged_sidecar() {
|
||||||
|
let sidecar_repo = Arc::new(InMemorySidecarRepository::new());
|
||||||
|
let writer = Arc::new(InMemorySidecarWriter::new());
|
||||||
|
|
||||||
|
let asset_id = SystemId::new();
|
||||||
|
let path = format!("sidecars/{}.xmp", asset_id);
|
||||||
|
|
||||||
|
let mut data = StructuredData::new();
|
||||||
|
data.insert("title", MetadataValue::String("Same".into()));
|
||||||
|
let hash = hash_structured_data(&data);
|
||||||
|
|
||||||
|
let mut record = SidecarRecord::new(asset_id, &path);
|
||||||
|
record.mark_synced(hash);
|
||||||
|
sidecar_repo.save(&record).await.unwrap();
|
||||||
|
|
||||||
|
// Write identical data to sidecar file
|
||||||
|
writer.write_sidecar(&data, &path).await.unwrap();
|
||||||
|
|
||||||
|
let handler = DetectExternalChangesHandler::new(sidecar_repo.clone(), writer);
|
||||||
|
let changed = handler.execute(DetectExternalChangesCommand).await.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(changed, 0);
|
||||||
|
let updated = sidecar_repo.find_by_asset(&asset_id).await.unwrap().unwrap();
|
||||||
|
assert_eq!(updated.sync_status, SyncStatus::InSync);
|
||||||
|
}
|
||||||
49
crates/application/tests/sidecar/commands/export_sidecar.rs
Normal file
49
crates/application/tests/sidecar/commands/export_sidecar.rs
Normal file
@@ -0,0 +1,49 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
use application::sidecar::{ExportSidecarCommand, ExportSidecarHandler};
|
||||||
|
use application::testing::{InMemoryAssetMetadataRepository, InMemorySidecarRepository, InMemorySidecarWriter};
|
||||||
|
use domain::catalog::entities::{AssetMetadata, MetadataSource};
|
||||||
|
use domain::entities::SyncStatus;
|
||||||
|
use domain::ports::SidecarRepository;
|
||||||
|
use domain::value_objects::{MetadataValue, StructuredData, SystemId};
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn exports_sidecar_marks_in_sync() {
|
||||||
|
let meta_repo = Arc::new(InMemoryAssetMetadataRepository::new());
|
||||||
|
let sidecar_repo = Arc::new(InMemorySidecarRepository::new());
|
||||||
|
let writer = Arc::new(InMemorySidecarWriter::new());
|
||||||
|
|
||||||
|
let asset_id = SystemId::new();
|
||||||
|
let mut data = StructuredData::new();
|
||||||
|
data.insert("title", MetadataValue::String("Beach".into()));
|
||||||
|
let metadata = AssetMetadata::new(asset_id, MetadataSource::UserEdited, data);
|
||||||
|
use domain::ports::AssetMetadataRepository;
|
||||||
|
meta_repo.save(&metadata).await.unwrap();
|
||||||
|
|
||||||
|
let handler = ExportSidecarHandler::new(meta_repo, sidecar_repo.clone(), writer.clone());
|
||||||
|
let record = handler.execute(ExportSidecarCommand { asset_id }).await.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(record.sync_status, SyncStatus::InSync);
|
||||||
|
assert!(record.last_known_file_hash.is_some());
|
||||||
|
|
||||||
|
let written = writer.get(&record.sidecar_storage_path).await;
|
||||||
|
assert!(written.is_some());
|
||||||
|
assert_eq!(written.unwrap().get_string("title"), Some("Beach"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn creates_new_record_if_none_exists() {
|
||||||
|
let meta_repo = Arc::new(InMemoryAssetMetadataRepository::new());
|
||||||
|
let sidecar_repo = Arc::new(InMemorySidecarRepository::new());
|
||||||
|
let writer = Arc::new(InMemorySidecarWriter::new());
|
||||||
|
|
||||||
|
let asset_id = SystemId::new();
|
||||||
|
|
||||||
|
let handler = ExportSidecarHandler::new(meta_repo, sidecar_repo.clone(), writer);
|
||||||
|
let record = handler.execute(ExportSidecarCommand { asset_id }).await.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(record.asset_id, asset_id);
|
||||||
|
assert_eq!(record.sync_status, SyncStatus::InSync);
|
||||||
|
|
||||||
|
let saved = sidecar_repo.find_by_asset(&asset_id).await.unwrap();
|
||||||
|
assert!(saved.is_some());
|
||||||
|
}
|
||||||
45
crates/application/tests/sidecar/commands/full_export.rs
Normal file
45
crates/application/tests/sidecar/commands/full_export.rs
Normal file
@@ -0,0 +1,45 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
use application::sidecar::{FullExportCommand, FullExportHandler};
|
||||||
|
use application::testing::{
|
||||||
|
InMemoryAssetRepository, InMemoryAssetMetadataRepository,
|
||||||
|
InMemorySidecarRepository, InMemorySidecarWriter,
|
||||||
|
};
|
||||||
|
use domain::catalog::entities::{Asset, AssetMetadata, AssetType, MetadataSource, SourceReference};
|
||||||
|
use domain::ports::AssetRepository;
|
||||||
|
use domain::value_objects::{Checksum, MetadataValue, StructuredData, SystemId};
|
||||||
|
|
||||||
|
fn make_asset(owner: SystemId) -> Asset {
|
||||||
|
let source = SourceReference {
|
||||||
|
volume_id: SystemId::new(),
|
||||||
|
relative_path: "photos/img.jpg".into(),
|
||||||
|
checksum: Checksum::new("a".repeat(64)).unwrap(),
|
||||||
|
};
|
||||||
|
Asset::new(source, AssetType::Image, "image/jpeg", 1024, owner)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn exports_all_user_assets() {
|
||||||
|
let asset_repo = Arc::new(InMemoryAssetRepository::new());
|
||||||
|
let meta_repo = Arc::new(InMemoryAssetMetadataRepository::new());
|
||||||
|
let sidecar_repo = Arc::new(InMemorySidecarRepository::new());
|
||||||
|
let writer = Arc::new(InMemorySidecarWriter::new());
|
||||||
|
|
||||||
|
let owner = SystemId::new();
|
||||||
|
let a1 = make_asset(owner);
|
||||||
|
let a2 = make_asset(owner);
|
||||||
|
asset_repo.save(&a1).await.unwrap();
|
||||||
|
asset_repo.save(&a2).await.unwrap();
|
||||||
|
|
||||||
|
let mut data = StructuredData::new();
|
||||||
|
data.insert("title", MetadataValue::String("Sunset".into()));
|
||||||
|
use domain::ports::AssetMetadataRepository;
|
||||||
|
meta_repo.save(&AssetMetadata::new(a1.asset_id, MetadataSource::UserEdited, data)).await.unwrap();
|
||||||
|
|
||||||
|
let handler = FullExportHandler::new(asset_repo, meta_repo, sidecar_repo, writer.clone());
|
||||||
|
let count = handler.execute(FullExportCommand { owner_id: owner }).await.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(count, 2);
|
||||||
|
|
||||||
|
let written = writer.get(&format!("sidecars/{}.xmp", a1.asset_id)).await;
|
||||||
|
assert!(written.is_some());
|
||||||
|
}
|
||||||
65
crates/application/tests/sidecar/commands/full_import.rs
Normal file
65
crates/application/tests/sidecar/commands/full_import.rs
Normal file
@@ -0,0 +1,65 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
use application::sidecar::{FullImportCommand, FullImportHandler};
|
||||||
|
use application::testing::{
|
||||||
|
InMemoryAssetRepository, InMemoryAssetMetadataRepository,
|
||||||
|
InMemorySidecarRepository, InMemorySidecarWriter,
|
||||||
|
};
|
||||||
|
use domain::catalog::entities::{Asset, AssetType, MetadataSource, SourceReference};
|
||||||
|
use domain::entities::SidecarRecord;
|
||||||
|
use domain::ports::{AssetMetadataRepository, AssetRepository, SidecarRepository, SidecarWriterPort};
|
||||||
|
use domain::value_objects::{Checksum, MetadataValue, StructuredData, SystemId};
|
||||||
|
|
||||||
|
fn make_asset(owner: SystemId) -> Asset {
|
||||||
|
let source = SourceReference {
|
||||||
|
volume_id: SystemId::new(),
|
||||||
|
relative_path: "photos/img.jpg".into(),
|
||||||
|
checksum: Checksum::new("b".repeat(64)).unwrap(),
|
||||||
|
};
|
||||||
|
Asset::new(source, AssetType::Image, "image/jpeg", 2048, owner)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn imports_from_existing_sidecars() {
|
||||||
|
let asset_repo = Arc::new(InMemoryAssetRepository::new());
|
||||||
|
let meta_repo = Arc::new(InMemoryAssetMetadataRepository::new());
|
||||||
|
let sidecar_repo = Arc::new(InMemorySidecarRepository::new());
|
||||||
|
let writer = Arc::new(InMemorySidecarWriter::new());
|
||||||
|
|
||||||
|
let owner = SystemId::new();
|
||||||
|
let asset = make_asset(owner);
|
||||||
|
asset_repo.save(&asset).await.unwrap();
|
||||||
|
|
||||||
|
let path = format!("sidecars/{}.xmp", asset.asset_id);
|
||||||
|
let record = SidecarRecord::new(asset.asset_id, &path);
|
||||||
|
sidecar_repo.save(&record).await.unwrap();
|
||||||
|
|
||||||
|
let mut data = StructuredData::new();
|
||||||
|
data.insert("lens", MetadataValue::String("50mm".into()));
|
||||||
|
writer.write_sidecar(&data, &path).await.unwrap();
|
||||||
|
|
||||||
|
let handler = FullImportHandler::new(asset_repo, meta_repo.clone(), sidecar_repo, writer);
|
||||||
|
let count = handler.execute(FullImportCommand { owner_id: owner }).await.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(count, 1);
|
||||||
|
let imported = meta_repo.find_by_asset_and_source(&asset.asset_id, MetadataSource::ExifExtracted).await.unwrap();
|
||||||
|
assert!(imported.is_some());
|
||||||
|
assert_eq!(imported.unwrap().data.get_string("lens"), Some("50mm"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn skips_missing_sidecars() {
|
||||||
|
let asset_repo = Arc::new(InMemoryAssetRepository::new());
|
||||||
|
let meta_repo = Arc::new(InMemoryAssetMetadataRepository::new());
|
||||||
|
let sidecar_repo = Arc::new(InMemorySidecarRepository::new());
|
||||||
|
let writer = Arc::new(InMemorySidecarWriter::new());
|
||||||
|
|
||||||
|
let owner = SystemId::new();
|
||||||
|
let asset = make_asset(owner);
|
||||||
|
asset_repo.save(&asset).await.unwrap();
|
||||||
|
// No sidecar record, no sidecar file
|
||||||
|
|
||||||
|
let handler = FullImportHandler::new(asset_repo, meta_repo, sidecar_repo, writer);
|
||||||
|
let count = handler.execute(FullImportCommand { owner_id: owner }).await.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(count, 0);
|
||||||
|
}
|
||||||
54
crates/application/tests/sidecar/commands/import_sidecar.rs
Normal file
54
crates/application/tests/sidecar/commands/import_sidecar.rs
Normal file
@@ -0,0 +1,54 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
use application::sidecar::{ImportSidecarCommand, ImportSidecarHandler};
|
||||||
|
use application::testing::{InMemoryAssetMetadataRepository, InMemorySidecarRepository, InMemorySidecarWriter};
|
||||||
|
use domain::catalog::entities::MetadataSource;
|
||||||
|
use domain::entities::{SidecarRecord, SyncStatus};
|
||||||
|
use domain::errors::DomainError;
|
||||||
|
use domain::ports::{SidecarRepository, SidecarWriterPort};
|
||||||
|
use domain::value_objects::{MetadataValue, StructuredData, SystemId};
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn imports_pending_read_sidecar() {
|
||||||
|
let sidecar_repo = Arc::new(InMemorySidecarRepository::new());
|
||||||
|
let writer = Arc::new(InMemorySidecarWriter::new());
|
||||||
|
let meta_repo = Arc::new(InMemoryAssetMetadataRepository::new());
|
||||||
|
|
||||||
|
let asset_id = SystemId::new();
|
||||||
|
let path = format!("sidecars/{}.xmp", asset_id);
|
||||||
|
|
||||||
|
// Create PendingRead record
|
||||||
|
let mut record = SidecarRecord::new(asset_id, &path);
|
||||||
|
record.mark_pending_read();
|
||||||
|
sidecar_repo.save(&record).await.unwrap();
|
||||||
|
|
||||||
|
// Write sidecar file data
|
||||||
|
let mut data = StructuredData::new();
|
||||||
|
data.insert("camera", MetadataValue::String("Canon".into()));
|
||||||
|
writer.write_sidecar(&data, &path).await.unwrap();
|
||||||
|
|
||||||
|
let handler = ImportSidecarHandler::new(sidecar_repo.clone(), writer, meta_repo);
|
||||||
|
let metadata = handler.execute(ImportSidecarCommand { asset_id }).await.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(metadata.metadata_source, MetadataSource::ExifExtracted);
|
||||||
|
assert_eq!(metadata.data.get_string("camera"), Some("Canon"));
|
||||||
|
|
||||||
|
let updated = sidecar_repo.find_by_asset(&asset_id).await.unwrap().unwrap();
|
||||||
|
assert_eq!(updated.sync_status, SyncStatus::InSync);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn rejects_non_pending_read() {
|
||||||
|
let sidecar_repo = Arc::new(InMemorySidecarRepository::new());
|
||||||
|
let writer = Arc::new(InMemorySidecarWriter::new());
|
||||||
|
let meta_repo = Arc::new(InMemoryAssetMetadataRepository::new());
|
||||||
|
|
||||||
|
let asset_id = SystemId::new();
|
||||||
|
let record = SidecarRecord::new(asset_id, "sidecars/x.xmp");
|
||||||
|
// Default status is PendingWrite, not PendingRead
|
||||||
|
sidecar_repo.save(&record).await.unwrap();
|
||||||
|
|
||||||
|
let handler = ImportSidecarHandler::new(sidecar_repo, writer, meta_repo);
|
||||||
|
let result = handler.execute(ImportSidecarCommand { asset_id }).await;
|
||||||
|
|
||||||
|
assert!(matches!(result, Err(DomainError::Validation(_))));
|
||||||
|
}
|
||||||
6
crates/application/tests/sidecar/commands/mod.rs
Normal file
6
crates/application/tests/sidecar/commands/mod.rs
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
mod export_sidecar;
|
||||||
|
mod detect_external_changes;
|
||||||
|
mod import_sidecar;
|
||||||
|
mod resolve_conflict;
|
||||||
|
mod full_export;
|
||||||
|
mod full_import;
|
||||||
@@ -0,0 +1,85 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
use application::sidecar::{ResolveConflictCommand, ResolveConflictHandler};
|
||||||
|
use application::testing::{InMemoryAssetMetadataRepository, InMemorySidecarRepository, InMemorySidecarWriter};
|
||||||
|
use domain::catalog::entities::{AssetMetadata, MetadataSource};
|
||||||
|
use domain::entities::{ConflictPolicy, SidecarRecord, SyncStatus};
|
||||||
|
use domain::errors::DomainError;
|
||||||
|
use domain::ports::{AssetMetadataRepository, SidecarRepository, SidecarWriterPort};
|
||||||
|
use domain::value_objects::{MetadataValue, StructuredData, SystemId};
|
||||||
|
|
||||||
|
fn conflict_record(asset_id: SystemId, path: &str) -> SidecarRecord {
|
||||||
|
let mut r = SidecarRecord::new(asset_id, path);
|
||||||
|
r.mark_conflict();
|
||||||
|
r
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn db_wins_re_exports() {
|
||||||
|
let sidecar_repo = Arc::new(InMemorySidecarRepository::new());
|
||||||
|
let writer = Arc::new(InMemorySidecarWriter::new());
|
||||||
|
let meta_repo = Arc::new(InMemoryAssetMetadataRepository::new());
|
||||||
|
|
||||||
|
let asset_id = SystemId::new();
|
||||||
|
let path = format!("sidecars/{}.xmp", asset_id);
|
||||||
|
|
||||||
|
sidecar_repo.save(&conflict_record(asset_id, &path)).await.unwrap();
|
||||||
|
|
||||||
|
let mut data = StructuredData::new();
|
||||||
|
data.insert("title", MetadataValue::String("DB Value".into()));
|
||||||
|
meta_repo.save(&AssetMetadata::new(asset_id, MetadataSource::UserEdited, data)).await.unwrap();
|
||||||
|
|
||||||
|
let handler = ResolveConflictHandler::new(sidecar_repo.clone(), writer.clone(), meta_repo);
|
||||||
|
let record = handler.execute(ResolveConflictCommand {
|
||||||
|
asset_id,
|
||||||
|
policy: ConflictPolicy::DbWins,
|
||||||
|
}).await.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(record.sync_status, SyncStatus::InSync);
|
||||||
|
let written = writer.get(&path).await.unwrap();
|
||||||
|
assert_eq!(written.get_string("title"), Some("DB Value"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn file_wins_re_imports() {
|
||||||
|
let sidecar_repo = Arc::new(InMemorySidecarRepository::new());
|
||||||
|
let writer = Arc::new(InMemorySidecarWriter::new());
|
||||||
|
let meta_repo = Arc::new(InMemoryAssetMetadataRepository::new());
|
||||||
|
|
||||||
|
let asset_id = SystemId::new();
|
||||||
|
let path = format!("sidecars/{}.xmp", asset_id);
|
||||||
|
|
||||||
|
sidecar_repo.save(&conflict_record(asset_id, &path)).await.unwrap();
|
||||||
|
|
||||||
|
let mut file_data = StructuredData::new();
|
||||||
|
file_data.insert("title", MetadataValue::String("File Value".into()));
|
||||||
|
writer.write_sidecar(&file_data, &path).await.unwrap();
|
||||||
|
|
||||||
|
let handler = ResolveConflictHandler::new(sidecar_repo.clone(), writer, meta_repo.clone());
|
||||||
|
let record = handler.execute(ResolveConflictCommand {
|
||||||
|
asset_id,
|
||||||
|
policy: ConflictPolicy::FileWins,
|
||||||
|
}).await.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(record.sync_status, SyncStatus::InSync);
|
||||||
|
let imported = meta_repo.find_by_asset_and_source(&asset_id, MetadataSource::ExifExtracted).await.unwrap();
|
||||||
|
assert!(imported.is_some());
|
||||||
|
assert_eq!(imported.unwrap().data.get_string("title"), Some("File Value"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn user_decision_returns_error() {
|
||||||
|
let sidecar_repo = Arc::new(InMemorySidecarRepository::new());
|
||||||
|
let writer = Arc::new(InMemorySidecarWriter::new());
|
||||||
|
let meta_repo = Arc::new(InMemoryAssetMetadataRepository::new());
|
||||||
|
|
||||||
|
let asset_id = SystemId::new();
|
||||||
|
sidecar_repo.save(&conflict_record(asset_id, "sidecars/x.xmp")).await.unwrap();
|
||||||
|
|
||||||
|
let handler = ResolveConflictHandler::new(sidecar_repo, writer, meta_repo);
|
||||||
|
let result = handler.execute(ResolveConflictCommand {
|
||||||
|
asset_id,
|
||||||
|
policy: ConflictPolicy::RequireUserDecision,
|
||||||
|
}).await;
|
||||||
|
|
||||||
|
assert!(matches!(result, Err(DomainError::Validation(msg)) if msg.contains("Manual")));
|
||||||
|
}
|
||||||
1
crates/application/tests/sidecar/mod.rs
Normal file
1
crates/application/tests/sidecar/mod.rs
Normal file
@@ -0,0 +1 @@
|
|||||||
|
mod commands;
|
||||||
Reference in New Issue
Block a user