From 0b2237860e93170fc568f57a25cb88a45319e348 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Sun, 31 May 2026 18:44:51 +0200 Subject: [PATCH] refactor: introduce IngestTransaction port to reduce IngestAssetHandler from 7 to 4 ports --- crates/adapters/postgres/src/storage/mod.rs | 56 ++++++++++- .../src/storage/commands/ingest_asset.rs | 30 ++---- .../application/src/testing/repositories.rs | 98 ++++++++++++++++++- .../tests/storage/commands/ingest_asset.rs | 23 ++--- crates/bootstrap/src/services/catalog.rs | 8 +- crates/bootstrap/src/services/storage.rs | 21 +--- crates/domain/src/storage/ports.rs | 20 ++++ 7 files changed, 189 insertions(+), 67 deletions(-) diff --git a/crates/adapters/postgres/src/storage/mod.rs b/crates/adapters/postgres/src/storage/mod.rs index cf3a33f..553858b 100644 --- a/crates/adapters/postgres/src/storage/mod.rs +++ b/crates/adapters/postgres/src/storage/mod.rs @@ -3,13 +3,13 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; use domain::{ entities::{ - IngestSession, IngestStatus, LibraryPath, OwnershipPolicy, QuotaDefinition, QuotaRule, - StorageVolume, TimePeriod, UsageLedgerEntry, UsageType, + Asset, IngestSession, IngestStatus, LibraryPath, OwnershipPolicy, QuotaDefinition, + QuotaRule, StorageVolume, TimePeriod, UsageLedgerEntry, UsageType, }, errors::DomainError, ports::{ - IngestSessionRepository, LibraryPathRepository, QuotaRepository, StorageVolumeRepository, - UsageLedgerRepository, + IngestSessionRepository, IngestTransaction, LibraryPathRepository, QuotaRepository, + StorageVolumeRepository, UsageLedgerRepository, }, value_objects::{Checksum, DateTimeStamp, SystemId}, }; @@ -565,3 +565,51 @@ impl UsageLedgerRepository for PostgresUsageLedgerRepository { Ok(row.total as u64) } } + +// ────────────────────────────────────────────── +// IngestTransaction (composite port) +// ────────────────────────────────────────────── + +pg_repo!(PostgresIngestTransaction); + +#[async_trait] +impl IngestTransaction for PostgresIngestTransaction { + async fn save_asset(&self, asset: &Asset) -> Result<(), DomainError> { + use domain::ports::AssetRepository; + crate::PostgresAssetRepository::new(self.pool.clone()) + .save(asset) + .await + } + + async fn save_session(&self, session: &IngestSession) -> Result<(), DomainError> { + PostgresIngestSessionRepository::new(self.pool.clone()) + .save(session) + .await + } + + async fn find_quota( + &self, + owner_id: &SystemId, + ) -> Result, DomainError> { + PostgresQuotaRepository::new(self.pool.clone()) + .find_by_owner(owner_id) + .await + } + + async fn sum_usage( + &self, + user_id: &SystemId, + usage_type: UsageType, + since: Option, + ) -> Result { + PostgresUsageLedgerRepository::new(self.pool.clone()) + .sum_usage(user_id, usage_type, since) + .await + } + + async fn record_usage(&self, entry: &UsageLedgerEntry) -> Result<(), DomainError> { + PostgresUsageLedgerRepository::new(self.pool.clone()) + .record(entry) + .await + } +} diff --git a/crates/application/src/storage/commands/ingest_asset.rs b/crates/application/src/storage/commands/ingest_asset.rs index 6941a74..3a18205 100644 --- a/crates/application/src/storage/commands/ingest_asset.rs +++ b/crates/application/src/storage/commands/ingest_asset.rs @@ -5,10 +5,7 @@ use domain::{ }, errors::DomainError, events::DomainEvent, - ports::{ - AssetRepository, EventPublisher, FileStoragePort, IngestSessionRepository, - LibraryPathRepository, QuotaRepository, UsageLedgerRepository, - }, + ports::{EventPublisher, FileStoragePort, IngestTransaction, LibraryPathRepository}, value_objects::{Checksum, DateTimeStamp, SystemId}, }; use sha2::{Digest, Sha256}; @@ -26,31 +23,22 @@ pub struct IngestAssetCommand { } pub struct IngestAssetHandler { - ingest_repo: Arc, + tx: Arc, path_repo: Arc, - quota_repo: Arc, - ledger_repo: Arc, - asset_repo: Arc, file_storage: Arc, event_pub: Arc, } impl IngestAssetHandler { pub fn new( - ingest_repo: Arc, + tx: Arc, path_repo: Arc, - quota_repo: Arc, - ledger_repo: Arc, - asset_repo: Arc, file_storage: Arc, event_pub: Arc, ) -> Self { Self { - ingest_repo, + tx, path_repo, - quota_repo, - ledger_repo, - asset_repo, file_storage, event_pub, } @@ -79,9 +67,9 @@ impl IngestAssetHandler { )); } - if let Some(quota) = self.quota_repo.find_by_owner(&cmd.uploader_id).await? { + if let Some(quota) = self.tx.find_quota(&cmd.uploader_id).await? { let current = self - .ledger_repo + .tx .sum_usage(&cmd.uploader_id, UsageType::StorageBytes, None) .await?; let result = domain::storage::services::check_quota( @@ -131,10 +119,10 @@ impl IngestAssetHandler { cmd.uploader_id, ); - self.asset_repo.save(&asset).await?; + self.tx.save_asset(&asset).await?; session.advance_to(IngestStatus::AwaitingProcessing)?; - self.ingest_repo.save(&session).await?; + self.tx.save_session(&session).await?; let entry = UsageLedgerEntry::new( cmd.uploader_id, @@ -142,7 +130,7 @@ impl IngestAssetHandler { cmd.file_size, format!("Ingested {}", cmd.filename), ); - self.ledger_repo.record(&entry).await?; + self.tx.record_usage(&entry).await?; self.event_pub .publish(&DomainEvent::AssetIngested { diff --git a/crates/application/src/testing/repositories.rs b/crates/application/src/testing/repositories.rs index fd46ba1..cc49b92 100644 --- a/crates/application/src/testing/repositories.rs +++ b/crates/application/src/testing/repositories.rs @@ -9,10 +9,10 @@ use domain::{ errors::DomainError, ports::{ AlbumRepository, AssetMetadataRepository, AssetRepository, DuplicateRepository, - GroupRepository, IngestSessionRepository, JobBatchRepository, JobRepository, - LibraryPathRepository, PipelineRepository, PluginRepository, QuotaRepository, - RoleRepository, ShareRepository, SidecarRepository, StorageVolumeRepository, TagRepository, - UsageLedgerRepository, UserRepository, + GroupRepository, IngestSessionRepository, IngestTransaction, JobBatchRepository, + JobRepository, LibraryPathRepository, PipelineRepository, PluginRepository, + QuotaRepository, RoleRepository, ShareRepository, SidecarRepository, + StorageVolumeRepository, TagRepository, UsageLedgerRepository, UserRepository, }, value_objects::{Checksum, DateTimeStamp, Email, SystemId}, }; @@ -1126,3 +1126,93 @@ impl PipelineRepository for InMemoryPipelineRepository { Ok(()) } } + +// --- InMemoryIngestTransaction --- + +pub struct InMemoryIngestTransaction { + assets: Mutex>, + sessions: Mutex>, + quotas: Mutex>, + ledger: Mutex>, +} + +impl InMemoryIngestTransaction { + pub fn new() -> Self { + Self { + assets: Mutex::new(HashMap::new()), + sessions: Mutex::new(HashMap::new()), + quotas: Mutex::new(HashMap::new()), + ledger: Mutex::new(Vec::new()), + } + } + + /// Pre-seed a quota for testing. + pub async fn insert_quota(&self, quota: &QuotaDefinition) { + self.quotas + .lock() + .await + .insert(quota.owner_scope.to_string(), quota.clone()); + } +} + +impl Default for InMemoryIngestTransaction { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl IngestTransaction for InMemoryIngestTransaction { + async fn save_asset(&self, asset: &Asset) -> Result<(), DomainError> { + self.assets + .lock() + .await + .insert(asset.asset_id.to_string(), asset.clone()); + Ok(()) + } + + async fn save_session(&self, session: &IngestSession) -> Result<(), DomainError> { + self.sessions + .lock() + .await + .insert(session.session_id.to_string(), session.clone()); + Ok(()) + } + + async fn find_quota( + &self, + owner_id: &SystemId, + ) -> Result, DomainError> { + Ok(self + .quotas + .lock() + .await + .values() + .find(|q| &q.owner_scope == owner_id) + .cloned()) + } + + async fn sum_usage( + &self, + user_id: &SystemId, + usage_type: UsageType, + since: Option, + ) -> Result { + let entries = self.ledger.lock().await; + let total = entries + .iter() + .filter(|e| &e.user_id == user_id && e.usage_type == usage_type) + .filter(|e| match &since { + Some(ts) => &e.timestamp >= ts, + None => true, + }) + .map(|e| e.consumed_amount) + .sum(); + Ok(total) + } + + async fn record_usage(&self, entry: &UsageLedgerEntry) -> Result<(), DomainError> { + self.ledger.lock().await.push(entry.clone()); + Ok(()) + } +} diff --git a/crates/application/tests/storage/commands/ingest_asset.rs b/crates/application/tests/storage/commands/ingest_asset.rs index fb76d33..f133a7d 100644 --- a/crates/application/tests/storage/commands/ingest_asset.rs +++ b/crates/application/tests/storage/commands/ingest_asset.rs @@ -3,23 +3,18 @@ use application::storage::{ RegisterVolumeCommand, RegisterVolumeHandler, }; use application::testing::{ - InMemoryAssetRepository, InMemoryFileStorage, InMemoryIngestSessionRepository, - InMemoryLibraryPathRepository, InMemoryQuotaRepository, InMemoryStorageVolumeRepository, - InMemoryUsageLedgerRepository, StubEventPublisher, + InMemoryFileStorage, InMemoryIngestTransaction, InMemoryLibraryPathRepository, + InMemoryStorageVolumeRepository, StubEventPublisher, }; use bytes::Bytes; use domain::entities::{IngestStatus, QuotaDefinition, TimePeriod, UsageType}; use domain::errors::DomainError; -use domain::ports::QuotaRepository; use domain::value_objects::SystemId; use std::sync::Arc; struct Harness { - ingest_repo: Arc, + tx: Arc, path_repo: Arc, - quota_repo: Arc, - ledger_repo: Arc, - asset_repo: Arc, file_storage: Arc, event_pub: Arc, vol_repo: Arc, @@ -28,11 +23,8 @@ struct Harness { impl Harness { fn new() -> Self { Self { - ingest_repo: Arc::new(InMemoryIngestSessionRepository::new()), + tx: Arc::new(InMemoryIngestTransaction::new()), path_repo: Arc::new(InMemoryLibraryPathRepository::new()), - quota_repo: Arc::new(InMemoryQuotaRepository::new()), - ledger_repo: Arc::new(InMemoryUsageLedgerRepository::new()), - asset_repo: Arc::new(InMemoryAssetRepository::new()), file_storage: Arc::new(InMemoryFileStorage::new()), event_pub: Arc::new(StubEventPublisher::new()), vol_repo: Arc::new(InMemoryStorageVolumeRepository::new()), @@ -41,11 +33,8 @@ impl Harness { fn ingest_handler(&self) -> IngestAssetHandler { IngestAssetHandler::new( - self.ingest_repo.clone(), + self.tx.clone(), self.path_repo.clone(), - self.quota_repo.clone(), - self.ledger_repo.clone(), - self.asset_repo.clone(), self.file_storage.clone(), self.event_pub.clone(), ) @@ -111,7 +100,7 @@ async fn rejects_quota_exceeded() { let mut quota = QuotaDefinition::new(user); quota.add_rule(UsageType::StorageBytes, 500, TimePeriod::Lifetime); - h.quota_repo.save("a).await.unwrap(); + h.tx.insert_quota("a).await; let handler = h.ingest_handler(); let result = handler diff --git a/crates/bootstrap/src/services/catalog.rs b/crates/bootstrap/src/services/catalog.rs index f9c282b..5250512 100644 --- a/crates/bootstrap/src/services/catalog.rs +++ b/crates/bootstrap/src/services/catalog.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use adapters_postgres::{ PgPool, PostgresAssetMetadataRepository, PostgresAssetRepository, - PostgresDuplicateRepository, + PostgresDuplicateRepository, PostgresIngestTransaction, }; use adapters_storage::LocalFileStorage; use application::catalog::{ @@ -24,13 +24,11 @@ pub fn build( let asset_repo = Arc::new(PostgresAssetRepository::new(pool.clone())); let metadata_repo = Arc::new(PostgresAssetMetadataRepository::new(pool.clone())); let duplicate_repo = Arc::new(PostgresDuplicateRepository::new(pool.clone())); + let ingest_tx = Arc::new(PostgresIngestTransaction::new(pool.clone())); let ingest_asset = Arc::new(IngestAssetHandler::new( - storage_repos.session_repo.clone(), + ingest_tx, storage_repos.path_repo.clone(), - storage_repos.quota_repo.clone(), - storage_repos.ledger_repo.clone(), - asset_repo.clone(), file_storage.clone(), event_publisher.clone(), )); diff --git a/crates/bootstrap/src/services/storage.rs b/crates/bootstrap/src/services/storage.rs index 54724a5..606a6e5 100644 --- a/crates/bootstrap/src/services/storage.rs +++ b/crates/bootstrap/src/services/storage.rs @@ -1,32 +1,27 @@ use std::sync::Arc; use adapters_postgres::{ - PgPool, PostgresIngestSessionRepository, PostgresLibraryPathRepository, - PostgresQuotaRepository, PostgresStorageVolumeRepository, PostgresUsageLedgerRepository, + PgPool, PostgresLibraryPathRepository, PostgresQuotaRepository, + PostgresStorageVolumeRepository, PostgresUsageLedgerRepository, }; use application::storage::{CheckQuotaHandler, RegisterLibraryPathHandler, RegisterVolumeHandler}; use presentation::state::StorageHandlers; /// Shared storage repos needed by other bounded contexts (catalog ingest, etc.). pub struct StorageRepos { - pub volume_repo: Arc, pub path_repo: Arc, - pub session_repo: Arc, - pub quota_repo: Arc, - pub ledger_repo: Arc, } pub fn build(pool: &PgPool) -> (StorageRepos, StorageHandlers) { let volume_repo = Arc::new(PostgresStorageVolumeRepository::new(pool.clone())); let path_repo = Arc::new(PostgresLibraryPathRepository::new(pool.clone())); - let session_repo = Arc::new(PostgresIngestSessionRepository::new(pool.clone())); let quota_repo = Arc::new(PostgresQuotaRepository::new(pool.clone())); let ledger_repo = Arc::new(PostgresUsageLedgerRepository::new(pool.clone())); let register_volume = Arc::new(RegisterVolumeHandler::new(volume_repo.clone())); let register_library_path = - Arc::new(RegisterLibraryPathHandler::new(volume_repo.clone(), path_repo.clone())); - let check_quota = Arc::new(CheckQuotaHandler::new(quota_repo.clone(), ledger_repo.clone())); + Arc::new(RegisterLibraryPathHandler::new(volume_repo, path_repo.clone())); + let check_quota = Arc::new(CheckQuotaHandler::new(quota_repo, ledger_repo)); let handlers = StorageHandlers { register_volume, @@ -34,13 +29,7 @@ pub fn build(pool: &PgPool) -> (StorageRepos, StorageHandlers) { check_quota, }; - let repos = StorageRepos { - volume_repo, - path_repo, - session_repo, - quota_repo, - ledger_repo, - }; + let repos = StorageRepos { path_repo }; (repos, handlers) } diff --git a/crates/domain/src/storage/ports.rs b/crates/domain/src/storage/ports.rs index e7fad5b..d296d9b 100644 --- a/crates/domain/src/storage/ports.rs +++ b/crates/domain/src/storage/ports.rs @@ -1,6 +1,7 @@ use super::entities::{ IngestSession, LibraryPath, QuotaDefinition, StorageVolume, UsageLedgerEntry, UsageType, }; +use crate::catalog::entities::Asset; use crate::common::errors::DomainError; use crate::common::value_objects::{DateTimeStamp, SystemId}; use async_trait::async_trait; @@ -63,6 +64,25 @@ pub trait UsageLedgerRepository: Send + Sync { ) -> Result; } +// --- IngestTransaction --- + +/// Bundles the four persistence concerns that the ingest use-case touches +/// (asset, session, quota, ledger) behind a single port so the handler only +/// needs one `Arc` instead of four separate repos. +#[async_trait] +pub trait IngestTransaction: Send + Sync { + async fn save_asset(&self, asset: &Asset) -> Result<(), DomainError>; + async fn save_session(&self, session: &IngestSession) -> Result<(), DomainError>; + async fn find_quota(&self, owner_id: &SystemId) -> Result, DomainError>; + async fn sum_usage( + &self, + user_id: &SystemId, + usage_type: UsageType, + since: Option, + ) -> Result; + async fn record_usage(&self, entry: &UsageLedgerEntry) -> Result<(), DomainError>; +} + // --- FileStoragePort --- #[derive(Debug, Clone)]