use bytes::Bytes; use domain::{ entities::{ Asset, AssetType, IngestSession, IngestStatus, SourceReference, UsageLedgerEntry, UsageType, }, errors::DomainError, events::DomainEvent, ports::{ AssetRepository, EventPublisher, FileStoragePort, IngestSessionRepository, LibraryPathRepository, QuotaRepository, UsageLedgerRepository, }, value_objects::{Checksum, DateTimeStamp, SystemId}, }; use sha2::{Digest, Sha256}; use std::sync::Arc; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct IngestAssetCommand { pub uploader_id: SystemId, pub client_device_id: String, pub filename: String, pub target_path_id: SystemId, pub file_size: u64, #[serde(skip)] pub data: Bytes, } pub struct IngestAssetHandler { ingest_repo: Arc, path_repo: Arc, quota_repo: Arc, ledger_repo: Arc, asset_repo: Arc, file_storage: Arc, event_pub: Arc, } impl IngestAssetHandler { pub fn new( ingest_repo: Arc, path_repo: Arc, quota_repo: Arc, ledger_repo: Arc, asset_repo: Arc, file_storage: Arc, event_pub: Arc, ) -> Self { Self { ingest_repo, path_repo, quota_repo, ledger_repo, asset_repo, file_storage, event_pub, } } pub async fn execute( &self, cmd: IngestAssetCommand, ) -> Result<(Asset, IngestSession), DomainError> { let mut hasher = Sha256::new(); hasher.update(&cmd.data); let checksum_hex = format!("{:x}", hasher.finalize()); let checksum = Checksum::new(checksum_hex)?; let path = self .path_repo .find_by_id(&cmd.target_path_id) .await? .ok_or_else(|| { DomainError::NotFound(format!("Library path {} not found", cmd.target_path_id)) })?; if !path.is_ingest_destination { return Err(DomainError::Validation( "Target path is not an ingest destination".to_string(), )); } if let Some(quota) = self.quota_repo.find_by_owner(&cmd.uploader_id).await? { let current = self .ledger_repo .sum_usage(&cmd.uploader_id, UsageType::StorageBytes, None) .await?; let result = domain::storage::services::check_quota( "a, UsageType::StorageBytes, current, cmd.file_size, ); if !result.allowed { return Err(DomainError::QuotaExceeded(format!( "Storage quota exceeded: {} / {} bytes", current + cmd.file_size, result.limit ))); } } let mut session = IngestSession::new( cmd.uploader_id, &cmd.client_device_id, &cmd.filename, checksum.clone(), cmd.target_path_id, ); let storage_path = format!("{}/{}", path.relative_path, cmd.filename); self.file_storage .store_file(&storage_path, cmd.data) .await?; let mime_type = mime_type_from_filename(&cmd.filename); let asset_type = if mime_type.starts_with("video") { AssetType::Video } else { AssetType::Image }; let asset = Asset::new( SourceReference { volume_id: path.volume_id, relative_path: storage_path, checksum, }, asset_type, &mime_type, cmd.file_size, cmd.uploader_id, ); self.asset_repo.save(&asset).await?; session.advance_to(IngestStatus::AwaitingProcessing)?; self.ingest_repo.save(&session).await?; let entry = UsageLedgerEntry::new( cmd.uploader_id, UsageType::StorageBytes, cmd.file_size, format!("Ingested {}", cmd.filename), ); self.ledger_repo.record(&entry).await?; self.event_pub .publish(DomainEvent::AssetIngested { asset_id: asset.asset_id, owner_user_id: cmd.uploader_id, timestamp: DateTimeStamp::now(), }) .await?; Ok((asset, session)) } } fn mime_type_from_filename(filename: &str) -> String { let lower = filename.to_lowercase(); if lower.ends_with(".jpg") || lower.ends_with(".jpeg") { "image/jpeg".to_string() } else if lower.ends_with(".png") { "image/png".to_string() } else if lower.ends_with(".mp4") { "video/mp4".to_string() } else { "application/octet-stream".to_string() } }