From 536bf3463a69fb7c18fbf40ce1992922133cbdaf Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Sun, 31 May 2026 05:13:47 +0200 Subject: [PATCH] app: add catalog commands/queries (RegisterAsset, UpdateMetadata, GetTimeline, GetAsset) --- .../application/src/catalog/commands/mod.rs | 2 + .../src/catalog/commands/register_asset.rs | 66 ++++++++++++++ .../src/catalog/commands/update_metadata.rs | 47 ++++++++++ crates/application/src/catalog/mod.rs | 8 +- .../src/catalog/queries/get_asset.rs | 37 ++++++++ .../src/catalog/queries/get_timeline.rs | 42 +++++++++ crates/application/src/catalog/queries/mod.rs | 2 + crates/application/tests/app_tests.rs | 1 + .../application/tests/catalog/commands/mod.rs | 2 + .../tests/catalog/commands/register_asset.rs | 85 +++++++++++++++++++ .../tests/catalog/commands/update_metadata.rs | 64 ++++++++++++++ crates/application/tests/catalog/mod.rs | 2 + .../tests/catalog/queries/get_asset.rs | 58 +++++++++++++ .../tests/catalog/queries/get_timeline.rs | 54 ++++++++++++ .../application/tests/catalog/queries/mod.rs | 2 + 15 files changed, 471 insertions(+), 1 deletion(-) create mode 100644 crates/application/src/catalog/commands/mod.rs create mode 100644 crates/application/src/catalog/commands/register_asset.rs create mode 100644 crates/application/src/catalog/commands/update_metadata.rs create mode 100644 crates/application/src/catalog/queries/get_asset.rs create mode 100644 crates/application/src/catalog/queries/get_timeline.rs create mode 100644 crates/application/src/catalog/queries/mod.rs create mode 100644 crates/application/tests/catalog/commands/mod.rs create mode 100644 crates/application/tests/catalog/commands/register_asset.rs create mode 100644 crates/application/tests/catalog/commands/update_metadata.rs create mode 100644 crates/application/tests/catalog/mod.rs create mode 100644 crates/application/tests/catalog/queries/get_asset.rs create mode 100644 crates/application/tests/catalog/queries/get_timeline.rs create mode 100644 crates/application/tests/catalog/queries/mod.rs diff --git a/crates/application/src/catalog/commands/mod.rs b/crates/application/src/catalog/commands/mod.rs new file mode 100644 index 0000000..62db3fd --- /dev/null +++ b/crates/application/src/catalog/commands/mod.rs @@ -0,0 +1,2 @@ +pub mod register_asset; +pub mod update_metadata; diff --git a/crates/application/src/catalog/commands/register_asset.rs b/crates/application/src/catalog/commands/register_asset.rs new file mode 100644 index 0000000..c74e587 --- /dev/null +++ b/crates/application/src/catalog/commands/register_asset.rs @@ -0,0 +1,66 @@ +use std::sync::Arc; +use domain::{ + catalog::entities::{Asset, AssetType, DuplicateGroup, SourceReference}, + errors::DomainError, + events::DomainEvent, + ports::{AssetRepository, DuplicateRepository, EventPublisher}, + value_objects::{Checksum, DateTimeStamp, SystemId}, +}; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct RegisterAssetCommand { + pub volume_id: SystemId, + pub relative_path: String, + pub checksum: String, + pub asset_type: AssetType, + pub mime_type: String, + pub file_size: u64, + pub owner_id: SystemId, +} + +pub struct RegisterAssetHandler { + asset_repo: Arc, + duplicate_repo: Arc, + event_pub: Arc, +} + +impl RegisterAssetHandler { + pub fn new( + asset_repo: Arc, + duplicate_repo: Arc, + event_pub: Arc, + ) -> Self { + Self { asset_repo, duplicate_repo, event_pub } + } + + pub async fn execute(&self, cmd: RegisterAssetCommand) -> Result<(Asset, Option), DomainError> { + let checksum = Checksum::new(&cmd.checksum)?; + + let existing = self.asset_repo.find_by_checksum(&checksum).await?; + + let source_ref = SourceReference { + volume_id: cmd.volume_id, + relative_path: cmd.relative_path, + checksum, + }; + + let asset = Asset::new(source_ref, cmd.asset_type, cmd.mime_type, cmd.file_size, cmd.owner_id); + self.asset_repo.save(&asset).await?; + + let dup_group = if let Some(first) = existing.first() { + let group = DuplicateGroup::new_exact(first.asset_id, asset.asset_id); + self.duplicate_repo.save(&group).await?; + Some(group) + } else { + None + }; + + self.event_pub.publish(DomainEvent::AssetIngested { + asset_id: asset.asset_id, + owner_user_id: asset.owner_user_id, + timestamp: DateTimeStamp::now(), + }).await?; + + Ok((asset, dup_group)) + } +} diff --git a/crates/application/src/catalog/commands/update_metadata.rs b/crates/application/src/catalog/commands/update_metadata.rs new file mode 100644 index 0000000..584e435 --- /dev/null +++ b/crates/application/src/catalog/commands/update_metadata.rs @@ -0,0 +1,47 @@ +use std::sync::Arc; +use domain::{ + catalog::entities::{AssetMetadata, MetadataSource}, + errors::DomainError, + events::DomainEvent, + ports::{AssetRepository, AssetMetadataRepository, EventPublisher}, + value_objects::{DateTimeStamp, StructuredData, SystemId}, +}; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct UpdateMetadataCommand { + pub asset_id: SystemId, + pub user_id: SystemId, + pub data: StructuredData, +} + +pub struct UpdateMetadataHandler { + asset_repo: Arc, + metadata_repo: Arc, + event_pub: Arc, +} + +impl UpdateMetadataHandler { + pub fn new( + asset_repo: Arc, + metadata_repo: Arc, + event_pub: Arc, + ) -> Self { + Self { asset_repo, metadata_repo, event_pub } + } + + pub async fn execute(&self, cmd: UpdateMetadataCommand) -> Result { + self.asset_repo.find_by_id(&cmd.asset_id).await? + .ok_or_else(|| DomainError::NotFound(format!("Asset {} not found", cmd.asset_id)))?; + + let metadata = AssetMetadata::new(cmd.asset_id, MetadataSource::UserEdited, cmd.data); + self.metadata_repo.save(&metadata).await?; + + self.event_pub.publish(DomainEvent::MetadataUpdated { + asset_id: cmd.asset_id, + updated_by: cmd.user_id, + timestamp: DateTimeStamp::now(), + }).await?; + + Ok(metadata) + } +} diff --git a/crates/application/src/catalog/mod.rs b/crates/application/src/catalog/mod.rs index 2c9940a..3c2dbf2 100644 --- a/crates/application/src/catalog/mod.rs +++ b/crates/application/src/catalog/mod.rs @@ -1 +1,7 @@ -// Catalog commands/queries (future: SearchAssets, UpdateMetadata, etc.) +pub mod commands; +pub mod queries; + +pub use commands::register_asset::{RegisterAssetCommand, RegisterAssetHandler}; +pub use commands::update_metadata::{UpdateMetadataCommand, UpdateMetadataHandler}; +pub use queries::get_timeline::{GetTimelineQuery, GetTimelineHandler}; +pub use queries::get_asset::{GetAssetQuery, GetAssetHandler}; diff --git a/crates/application/src/catalog/queries/get_asset.rs b/crates/application/src/catalog/queries/get_asset.rs new file mode 100644 index 0000000..8dfb29d --- /dev/null +++ b/crates/application/src/catalog/queries/get_asset.rs @@ -0,0 +1,37 @@ +use std::sync::Arc; +use domain::{ + catalog::entities::Asset, + catalog::services::resolve_metadata, + errors::DomainError, + ports::{AssetRepository, AssetMetadataRepository}, + value_objects::{StructuredData, SystemId}, +}; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct GetAssetQuery { + pub asset_id: SystemId, +} + +pub struct GetAssetHandler { + asset_repo: Arc, + metadata_repo: Arc, +} + +impl GetAssetHandler { + pub fn new( + asset_repo: Arc, + metadata_repo: Arc, + ) -> Self { + Self { asset_repo, metadata_repo } + } + + pub async fn execute(&self, query: GetAssetQuery) -> Result<(Asset, StructuredData), DomainError> { + let asset = self.asset_repo.find_by_id(&query.asset_id).await? + .ok_or_else(|| DomainError::NotFound(format!("Asset {} not found", query.asset_id)))?; + + let layers = self.metadata_repo.find_by_asset(&asset.asset_id).await?; + let resolved = resolve_metadata(&layers); + + Ok((asset, resolved)) + } +} diff --git a/crates/application/src/catalog/queries/get_timeline.rs b/crates/application/src/catalog/queries/get_timeline.rs new file mode 100644 index 0000000..35872b8 --- /dev/null +++ b/crates/application/src/catalog/queries/get_timeline.rs @@ -0,0 +1,42 @@ +use std::sync::Arc; +use domain::{ + catalog::entities::Asset, + catalog::services::resolve_metadata, + errors::DomainError, + ports::{AssetRepository, AssetMetadataRepository}, + value_objects::{StructuredData, SystemId}, +}; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct GetTimelineQuery { + pub owner_id: SystemId, + pub limit: u32, + pub offset: u32, +} + +pub struct GetTimelineHandler { + asset_repo: Arc, + metadata_repo: Arc, +} + +impl GetTimelineHandler { + pub fn new( + asset_repo: Arc, + metadata_repo: Arc, + ) -> Self { + Self { asset_repo, metadata_repo } + } + + pub async fn execute(&self, query: GetTimelineQuery) -> Result, DomainError> { + let assets = self.asset_repo.find_by_owner(&query.owner_id, query.limit, query.offset).await?; + + let mut results = Vec::with_capacity(assets.len()); + for asset in assets { + let layers = self.metadata_repo.find_by_asset(&asset.asset_id).await?; + let resolved = resolve_metadata(&layers); + results.push((asset, resolved)); + } + + Ok(results) + } +} diff --git a/crates/application/src/catalog/queries/mod.rs b/crates/application/src/catalog/queries/mod.rs new file mode 100644 index 0000000..ebcc54e --- /dev/null +++ b/crates/application/src/catalog/queries/mod.rs @@ -0,0 +1,2 @@ +pub mod get_timeline; +pub mod get_asset; diff --git a/crates/application/tests/app_tests.rs b/crates/application/tests/app_tests.rs index 5310b62..06b39a9 100644 --- a/crates/application/tests/app_tests.rs +++ b/crates/application/tests/app_tests.rs @@ -1,3 +1,4 @@ mod identity; mod organization; mod storage; +mod catalog; diff --git a/crates/application/tests/catalog/commands/mod.rs b/crates/application/tests/catalog/commands/mod.rs new file mode 100644 index 0000000..89d66ad --- /dev/null +++ b/crates/application/tests/catalog/commands/mod.rs @@ -0,0 +1,2 @@ +mod register_asset; +mod update_metadata; diff --git a/crates/application/tests/catalog/commands/register_asset.rs b/crates/application/tests/catalog/commands/register_asset.rs new file mode 100644 index 0000000..26d2698 --- /dev/null +++ b/crates/application/tests/catalog/commands/register_asset.rs @@ -0,0 +1,85 @@ +use std::sync::Arc; +use application::catalog::{RegisterAssetCommand, RegisterAssetHandler}; +use application::testing::{InMemoryAssetRepository, InMemoryDuplicateRepository, StubEventPublisher}; +use domain::catalog::entities::AssetType; +use domain::value_objects::SystemId; + +fn valid_checksum() -> String { + "a".repeat(64) +} + +#[tokio::test] +async fn registers_asset() { + let asset_repo = Arc::new(InMemoryAssetRepository::new()); + let dup_repo = Arc::new(InMemoryDuplicateRepository::new()); + let events = Arc::new(StubEventPublisher::new()); + + let handler = RegisterAssetHandler::new( + asset_repo.clone(), + dup_repo.clone(), + events.clone(), + ); + + let owner = SystemId::new(); + let volume = SystemId::new(); + + let (asset, dup) = handler.execute(RegisterAssetCommand { + volume_id: volume, + relative_path: "photos/img.jpg".into(), + checksum: valid_checksum(), + asset_type: AssetType::Image, + mime_type: "image/jpeg".into(), + file_size: 1024, + owner_id: owner, + }).await.unwrap(); + + assert_eq!(asset.mime_type, "image/jpeg"); + assert_eq!(asset.file_size, 1024); + assert_eq!(asset.owner_user_id, owner); + assert!(dup.is_none()); + assert_eq!(events.published().await.len(), 1); +} + +#[tokio::test] +async fn flags_duplicate_when_checksum_exists() { + let asset_repo = Arc::new(InMemoryAssetRepository::new()); + let dup_repo = Arc::new(InMemoryDuplicateRepository::new()); + let events = Arc::new(StubEventPublisher::new()); + + let handler = RegisterAssetHandler::new( + asset_repo.clone(), + dup_repo.clone(), + events.clone(), + ); + + let owner = SystemId::new(); + let volume = SystemId::new(); + let checksum = valid_checksum(); + + // First asset + let (first, _) = handler.execute(RegisterAssetCommand { + volume_id: volume, + relative_path: "photos/img1.jpg".into(), + checksum: checksum.clone(), + asset_type: AssetType::Image, + mime_type: "image/jpeg".into(), + file_size: 1024, + owner_id: owner, + }).await.unwrap(); + + // Second asset with same checksum + let (second, dup) = handler.execute(RegisterAssetCommand { + volume_id: volume, + relative_path: "photos/img2.jpg".into(), + checksum, + asset_type: AssetType::Image, + mime_type: "image/jpeg".into(), + file_size: 1024, + owner_id: owner, + }).await.unwrap(); + + let group = dup.expect("should flag duplicate"); + let candidate_ids: Vec<_> = group.candidates.iter().map(|c| c.asset_id).collect(); + assert!(candidate_ids.contains(&first.asset_id)); + assert!(candidate_ids.contains(&second.asset_id)); +} diff --git a/crates/application/tests/catalog/commands/update_metadata.rs b/crates/application/tests/catalog/commands/update_metadata.rs new file mode 100644 index 0000000..f1e9dcd --- /dev/null +++ b/crates/application/tests/catalog/commands/update_metadata.rs @@ -0,0 +1,64 @@ +use std::sync::Arc; +use application::catalog::{UpdateMetadataCommand, UpdateMetadataHandler}; +use application::testing::{InMemoryAssetRepository, InMemoryAssetMetadataRepository, StubEventPublisher}; +use domain::catalog::entities::{Asset, AssetType, SourceReference, MetadataSource}; +use domain::errors::DomainError; +use domain::value_objects::{Checksum, MetadataValue, StructuredData, SystemId}; + +async fn seed_asset(repo: &InMemoryAssetRepository) -> Asset { + let source = SourceReference { + volume_id: SystemId::new(), + relative_path: "photos/img.jpg".into(), + checksum: Checksum::new("a".repeat(64)).unwrap(), + }; + let asset = Asset::new(source, AssetType::Image, "image/jpeg", 1024, SystemId::new()); + repo.save(&asset).await.unwrap(); + asset +} + +use domain::ports::AssetRepository; + +#[tokio::test] +async fn updates_metadata() { + let asset_repo = Arc::new(InMemoryAssetRepository::new()); + let meta_repo = Arc::new(InMemoryAssetMetadataRepository::new()); + let events = Arc::new(StubEventPublisher::new()); + + let asset = seed_asset(&asset_repo).await; + + let handler = UpdateMetadataHandler::new( + asset_repo.clone(), + meta_repo.clone(), + events.clone(), + ); + + let mut data = StructuredData::new(); + data.insert("title", MetadataValue::String("Sunset".into())); + + let result = handler.execute(UpdateMetadataCommand { + asset_id: asset.asset_id, + user_id: SystemId::new(), + data, + }).await.unwrap(); + + assert_eq!(result.metadata_source, MetadataSource::UserEdited); + assert_eq!(result.data.get_string("title"), Some("Sunset")); + assert_eq!(events.published().await.len(), 1); +} + +#[tokio::test] +async fn rejects_nonexistent_asset() { + let asset_repo = Arc::new(InMemoryAssetRepository::new()); + let meta_repo = Arc::new(InMemoryAssetMetadataRepository::new()); + let events = Arc::new(StubEventPublisher::new()); + + let handler = UpdateMetadataHandler::new(asset_repo, meta_repo, events); + + let result = handler.execute(UpdateMetadataCommand { + asset_id: SystemId::new(), + user_id: SystemId::new(), + data: StructuredData::new(), + }).await; + + assert!(matches!(result, Err(DomainError::NotFound(_)))); +} diff --git a/crates/application/tests/catalog/mod.rs b/crates/application/tests/catalog/mod.rs new file mode 100644 index 0000000..2406e7d --- /dev/null +++ b/crates/application/tests/catalog/mod.rs @@ -0,0 +1,2 @@ +mod commands; +mod queries; diff --git a/crates/application/tests/catalog/queries/get_asset.rs b/crates/application/tests/catalog/queries/get_asset.rs new file mode 100644 index 0000000..e705b76 --- /dev/null +++ b/crates/application/tests/catalog/queries/get_asset.rs @@ -0,0 +1,58 @@ +use std::sync::Arc; +use application::catalog::{GetAssetQuery, GetAssetHandler}; +use application::testing::{InMemoryAssetRepository, InMemoryAssetMetadataRepository}; +use domain::catalog::entities::{Asset, AssetMetadata, AssetType, MetadataSource, SourceReference}; +use domain::errors::DomainError; +use domain::ports::{AssetRepository, AssetMetadataRepository}; +use domain::value_objects::{Checksum, MetadataValue, StructuredData, SystemId}; + +#[tokio::test] +async fn returns_asset_with_resolved_metadata() { + let asset_repo = Arc::new(InMemoryAssetRepository::new()); + let meta_repo = Arc::new(InMemoryAssetMetadataRepository::new()); + + let source = SourceReference { + volume_id: SystemId::new(), + relative_path: "photos/img.jpg".into(), + checksum: Checksum::new("a".repeat(64)).unwrap(), + }; + let asset = Asset::new(source, AssetType::Image, "image/jpeg", 1024, SystemId::new()); + asset_repo.save(&asset).await.unwrap(); + + // Add exif layer + let mut exif_data = StructuredData::new(); + exif_data.insert("camera", MetadataValue::String("Nikon".into())); + exif_data.insert("title", MetadataValue::String("EXIF title".into())); + let exif = AssetMetadata::new(asset.asset_id, MetadataSource::ExifExtracted, exif_data); + meta_repo.save(&exif).await.unwrap(); + + // Add user layer (overrides title) + let mut user_data = StructuredData::new(); + user_data.insert("title", MetadataValue::String("My Photo".into())); + let user_meta = AssetMetadata::new(asset.asset_id, MetadataSource::UserEdited, user_data); + meta_repo.save(&user_meta).await.unwrap(); + + let handler = GetAssetHandler::new(asset_repo, meta_repo); + let (returned, resolved) = handler.execute(GetAssetQuery { + asset_id: asset.asset_id, + }).await.unwrap(); + + assert_eq!(returned.asset_id, asset.asset_id); + // UserEdited overrides ExifExtracted + assert_eq!(resolved.get_string("title"), Some("My Photo")); + // ExifExtracted field preserved + assert_eq!(resolved.get_string("camera"), Some("Nikon")); +} + +#[tokio::test] +async fn rejects_nonexistent() { + let asset_repo = Arc::new(InMemoryAssetRepository::new()); + let meta_repo = Arc::new(InMemoryAssetMetadataRepository::new()); + + let handler = GetAssetHandler::new(asset_repo, meta_repo); + let result = handler.execute(GetAssetQuery { + asset_id: SystemId::new(), + }).await; + + assert!(matches!(result, Err(DomainError::NotFound(_)))); +} diff --git a/crates/application/tests/catalog/queries/get_timeline.rs b/crates/application/tests/catalog/queries/get_timeline.rs new file mode 100644 index 0000000..e8ecbb6 --- /dev/null +++ b/crates/application/tests/catalog/queries/get_timeline.rs @@ -0,0 +1,54 @@ +use std::sync::Arc; +use application::catalog::{GetTimelineQuery, GetTimelineHandler}; +use application::testing::{InMemoryAssetRepository, InMemoryAssetMetadataRepository}; +use domain::catalog::entities::{Asset, AssetType, SourceReference}; +use domain::ports::AssetRepository; +use domain::value_objects::{Checksum, SystemId}; + +async fn seed_assets(repo: &InMemoryAssetRepository, owner: SystemId, count: usize) { + for i in 0..count { + let hex = format!("{:0>64x}", i + 1); + let source = SourceReference { + volume_id: SystemId::new(), + relative_path: format!("photos/img{i}.jpg"), + checksum: Checksum::new(hex).unwrap(), + }; + let asset = Asset::new(source, AssetType::Image, "image/jpeg", 1024, owner); + repo.save(&asset).await.unwrap(); + } +} + +#[tokio::test] +async fn returns_paginated_assets() { + let asset_repo = Arc::new(InMemoryAssetRepository::new()); + let meta_repo = Arc::new(InMemoryAssetMetadataRepository::new()); + let owner = SystemId::new(); + + seed_assets(&asset_repo, owner, 5).await; + + let handler = GetTimelineHandler::new(asset_repo, meta_repo); + + let page = handler.execute(GetTimelineQuery { + owner_id: owner, + limit: 3, + offset: 0, + }).await.unwrap(); + + assert_eq!(page.len(), 3); +} + +#[tokio::test] +async fn returns_empty_for_no_assets() { + let asset_repo = Arc::new(InMemoryAssetRepository::new()); + let meta_repo = Arc::new(InMemoryAssetMetadataRepository::new()); + + let handler = GetTimelineHandler::new(asset_repo, meta_repo); + + let page = handler.execute(GetTimelineQuery { + owner_id: SystemId::new(), + limit: 10, + offset: 0, + }).await.unwrap(); + + assert!(page.is_empty()); +} diff --git a/crates/application/tests/catalog/queries/mod.rs b/crates/application/tests/catalog/queries/mod.rs new file mode 100644 index 0000000..80aa28a --- /dev/null +++ b/crates/application/tests/catalog/queries/mod.rs @@ -0,0 +1,2 @@ +mod get_timeline; +mod get_asset;