refactor: introduce IngestTransaction port to reduce IngestAssetHandler from 7 to 4 ports

This commit is contained in:
2026-05-31 18:44:51 +02:00
parent aa09aec66b
commit 0b2237860e
7 changed files with 189 additions and 67 deletions

View File

@@ -3,13 +3,13 @@ use async_trait::async_trait;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use domain::{ use domain::{
entities::{ entities::{
IngestSession, IngestStatus, LibraryPath, OwnershipPolicy, QuotaDefinition, QuotaRule, Asset, IngestSession, IngestStatus, LibraryPath, OwnershipPolicy, QuotaDefinition,
StorageVolume, TimePeriod, UsageLedgerEntry, UsageType, QuotaRule, StorageVolume, TimePeriod, UsageLedgerEntry, UsageType,
}, },
errors::DomainError, errors::DomainError,
ports::{ ports::{
IngestSessionRepository, LibraryPathRepository, QuotaRepository, StorageVolumeRepository, IngestSessionRepository, IngestTransaction, LibraryPathRepository, QuotaRepository,
UsageLedgerRepository, StorageVolumeRepository, UsageLedgerRepository,
}, },
value_objects::{Checksum, DateTimeStamp, SystemId}, value_objects::{Checksum, DateTimeStamp, SystemId},
}; };
@@ -565,3 +565,51 @@ impl UsageLedgerRepository for PostgresUsageLedgerRepository {
Ok(row.total as u64) Ok(row.total as u64)
} }
} }
// ──────────────────────────────────────────────
// IngestTransaction (composite port)
// ──────────────────────────────────────────────
pg_repo!(PostgresIngestTransaction);
#[async_trait]
impl IngestTransaction for PostgresIngestTransaction {
async fn save_asset(&self, asset: &Asset) -> Result<(), DomainError> {
use domain::ports::AssetRepository;
crate::PostgresAssetRepository::new(self.pool.clone())
.save(asset)
.await
}
async fn save_session(&self, session: &IngestSession) -> Result<(), DomainError> {
PostgresIngestSessionRepository::new(self.pool.clone())
.save(session)
.await
}
async fn find_quota(
&self,
owner_id: &SystemId,
) -> Result<Option<QuotaDefinition>, DomainError> {
PostgresQuotaRepository::new(self.pool.clone())
.find_by_owner(owner_id)
.await
}
async fn sum_usage(
&self,
user_id: &SystemId,
usage_type: UsageType,
since: Option<DateTimeStamp>,
) -> Result<u64, DomainError> {
PostgresUsageLedgerRepository::new(self.pool.clone())
.sum_usage(user_id, usage_type, since)
.await
}
async fn record_usage(&self, entry: &UsageLedgerEntry) -> Result<(), DomainError> {
PostgresUsageLedgerRepository::new(self.pool.clone())
.record(entry)
.await
}
}

View File

@@ -5,10 +5,7 @@ use domain::{
}, },
errors::DomainError, errors::DomainError,
events::DomainEvent, events::DomainEvent,
ports::{ ports::{EventPublisher, FileStoragePort, IngestTransaction, LibraryPathRepository},
AssetRepository, EventPublisher, FileStoragePort, IngestSessionRepository,
LibraryPathRepository, QuotaRepository, UsageLedgerRepository,
},
value_objects::{Checksum, DateTimeStamp, SystemId}, value_objects::{Checksum, DateTimeStamp, SystemId},
}; };
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
@@ -26,31 +23,22 @@ pub struct IngestAssetCommand {
} }
pub struct IngestAssetHandler { pub struct IngestAssetHandler {
ingest_repo: Arc<dyn IngestSessionRepository>, tx: Arc<dyn IngestTransaction>,
path_repo: Arc<dyn LibraryPathRepository>, path_repo: Arc<dyn LibraryPathRepository>,
quota_repo: Arc<dyn QuotaRepository>,
ledger_repo: Arc<dyn UsageLedgerRepository>,
asset_repo: Arc<dyn AssetRepository>,
file_storage: Arc<dyn FileStoragePort>, file_storage: Arc<dyn FileStoragePort>,
event_pub: Arc<dyn EventPublisher>, event_pub: Arc<dyn EventPublisher>,
} }
impl IngestAssetHandler { impl IngestAssetHandler {
pub fn new( pub fn new(
ingest_repo: Arc<dyn IngestSessionRepository>, tx: Arc<dyn IngestTransaction>,
path_repo: Arc<dyn LibraryPathRepository>, path_repo: Arc<dyn LibraryPathRepository>,
quota_repo: Arc<dyn QuotaRepository>,
ledger_repo: Arc<dyn UsageLedgerRepository>,
asset_repo: Arc<dyn AssetRepository>,
file_storage: Arc<dyn FileStoragePort>, file_storage: Arc<dyn FileStoragePort>,
event_pub: Arc<dyn EventPublisher>, event_pub: Arc<dyn EventPublisher>,
) -> Self { ) -> Self {
Self { Self {
ingest_repo, tx,
path_repo, path_repo,
quota_repo,
ledger_repo,
asset_repo,
file_storage, file_storage,
event_pub, event_pub,
} }
@@ -79,9 +67,9 @@ impl IngestAssetHandler {
)); ));
} }
if let Some(quota) = self.quota_repo.find_by_owner(&cmd.uploader_id).await? { if let Some(quota) = self.tx.find_quota(&cmd.uploader_id).await? {
let current = self let current = self
.ledger_repo .tx
.sum_usage(&cmd.uploader_id, UsageType::StorageBytes, None) .sum_usage(&cmd.uploader_id, UsageType::StorageBytes, None)
.await?; .await?;
let result = domain::storage::services::check_quota( let result = domain::storage::services::check_quota(
@@ -131,10 +119,10 @@ impl IngestAssetHandler {
cmd.uploader_id, cmd.uploader_id,
); );
self.asset_repo.save(&asset).await?; self.tx.save_asset(&asset).await?;
session.advance_to(IngestStatus::AwaitingProcessing)?; session.advance_to(IngestStatus::AwaitingProcessing)?;
self.ingest_repo.save(&session).await?; self.tx.save_session(&session).await?;
let entry = UsageLedgerEntry::new( let entry = UsageLedgerEntry::new(
cmd.uploader_id, cmd.uploader_id,
@@ -142,7 +130,7 @@ impl IngestAssetHandler {
cmd.file_size, cmd.file_size,
format!("Ingested {}", cmd.filename), format!("Ingested {}", cmd.filename),
); );
self.ledger_repo.record(&entry).await?; self.tx.record_usage(&entry).await?;
self.event_pub self.event_pub
.publish(&DomainEvent::AssetIngested { .publish(&DomainEvent::AssetIngested {

View File

@@ -9,10 +9,10 @@ use domain::{
errors::DomainError, errors::DomainError,
ports::{ ports::{
AlbumRepository, AssetMetadataRepository, AssetRepository, DuplicateRepository, AlbumRepository, AssetMetadataRepository, AssetRepository, DuplicateRepository,
GroupRepository, IngestSessionRepository, JobBatchRepository, JobRepository, GroupRepository, IngestSessionRepository, IngestTransaction, JobBatchRepository,
LibraryPathRepository, PipelineRepository, PluginRepository, QuotaRepository, JobRepository, LibraryPathRepository, PipelineRepository, PluginRepository,
RoleRepository, ShareRepository, SidecarRepository, StorageVolumeRepository, TagRepository, QuotaRepository, RoleRepository, ShareRepository, SidecarRepository,
UsageLedgerRepository, UserRepository, StorageVolumeRepository, TagRepository, UsageLedgerRepository, UserRepository,
}, },
value_objects::{Checksum, DateTimeStamp, Email, SystemId}, value_objects::{Checksum, DateTimeStamp, Email, SystemId},
}; };
@@ -1126,3 +1126,93 @@ impl PipelineRepository for InMemoryPipelineRepository {
Ok(()) Ok(())
} }
} }
// --- InMemoryIngestTransaction ---
pub struct InMemoryIngestTransaction {
assets: Mutex<HashMap<String, Asset>>,
sessions: Mutex<HashMap<String, IngestSession>>,
quotas: Mutex<HashMap<String, QuotaDefinition>>,
ledger: Mutex<Vec<UsageLedgerEntry>>,
}
impl InMemoryIngestTransaction {
pub fn new() -> Self {
Self {
assets: Mutex::new(HashMap::new()),
sessions: Mutex::new(HashMap::new()),
quotas: Mutex::new(HashMap::new()),
ledger: Mutex::new(Vec::new()),
}
}
/// Pre-seed a quota for testing.
pub async fn insert_quota(&self, quota: &QuotaDefinition) {
self.quotas
.lock()
.await
.insert(quota.owner_scope.to_string(), quota.clone());
}
}
impl Default for InMemoryIngestTransaction {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl IngestTransaction for InMemoryIngestTransaction {
async fn save_asset(&self, asset: &Asset) -> Result<(), DomainError> {
self.assets
.lock()
.await
.insert(asset.asset_id.to_string(), asset.clone());
Ok(())
}
async fn save_session(&self, session: &IngestSession) -> Result<(), DomainError> {
self.sessions
.lock()
.await
.insert(session.session_id.to_string(), session.clone());
Ok(())
}
async fn find_quota(
&self,
owner_id: &SystemId,
) -> Result<Option<QuotaDefinition>, DomainError> {
Ok(self
.quotas
.lock()
.await
.values()
.find(|q| &q.owner_scope == owner_id)
.cloned())
}
async fn sum_usage(
&self,
user_id: &SystemId,
usage_type: UsageType,
since: Option<DateTimeStamp>,
) -> Result<u64, DomainError> {
let entries = self.ledger.lock().await;
let total = entries
.iter()
.filter(|e| &e.user_id == user_id && e.usage_type == usage_type)
.filter(|e| match &since {
Some(ts) => &e.timestamp >= ts,
None => true,
})
.map(|e| e.consumed_amount)
.sum();
Ok(total)
}
async fn record_usage(&self, entry: &UsageLedgerEntry) -> Result<(), DomainError> {
self.ledger.lock().await.push(entry.clone());
Ok(())
}
}

View File

@@ -3,23 +3,18 @@ use application::storage::{
RegisterVolumeCommand, RegisterVolumeHandler, RegisterVolumeCommand, RegisterVolumeHandler,
}; };
use application::testing::{ use application::testing::{
InMemoryAssetRepository, InMemoryFileStorage, InMemoryIngestSessionRepository, InMemoryFileStorage, InMemoryIngestTransaction, InMemoryLibraryPathRepository,
InMemoryLibraryPathRepository, InMemoryQuotaRepository, InMemoryStorageVolumeRepository, InMemoryStorageVolumeRepository, StubEventPublisher,
InMemoryUsageLedgerRepository, StubEventPublisher,
}; };
use bytes::Bytes; use bytes::Bytes;
use domain::entities::{IngestStatus, QuotaDefinition, TimePeriod, UsageType}; use domain::entities::{IngestStatus, QuotaDefinition, TimePeriod, UsageType};
use domain::errors::DomainError; use domain::errors::DomainError;
use domain::ports::QuotaRepository;
use domain::value_objects::SystemId; use domain::value_objects::SystemId;
use std::sync::Arc; use std::sync::Arc;
struct Harness { struct Harness {
ingest_repo: Arc<InMemoryIngestSessionRepository>, tx: Arc<InMemoryIngestTransaction>,
path_repo: Arc<InMemoryLibraryPathRepository>, path_repo: Arc<InMemoryLibraryPathRepository>,
quota_repo: Arc<InMemoryQuotaRepository>,
ledger_repo: Arc<InMemoryUsageLedgerRepository>,
asset_repo: Arc<InMemoryAssetRepository>,
file_storage: Arc<InMemoryFileStorage>, file_storage: Arc<InMemoryFileStorage>,
event_pub: Arc<StubEventPublisher>, event_pub: Arc<StubEventPublisher>,
vol_repo: Arc<InMemoryStorageVolumeRepository>, vol_repo: Arc<InMemoryStorageVolumeRepository>,
@@ -28,11 +23,8 @@ struct Harness {
impl Harness { impl Harness {
fn new() -> Self { fn new() -> Self {
Self { Self {
ingest_repo: Arc::new(InMemoryIngestSessionRepository::new()), tx: Arc::new(InMemoryIngestTransaction::new()),
path_repo: Arc::new(InMemoryLibraryPathRepository::new()), path_repo: Arc::new(InMemoryLibraryPathRepository::new()),
quota_repo: Arc::new(InMemoryQuotaRepository::new()),
ledger_repo: Arc::new(InMemoryUsageLedgerRepository::new()),
asset_repo: Arc::new(InMemoryAssetRepository::new()),
file_storage: Arc::new(InMemoryFileStorage::new()), file_storage: Arc::new(InMemoryFileStorage::new()),
event_pub: Arc::new(StubEventPublisher::new()), event_pub: Arc::new(StubEventPublisher::new()),
vol_repo: Arc::new(InMemoryStorageVolumeRepository::new()), vol_repo: Arc::new(InMemoryStorageVolumeRepository::new()),
@@ -41,11 +33,8 @@ impl Harness {
fn ingest_handler(&self) -> IngestAssetHandler { fn ingest_handler(&self) -> IngestAssetHandler {
IngestAssetHandler::new( IngestAssetHandler::new(
self.ingest_repo.clone(), self.tx.clone(),
self.path_repo.clone(), self.path_repo.clone(),
self.quota_repo.clone(),
self.ledger_repo.clone(),
self.asset_repo.clone(),
self.file_storage.clone(), self.file_storage.clone(),
self.event_pub.clone(), self.event_pub.clone(),
) )
@@ -111,7 +100,7 @@ async fn rejects_quota_exceeded() {
let mut quota = QuotaDefinition::new(user); let mut quota = QuotaDefinition::new(user);
quota.add_rule(UsageType::StorageBytes, 500, TimePeriod::Lifetime); quota.add_rule(UsageType::StorageBytes, 500, TimePeriod::Lifetime);
h.quota_repo.save(&quota).await.unwrap(); h.tx.insert_quota(&quota).await;
let handler = h.ingest_handler(); let handler = h.ingest_handler();
let result = handler let result = handler

View File

@@ -2,7 +2,7 @@ use std::sync::Arc;
use adapters_postgres::{ use adapters_postgres::{
PgPool, PostgresAssetMetadataRepository, PostgresAssetRepository, PgPool, PostgresAssetMetadataRepository, PostgresAssetRepository,
PostgresDuplicateRepository, PostgresDuplicateRepository, PostgresIngestTransaction,
}; };
use adapters_storage::LocalFileStorage; use adapters_storage::LocalFileStorage;
use application::catalog::{ use application::catalog::{
@@ -24,13 +24,11 @@ pub fn build(
let asset_repo = Arc::new(PostgresAssetRepository::new(pool.clone())); let asset_repo = Arc::new(PostgresAssetRepository::new(pool.clone()));
let metadata_repo = Arc::new(PostgresAssetMetadataRepository::new(pool.clone())); let metadata_repo = Arc::new(PostgresAssetMetadataRepository::new(pool.clone()));
let duplicate_repo = Arc::new(PostgresDuplicateRepository::new(pool.clone())); let duplicate_repo = Arc::new(PostgresDuplicateRepository::new(pool.clone()));
let ingest_tx = Arc::new(PostgresIngestTransaction::new(pool.clone()));
let ingest_asset = Arc::new(IngestAssetHandler::new( let ingest_asset = Arc::new(IngestAssetHandler::new(
storage_repos.session_repo.clone(), ingest_tx,
storage_repos.path_repo.clone(), storage_repos.path_repo.clone(),
storage_repos.quota_repo.clone(),
storage_repos.ledger_repo.clone(),
asset_repo.clone(),
file_storage.clone(), file_storage.clone(),
event_publisher.clone(), event_publisher.clone(),
)); ));

View File

@@ -1,32 +1,27 @@
use std::sync::Arc; use std::sync::Arc;
use adapters_postgres::{ use adapters_postgres::{
PgPool, PostgresIngestSessionRepository, PostgresLibraryPathRepository, PgPool, PostgresLibraryPathRepository, PostgresQuotaRepository,
PostgresQuotaRepository, PostgresStorageVolumeRepository, PostgresUsageLedgerRepository, PostgresStorageVolumeRepository, PostgresUsageLedgerRepository,
}; };
use application::storage::{CheckQuotaHandler, RegisterLibraryPathHandler, RegisterVolumeHandler}; use application::storage::{CheckQuotaHandler, RegisterLibraryPathHandler, RegisterVolumeHandler};
use presentation::state::StorageHandlers; use presentation::state::StorageHandlers;
/// Shared storage repos needed by other bounded contexts (catalog ingest, etc.). /// Shared storage repos needed by other bounded contexts (catalog ingest, etc.).
pub struct StorageRepos { pub struct StorageRepos {
pub volume_repo: Arc<PostgresStorageVolumeRepository>,
pub path_repo: Arc<PostgresLibraryPathRepository>, pub path_repo: Arc<PostgresLibraryPathRepository>,
pub session_repo: Arc<PostgresIngestSessionRepository>,
pub quota_repo: Arc<PostgresQuotaRepository>,
pub ledger_repo: Arc<PostgresUsageLedgerRepository>,
} }
pub fn build(pool: &PgPool) -> (StorageRepos, StorageHandlers) { pub fn build(pool: &PgPool) -> (StorageRepos, StorageHandlers) {
let volume_repo = Arc::new(PostgresStorageVolumeRepository::new(pool.clone())); let volume_repo = Arc::new(PostgresStorageVolumeRepository::new(pool.clone()));
let path_repo = Arc::new(PostgresLibraryPathRepository::new(pool.clone())); let path_repo = Arc::new(PostgresLibraryPathRepository::new(pool.clone()));
let session_repo = Arc::new(PostgresIngestSessionRepository::new(pool.clone()));
let quota_repo = Arc::new(PostgresQuotaRepository::new(pool.clone())); let quota_repo = Arc::new(PostgresQuotaRepository::new(pool.clone()));
let ledger_repo = Arc::new(PostgresUsageLedgerRepository::new(pool.clone())); let ledger_repo = Arc::new(PostgresUsageLedgerRepository::new(pool.clone()));
let register_volume = Arc::new(RegisterVolumeHandler::new(volume_repo.clone())); let register_volume = Arc::new(RegisterVolumeHandler::new(volume_repo.clone()));
let register_library_path = let register_library_path =
Arc::new(RegisterLibraryPathHandler::new(volume_repo.clone(), path_repo.clone())); Arc::new(RegisterLibraryPathHandler::new(volume_repo, path_repo.clone()));
let check_quota = Arc::new(CheckQuotaHandler::new(quota_repo.clone(), ledger_repo.clone())); let check_quota = Arc::new(CheckQuotaHandler::new(quota_repo, ledger_repo));
let handlers = StorageHandlers { let handlers = StorageHandlers {
register_volume, register_volume,
@@ -34,13 +29,7 @@ pub fn build(pool: &PgPool) -> (StorageRepos, StorageHandlers) {
check_quota, check_quota,
}; };
let repos = StorageRepos { let repos = StorageRepos { path_repo };
volume_repo,
path_repo,
session_repo,
quota_repo,
ledger_repo,
};
(repos, handlers) (repos, handlers)
} }

View File

@@ -1,6 +1,7 @@
use super::entities::{ use super::entities::{
IngestSession, LibraryPath, QuotaDefinition, StorageVolume, UsageLedgerEntry, UsageType, IngestSession, LibraryPath, QuotaDefinition, StorageVolume, UsageLedgerEntry, UsageType,
}; };
use crate::catalog::entities::Asset;
use crate::common::errors::DomainError; use crate::common::errors::DomainError;
use crate::common::value_objects::{DateTimeStamp, SystemId}; use crate::common::value_objects::{DateTimeStamp, SystemId};
use async_trait::async_trait; use async_trait::async_trait;
@@ -63,6 +64,25 @@ pub trait UsageLedgerRepository: Send + Sync {
) -> Result<u64, DomainError>; ) -> Result<u64, DomainError>;
} }
// --- IngestTransaction ---
/// Bundles the four persistence concerns that the ingest use-case touches
/// (asset, session, quota, ledger) behind a single port so the handler only
/// needs one `Arc<dyn IngestTransaction>` instead of four separate repos.
#[async_trait]
pub trait IngestTransaction: Send + Sync {
async fn save_asset(&self, asset: &Asset) -> Result<(), DomainError>;
async fn save_session(&self, session: &IngestSession) -> Result<(), DomainError>;
async fn find_quota(&self, owner_id: &SystemId) -> Result<Option<QuotaDefinition>, DomainError>;
async fn sum_usage(
&self,
user_id: &SystemId,
usage_type: UsageType,
since: Option<DateTimeStamp>,
) -> Result<u64, DomainError>;
async fn record_usage(&self, entry: &UsageLedgerEntry) -> Result<(), DomainError>;
}
// --- FileStoragePort --- // --- FileStoragePort ---
#[derive(Debug, Clone)] #[derive(Debug, Clone)]