app: add storage commands/queries + missing in-memory test repos
This commit is contained in:
129
crates/application/src/storage/commands/ingest_asset.rs
Normal file
129
crates/application/src/storage/commands/ingest_asset.rs
Normal file
@@ -0,0 +1,129 @@
|
||||
use std::sync::Arc;
|
||||
use bytes::Bytes;
|
||||
use domain::{
|
||||
entities::{Asset, AssetType, IngestSession, IngestStatus, SourceReference, UsageLedgerEntry, UsageType},
|
||||
errors::DomainError,
|
||||
events::DomainEvent,
|
||||
ports::{
|
||||
AssetRepository, EventPublisher, FileStoragePort,
|
||||
IngestSessionRepository, LibraryPathRepository, QuotaRepository, UsageLedgerRepository,
|
||||
},
|
||||
value_objects::{Checksum, DateTimeStamp, SystemId},
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||
pub struct IngestAssetCommand {
|
||||
pub uploader_id: SystemId,
|
||||
pub client_device_id: String,
|
||||
pub filename: String,
|
||||
pub checksum: String,
|
||||
pub target_path_id: SystemId,
|
||||
pub file_size: u64,
|
||||
#[serde(skip)]
|
||||
pub data: Bytes,
|
||||
}
|
||||
|
||||
pub struct IngestAssetHandler {
|
||||
ingest_repo: Arc<dyn IngestSessionRepository>,
|
||||
path_repo: Arc<dyn LibraryPathRepository>,
|
||||
quota_repo: Arc<dyn QuotaRepository>,
|
||||
ledger_repo: Arc<dyn UsageLedgerRepository>,
|
||||
asset_repo: Arc<dyn AssetRepository>,
|
||||
file_storage: Arc<dyn FileStoragePort>,
|
||||
event_pub: Arc<dyn EventPublisher>,
|
||||
}
|
||||
|
||||
impl IngestAssetHandler {
|
||||
pub fn new(
|
||||
ingest_repo: Arc<dyn IngestSessionRepository>,
|
||||
path_repo: Arc<dyn LibraryPathRepository>,
|
||||
quota_repo: Arc<dyn QuotaRepository>,
|
||||
ledger_repo: Arc<dyn UsageLedgerRepository>,
|
||||
asset_repo: Arc<dyn AssetRepository>,
|
||||
file_storage: Arc<dyn FileStoragePort>,
|
||||
event_pub: Arc<dyn EventPublisher>,
|
||||
) -> Self {
|
||||
Self { ingest_repo, path_repo, quota_repo, ledger_repo, asset_repo, file_storage, event_pub }
|
||||
}
|
||||
|
||||
pub async fn execute(&self, cmd: IngestAssetCommand) -> Result<(Asset, IngestSession), DomainError> {
|
||||
let checksum = Checksum::new(&cmd.checksum)?;
|
||||
|
||||
let path = self.path_repo.find_by_id(&cmd.target_path_id).await?
|
||||
.ok_or_else(|| DomainError::NotFound(format!("Library path {} not found", cmd.target_path_id)))?;
|
||||
|
||||
if !path.is_ingest_destination {
|
||||
return Err(DomainError::Validation("Target path is not an ingest destination".to_string()));
|
||||
}
|
||||
|
||||
if let Some(quota) = self.quota_repo.find_by_owner(&cmd.uploader_id).await? {
|
||||
let current = self.ledger_repo.sum_usage(&cmd.uploader_id, UsageType::StorageBytes, None).await?;
|
||||
let result = domain::storage::services::check_quota("a, UsageType::StorageBytes, current, cmd.file_size);
|
||||
if !result.allowed {
|
||||
return Err(DomainError::QuotaExceeded(format!(
|
||||
"Storage quota exceeded: {} / {} bytes", current + cmd.file_size, result.limit
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
let mut session = IngestSession::new(
|
||||
cmd.uploader_id,
|
||||
&cmd.client_device_id,
|
||||
&cmd.filename,
|
||||
checksum.clone(),
|
||||
cmd.target_path_id,
|
||||
);
|
||||
|
||||
let storage_path = format!("{}/{}", path.relative_path, cmd.filename);
|
||||
self.file_storage.store_file(&storage_path, cmd.data).await?;
|
||||
|
||||
let mime_type = mime_type_from_filename(&cmd.filename);
|
||||
let asset_type = if mime_type.starts_with("video") { AssetType::Video } else { AssetType::Image };
|
||||
|
||||
let asset = Asset::new(
|
||||
SourceReference {
|
||||
volume_id: path.volume_id,
|
||||
relative_path: storage_path,
|
||||
checksum,
|
||||
},
|
||||
asset_type,
|
||||
&mime_type,
|
||||
cmd.file_size,
|
||||
cmd.uploader_id,
|
||||
);
|
||||
|
||||
self.asset_repo.save(&asset).await?;
|
||||
|
||||
session.advance_to(IngestStatus::AwaitingProcessing)?;
|
||||
self.ingest_repo.save(&session).await?;
|
||||
|
||||
let entry = UsageLedgerEntry::new(
|
||||
cmd.uploader_id,
|
||||
UsageType::StorageBytes,
|
||||
cmd.file_size,
|
||||
format!("Ingested {}", cmd.filename),
|
||||
);
|
||||
self.ledger_repo.record(&entry).await?;
|
||||
|
||||
self.event_pub.publish(DomainEvent::AssetIngested {
|
||||
asset_id: asset.asset_id,
|
||||
owner_user_id: cmd.uploader_id,
|
||||
timestamp: DateTimeStamp::now(),
|
||||
}).await?;
|
||||
|
||||
Ok((asset, session))
|
||||
}
|
||||
}
|
||||
|
||||
fn mime_type_from_filename(filename: &str) -> String {
|
||||
let lower = filename.to_lowercase();
|
||||
if lower.ends_with(".jpg") || lower.ends_with(".jpeg") {
|
||||
"image/jpeg".to_string()
|
||||
} else if lower.ends_with(".png") {
|
||||
"image/png".to_string()
|
||||
} else if lower.ends_with(".mp4") {
|
||||
"video/mp4".to_string()
|
||||
} else {
|
||||
"application/octet-stream".to_string()
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user