diff --git a/crates/application/src/storage/commands/ingest_asset.rs b/crates/application/src/storage/commands/ingest_asset.rs new file mode 100644 index 0000000..eb63007 --- /dev/null +++ b/crates/application/src/storage/commands/ingest_asset.rs @@ -0,0 +1,129 @@ +use std::sync::Arc; +use bytes::Bytes; +use domain::{ + entities::{Asset, AssetType, IngestSession, IngestStatus, SourceReference, UsageLedgerEntry, UsageType}, + errors::DomainError, + events::DomainEvent, + ports::{ + AssetRepository, EventPublisher, FileStoragePort, + IngestSessionRepository, LibraryPathRepository, QuotaRepository, UsageLedgerRepository, + }, + value_objects::{Checksum, DateTimeStamp, SystemId}, +}; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct IngestAssetCommand { + pub uploader_id: SystemId, + pub client_device_id: String, + pub filename: String, + pub checksum: String, + pub target_path_id: SystemId, + pub file_size: u64, + #[serde(skip)] + pub data: Bytes, +} + +pub struct IngestAssetHandler { + ingest_repo: Arc, + path_repo: Arc, + quota_repo: Arc, + ledger_repo: Arc, + asset_repo: Arc, + file_storage: Arc, + event_pub: Arc, +} + +impl IngestAssetHandler { + pub fn new( + ingest_repo: Arc, + path_repo: Arc, + quota_repo: Arc, + ledger_repo: Arc, + asset_repo: Arc, + file_storage: Arc, + event_pub: Arc, + ) -> Self { + Self { ingest_repo, path_repo, quota_repo, ledger_repo, asset_repo, file_storage, event_pub } + } + + pub async fn execute(&self, cmd: IngestAssetCommand) -> Result<(Asset, IngestSession), DomainError> { + let checksum = Checksum::new(&cmd.checksum)?; + + let path = self.path_repo.find_by_id(&cmd.target_path_id).await? + .ok_or_else(|| DomainError::NotFound(format!("Library path {} not found", cmd.target_path_id)))?; + + if !path.is_ingest_destination { + return Err(DomainError::Validation("Target path is not an ingest destination".to_string())); + } + + if let Some(quota) = self.quota_repo.find_by_owner(&cmd.uploader_id).await? { + let current = self.ledger_repo.sum_usage(&cmd.uploader_id, UsageType::StorageBytes, None).await?; + let result = domain::storage::services::check_quota("a, UsageType::StorageBytes, current, cmd.file_size); + if !result.allowed { + return Err(DomainError::QuotaExceeded(format!( + "Storage quota exceeded: {} / {} bytes", current + cmd.file_size, result.limit + ))); + } + } + + let mut session = IngestSession::new( + cmd.uploader_id, + &cmd.client_device_id, + &cmd.filename, + checksum.clone(), + cmd.target_path_id, + ); + + let storage_path = format!("{}/{}", path.relative_path, cmd.filename); + self.file_storage.store_file(&storage_path, cmd.data).await?; + + let mime_type = mime_type_from_filename(&cmd.filename); + let asset_type = if mime_type.starts_with("video") { AssetType::Video } else { AssetType::Image }; + + let asset = Asset::new( + SourceReference { + volume_id: path.volume_id, + relative_path: storage_path, + checksum, + }, + asset_type, + &mime_type, + cmd.file_size, + cmd.uploader_id, + ); + + self.asset_repo.save(&asset).await?; + + session.advance_to(IngestStatus::AwaitingProcessing)?; + self.ingest_repo.save(&session).await?; + + let entry = UsageLedgerEntry::new( + cmd.uploader_id, + UsageType::StorageBytes, + cmd.file_size, + format!("Ingested {}", cmd.filename), + ); + self.ledger_repo.record(&entry).await?; + + self.event_pub.publish(DomainEvent::AssetIngested { + asset_id: asset.asset_id, + owner_user_id: cmd.uploader_id, + timestamp: DateTimeStamp::now(), + }).await?; + + Ok((asset, session)) + } +} + +fn mime_type_from_filename(filename: &str) -> String { + let lower = filename.to_lowercase(); + if lower.ends_with(".jpg") || lower.ends_with(".jpeg") { + "image/jpeg".to_string() + } else if lower.ends_with(".png") { + "image/png".to_string() + } else if lower.ends_with(".mp4") { + "video/mp4".to_string() + } else { + "application/octet-stream".to_string() + } +} diff --git a/crates/application/src/storage/commands/mod.rs b/crates/application/src/storage/commands/mod.rs new file mode 100644 index 0000000..5e7f5ef --- /dev/null +++ b/crates/application/src/storage/commands/mod.rs @@ -0,0 +1,3 @@ +pub mod register_volume; +pub mod register_library_path; +pub mod ingest_asset; diff --git a/crates/application/src/storage/commands/register_library_path.rs b/crates/application/src/storage/commands/register_library_path.rs new file mode 100644 index 0000000..1abca06 --- /dev/null +++ b/crates/application/src/storage/commands/register_library_path.rs @@ -0,0 +1,43 @@ +use std::sync::Arc; +use domain::{ + entities::LibraryPath, + errors::DomainError, + ports::{LibraryPathRepository, StorageVolumeRepository}, + value_objects::SystemId, +}; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct RegisterLibraryPathCommand { + pub volume_id: SystemId, + pub relative_path: String, + pub owner_id: SystemId, + pub is_ingest_destination: bool, +} + +pub struct RegisterLibraryPathHandler { + volume_repo: Arc, + path_repo: Arc, +} + +impl RegisterLibraryPathHandler { + pub fn new( + volume_repo: Arc, + path_repo: Arc, + ) -> Self { + Self { volume_repo, path_repo } + } + + pub async fn execute(&self, cmd: RegisterLibraryPathCommand) -> Result { + self.volume_repo.find_by_id(&cmd.volume_id).await? + .ok_or_else(|| DomainError::NotFound(format!("Volume {} not found", cmd.volume_id)))?; + + let path = LibraryPath::new_user_owned( + cmd.volume_id, + cmd.relative_path, + cmd.owner_id, + cmd.is_ingest_destination, + ); + self.path_repo.save(&path).await?; + Ok(path) + } +} diff --git a/crates/application/src/storage/commands/register_volume.rs b/crates/application/src/storage/commands/register_volume.rs new file mode 100644 index 0000000..5b6e1f6 --- /dev/null +++ b/crates/application/src/storage/commands/register_volume.rs @@ -0,0 +1,32 @@ +use std::sync::Arc; +use domain::{ + entities::StorageVolume, + errors::DomainError, + ports::StorageVolumeRepository, +}; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct RegisterVolumeCommand { + pub volume_name: String, + pub uri_prefix: String, + pub is_writable: bool, +} + +pub struct RegisterVolumeHandler { + volume_repo: Arc, +} + +impl RegisterVolumeHandler { + pub fn new(volume_repo: Arc) -> Self { + Self { volume_repo } + } + + pub async fn execute(&self, cmd: RegisterVolumeCommand) -> Result { + if cmd.volume_name.is_empty() { + return Err(DomainError::Validation("Volume name must not be empty".to_string())); + } + let volume = StorageVolume::new(cmd.volume_name, cmd.uri_prefix, cmd.is_writable); + self.volume_repo.save(&volume).await?; + Ok(volume) + } +} diff --git a/crates/application/src/storage/mod.rs b/crates/application/src/storage/mod.rs index 180ec5a..1b7b2f0 100644 --- a/crates/application/src/storage/mod.rs +++ b/crates/application/src/storage/mod.rs @@ -1 +1,7 @@ -// Storage commands/queries (future: IngestAsset, ManageVolume, etc.) +pub mod commands; +pub mod queries; + +pub use commands::register_volume::{RegisterVolumeCommand, RegisterVolumeHandler}; +pub use commands::register_library_path::{RegisterLibraryPathCommand, RegisterLibraryPathHandler}; +pub use commands::ingest_asset::{IngestAssetCommand, IngestAssetHandler}; +pub use queries::check_quota::{CheckQuotaQuery, CheckQuotaHandler}; diff --git a/crates/application/src/storage/queries/check_quota.rs b/crates/application/src/storage/queries/check_quota.rs new file mode 100644 index 0000000..31faa27 --- /dev/null +++ b/crates/application/src/storage/queries/check_quota.rs @@ -0,0 +1,45 @@ +use std::sync::Arc; +use domain::{ + entities::UsageType, + errors::DomainError, + ports::{QuotaRepository, UsageLedgerRepository}, + storage::services::{check_quota, QuotaCheckResult}, + value_objects::SystemId, +}; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct CheckQuotaQuery { + pub user_id: SystemId, + pub usage_type: UsageType, + pub requested_amount: u64, +} + +pub struct CheckQuotaHandler { + quota_repo: Arc, + ledger_repo: Arc, +} + +impl CheckQuotaHandler { + pub fn new( + quota_repo: Arc, + ledger_repo: Arc, + ) -> Self { + Self { quota_repo, ledger_repo } + } + + pub async fn execute(&self, query: CheckQuotaQuery) -> Result { + let quota = self.quota_repo.find_by_owner(&query.user_id).await?; + + let Some(quota) = quota else { + return Ok(QuotaCheckResult { + allowed: true, + current_usage: 0, + limit: 0, + is_unlimited: true, + }); + }; + + let current = self.ledger_repo.sum_usage(&query.user_id, query.usage_type, None).await?; + Ok(check_quota("a, query.usage_type, current, query.requested_amount)) + } +} diff --git a/crates/application/src/storage/queries/mod.rs b/crates/application/src/storage/queries/mod.rs new file mode 100644 index 0000000..384f573 --- /dev/null +++ b/crates/application/src/storage/queries/mod.rs @@ -0,0 +1 @@ +pub mod check_quota; diff --git a/crates/application/src/testing/repositories.rs b/crates/application/src/testing/repositories.rs index ea59c8e..66edd6a 100644 --- a/crates/application/src/testing/repositories.rs +++ b/crates/application/src/testing/repositories.rs @@ -2,13 +2,21 @@ use std::collections::HashMap; use async_trait::async_trait; use tokio::sync::Mutex; use domain::{ - entities::{Album, Asset, Group, Job, JobStatus, Role, User}, + entities::{ + Album, Asset, AssetMetadata, AssetTag, DuplicateGroup, DuplicateStatus, + Group, IngestSession, InviteCode, Job, JobStatus, LibraryPath, + MetadataSource, QuotaDefinition, Role, ShareLink, ShareScope, ShareTarget, + StorageVolume, Tag, UsageLedgerEntry, UsageType, User, + }, errors::DomainError, ports::{ - AlbumRepository, AssetRepository, GroupRepository, - JobRepository, RoleRepository, UserRepository, + AlbumRepository, AssetMetadataRepository, AssetRepository, + DuplicateRepository, GroupRepository, IngestSessionRepository, + JobRepository, LibraryPathRepository, QuotaRepository, + RoleRepository, ShareRepository, StorageVolumeRepository, + TagRepository, UsageLedgerRepository, UserRepository, }, - value_objects::{Checksum, Email, SystemId}, + value_objects::{Checksum, DateTimeStamp, Email, SystemId}, }; // --- InMemoryUserRepository --- @@ -276,3 +284,435 @@ impl GroupRepository for InMemoryGroupRepository { Ok(()) } } + +// --- InMemoryStorageVolumeRepository --- + +pub struct InMemoryStorageVolumeRepository { + data: Mutex>, +} + +impl InMemoryStorageVolumeRepository { + pub fn new() -> Self { + Self { data: Mutex::new(HashMap::new()) } + } +} + +impl Default for InMemoryStorageVolumeRepository { + fn default() -> Self { Self::new() } +} + +#[async_trait] +impl StorageVolumeRepository for InMemoryStorageVolumeRepository { + async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { + Ok(self.data.lock().await.get(&id.to_string()).cloned()) + } + + async fn find_all(&self) -> Result, DomainError> { + Ok(self.data.lock().await.values().cloned().collect()) + } + + async fn save(&self, volume: &StorageVolume) -> Result<(), DomainError> { + self.data.lock().await.insert(volume.volume_id.to_string(), volume.clone()); + Ok(()) + } + + async fn delete(&self, id: &SystemId) -> Result<(), DomainError> { + self.data.lock().await.remove(&id.to_string()); + Ok(()) + } +} + +// --- InMemoryLibraryPathRepository --- + +pub struct InMemoryLibraryPathRepository { + data: Mutex>, +} + +impl InMemoryLibraryPathRepository { + pub fn new() -> Self { + Self { data: Mutex::new(HashMap::new()) } + } +} + +impl Default for InMemoryLibraryPathRepository { + fn default() -> Self { Self::new() } +} + +#[async_trait] +impl LibraryPathRepository for InMemoryLibraryPathRepository { + async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { + Ok(self.data.lock().await.get(&id.to_string()).cloned()) + } + + async fn find_by_volume(&self, volume_id: &SystemId) -> Result, DomainError> { + Ok(self.data.lock().await.values() + .filter(|p| &p.volume_id == volume_id) + .cloned() + .collect()) + } + + async fn find_ingest_destinations(&self, owner_id: &SystemId) -> Result, DomainError> { + Ok(self.data.lock().await.values() + .filter(|p| p.is_ingest_destination && p.designated_owner_id.as_ref() == Some(owner_id)) + .cloned() + .collect()) + } + + async fn save(&self, path: &LibraryPath) -> Result<(), DomainError> { + self.data.lock().await.insert(path.path_id.to_string(), path.clone()); + Ok(()) + } + + async fn delete(&self, id: &SystemId) -> Result<(), DomainError> { + self.data.lock().await.remove(&id.to_string()); + Ok(()) + } +} + +// --- InMemoryIngestSessionRepository --- + +pub struct InMemoryIngestSessionRepository { + data: Mutex>, +} + +impl InMemoryIngestSessionRepository { + pub fn new() -> Self { + Self { data: Mutex::new(HashMap::new()) } + } +} + +impl Default for InMemoryIngestSessionRepository { + fn default() -> Self { Self::new() } +} + +#[async_trait] +impl IngestSessionRepository for InMemoryIngestSessionRepository { + async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { + Ok(self.data.lock().await.get(&id.to_string()).cloned()) + } + + async fn find_by_user(&self, user_id: &SystemId) -> Result, DomainError> { + Ok(self.data.lock().await.values() + .filter(|s| &s.uploader_user_id == user_id) + .cloned() + .collect()) + } + + async fn save(&self, session: &IngestSession) -> Result<(), DomainError> { + self.data.lock().await.insert(session.session_id.to_string(), session.clone()); + Ok(()) + } +} + +// --- InMemoryQuotaRepository --- + +pub struct InMemoryQuotaRepository { + data: Mutex>, +} + +impl InMemoryQuotaRepository { + pub fn new() -> Self { + Self { data: Mutex::new(HashMap::new()) } + } +} + +impl Default for InMemoryQuotaRepository { + fn default() -> Self { Self::new() } +} + +#[async_trait] +impl QuotaRepository for InMemoryQuotaRepository { + async fn find_by_owner(&self, owner_id: &SystemId) -> Result, DomainError> { + Ok(self.data.lock().await.values() + .find(|q| &q.owner_scope == owner_id) + .cloned()) + } + + async fn save(&self, quota: &QuotaDefinition) -> Result<(), DomainError> { + self.data.lock().await.insert(quota.quota_id.to_string(), quota.clone()); + Ok(()) + } + + async fn delete(&self, id: &SystemId) -> Result<(), DomainError> { + self.data.lock().await.remove(&id.to_string()); + Ok(()) + } +} + +// --- InMemoryUsageLedgerRepository --- + +pub struct InMemoryUsageLedgerRepository { + entries: Mutex>, +} + +impl InMemoryUsageLedgerRepository { + pub fn new() -> Self { + Self { entries: Mutex::new(Vec::new()) } + } +} + +impl Default for InMemoryUsageLedgerRepository { + fn default() -> Self { Self::new() } +} + +#[async_trait] +impl UsageLedgerRepository for InMemoryUsageLedgerRepository { + async fn record(&self, entry: &UsageLedgerEntry) -> Result<(), DomainError> { + self.entries.lock().await.push(entry.clone()); + Ok(()) + } + + async fn sum_usage( + &self, + user_id: &SystemId, + usage_type: UsageType, + since: Option, + ) -> Result { + let entries = self.entries.lock().await; + let total = entries.iter() + .filter(|e| &e.user_id == user_id && e.usage_type == usage_type) + .filter(|e| match &since { + Some(ts) => &e.timestamp >= ts, + None => true, + }) + .map(|e| e.consumed_amount) + .sum(); + Ok(total) + } +} + +// --- InMemoryAssetMetadataRepository --- + +pub struct InMemoryAssetMetadataRepository { + data: Mutex>, +} + +impl InMemoryAssetMetadataRepository { + pub fn new() -> Self { + Self { data: Mutex::new(HashMap::new()) } + } + + fn key(asset_id: &SystemId, source: MetadataSource) -> String { + format!("{asset_id}:{source:?}") + } +} + +impl Default for InMemoryAssetMetadataRepository { + fn default() -> Self { Self::new() } +} + +#[async_trait] +impl AssetMetadataRepository for InMemoryAssetMetadataRepository { + async fn find_by_asset(&self, asset_id: &SystemId) -> Result, DomainError> { + let prefix = format!("{asset_id}:"); + Ok(self.data.lock().await.iter() + .filter(|(k, _)| k.starts_with(&prefix)) + .map(|(_, v)| v.clone()) + .collect()) + } + + async fn find_by_asset_and_source(&self, asset_id: &SystemId, source: MetadataSource) -> Result, DomainError> { + Ok(self.data.lock().await.get(&Self::key(asset_id, source)).cloned()) + } + + async fn save(&self, metadata: &AssetMetadata) -> Result<(), DomainError> { + let key = Self::key(&metadata.asset_id, metadata.metadata_source); + self.data.lock().await.insert(key, metadata.clone()); + Ok(()) + } + + async fn delete_by_asset_and_source(&self, asset_id: &SystemId, source: MetadataSource) -> Result<(), DomainError> { + self.data.lock().await.remove(&Self::key(asset_id, source)); + Ok(()) + } +} + +// --- InMemoryShareRepository --- + +pub struct InMemoryShareRepository { + scopes: Mutex>, + targets: Mutex>, + links: Mutex>, + invites: Mutex>, +} + +impl InMemoryShareRepository { + pub fn new() -> Self { + Self { + scopes: Mutex::new(HashMap::new()), + targets: Mutex::new(HashMap::new()), + links: Mutex::new(HashMap::new()), + invites: Mutex::new(HashMap::new()), + } + } +} + +impl Default for InMemoryShareRepository { + fn default() -> Self { Self::new() } +} + +#[async_trait] +impl ShareRepository for InMemoryShareRepository { + async fn save_scope(&self, scope: &ShareScope) -> Result<(), DomainError> { + self.scopes.lock().await.insert(scope.scope_id.to_string(), scope.clone()); + Ok(()) + } + + async fn find_scope_by_id(&self, id: &SystemId) -> Result, DomainError> { + Ok(self.scopes.lock().await.get(&id.to_string()).cloned()) + } + + async fn find_scopes_for_resource(&self, resource_id: &SystemId) -> Result, DomainError> { + Ok(self.scopes.lock().await.values() + .filter(|s| &s.shareable_id == resource_id) + .cloned() + .collect()) + } + + async fn delete_scope(&self, id: &SystemId) -> Result<(), DomainError> { + self.scopes.lock().await.remove(&id.to_string()); + Ok(()) + } + + async fn save_target(&self, target: &ShareTarget) -> Result<(), DomainError> { + let key = format!("{}:{}", target.scope_id, target.target_id); + self.targets.lock().await.insert(key, target.clone()); + Ok(()) + } + + async fn find_targets_for_scope(&self, scope_id: &SystemId) -> Result, DomainError> { + Ok(self.targets.lock().await.values() + .filter(|t| &t.scope_id == scope_id) + .cloned() + .collect()) + } + + async fn find_targets_for_user(&self, user_id: &SystemId) -> Result, DomainError> { + Ok(self.targets.lock().await.values() + .filter(|t| &t.target_id == user_id) + .cloned() + .collect()) + } + + async fn save_link(&self, link: &ShareLink) -> Result<(), DomainError> { + self.links.lock().await.insert(link.token.clone(), link.clone()); + Ok(()) + } + + async fn find_link_by_token(&self, token: &str) -> Result, DomainError> { + Ok(self.links.lock().await.get(token).cloned()) + } + + async fn save_invite(&self, invite: &InviteCode) -> Result<(), DomainError> { + self.invites.lock().await.insert(invite.code_id.to_string(), invite.clone()); + Ok(()) + } + + async fn find_invite_by_id(&self, id: &SystemId) -> Result, DomainError> { + Ok(self.invites.lock().await.get(&id.to_string()).cloned()) + } +} + +// --- InMemoryTagRepository --- + +pub struct InMemoryTagRepository { + tags: Mutex>, + asset_tags: Mutex>, +} + +impl InMemoryTagRepository { + pub fn new() -> Self { + Self { + tags: Mutex::new(HashMap::new()), + asset_tags: Mutex::new(HashMap::new()), + } + } +} + +impl Default for InMemoryTagRepository { + fn default() -> Self { Self::new() } +} + +#[async_trait] +impl TagRepository for InMemoryTagRepository { + async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { + Ok(self.tags.lock().await.get(&id.to_string()).cloned()) + } + + async fn find_by_name(&self, name: &str) -> Result, DomainError> { + Ok(self.tags.lock().await.values() + .find(|t| t.name == name) + .cloned()) + } + + async fn find_tags_for_asset(&self, asset_id: &SystemId) -> Result, DomainError> { + let asset_tags = self.asset_tags.lock().await; + let tags = self.tags.lock().await; + let mut result = Vec::new(); + for at in asset_tags.values() { + if &at.asset_id == asset_id && let Some(tag) = tags.get(&at.tag_id.to_string()) { + result.push((tag.clone(), at.clone())); + } + } + Ok(result) + } + + async fn save_tag(&self, tag: &Tag) -> Result<(), DomainError> { + self.tags.lock().await.insert(tag.tag_id.to_string(), tag.clone()); + Ok(()) + } + + async fn save_asset_tag(&self, asset_tag: &AssetTag) -> Result<(), DomainError> { + let key = format!("{}:{}", asset_tag.asset_id, asset_tag.tag_id); + self.asset_tags.lock().await.insert(key, asset_tag.clone()); + Ok(()) + } + + async fn remove_asset_tag(&self, asset_id: &SystemId, tag_id: &SystemId) -> Result<(), DomainError> { + let key = format!("{asset_id}:{tag_id}"); + self.asset_tags.lock().await.remove(&key); + Ok(()) + } +} + +// --- InMemoryDuplicateRepository --- + +pub struct InMemoryDuplicateRepository { + data: Mutex>, +} + +impl InMemoryDuplicateRepository { + pub fn new() -> Self { + Self { data: Mutex::new(HashMap::new()) } + } +} + +impl Default for InMemoryDuplicateRepository { + fn default() -> Self { Self::new() } +} + +#[async_trait] +impl DuplicateRepository for InMemoryDuplicateRepository { + async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { + Ok(self.data.lock().await.get(&id.to_string()).cloned()) + } + + async fn find_unresolved(&self) -> Result, DomainError> { + Ok(self.data.lock().await.values() + .filter(|g| g.status == DuplicateStatus::Unresolved) + .cloned() + .collect()) + } + + async fn find_by_asset(&self, asset_id: &SystemId) -> Result, DomainError> { + Ok(self.data.lock().await.values() + .filter(|g| g.candidates.iter().any(|c| &c.asset_id == asset_id)) + .cloned() + .collect()) + } + + async fn save(&self, group: &DuplicateGroup) -> Result<(), DomainError> { + self.data.lock().await.insert(group.group_id.to_string(), group.clone()); + Ok(()) + } +} diff --git a/crates/application/tests/app_tests.rs b/crates/application/tests/app_tests.rs index 6b57d05..5310b62 100644 --- a/crates/application/tests/app_tests.rs +++ b/crates/application/tests/app_tests.rs @@ -1,2 +1,3 @@ mod identity; mod organization; +mod storage; diff --git a/crates/application/tests/storage/commands/ingest_asset.rs b/crates/application/tests/storage/commands/ingest_asset.rs new file mode 100644 index 0000000..90ed05c --- /dev/null +++ b/crates/application/tests/storage/commands/ingest_asset.rs @@ -0,0 +1,181 @@ +use std::sync::Arc; +use bytes::Bytes; +use application::testing::{ + InMemoryAssetRepository, InMemoryIngestSessionRepository, + InMemoryLibraryPathRepository, InMemoryQuotaRepository, + InMemoryStorageVolumeRepository, InMemoryUsageLedgerRepository, + InMemoryFileStorage, StubEventPublisher, +}; +use application::storage::{ + IngestAssetCommand, IngestAssetHandler, + RegisterVolumeCommand, RegisterVolumeHandler, + RegisterLibraryPathCommand, RegisterLibraryPathHandler, +}; +use domain::entities::{IngestStatus, QuotaDefinition, TimePeriod, UsageType}; +use domain::errors::DomainError; +use domain::ports::QuotaRepository; +use domain::value_objects::SystemId; + +struct Harness { + ingest_repo: Arc, + path_repo: Arc, + quota_repo: Arc, + ledger_repo: Arc, + asset_repo: Arc, + file_storage: Arc, + event_pub: Arc, + vol_repo: Arc, +} + +impl Harness { + fn new() -> Self { + Self { + ingest_repo: Arc::new(InMemoryIngestSessionRepository::new()), + path_repo: Arc::new(InMemoryLibraryPathRepository::new()), + quota_repo: Arc::new(InMemoryQuotaRepository::new()), + ledger_repo: Arc::new(InMemoryUsageLedgerRepository::new()), + asset_repo: Arc::new(InMemoryAssetRepository::new()), + file_storage: Arc::new(InMemoryFileStorage::new()), + event_pub: Arc::new(StubEventPublisher::new()), + vol_repo: Arc::new(InMemoryStorageVolumeRepository::new()), + } + } + + fn ingest_handler(&self) -> IngestAssetHandler { + IngestAssetHandler::new( + self.ingest_repo.clone(), + self.path_repo.clone(), + self.quota_repo.clone(), + self.ledger_repo.clone(), + self.asset_repo.clone(), + self.file_storage.clone(), + self.event_pub.clone(), + ) + } + + async fn setup_volume_and_path(&self, owner: SystemId) -> SystemId { + let vol_handler = RegisterVolumeHandler::new(self.vol_repo.clone()); + let vol = vol_handler.execute(RegisterVolumeCommand { + volume_name: "main".into(), + uri_prefix: "file:///data".into(), + is_writable: true, + }).await.unwrap(); + + let path_handler = RegisterLibraryPathHandler::new(self.vol_repo.clone(), self.path_repo.clone()); + let path = path_handler.execute(RegisterLibraryPathCommand { + volume_id: vol.volume_id, + relative_path: "photos/inbox".into(), + owner_id: owner, + is_ingest_destination: true, + }).await.unwrap(); + path.path_id + } +} + +fn valid_checksum() -> String { + "a".repeat(64) +} + +#[tokio::test] +async fn ingests_successfully() { + let h = Harness::new(); + let user = SystemId::new(); + let path_id = h.setup_volume_and_path(user).await; + + let handler = h.ingest_handler(); + let (asset, session) = handler.execute(IngestAssetCommand { + uploader_id: user, + client_device_id: "iphone-1".into(), + filename: "photo.jpg".into(), + checksum: valid_checksum(), + target_path_id: path_id, + file_size: 1024, + data: Bytes::from(vec![0u8; 1024]), + }).await.unwrap(); + + assert_eq!(asset.mime_type, "image/jpeg"); + assert_eq!(asset.file_size, 1024); + assert_eq!(asset.owner_user_id, user); + assert_eq!(session.status, IngestStatus::AwaitingProcessing); + assert!(!h.event_pub.published().await.is_empty()); +} + +#[tokio::test] +async fn rejects_quota_exceeded() { + let h = Harness::new(); + let user = SystemId::new(); + let path_id = h.setup_volume_and_path(user).await; + + let mut quota = QuotaDefinition::new(user); + quota.add_rule(UsageType::StorageBytes, 500, TimePeriod::Lifetime); + h.quota_repo.save("a).await.unwrap(); + + let handler = h.ingest_handler(); + let result = handler.execute(IngestAssetCommand { + uploader_id: user, + client_device_id: "iphone-1".into(), + filename: "big.jpg".into(), + checksum: valid_checksum(), + target_path_id: path_id, + file_size: 1024, + data: Bytes::from(vec![0u8; 1024]), + }).await; + + assert!(matches!(result, Err(DomainError::QuotaExceeded(_)))); +} + +#[tokio::test] +async fn rejects_invalid_checksum() { + let h = Harness::new(); + let user = SystemId::new(); + let path_id = h.setup_volume_and_path(user).await; + + let handler = h.ingest_handler(); + let result = handler.execute(IngestAssetCommand { + uploader_id: user, + client_device_id: "iphone-1".into(), + filename: "photo.jpg".into(), + checksum: "tooshort".into(), + target_path_id: path_id, + file_size: 1024, + data: Bytes::from(vec![0u8; 1024]), + }).await; + + assert!(matches!(result, Err(DomainError::Validation(_)))); +} + +#[tokio::test] +async fn rejects_non_ingest_path() { + let h = Harness::new(); + let user = SystemId::new(); + + // Create volume + non-ingest path directly + let vol_handler = RegisterVolumeHandler::new(h.vol_repo.clone()); + let vol = vol_handler.execute(RegisterVolumeCommand { + volume_name: "main".into(), + uri_prefix: "file:///data".into(), + is_writable: true, + }).await.unwrap(); + + let path = domain::entities::LibraryPath::new_user_owned( + vol.volume_id, + "photos/archive", + user, + false, // not an ingest destination + ); + use domain::ports::LibraryPathRepository; + h.path_repo.save(&path).await.unwrap(); + + let handler = h.ingest_handler(); + let result = handler.execute(IngestAssetCommand { + uploader_id: user, + client_device_id: "iphone-1".into(), + filename: "photo.jpg".into(), + checksum: valid_checksum(), + target_path_id: path.path_id, + file_size: 1024, + data: Bytes::from(vec![0u8; 1024]), + }).await; + + assert!(matches!(result, Err(DomainError::Validation(_)))); +} diff --git a/crates/application/tests/storage/commands/mod.rs b/crates/application/tests/storage/commands/mod.rs new file mode 100644 index 0000000..c15fb27 --- /dev/null +++ b/crates/application/tests/storage/commands/mod.rs @@ -0,0 +1,3 @@ +mod register_volume; +mod register_library_path; +mod ingest_asset; diff --git a/crates/application/tests/storage/commands/register_library_path.rs b/crates/application/tests/storage/commands/register_library_path.rs new file mode 100644 index 0000000..e02635f --- /dev/null +++ b/crates/application/tests/storage/commands/register_library_path.rs @@ -0,0 +1,47 @@ +use std::sync::Arc; +use application::testing::{InMemoryStorageVolumeRepository, InMemoryLibraryPathRepository}; +use application::storage::{RegisterVolumeCommand, RegisterVolumeHandler, RegisterLibraryPathCommand, RegisterLibraryPathHandler}; +use domain::errors::DomainError; +use domain::value_objects::SystemId; + +#[tokio::test] +async fn creates_path() { + let vol_repo = Arc::new(InMemoryStorageVolumeRepository::new()); + let path_repo = Arc::new(InMemoryLibraryPathRepository::new()); + + let vol_handler = RegisterVolumeHandler::new(vol_repo.clone()); + let vol = vol_handler.execute(RegisterVolumeCommand { + volume_name: "main".into(), + uri_prefix: "file:///data".into(), + is_writable: true, + }).await.unwrap(); + + let handler = RegisterLibraryPathHandler::new(vol_repo, path_repo); + let owner = SystemId::new(); + let path = handler.execute(RegisterLibraryPathCommand { + volume_id: vol.volume_id, + relative_path: "photos/inbox".into(), + owner_id: owner, + is_ingest_destination: true, + }).await.unwrap(); + + assert_eq!(path.volume_id, vol.volume_id); + assert_eq!(path.relative_path, "photos/inbox"); + assert!(path.is_ingest_destination); + assert_eq!(path.designated_owner_id, Some(owner)); +} + +#[tokio::test] +async fn rejects_nonexistent_volume() { + let vol_repo = Arc::new(InMemoryStorageVolumeRepository::new()); + let path_repo = Arc::new(InMemoryLibraryPathRepository::new()); + let handler = RegisterLibraryPathHandler::new(vol_repo, path_repo); + + let result = handler.execute(RegisterLibraryPathCommand { + volume_id: SystemId::new(), + relative_path: "photos/inbox".into(), + owner_id: SystemId::new(), + is_ingest_destination: true, + }).await; + assert!(matches!(result, Err(DomainError::NotFound(_)))); +} diff --git a/crates/application/tests/storage/commands/register_volume.rs b/crates/application/tests/storage/commands/register_volume.rs new file mode 100644 index 0000000..602fa53 --- /dev/null +++ b/crates/application/tests/storage/commands/register_volume.rs @@ -0,0 +1,30 @@ +use std::sync::Arc; +use application::testing::InMemoryStorageVolumeRepository; +use application::storage::{RegisterVolumeCommand, RegisterVolumeHandler}; +use domain::errors::DomainError; + +#[tokio::test] +async fn creates_volume() { + let repo = Arc::new(InMemoryStorageVolumeRepository::new()); + let handler = RegisterVolumeHandler::new(repo); + let vol = handler.execute(RegisterVolumeCommand { + volume_name: "primary".into(), + uri_prefix: "file:///data".into(), + is_writable: true, + }).await.unwrap(); + assert_eq!(vol.volume_name, "primary"); + assert_eq!(vol.uri_prefix, "file:///data"); + assert!(vol.is_writable); +} + +#[tokio::test] +async fn rejects_empty_name() { + let repo = Arc::new(InMemoryStorageVolumeRepository::new()); + let handler = RegisterVolumeHandler::new(repo); + let result = handler.execute(RegisterVolumeCommand { + volume_name: "".into(), + uri_prefix: "file:///data".into(), + is_writable: true, + }).await; + assert!(matches!(result, Err(DomainError::Validation(_)))); +} diff --git a/crates/application/tests/storage/mod.rs b/crates/application/tests/storage/mod.rs new file mode 100644 index 0000000..2406e7d --- /dev/null +++ b/crates/application/tests/storage/mod.rs @@ -0,0 +1,2 @@ +mod commands; +mod queries; diff --git a/crates/application/tests/storage/queries/check_quota.rs b/crates/application/tests/storage/queries/check_quota.rs new file mode 100644 index 0000000..b3a1b45 --- /dev/null +++ b/crates/application/tests/storage/queries/check_quota.rs @@ -0,0 +1,71 @@ +use std::sync::Arc; +use application::testing::{InMemoryQuotaRepository, InMemoryUsageLedgerRepository}; +use application::storage::{CheckQuotaQuery, CheckQuotaHandler}; +use domain::entities::{QuotaDefinition, TimePeriod, UsageLedgerEntry, UsageType}; +use domain::ports::UsageLedgerRepository; +use domain::value_objects::SystemId; + +#[tokio::test] +async fn returns_allowed() { + let quota_repo = Arc::new(InMemoryQuotaRepository::new()); + let ledger_repo = Arc::new(InMemoryUsageLedgerRepository::new()); + + let user = SystemId::new(); + let mut quota = QuotaDefinition::new(user); + quota.add_rule(UsageType::StorageBytes, 10_000, TimePeriod::Lifetime); + use domain::ports::QuotaRepository; + quota_repo.save("a).await.unwrap(); + + let handler = CheckQuotaHandler::new(quota_repo, ledger_repo); + let result = handler.execute(CheckQuotaQuery { + user_id: user, + usage_type: UsageType::StorageBytes, + requested_amount: 5_000, + }).await.unwrap(); + + assert!(result.allowed); + assert_eq!(result.limit, 10_000); + assert!(!result.is_unlimited); +} + +#[tokio::test] +async fn returns_denied() { + let quota_repo = Arc::new(InMemoryQuotaRepository::new()); + let ledger_repo = Arc::new(InMemoryUsageLedgerRepository::new()); + + let user = SystemId::new(); + let mut quota = QuotaDefinition::new(user); + quota.add_rule(UsageType::StorageBytes, 1_000, TimePeriod::Lifetime); + use domain::ports::QuotaRepository; + quota_repo.save("a).await.unwrap(); + + // record existing usage + let entry = UsageLedgerEntry::new(user, UsageType::StorageBytes, 900, "prior upload"); + ledger_repo.record(&entry).await.unwrap(); + + let handler = CheckQuotaHandler::new(quota_repo, ledger_repo); + let result = handler.execute(CheckQuotaQuery { + user_id: user, + usage_type: UsageType::StorageBytes, + requested_amount: 200, + }).await.unwrap(); + + assert!(!result.allowed); + assert_eq!(result.current_usage, 900); +} + +#[tokio::test] +async fn returns_unlimited_when_no_quota() { + let quota_repo = Arc::new(InMemoryQuotaRepository::new()); + let ledger_repo = Arc::new(InMemoryUsageLedgerRepository::new()); + + let handler = CheckQuotaHandler::new(quota_repo, ledger_repo); + let result = handler.execute(CheckQuotaQuery { + user_id: SystemId::new(), + usage_type: UsageType::StorageBytes, + requested_amount: 999_999, + }).await.unwrap(); + + assert!(result.allowed); + assert!(result.is_unlimited); +} diff --git a/crates/application/tests/storage/queries/mod.rs b/crates/application/tests/storage/queries/mod.rs new file mode 100644 index 0000000..9859251 --- /dev/null +++ b/crates/application/tests/storage/queries/mod.rs @@ -0,0 +1 @@ +mod check_quota;