style: cargo fmt --all

This commit is contained in:
2026-05-31 05:31:42 +02:00
parent 4b31a0f74b
commit c2ebca0da0
138 changed files with 2422 additions and 1164 deletions

View File

@@ -1,4 +1,3 @@
use std::sync::Arc;
use domain::{
catalog::entities::{Asset, AssetType, DuplicateGroup, SourceReference},
errors::DomainError,
@@ -6,6 +5,7 @@ use domain::{
ports::{AssetRepository, DuplicateRepository, EventPublisher},
value_objects::{Checksum, DateTimeStamp, SystemId},
};
use std::sync::Arc;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct RegisterAssetCommand {
@@ -30,10 +30,17 @@ impl RegisterAssetHandler {
duplicate_repo: Arc<dyn DuplicateRepository>,
event_pub: Arc<dyn EventPublisher>,
) -> Self {
Self { asset_repo, duplicate_repo, event_pub }
Self {
asset_repo,
duplicate_repo,
event_pub,
}
}
pub async fn execute(&self, cmd: RegisterAssetCommand) -> Result<(Asset, Option<DuplicateGroup>), DomainError> {
pub async fn execute(
&self,
cmd: RegisterAssetCommand,
) -> Result<(Asset, Option<DuplicateGroup>), DomainError> {
let checksum = Checksum::new(&cmd.checksum)?;
let existing = self.asset_repo.find_by_checksum(&checksum).await?;
@@ -44,7 +51,13 @@ impl RegisterAssetHandler {
checksum,
};
let asset = Asset::new(source_ref, cmd.asset_type, cmd.mime_type, cmd.file_size, cmd.owner_id);
let asset = Asset::new(
source_ref,
cmd.asset_type,
cmd.mime_type,
cmd.file_size,
cmd.owner_id,
);
self.asset_repo.save(&asset).await?;
let dup_group = if let Some(first) = existing.first() {
@@ -55,11 +68,13 @@ impl RegisterAssetHandler {
None
};
self.event_pub.publish(DomainEvent::AssetIngested {
asset_id: asset.asset_id,
owner_user_id: asset.owner_user_id,
timestamp: DateTimeStamp::now(),
}).await?;
self.event_pub
.publish(DomainEvent::AssetIngested {
asset_id: asset.asset_id,
owner_user_id: asset.owner_user_id,
timestamp: DateTimeStamp::now(),
})
.await?;
Ok((asset, dup_group))
}

View File

@@ -1,11 +1,11 @@
use std::sync::Arc;
use domain::{
catalog::entities::{AssetMetadata, MetadataSource},
errors::DomainError,
events::DomainEvent,
ports::{AssetRepository, AssetMetadataRepository, EventPublisher},
ports::{AssetMetadataRepository, AssetRepository, EventPublisher},
value_objects::{DateTimeStamp, StructuredData, SystemId},
};
use std::sync::Arc;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct UpdateMetadataCommand {
@@ -26,21 +26,29 @@ impl UpdateMetadataHandler {
metadata_repo: Arc<dyn AssetMetadataRepository>,
event_pub: Arc<dyn EventPublisher>,
) -> Self {
Self { asset_repo, metadata_repo, event_pub }
Self {
asset_repo,
metadata_repo,
event_pub,
}
}
pub async fn execute(&self, cmd: UpdateMetadataCommand) -> Result<AssetMetadata, DomainError> {
self.asset_repo.find_by_id(&cmd.asset_id).await?
self.asset_repo
.find_by_id(&cmd.asset_id)
.await?
.ok_or_else(|| DomainError::NotFound(format!("Asset {} not found", cmd.asset_id)))?;
let metadata = AssetMetadata::new(cmd.asset_id, MetadataSource::UserEdited, cmd.data);
self.metadata_repo.save(&metadata).await?;
self.event_pub.publish(DomainEvent::MetadataUpdated {
asset_id: cmd.asset_id,
updated_by: cmd.user_id,
timestamp: DateTimeStamp::now(),
}).await?;
self.event_pub
.publish(DomainEvent::MetadataUpdated {
asset_id: cmd.asset_id,
updated_by: cmd.user_id,
timestamp: DateTimeStamp::now(),
})
.await?;
Ok(metadata)
}

View File

@@ -3,5 +3,5 @@ pub mod queries;
pub use commands::register_asset::{RegisterAssetCommand, RegisterAssetHandler};
pub use commands::update_metadata::{UpdateMetadataCommand, UpdateMetadataHandler};
pub use queries::get_timeline::{GetTimelineQuery, GetTimelineHandler};
pub use queries::get_asset::{GetAssetQuery, GetAssetHandler};
pub use queries::get_asset::{GetAssetHandler, GetAssetQuery};
pub use queries::get_timeline::{GetTimelineHandler, GetTimelineQuery};

View File

@@ -1,11 +1,11 @@
use std::sync::Arc;
use domain::{
catalog::entities::Asset,
catalog::services::resolve_metadata,
errors::DomainError,
ports::{AssetRepository, AssetMetadataRepository},
ports::{AssetMetadataRepository, AssetRepository},
value_objects::{StructuredData, SystemId},
};
use std::sync::Arc;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct GetAssetQuery {
@@ -22,11 +22,20 @@ impl GetAssetHandler {
asset_repo: Arc<dyn AssetRepository>,
metadata_repo: Arc<dyn AssetMetadataRepository>,
) -> Self {
Self { asset_repo, metadata_repo }
Self {
asset_repo,
metadata_repo,
}
}
pub async fn execute(&self, query: GetAssetQuery) -> Result<(Asset, StructuredData), DomainError> {
let asset = self.asset_repo.find_by_id(&query.asset_id).await?
pub async fn execute(
&self,
query: GetAssetQuery,
) -> Result<(Asset, StructuredData), DomainError> {
let asset = self
.asset_repo
.find_by_id(&query.asset_id)
.await?
.ok_or_else(|| DomainError::NotFound(format!("Asset {} not found", query.asset_id)))?;
let layers = self.metadata_repo.find_by_asset(&asset.asset_id).await?;

View File

@@ -1,11 +1,11 @@
use std::sync::Arc;
use domain::{
catalog::entities::Asset,
catalog::services::resolve_metadata,
errors::DomainError,
ports::{AssetRepository, AssetMetadataRepository},
ports::{AssetMetadataRepository, AssetRepository},
value_objects::{StructuredData, SystemId},
};
use std::sync::Arc;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct GetTimelineQuery {
@@ -24,11 +24,20 @@ impl GetTimelineHandler {
asset_repo: Arc<dyn AssetRepository>,
metadata_repo: Arc<dyn AssetMetadataRepository>,
) -> Self {
Self { asset_repo, metadata_repo }
Self {
asset_repo,
metadata_repo,
}
}
pub async fn execute(&self, query: GetTimelineQuery) -> Result<Vec<(Asset, StructuredData)>, DomainError> {
let assets = self.asset_repo.find_by_owner(&query.owner_id, query.limit, query.offset).await?;
pub async fn execute(
&self,
query: GetTimelineQuery,
) -> Result<Vec<(Asset, StructuredData)>, DomainError> {
let assets = self
.asset_repo
.find_by_owner(&query.owner_id, query.limit, query.offset)
.await?;
let mut results = Vec::with_capacity(assets.len());
for asset in assets {

View File

@@ -1,2 +1,2 @@
pub mod get_timeline;
pub mod get_asset;
pub mod get_timeline;

View File

@@ -1,10 +1,10 @@
use std::sync::Arc;
use domain::{
entities::User,
errors::DomainError,
ports::{PasswordHasher, TokenIssuer, UserRepository},
value_objects::Email,
};
use std::sync::Arc;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct LoginUserCommand {
@@ -24,14 +24,24 @@ impl LoginUserHandler {
hasher: Arc<dyn PasswordHasher>,
issuer: Arc<dyn TokenIssuer>,
) -> Self {
Self { repo, hasher, issuer }
Self {
repo,
hasher,
issuer,
}
}
pub async fn execute(&self, cmd: LoginUserCommand) -> Result<(User, String), DomainError> {
let email = Email::new(&cmd.email)?;
let user = self.repo.find_by_email(&email).await?
let user = self
.repo
.find_by_email(&email)
.await?
.ok_or_else(|| DomainError::Unauthorized("Invalid credentials".to_string()))?;
let valid = self.hasher.verify(&cmd.password, &user.password_hash).await?;
let valid = self
.hasher
.verify(&cmd.password, &user.password_hash)
.await?;
if !valid {
return Err(DomainError::Unauthorized("Invalid credentials".to_string()));
}

View File

@@ -1,5 +1,5 @@
pub mod register_user;
pub mod login_user;
pub mod register_user;
pub use register_user::{RegisterUserCommand, RegisterUserHandler};
pub use login_user::{LoginUserCommand, LoginUserHandler};
pub use register_user::{RegisterUserCommand, RegisterUserHandler};

View File

@@ -1,10 +1,10 @@
use std::sync::Arc;
use domain::{
entities::User,
errors::DomainError,
ports::{PasswordHasher, UserRepository},
value_objects::Email,
};
use std::sync::Arc;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct RegisterUserCommand {
@@ -25,17 +25,32 @@ impl RegisterUserHandler {
pub async fn execute(&self, cmd: RegisterUserCommand) -> Result<User, DomainError> {
if cmd.username.is_empty() {
return Err(DomainError::Validation("Username must not be empty".to_string()));
return Err(DomainError::Validation(
"Username must not be empty".to_string(),
));
}
if cmd.password.len() < 8 {
return Err(DomainError::Validation("Password must be at least 8 characters".to_string()));
return Err(DomainError::Validation(
"Password must be at least 8 characters".to_string(),
));
}
let email = Email::new(&cmd.email)?;
if self.user_repo.find_by_email(&email).await?.is_some() {
return Err(DomainError::Conflict(format!("Email {} is already registered", email.as_str())));
return Err(DomainError::Conflict(format!(
"Email {} is already registered",
email.as_str()
)));
}
if self.user_repo.find_by_username(&cmd.username).await?.is_some() {
return Err(DomainError::Conflict(format!("Username {} is already taken", cmd.username)));
if self
.user_repo
.find_by_username(&cmd.username)
.await?
.is_some()
{
return Err(DomainError::Conflict(format!(
"Username {} is already taken",
cmd.username
)));
}
let hash = self.hasher.hash(&cmd.password).await?;
let user = User::new(&cmd.username, email, hash);

View File

@@ -1,5 +1,5 @@
pub mod commands;
pub mod queries;
pub use commands::{RegisterUserCommand, RegisterUserHandler, LoginUserCommand, LoginUserHandler};
pub use queries::{GetProfileQuery, GetProfileHandler};
pub use commands::{LoginUserCommand, LoginUserHandler, RegisterUserCommand, RegisterUserHandler};
pub use queries::{GetProfileHandler, GetProfileQuery};

View File

@@ -1,5 +1,5 @@
use std::sync::Arc;
use domain::{entities::User, errors::DomainError, ports::UserRepository, value_objects::SystemId};
use std::sync::Arc;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct GetProfileQuery {
@@ -16,7 +16,9 @@ impl GetProfileHandler {
}
pub async fn execute(&self, query: GetProfileQuery) -> Result<User, DomainError> {
self.repo.find_by_id(&query.user_id).await?
self.repo
.find_by_id(&query.user_id)
.await?
.ok_or_else(|| DomainError::NotFound(format!("User {} not found", query.user_id)))
}
}

View File

@@ -1,3 +1,3 @@
pub mod get_profile;
pub use get_profile::{GetProfileQuery, GetProfileHandler};
pub use get_profile::{GetProfileHandler, GetProfileQuery};

View File

@@ -1,8 +1,8 @@
pub mod catalog;
pub mod identity;
pub mod organization;
pub mod storage;
pub mod catalog;
pub mod processing;
pub mod sharing;
pub mod sidecar;
pub mod processing;
pub mod storage;
pub mod testing;

View File

@@ -1,10 +1,7 @@
use std::sync::Arc;
use domain::{
entities::Album,
errors::DomainError,
ports::AlbumRepository,
value_objects::SystemId,
entities::Album, errors::DomainError, ports::AlbumRepository, value_objects::SystemId,
};
use std::sync::Arc;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct CreateAlbumCommand {
@@ -23,7 +20,9 @@ impl CreateAlbumHandler {
pub async fn execute(&self, cmd: CreateAlbumCommand) -> Result<Album, DomainError> {
if cmd.title.is_empty() {
return Err(DomainError::Validation("Album title must not be empty".to_string()));
return Err(DomainError::Validation(
"Album title must not be empty".to_string(),
));
}
let album = Album::new(&cmd.title, cmd.creator_id);
self.album_repo.save(&album).await?;

View File

@@ -1,10 +1,7 @@
use std::sync::Arc;
use domain::{
entities::Album,
errors::DomainError,
ports::AlbumRepository,
value_objects::SystemId,
entities::Album, errors::DomainError, ports::AlbumRepository, value_objects::SystemId,
};
use std::sync::Arc;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum AlbumAction {
@@ -29,7 +26,10 @@ impl ManageAlbumEntriesHandler {
}
pub async fn execute(&self, cmd: ManageAlbumEntriesCommand) -> Result<Album, DomainError> {
let mut album = self.album_repo.find_by_id(&cmd.album_id).await?
let mut album = self
.album_repo
.find_by_id(&cmd.album_id)
.await?
.ok_or_else(|| DomainError::NotFound(format!("Album {} not found", cmd.album_id)))?;
match cmd.action {

View File

@@ -1,10 +1,10 @@
use std::sync::Arc;
use domain::{
entities::{AssetTag, Tag},
errors::DomainError,
ports::{AssetRepository, TagRepository},
value_objects::SystemId,
};
use std::sync::Arc;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct TagAssetCommand {
@@ -20,11 +20,16 @@ pub struct TagAssetHandler {
impl TagAssetHandler {
pub fn new(asset_repo: Arc<dyn AssetRepository>, tag_repo: Arc<dyn TagRepository>) -> Self {
Self { asset_repo, tag_repo }
Self {
asset_repo,
tag_repo,
}
}
pub async fn execute(&self, cmd: TagAssetCommand) -> Result<(Tag, AssetTag), DomainError> {
self.asset_repo.find_by_id(&cmd.asset_id).await?
self.asset_repo
.find_by_id(&cmd.asset_id)
.await?
.ok_or_else(|| DomainError::NotFound(format!("Asset {} not found", cmd.asset_id)))?;
let tag = match self.tag_repo.find_by_name(&cmd.tag_name).await? {

View File

@@ -1,7 +1,7 @@
pub mod commands;
pub mod queries;
pub use commands::{CreateAlbumCommand, CreateAlbumHandler};
pub use commands::{AlbumAction, ManageAlbumEntriesCommand, ManageAlbumEntriesHandler};
pub use commands::{CreateAlbumCommand, CreateAlbumHandler};
pub use commands::{TagAssetCommand, TagAssetHandler};
pub use queries::get_album::{GetAlbumQuery, GetAlbumHandler};
pub use queries::get_album::{GetAlbumHandler, GetAlbumQuery};

View File

@@ -1,10 +1,7 @@
use std::sync::Arc;
use domain::{
entities::Album,
errors::DomainError,
ports::AlbumRepository,
value_objects::SystemId,
entities::Album, errors::DomainError, ports::AlbumRepository, value_objects::SystemId,
};
use std::sync::Arc;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct GetAlbumQuery {
@@ -21,7 +18,9 @@ impl GetAlbumHandler {
}
pub async fn execute(&self, query: GetAlbumQuery) -> Result<Album, DomainError> {
self.album_repo.find_by_id(&query.album_id).await?
self.album_repo
.find_by_id(&query.album_id)
.await?
.ok_or_else(|| DomainError::NotFound(format!("Album {} not found", query.album_id)))
}
}

View File

@@ -1,4 +1,3 @@
use std::sync::Arc;
use domain::{
entities::Job,
errors::DomainError,
@@ -6,6 +5,7 @@ use domain::{
ports::{EventPublisher, JobBatchRepository, JobRepository},
value_objects::{DateTimeStamp, StructuredData, SystemId},
};
use std::sync::Arc;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct CompleteJobCommand {
@@ -25,24 +25,35 @@ impl CompleteJobHandler {
batch_repo: Arc<dyn JobBatchRepository>,
event_pub: Arc<dyn EventPublisher>,
) -> Self {
Self { job_repo, batch_repo, event_pub }
Self {
job_repo,
batch_repo,
event_pub,
}
}
pub async fn execute(&self, cmd: CompleteJobCommand) -> Result<Job, DomainError> {
let mut job = self.job_repo.find_by_id(&cmd.job_id).await?
let mut job = self
.job_repo
.find_by_id(&cmd.job_id)
.await?
.ok_or_else(|| DomainError::NotFound(format!("Job {} not found", cmd.job_id)))?;
job.complete(cmd.result);
self.job_repo.save(&job).await?;
if let Some(ref batch_id) = job.batch_id {
let mut batch = self.batch_repo.find_by_id(batch_id).await?
.ok_or_else(|| DomainError::NotFound(format!("Batch {} not found", batch_id)))?;
let mut batch =
self.batch_repo.find_by_id(batch_id).await?.ok_or_else(|| {
DomainError::NotFound(format!("Batch {} not found", batch_id))
})?;
batch.record_completion();
self.batch_repo.save(&batch).await?;
}
self.event_pub.publish(DomainEvent::JobCompleted {
job_id: job.job_id,
timestamp: DateTimeStamp::now(),
}).await?;
self.event_pub
.publish(DomainEvent::JobCompleted {
job_id: job.job_id,
timestamp: DateTimeStamp::now(),
})
.await?;
Ok(job)
}
}

View File

@@ -1,10 +1,10 @@
use std::sync::Arc;
use domain::{
entities::ProcessingPipeline,
errors::DomainError,
ports::{PipelineRepository, PluginRepository},
value_objects::{StructuredData, SystemId},
};
use std::sync::Arc;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct PipelineStepConfig {
@@ -28,13 +28,23 @@ impl ConfigurePipelineHandler {
pipeline_repo: Arc<dyn PipelineRepository>,
plugin_repo: Arc<dyn PluginRepository>,
) -> Self {
Self { pipeline_repo, plugin_repo }
Self {
pipeline_repo,
plugin_repo,
}
}
pub async fn execute(&self, cmd: ConfigurePipelineCommand) -> Result<ProcessingPipeline, DomainError> {
pub async fn execute(
&self,
cmd: ConfigurePipelineCommand,
) -> Result<ProcessingPipeline, DomainError> {
for step in &cmd.steps {
self.plugin_repo.find_by_id(&step.plugin_id).await?
.ok_or_else(|| DomainError::NotFound(format!("Plugin {} not found", step.plugin_id)))?;
self.plugin_repo
.find_by_id(&step.plugin_id)
.await?
.ok_or_else(|| {
DomainError::NotFound(format!("Plugin {} not found", step.plugin_id))
})?;
}
let mut pipeline = ProcessingPipeline::new(cmd.trigger_event);
for step in cmd.steps {

View File

@@ -1,4 +1,3 @@
use std::sync::Arc;
use domain::{
entities::{Job, JobType},
errors::DomainError,
@@ -6,6 +5,7 @@ use domain::{
ports::{EventPublisher, JobRepository},
value_objects::{DateTimeStamp, StructuredData, SystemId},
};
use std::sync::Arc;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct EnqueueJobCommand {
@@ -23,7 +23,10 @@ pub struct EnqueueJobHandler {
impl EnqueueJobHandler {
pub fn new(job_repo: Arc<dyn JobRepository>, event_pub: Arc<dyn EventPublisher>) -> Self {
Self { job_repo, event_pub }
Self {
job_repo,
event_pub,
}
}
pub async fn execute(&self, cmd: EnqueueJobCommand) -> Result<Job, DomainError> {
@@ -35,11 +38,13 @@ impl EnqueueJobHandler {
job = job.with_batch(id);
}
self.job_repo.save(&job).await?;
self.event_pub.publish(DomainEvent::JobEnqueued {
job_id: job.job_id,
job_type: format!("{:?}", cmd.job_type),
timestamp: DateTimeStamp::now(),
}).await?;
self.event_pub
.publish(DomainEvent::JobEnqueued {
job_id: job.job_id,
job_type: format!("{:?}", cmd.job_type),
timestamp: DateTimeStamp::now(),
})
.await?;
Ok(job)
}
}

View File

@@ -1,4 +1,3 @@
use std::sync::Arc;
use domain::{
entities::{Job, JobStatus},
errors::DomainError,
@@ -6,6 +5,7 @@ use domain::{
ports::{EventPublisher, JobBatchRepository, JobRepository},
value_objects::{DateTimeStamp, SystemId},
};
use std::sync::Arc;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct FailJobCommand {
@@ -25,32 +25,44 @@ impl FailJobHandler {
batch_repo: Arc<dyn JobBatchRepository>,
event_pub: Arc<dyn EventPublisher>,
) -> Self {
Self { job_repo, batch_repo, event_pub }
Self {
job_repo,
batch_repo,
event_pub,
}
}
pub async fn execute(&self, cmd: FailJobCommand) -> Result<Job, DomainError> {
let mut job = self.job_repo.find_by_id(&cmd.job_id).await?
let mut job = self
.job_repo
.find_by_id(&cmd.job_id)
.await?
.ok_or_else(|| DomainError::NotFound(format!("Job {} not found", cmd.job_id)))?;
job.fail(&cmd.error);
self.job_repo.save(&job).await?;
if job.status == JobStatus::Failed {
if let Some(ref batch_id) = job.batch_id {
let mut batch = self.batch_repo.find_by_id(batch_id).await?
.ok_or_else(|| DomainError::NotFound(format!("Batch {} not found", batch_id)))?;
let mut batch = self.batch_repo.find_by_id(batch_id).await?.ok_or_else(|| {
DomainError::NotFound(format!("Batch {} not found", batch_id))
})?;
batch.record_failure();
self.batch_repo.save(&batch).await?;
}
self.event_pub.publish(DomainEvent::JobFailed {
job_id: job.job_id,
error: cmd.error,
timestamp: DateTimeStamp::now(),
}).await?;
self.event_pub
.publish(DomainEvent::JobFailed {
job_id: job.job_id,
error: cmd.error,
timestamp: DateTimeStamp::now(),
})
.await?;
} else if job.status == JobStatus::Queued {
self.event_pub.publish(DomainEvent::JobEnqueued {
job_id: job.job_id,
job_type: format!("{:?}", job.job_type),
timestamp: DateTimeStamp::now(),
}).await?;
self.event_pub
.publish(DomainEvent::JobEnqueued {
job_id: job.job_id,
job_type: format!("{:?}", job.job_type),
timestamp: DateTimeStamp::now(),
})
.await?;
}
Ok(job)
}

View File

@@ -1,10 +1,10 @@
use std::sync::Arc;
use domain::{
entities::{Plugin, PluginType},
errors::DomainError,
ports::PluginRepository,
value_objects::{StructuredData, SystemId},
};
use std::sync::Arc;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum PluginAction {
@@ -34,25 +34,37 @@ impl ManagePluginHandler {
pub async fn execute(&self, cmd: ManagePluginCommand) -> Result<Plugin, DomainError> {
match cmd.action {
PluginAction::Create { name, plugin_type, config } => {
PluginAction::Create {
name,
plugin_type,
config,
} => {
let mut plugin = Plugin::new(name, plugin_type);
plugin.configuration = config;
self.plugin_repo.save(&plugin).await?;
Ok(plugin)
}
PluginAction::Enable => {
let id = cmd.plugin_id
.ok_or_else(|| DomainError::Validation("plugin_id required for Enable".into()))?;
let mut plugin = self.plugin_repo.find_by_id(&id).await?
let id = cmd.plugin_id.ok_or_else(|| {
DomainError::Validation("plugin_id required for Enable".into())
})?;
let mut plugin = self
.plugin_repo
.find_by_id(&id)
.await?
.ok_or_else(|| DomainError::NotFound(format!("Plugin {} not found", id)))?;
plugin.enable();
self.plugin_repo.save(&plugin).await?;
Ok(plugin)
}
PluginAction::Disable => {
let id = cmd.plugin_id
.ok_or_else(|| DomainError::Validation("plugin_id required for Disable".into()))?;
let mut plugin = self.plugin_repo.find_by_id(&id).await?
let id = cmd.plugin_id.ok_or_else(|| {
DomainError::Validation("plugin_id required for Disable".into())
})?;
let mut plugin = self
.plugin_repo
.find_by_id(&id)
.await?
.ok_or_else(|| DomainError::NotFound(format!("Plugin {} not found", id)))?;
plugin.disable();
self.plugin_repo.save(&plugin).await?;

View File

@@ -1,6 +1,6 @@
pub mod enqueue_job;
pub mod start_job;
pub mod complete_job;
pub mod configure_pipeline;
pub mod enqueue_job;
pub mod fail_job;
pub mod manage_plugin;
pub mod configure_pipeline;
pub mod start_job;

View File

@@ -1,10 +1,5 @@
use domain::{entities::Job, errors::DomainError, ports::JobRepository, value_objects::SystemId};
use std::sync::Arc;
use domain::{
entities::Job,
errors::DomainError,
ports::JobRepository,
value_objects::SystemId,
};
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct StartJobCommand {
@@ -21,7 +16,10 @@ impl StartJobHandler {
}
pub async fn execute(&self, cmd: StartJobCommand) -> Result<Job, DomainError> {
let mut job = self.job_repo.find_by_id(&cmd.job_id).await?
let mut job = self
.job_repo
.find_by_id(&cmd.job_id)
.await?
.ok_or_else(|| DomainError::NotFound(format!("Job {} not found", cmd.job_id)))?;
job.start()?;
self.job_repo.save(&job).await?;

View File

@@ -1,10 +1,14 @@
pub mod commands;
pub mod queries;
pub use commands::enqueue_job::{EnqueueJobCommand, EnqueueJobHandler};
pub use commands::start_job::{StartJobCommand, StartJobHandler};
pub use commands::complete_job::{CompleteJobCommand, CompleteJobHandler};
pub use commands::configure_pipeline::{
ConfigurePipelineCommand, ConfigurePipelineHandler, PipelineStepConfig,
};
pub use commands::enqueue_job::{EnqueueJobCommand, EnqueueJobHandler};
pub use commands::fail_job::{FailJobCommand, FailJobHandler};
pub use commands::manage_plugin::{ManagePluginCommand, ManagePluginHandler, PluginAction};
pub use commands::configure_pipeline::{ConfigurePipelineCommand, ConfigurePipelineHandler, PipelineStepConfig};
pub use queries::report_batch_progress::{ReportBatchProgressQuery, ReportBatchProgressHandler, BatchProgress};
pub use commands::start_job::{StartJobCommand, StartJobHandler};
pub use queries::report_batch_progress::{
BatchProgress, ReportBatchProgressHandler, ReportBatchProgressQuery,
};

View File

@@ -1,10 +1,10 @@
use std::sync::Arc;
use domain::{
entities::{Job, JobBatch},
errors::DomainError,
ports::{JobBatchRepository, JobRepository},
value_objects::SystemId,
};
use std::sync::Arc;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ReportBatchProgressQuery {
@@ -24,11 +24,20 @@ pub struct ReportBatchProgressHandler {
impl ReportBatchProgressHandler {
pub fn new(batch_repo: Arc<dyn JobBatchRepository>, job_repo: Arc<dyn JobRepository>) -> Self {
Self { batch_repo, job_repo }
Self {
batch_repo,
job_repo,
}
}
pub async fn execute(&self, query: ReportBatchProgressQuery) -> Result<BatchProgress, DomainError> {
let batch = self.batch_repo.find_by_id(&query.batch_id).await?
pub async fn execute(
&self,
query: ReportBatchProgressQuery,
) -> Result<BatchProgress, DomainError> {
let batch = self
.batch_repo
.find_by_id(&query.batch_id)
.await?
.ok_or_else(|| DomainError::NotFound(format!("Batch {} not found", query.batch_id)))?;
let jobs = self.job_repo.find_by_batch(&query.batch_id).await?;
Ok(BatchProgress { batch, jobs })

View File

@@ -1,10 +1,10 @@
use std::sync::Arc;
use domain::{
entities::{LinkAccessLevel, ScopeType, ShareLink, ShareScope, ShareableType},
errors::DomainError,
ports::ShareRepository,
value_objects::{DateTimeStamp, SystemId},
};
use std::sync::Arc;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct GenerateShareLinkCommand {
@@ -25,8 +25,16 @@ impl GenerateShareLinkHandler {
Self { share_repo }
}
pub async fn execute(&self, cmd: GenerateShareLinkCommand) -> Result<(ShareScope, ShareLink), DomainError> {
let scope = ShareScope::new(ScopeType::Link, cmd.shareable_type, cmd.shareable_id, cmd.created_by);
pub async fn execute(
&self,
cmd: GenerateShareLinkCommand,
) -> Result<(ShareScope, ShareLink), DomainError> {
let scope = ShareScope::new(
ScopeType::Link,
cmd.shareable_type,
cmd.shareable_id,
cmd.created_by,
);
let token = uuid::Uuid::new_v4().to_string();
let mut link = ShareLink::new(scope.scope_id, token, cmd.access_level);
link.expires_at = cmd.expires_at;

View File

@@ -1,7 +1,7 @@
pub mod share_resource;
pub mod generate_share_link;
pub mod revoke_share;
pub mod share_resource;
pub use share_resource::{ShareResourceCommand, ShareResourceHandler};
pub use generate_share_link::{GenerateShareLinkCommand, GenerateShareLinkHandler};
pub use revoke_share::{RevokeShareCommand, RevokeShareHandler};
pub use share_resource::{ShareResourceCommand, ShareResourceHandler};

View File

@@ -1,10 +1,10 @@
use std::sync::Arc;
use domain::{
errors::DomainError,
events::DomainEvent,
ports::{EventPublisher, ShareRepository},
value_objects::{DateTimeStamp, SystemId},
};
use std::sync::Arc;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct RevokeShareCommand {
@@ -19,20 +19,29 @@ pub struct RevokeShareHandler {
impl RevokeShareHandler {
pub fn new(share_repo: Arc<dyn ShareRepository>, event_pub: Arc<dyn EventPublisher>) -> Self {
Self { share_repo, event_pub }
Self {
share_repo,
event_pub,
}
}
pub async fn execute(&self, cmd: RevokeShareCommand) -> Result<(), DomainError> {
self.share_repo.find_scope_by_id(&cmd.scope_id).await?
.ok_or_else(|| DomainError::NotFound(format!("Share scope {} not found", cmd.scope_id)))?;
self.share_repo
.find_scope_by_id(&cmd.scope_id)
.await?
.ok_or_else(|| {
DomainError::NotFound(format!("Share scope {} not found", cmd.scope_id))
})?;
self.share_repo.delete_scope(&cmd.scope_id).await?;
self.event_pub.publish(DomainEvent::ShareRevoked {
scope_id: cmd.scope_id,
revoked_by: cmd.revoked_by,
timestamp: DateTimeStamp::now(),
}).await?;
self.event_pub
.publish(DomainEvent::ShareRevoked {
scope_id: cmd.scope_id,
revoked_by: cmd.revoked_by,
timestamp: DateTimeStamp::now(),
})
.await?;
Ok(())
}

View File

@@ -1,4 +1,3 @@
use std::sync::Arc;
use domain::{
entities::{ScopeType, ShareScope, ShareTarget, ShareableType, TargetType},
errors::DomainError,
@@ -6,6 +5,7 @@ use domain::{
ports::{EventPublisher, ShareRepository},
value_objects::{DateTimeStamp, SystemId},
};
use std::sync::Arc;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ShareResourceCommand {
@@ -24,27 +24,40 @@ pub struct ShareResourceHandler {
impl ShareResourceHandler {
pub fn new(share_repo: Arc<dyn ShareRepository>, event_pub: Arc<dyn EventPublisher>) -> Self {
Self { share_repo, event_pub }
Self {
share_repo,
event_pub,
}
}
pub async fn execute(&self, cmd: ShareResourceCommand) -> Result<(ShareScope, ShareTarget), DomainError> {
pub async fn execute(
&self,
cmd: ShareResourceCommand,
) -> Result<(ShareScope, ShareTarget), DomainError> {
let scope_type = match cmd.target_type {
TargetType::User => ScopeType::User,
TargetType::Group => ScopeType::Group,
};
let scope = ShareScope::new(scope_type, cmd.shareable_type, cmd.shareable_id, cmd.created_by);
let scope = ShareScope::new(
scope_type,
cmd.shareable_type,
cmd.shareable_id,
cmd.created_by,
);
let target = ShareTarget::new(scope.scope_id, cmd.target_type, cmd.target_id, cmd.role_id);
self.share_repo.save_scope(&scope).await?;
self.share_repo.save_target(&target).await?;
self.event_pub.publish(DomainEvent::ShareCreated {
scope_id: scope.scope_id,
shareable_id: cmd.shareable_id,
created_by: cmd.created_by,
timestamp: DateTimeStamp::now(),
}).await?;
self.event_pub
.publish(DomainEvent::ShareCreated {
scope_id: scope.scope_id,
shareable_id: cmd.shareable_id,
created_by: cmd.created_by,
timestamp: DateTimeStamp::now(),
})
.await?;
Ok((scope, target))
}

View File

@@ -1,7 +1,7 @@
pub mod commands;
pub mod queries;
pub use commands::{ShareResourceCommand, ShareResourceHandler};
pub use commands::{GenerateShareLinkCommand, GenerateShareLinkHandler};
pub use commands::{RevokeShareCommand, RevokeShareHandler};
pub use queries::access_shared_resource::{AccessSharedResourceQuery, AccessSharedResourceHandler};
pub use commands::{ShareResourceCommand, ShareResourceHandler};
pub use queries::access_shared_resource::{AccessSharedResourceHandler, AccessSharedResourceQuery};

View File

@@ -1,9 +1,9 @@
use std::sync::Arc;
use domain::{
entities::{LinkAccessLevel, ShareScope},
errors::DomainError,
ports::ShareRepository,
};
use std::sync::Arc;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct AccessSharedResourceQuery {
@@ -19,18 +19,29 @@ impl AccessSharedResourceHandler {
Self { share_repo }
}
pub async fn execute(&self, query: AccessSharedResourceQuery) -> Result<(ShareScope, LinkAccessLevel), DomainError> {
let mut link = self.share_repo.find_link_by_token(&query.token).await?
pub async fn execute(
&self,
query: AccessSharedResourceQuery,
) -> Result<(ShareScope, LinkAccessLevel), DomainError> {
let mut link = self
.share_repo
.find_link_by_token(&query.token)
.await?
.ok_or_else(|| DomainError::NotFound("Share link not found".to_string()))?;
if !link.is_valid() {
return Err(DomainError::Forbidden("Link expired or exhausted".to_string()));
return Err(DomainError::Forbidden(
"Link expired or exhausted".to_string(),
));
}
link.record_use();
self.share_repo.save_link(&link).await?;
let scope = self.share_repo.find_scope_by_id(&link.scope_id).await?
let scope = self
.share_repo
.find_scope_by_id(&link.scope_id)
.await?
.ok_or_else(|| DomainError::NotFound("Share scope not found".to_string()))?;
Ok((scope, link.access_level))

View File

@@ -1,10 +1,10 @@
use std::sync::Arc;
use crate::sidecar::hash_helper::hash_structured_data;
use domain::{
entities::SyncStatus,
errors::DomainError,
ports::{SidecarRepository, SidecarWriterPort},
};
use crate::sidecar::hash_helper::hash_structured_data;
use std::sync::Arc;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct DetectExternalChangesCommand;
@@ -19,7 +19,10 @@ impl DetectExternalChangesHandler {
sidecar_repo: Arc<dyn SidecarRepository>,
writer: Arc<dyn SidecarWriterPort>,
) -> Self {
Self { sidecar_repo, writer }
Self {
sidecar_repo,
writer,
}
}
pub async fn execute(&self, _cmd: DetectExternalChangesCommand) -> Result<u32, DomainError> {

View File

@@ -1,4 +1,4 @@
use std::sync::Arc;
use crate::sidecar::hash_helper::hash_structured_data;
use domain::{
catalog::services::resolve_metadata,
entities::SidecarRecord,
@@ -6,7 +6,7 @@ use domain::{
ports::{AssetMetadataRepository, SidecarRepository, SidecarWriterPort},
value_objects::SystemId,
};
use crate::sidecar::hash_helper::hash_structured_data;
use std::sync::Arc;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ExportSidecarCommand {
@@ -25,7 +25,11 @@ impl ExportSidecarHandler {
sidecar_repo: Arc<dyn SidecarRepository>,
writer: Arc<dyn SidecarWriterPort>,
) -> Self {
Self { metadata_repo, sidecar_repo, writer }
Self {
metadata_repo,
sidecar_repo,
writer,
}
}
pub async fn execute(&self, cmd: ExportSidecarCommand) -> Result<SidecarRecord, DomainError> {
@@ -37,7 +41,9 @@ impl ExportSidecarHandler {
None => SidecarRecord::new(cmd.asset_id, format!("sidecars/{}.xmp", cmd.asset_id)),
};
self.writer.write_sidecar(&resolved, &record.sidecar_storage_path).await?;
self.writer
.write_sidecar(&resolved, &record.sidecar_storage_path)
.await?;
let hash = hash_structured_data(&resolved);
record.mark_synced(hash);

View File

@@ -1,12 +1,12 @@
use std::sync::Arc;
use crate::sidecar::hash_helper::hash_structured_data;
use domain::{
catalog::services::resolve_metadata,
entities::SidecarRecord,
errors::DomainError,
ports::{AssetRepository, AssetMetadataRepository, SidecarRepository, SidecarWriterPort},
ports::{AssetMetadataRepository, AssetRepository, SidecarRepository, SidecarWriterPort},
value_objects::SystemId,
};
use crate::sidecar::hash_helper::hash_structured_data;
use std::sync::Arc;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct FullExportCommand {
@@ -27,11 +27,19 @@ impl FullExportHandler {
sidecar_repo: Arc<dyn SidecarRepository>,
writer: Arc<dyn SidecarWriterPort>,
) -> Self {
Self { asset_repo, metadata_repo, sidecar_repo, writer }
Self {
asset_repo,
metadata_repo,
sidecar_repo,
writer,
}
}
pub async fn execute(&self, cmd: FullExportCommand) -> Result<u32, DomainError> {
let assets = self.asset_repo.find_by_owner(&cmd.owner_id, u32::MAX, 0).await?;
let assets = self
.asset_repo
.find_by_owner(&cmd.owner_id, u32::MAX, 0)
.await?;
let mut count = 0u32;
for asset in &assets {
@@ -40,10 +48,14 @@ impl FullExportHandler {
let mut record = match self.sidecar_repo.find_by_asset(&asset.asset_id).await? {
Some(r) => r,
None => SidecarRecord::new(asset.asset_id, format!("sidecars/{}.xmp", asset.asset_id)),
None => {
SidecarRecord::new(asset.asset_id, format!("sidecars/{}.xmp", asset.asset_id))
}
};
self.writer.write_sidecar(&resolved, &record.sidecar_storage_path).await?;
self.writer
.write_sidecar(&resolved, &record.sidecar_storage_path)
.await?;
let hash = hash_structured_data(&resolved);
record.mark_synced(hash);
self.sidecar_repo.save(&record).await?;

View File

@@ -1,12 +1,12 @@
use std::sync::Arc;
use crate::sidecar::hash_helper::hash_structured_data;
use domain::{
catalog::entities::{AssetMetadata, MetadataSource},
entities::SidecarRecord,
errors::DomainError,
ports::{AssetRepository, AssetMetadataRepository, SidecarRepository, SidecarWriterPort},
ports::{AssetMetadataRepository, AssetRepository, SidecarRepository, SidecarWriterPort},
value_objects::SystemId,
};
use crate::sidecar::hash_helper::hash_structured_data;
use std::sync::Arc;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct FullImportCommand {
@@ -27,11 +27,19 @@ impl FullImportHandler {
sidecar_repo: Arc<dyn SidecarRepository>,
writer: Arc<dyn SidecarWriterPort>,
) -> Self {
Self { asset_repo, metadata_repo, sidecar_repo, writer }
Self {
asset_repo,
metadata_repo,
sidecar_repo,
writer,
}
}
pub async fn execute(&self, cmd: FullImportCommand) -> Result<u32, DomainError> {
let assets = self.asset_repo.find_by_owner(&cmd.owner_id, u32::MAX, 0).await?;
let assets = self
.asset_repo
.find_by_owner(&cmd.owner_id, u32::MAX, 0)
.await?;
let mut count = 0u32;
for asset in &assets {
@@ -45,7 +53,11 @@ impl FullImportHandler {
match self.writer.read_sidecar(&record.sidecar_storage_path).await {
Ok(data) => {
let metadata = AssetMetadata::new(asset.asset_id, MetadataSource::ExifExtracted, data.clone());
let metadata = AssetMetadata::new(
asset.asset_id,
MetadataSource::ExifExtracted,
data.clone(),
);
self.metadata_repo.save(&metadata).await?;
let hash = hash_structured_data(&data);
let mut record = record;

View File

@@ -1,4 +1,4 @@
use std::sync::Arc;
use crate::sidecar::hash_helper::hash_structured_data;
use domain::{
catalog::entities::{AssetMetadata, MetadataSource},
entities::SyncStatus,
@@ -6,7 +6,7 @@ use domain::{
ports::{AssetMetadataRepository, SidecarRepository, SidecarWriterPort},
value_objects::SystemId,
};
use crate::sidecar::hash_helper::hash_structured_data;
use std::sync::Arc;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ImportSidecarCommand {
@@ -25,21 +25,35 @@ impl ImportSidecarHandler {
writer: Arc<dyn SidecarWriterPort>,
metadata_repo: Arc<dyn AssetMetadataRepository>,
) -> Self {
Self { sidecar_repo, writer, metadata_repo }
Self {
sidecar_repo,
writer,
metadata_repo,
}
}
pub async fn execute(&self, cmd: ImportSidecarCommand) -> Result<AssetMetadata, DomainError> {
let mut record = self.sidecar_repo.find_by_asset(&cmd.asset_id).await?
.ok_or_else(|| DomainError::NotFound(format!("Sidecar record for {} not found", cmd.asset_id)))?;
let mut record = self
.sidecar_repo
.find_by_asset(&cmd.asset_id)
.await?
.ok_or_else(|| {
DomainError::NotFound(format!("Sidecar record for {} not found", cmd.asset_id))
})?;
if record.sync_status != SyncStatus::PendingRead {
return Err(DomainError::Validation(
format!("Sidecar is not pending read (status: {:?})", record.sync_status),
));
return Err(DomainError::Validation(format!(
"Sidecar is not pending read (status: {:?})",
record.sync_status
)));
}
let data = self.writer.read_sidecar(&record.sidecar_storage_path).await?;
let metadata = AssetMetadata::new(cmd.asset_id, MetadataSource::ExifExtracted, data.clone());
let data = self
.writer
.read_sidecar(&record.sidecar_storage_path)
.await?;
let metadata =
AssetMetadata::new(cmd.asset_id, MetadataSource::ExifExtracted, data.clone());
self.metadata_repo.save(&metadata).await?;
let hash = hash_structured_data(&data);

View File

@@ -1,6 +1,6 @@
pub mod export_sidecar;
pub mod detect_external_changes;
pub mod import_sidecar;
pub mod resolve_conflict;
pub mod export_sidecar;
pub mod full_export;
pub mod full_import;
pub mod import_sidecar;
pub mod resolve_conflict;

View File

@@ -1,4 +1,4 @@
use std::sync::Arc;
use crate::sidecar::hash_helper::hash_structured_data;
use domain::{
catalog::entities::{AssetMetadata, MetadataSource},
catalog::services::resolve_metadata,
@@ -7,7 +7,7 @@ use domain::{
ports::{AssetMetadataRepository, SidecarRepository, SidecarWriterPort},
value_objects::SystemId,
};
use crate::sidecar::hash_helper::hash_structured_data;
use std::sync::Arc;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ResolveConflictCommand {
@@ -27,36 +27,54 @@ impl ResolveConflictHandler {
writer: Arc<dyn SidecarWriterPort>,
metadata_repo: Arc<dyn AssetMetadataRepository>,
) -> Self {
Self { sidecar_repo, writer, metadata_repo }
Self {
sidecar_repo,
writer,
metadata_repo,
}
}
pub async fn execute(&self, cmd: ResolveConflictCommand) -> Result<SidecarRecord, DomainError> {
let mut record = self.sidecar_repo.find_by_asset(&cmd.asset_id).await?
.ok_or_else(|| DomainError::NotFound(format!("Sidecar record for {} not found", cmd.asset_id)))?;
let mut record = self
.sidecar_repo
.find_by_asset(&cmd.asset_id)
.await?
.ok_or_else(|| {
DomainError::NotFound(format!("Sidecar record for {} not found", cmd.asset_id))
})?;
if record.sync_status != SyncStatus::Conflict {
return Err(DomainError::Validation(
format!("Sidecar is not in conflict (status: {:?})", record.sync_status),
));
return Err(DomainError::Validation(format!(
"Sidecar is not in conflict (status: {:?})",
record.sync_status
)));
}
match cmd.policy {
ConflictPolicy::DbWins => {
let layers = self.metadata_repo.find_by_asset(&cmd.asset_id).await?;
let resolved = resolve_metadata(&layers);
self.writer.write_sidecar(&resolved, &record.sidecar_storage_path).await?;
self.writer
.write_sidecar(&resolved, &record.sidecar_storage_path)
.await?;
let hash = hash_structured_data(&resolved);
record.mark_synced(hash);
}
ConflictPolicy::FileWins => {
let data = self.writer.read_sidecar(&record.sidecar_storage_path).await?;
let metadata = AssetMetadata::new(cmd.asset_id, MetadataSource::ExifExtracted, data.clone());
let data = self
.writer
.read_sidecar(&record.sidecar_storage_path)
.await?;
let metadata =
AssetMetadata::new(cmd.asset_id, MetadataSource::ExifExtracted, data.clone());
self.metadata_repo.save(&metadata).await?;
let hash = hash_structured_data(&data);
record.mark_synced(hash);
}
ConflictPolicy::RequireUserDecision => {
return Err(DomainError::Validation("Manual resolution required".to_string()));
return Err(DomainError::Validation(
"Manual resolution required".to_string(),
));
}
}

View File

@@ -1,5 +1,5 @@
use domain::value_objects::{Checksum, StructuredData};
use sha2::{Sha256, Digest};
use sha2::{Digest, Sha256};
pub fn hash_structured_data(data: &StructuredData) -> Checksum {
let json = serde_json::to_string(data).unwrap_or_default();

View File

@@ -1,9 +1,11 @@
pub mod commands;
pub mod hash_helper;
pub use commands::detect_external_changes::{
DetectExternalChangesCommand, DetectExternalChangesHandler,
};
pub use commands::export_sidecar::{ExportSidecarCommand, ExportSidecarHandler};
pub use commands::detect_external_changes::{DetectExternalChangesCommand, DetectExternalChangesHandler};
pub use commands::import_sidecar::{ImportSidecarCommand, ImportSidecarHandler};
pub use commands::resolve_conflict::{ResolveConflictCommand, ResolveConflictHandler};
pub use commands::full_export::{FullExportCommand, FullExportHandler};
pub use commands::full_import::{FullImportCommand, FullImportHandler};
pub use commands::import_sidecar::{ImportSidecarCommand, ImportSidecarHandler};
pub use commands::resolve_conflict::{ResolveConflictCommand, ResolveConflictHandler};

View File

@@ -1,15 +1,17 @@
use std::sync::Arc;
use bytes::Bytes;
use domain::{
entities::{Asset, AssetType, IngestSession, IngestStatus, SourceReference, UsageLedgerEntry, UsageType},
entities::{
Asset, AssetType, IngestSession, IngestStatus, SourceReference, UsageLedgerEntry, UsageType,
},
errors::DomainError,
events::DomainEvent,
ports::{
AssetRepository, EventPublisher, FileStoragePort,
IngestSessionRepository, LibraryPathRepository, QuotaRepository, UsageLedgerRepository,
AssetRepository, EventPublisher, FileStoragePort, IngestSessionRepository,
LibraryPathRepository, QuotaRepository, UsageLedgerRepository,
},
value_objects::{Checksum, DateTimeStamp, SystemId},
};
use std::sync::Arc;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct IngestAssetCommand {
@@ -43,25 +45,53 @@ impl IngestAssetHandler {
file_storage: Arc<dyn FileStoragePort>,
event_pub: Arc<dyn EventPublisher>,
) -> Self {
Self { ingest_repo, path_repo, quota_repo, ledger_repo, asset_repo, file_storage, event_pub }
Self {
ingest_repo,
path_repo,
quota_repo,
ledger_repo,
asset_repo,
file_storage,
event_pub,
}
}
pub async fn execute(&self, cmd: IngestAssetCommand) -> Result<(Asset, IngestSession), DomainError> {
pub async fn execute(
&self,
cmd: IngestAssetCommand,
) -> Result<(Asset, IngestSession), DomainError> {
let checksum = Checksum::new(&cmd.checksum)?;
let path = self.path_repo.find_by_id(&cmd.target_path_id).await?
.ok_or_else(|| DomainError::NotFound(format!("Library path {} not found", cmd.target_path_id)))?;
let path = self
.path_repo
.find_by_id(&cmd.target_path_id)
.await?
.ok_or_else(|| {
DomainError::NotFound(format!("Library path {} not found", cmd.target_path_id))
})?;
if !path.is_ingest_destination {
return Err(DomainError::Validation("Target path is not an ingest destination".to_string()));
return Err(DomainError::Validation(
"Target path is not an ingest destination".to_string(),
));
}
if let Some(quota) = self.quota_repo.find_by_owner(&cmd.uploader_id).await? {
let current = self.ledger_repo.sum_usage(&cmd.uploader_id, UsageType::StorageBytes, None).await?;
let result = domain::storage::services::check_quota(&quota, UsageType::StorageBytes, current, cmd.file_size);
let current = self
.ledger_repo
.sum_usage(&cmd.uploader_id, UsageType::StorageBytes, None)
.await?;
let result = domain::storage::services::check_quota(
&quota,
UsageType::StorageBytes,
current,
cmd.file_size,
);
if !result.allowed {
return Err(DomainError::QuotaExceeded(format!(
"Storage quota exceeded: {} / {} bytes", current + cmd.file_size, result.limit
"Storage quota exceeded: {} / {} bytes",
current + cmd.file_size,
result.limit
)));
}
}
@@ -75,10 +105,16 @@ impl IngestAssetHandler {
);
let storage_path = format!("{}/{}", path.relative_path, cmd.filename);
self.file_storage.store_file(&storage_path, cmd.data).await?;
self.file_storage
.store_file(&storage_path, cmd.data)
.await?;
let mime_type = mime_type_from_filename(&cmd.filename);
let asset_type = if mime_type.starts_with("video") { AssetType::Video } else { AssetType::Image };
let asset_type = if mime_type.starts_with("video") {
AssetType::Video
} else {
AssetType::Image
};
let asset = Asset::new(
SourceReference {
@@ -105,11 +141,13 @@ impl IngestAssetHandler {
);
self.ledger_repo.record(&entry).await?;
self.event_pub.publish(DomainEvent::AssetIngested {
asset_id: asset.asset_id,
owner_user_id: cmd.uploader_id,
timestamp: DateTimeStamp::now(),
}).await?;
self.event_pub
.publish(DomainEvent::AssetIngested {
asset_id: asset.asset_id,
owner_user_id: cmd.uploader_id,
timestamp: DateTimeStamp::now(),
})
.await?;
Ok((asset, session))
}

View File

@@ -1,3 +1,3 @@
pub mod register_volume;
pub mod register_library_path;
pub mod ingest_asset;
pub mod register_library_path;
pub mod register_volume;

View File

@@ -1,10 +1,10 @@
use std::sync::Arc;
use domain::{
entities::LibraryPath,
errors::DomainError,
ports::{LibraryPathRepository, StorageVolumeRepository},
value_objects::SystemId,
};
use std::sync::Arc;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct RegisterLibraryPathCommand {
@@ -24,11 +24,19 @@ impl RegisterLibraryPathHandler {
volume_repo: Arc<dyn StorageVolumeRepository>,
path_repo: Arc<dyn LibraryPathRepository>,
) -> Self {
Self { volume_repo, path_repo }
Self {
volume_repo,
path_repo,
}
}
pub async fn execute(&self, cmd: RegisterLibraryPathCommand) -> Result<LibraryPath, DomainError> {
self.volume_repo.find_by_id(&cmd.volume_id).await?
pub async fn execute(
&self,
cmd: RegisterLibraryPathCommand,
) -> Result<LibraryPath, DomainError> {
self.volume_repo
.find_by_id(&cmd.volume_id)
.await?
.ok_or_else(|| DomainError::NotFound(format!("Volume {} not found", cmd.volume_id)))?;
let path = LibraryPath::new_user_owned(

View File

@@ -1,9 +1,5 @@
use domain::{entities::StorageVolume, errors::DomainError, ports::StorageVolumeRepository};
use std::sync::Arc;
use domain::{
entities::StorageVolume,
errors::DomainError,
ports::StorageVolumeRepository,
};
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct RegisterVolumeCommand {
@@ -23,7 +19,9 @@ impl RegisterVolumeHandler {
pub async fn execute(&self, cmd: RegisterVolumeCommand) -> Result<StorageVolume, DomainError> {
if cmd.volume_name.is_empty() {
return Err(DomainError::Validation("Volume name must not be empty".to_string()));
return Err(DomainError::Validation(
"Volume name must not be empty".to_string(),
));
}
let volume = StorageVolume::new(cmd.volume_name, cmd.uri_prefix, cmd.is_writable);
self.volume_repo.save(&volume).await?;

View File

@@ -1,7 +1,7 @@
pub mod commands;
pub mod queries;
pub use commands::register_volume::{RegisterVolumeCommand, RegisterVolumeHandler};
pub use commands::register_library_path::{RegisterLibraryPathCommand, RegisterLibraryPathHandler};
pub use commands::ingest_asset::{IngestAssetCommand, IngestAssetHandler};
pub use queries::check_quota::{CheckQuotaQuery, CheckQuotaHandler};
pub use commands::register_library_path::{RegisterLibraryPathCommand, RegisterLibraryPathHandler};
pub use commands::register_volume::{RegisterVolumeCommand, RegisterVolumeHandler};
pub use queries::check_quota::{CheckQuotaHandler, CheckQuotaQuery};

View File

@@ -1,11 +1,11 @@
use std::sync::Arc;
use domain::{
entities::UsageType,
errors::DomainError,
ports::{QuotaRepository, UsageLedgerRepository},
storage::services::{check_quota, QuotaCheckResult},
storage::services::{QuotaCheckResult, check_quota},
value_objects::SystemId,
};
use std::sync::Arc;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct CheckQuotaQuery {
@@ -24,7 +24,10 @@ impl CheckQuotaHandler {
quota_repo: Arc<dyn QuotaRepository>,
ledger_repo: Arc<dyn UsageLedgerRepository>,
) -> Self {
Self { quota_repo, ledger_repo }
Self {
quota_repo,
ledger_repo,
}
}
pub async fn execute(&self, query: CheckQuotaQuery) -> Result<QuotaCheckResult, DomainError> {
@@ -39,7 +42,15 @@ impl CheckQuotaHandler {
});
};
let current = self.ledger_repo.sum_usage(&query.user_id, query.usage_type, None).await?;
Ok(check_quota(&quota, query.usage_type, current, query.requested_amount))
let current = self
.ledger_repo
.sum_usage(&query.user_id, query.usage_type, None)
.await?;
Ok(check_quota(
&quota,
query.usage_type,
current,
query.requested_amount,
))
}
}

View File

@@ -1,13 +1,15 @@
use std::collections::HashMap;
use async_trait::async_trait;
use bytes::Bytes;
use tokio::sync::Mutex;
use domain::{
errors::DomainError,
events::DomainEvent,
ports::{EventPublisher, FileStoragePort, FileEntry, PasswordHasher, TokenIssuer, SidecarWriterPort},
ports::{
EventPublisher, FileEntry, FileStoragePort, PasswordHasher, SidecarWriterPort, TokenIssuer,
},
value_objects::{PasswordHash, StructuredData, SystemId},
};
use std::collections::HashMap;
use tokio::sync::Mutex;
// --- StubEventPublisher ---
@@ -17,7 +19,9 @@ pub struct StubEventPublisher {
impl StubEventPublisher {
pub fn new() -> Self {
Self { events: Mutex::new(Vec::new()) }
Self {
events: Mutex::new(Vec::new()),
}
}
pub async fn published(&self) -> Vec<DomainEvent> {
@@ -26,7 +30,9 @@ impl StubEventPublisher {
}
impl Default for StubEventPublisher {
fn default() -> Self { Self::new() }
fn default() -> Self {
Self::new()
}
}
#[async_trait]
@@ -45,12 +51,16 @@ pub struct InMemoryFileStorage {
impl InMemoryFileStorage {
pub fn new() -> Self {
Self { files: Mutex::new(HashMap::new()) }
Self {
files: Mutex::new(HashMap::new()),
}
}
}
impl Default for InMemoryFileStorage {
fn default() -> Self { Self::new() }
fn default() -> Self {
Self::new()
}
}
#[async_trait]
@@ -61,7 +71,11 @@ impl FileStoragePort for InMemoryFileStorage {
}
async fn read_file(&self, path: &str) -> Result<Bytes, DomainError> {
self.files.lock().await.get(path).cloned()
self.files
.lock()
.await
.get(path)
.cloned()
.ok_or_else(|| DomainError::NotFound(format!("File not found: {path}")))
}
@@ -72,8 +86,13 @@ impl FileStoragePort for InMemoryFileStorage {
async fn list_directory(&self, path: &str) -> Result<Vec<FileEntry>, DomainError> {
let files = self.files.lock().await;
let prefix = if path.ends_with('/') { path.to_string() } else { format!("{path}/") };
Ok(files.keys()
let prefix = if path.ends_with('/') {
path.to_string()
} else {
format!("{path}/")
};
Ok(files
.keys()
.filter(|k| k.starts_with(&prefix))
.map(|k| FileEntry {
path: k.clone(),
@@ -98,7 +117,9 @@ pub struct StubSidecarWriter;
#[async_trait]
impl SidecarWriterPort for StubSidecarWriter {
fn format_name(&self) -> &str { "stub" }
fn format_name(&self) -> &str {
"stub"
}
async fn write_sidecar(&self, _data: &StructuredData, _path: &str) -> Result<(), DomainError> {
Ok(())
@@ -117,7 +138,9 @@ pub struct InMemorySidecarWriter {
impl InMemorySidecarWriter {
pub fn new() -> Self {
Self { data: Mutex::new(HashMap::new()) }
Self {
data: Mutex::new(HashMap::new()),
}
}
pub async fn get(&self, path: &str) -> Option<StructuredData> {
@@ -126,20 +149,31 @@ impl InMemorySidecarWriter {
}
impl Default for InMemorySidecarWriter {
fn default() -> Self { Self::new() }
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl SidecarWriterPort for InMemorySidecarWriter {
fn format_name(&self) -> &str { "in-memory" }
fn format_name(&self) -> &str {
"in-memory"
}
async fn write_sidecar(&self, data: &StructuredData, path: &str) -> Result<(), DomainError> {
self.data.lock().await.insert(path.to_string(), data.clone());
self.data
.lock()
.await
.insert(path.to_string(), data.clone());
Ok(())
}
async fn read_sidecar(&self, path: &str) -> Result<StructuredData, DomainError> {
self.data.lock().await.get(path).cloned()
self.data
.lock()
.await
.get(path)
.cloned()
.ok_or_else(|| DomainError::NotFound(format!("Sidecar not found: {path}")))
}
}
@@ -168,9 +202,9 @@ impl TokenIssuer for StubTokenIssuer {
Ok(format!("token:{user_id}"))
}
async fn verify(&self, token: &str) -> Result<(SystemId, String), DomainError> {
let id_str = token.strip_prefix("token:").ok_or_else(|| {
DomainError::Unauthorized("Invalid stub token".to_string())
})?;
let id_str = token
.strip_prefix("token:")
.ok_or_else(|| DomainError::Unauthorized("Invalid stub token".to_string()))?;
let uuid = uuid::Uuid::parse_str(id_str)
.map_err(|_| DomainError::Unauthorized("Bad UUID in stub token".to_string()))?;
Ok((SystemId::from_uuid(uuid), "user".to_string()))

View File

@@ -1,25 +1,23 @@
use std::collections::HashMap;
use async_trait::async_trait;
use tokio::sync::Mutex;
use domain::{
entities::{
Album, Asset, AssetMetadata, AssetTag, DuplicateGroup, DuplicateStatus,
Group, IngestSession, InviteCode, Job, JobBatch, JobStatus, LibraryPath,
MetadataSource, Plugin, ProcessingPipeline, QuotaDefinition, Role,
ShareLink, ShareScope, ShareTarget, SidecarRecord, SyncStatus,
StorageVolume, Tag, UsageLedgerEntry, UsageType, User,
Album, Asset, AssetMetadata, AssetTag, DuplicateGroup, DuplicateStatus, Group,
IngestSession, InviteCode, Job, JobBatch, JobStatus, LibraryPath, MetadataSource, Plugin,
ProcessingPipeline, QuotaDefinition, Role, ShareLink, ShareScope, ShareTarget,
SidecarRecord, StorageVolume, SyncStatus, Tag, UsageLedgerEntry, UsageType, User,
},
errors::DomainError,
ports::{
AlbumRepository, AssetMetadataRepository, AssetRepository,
DuplicateRepository, GroupRepository, IngestSessionRepository,
JobBatchRepository, JobRepository, LibraryPathRepository,
PipelineRepository, PluginRepository, QuotaRepository,
RoleRepository, ShareRepository, SidecarRepository, StorageVolumeRepository,
TagRepository, UsageLedgerRepository, UserRepository,
AlbumRepository, AssetMetadataRepository, AssetRepository, DuplicateRepository,
GroupRepository, IngestSessionRepository, JobBatchRepository, JobRepository,
LibraryPathRepository, PipelineRepository, PluginRepository, QuotaRepository,
RoleRepository, ShareRepository, SidecarRepository, StorageVolumeRepository, TagRepository,
UsageLedgerRepository, UserRepository,
},
value_objects::{Checksum, DateTimeStamp, Email, SystemId},
};
use std::collections::HashMap;
use tokio::sync::Mutex;
// --- InMemoryUserRepository ---
@@ -29,7 +27,9 @@ pub struct InMemoryUserRepository {
impl InMemoryUserRepository {
pub fn new() -> Self {
Self { users: Mutex::new(HashMap::new()) }
Self {
users: Mutex::new(HashMap::new()),
}
}
pub async fn all(&self) -> Vec<User> {
@@ -38,7 +38,9 @@ impl InMemoryUserRepository {
}
impl Default for InMemoryUserRepository {
fn default() -> Self { Self::new() }
fn default() -> Self {
Self::new()
}
}
#[async_trait]
@@ -48,19 +50,30 @@ impl UserRepository for InMemoryUserRepository {
}
async fn find_by_email(&self, email: &Email) -> Result<Option<User>, DomainError> {
Ok(self.users.lock().await.values()
Ok(self
.users
.lock()
.await
.values()
.find(|u| u.email.as_str() == email.as_str())
.cloned())
}
async fn find_by_username(&self, username: &str) -> Result<Option<User>, DomainError> {
Ok(self.users.lock().await.values()
Ok(self
.users
.lock()
.await
.values()
.find(|u| u.username == username)
.cloned())
}
async fn save(&self, user: &User) -> Result<(), DomainError> {
self.users.lock().await.insert(user.id.to_string(), user.clone());
self.users
.lock()
.await
.insert(user.id.to_string(), user.clone());
Ok(())
}
@@ -78,12 +91,16 @@ pub struct InMemoryAssetRepository {
impl InMemoryAssetRepository {
pub fn new() -> Self {
Self { data: Mutex::new(HashMap::new()) }
Self {
data: Mutex::new(HashMap::new()),
}
}
}
impl Default for InMemoryAssetRepository {
fn default() -> Self { Self::new() }
fn default() -> Self {
Self::new()
}
}
#[async_trait]
@@ -93,22 +110,42 @@ impl AssetRepository for InMemoryAssetRepository {
}
async fn find_by_checksum(&self, checksum: &Checksum) -> Result<Vec<Asset>, DomainError> {
Ok(self.data.lock().await.values()
Ok(self
.data
.lock()
.await
.values()
.filter(|a| &a.source_reference.checksum == checksum)
.cloned()
.collect())
}
async fn find_by_owner(&self, owner_id: &SystemId, limit: u32, offset: u32) -> Result<Vec<Asset>, DomainError> {
let all: Vec<Asset> = self.data.lock().await.values()
async fn find_by_owner(
&self,
owner_id: &SystemId,
limit: u32,
offset: u32,
) -> Result<Vec<Asset>, DomainError> {
let all: Vec<Asset> = self
.data
.lock()
.await
.values()
.filter(|a| &a.owner_user_id == owner_id)
.cloned()
.collect();
Ok(all.into_iter().skip(offset as usize).take(limit as usize).collect())
Ok(all
.into_iter()
.skip(offset as usize)
.take(limit as usize)
.collect())
}
async fn save(&self, asset: &Asset) -> Result<(), DomainError> {
self.data.lock().await.insert(asset.asset_id.to_string(), asset.clone());
self.data
.lock()
.await
.insert(asset.asset_id.to_string(), asset.clone());
Ok(())
}
@@ -126,12 +163,16 @@ pub struct InMemoryAlbumRepository {
impl InMemoryAlbumRepository {
pub fn new() -> Self {
Self { data: Mutex::new(HashMap::new()) }
Self {
data: Mutex::new(HashMap::new()),
}
}
}
impl Default for InMemoryAlbumRepository {
fn default() -> Self { Self::new() }
fn default() -> Self {
Self::new()
}
}
#[async_trait]
@@ -141,14 +182,21 @@ impl AlbumRepository for InMemoryAlbumRepository {
}
async fn find_by_creator(&self, creator_id: &SystemId) -> Result<Vec<Album>, DomainError> {
Ok(self.data.lock().await.values()
Ok(self
.data
.lock()
.await
.values()
.filter(|a| &a.creator_user_id == creator_id)
.cloned()
.collect())
}
async fn save(&self, album: &Album) -> Result<(), DomainError> {
self.data.lock().await.insert(album.album_id.to_string(), album.clone());
self.data
.lock()
.await
.insert(album.album_id.to_string(), album.clone());
Ok(())
}
@@ -166,12 +214,16 @@ pub struct InMemoryJobRepository {
impl InMemoryJobRepository {
pub fn new() -> Self {
Self { data: Mutex::new(HashMap::new()) }
Self {
data: Mutex::new(HashMap::new()),
}
}
}
impl Default for InMemoryJobRepository {
fn default() -> Self { Self::new() }
fn default() -> Self {
Self::new()
}
}
#[async_trait]
@@ -182,21 +234,29 @@ impl JobRepository for InMemoryJobRepository {
async fn find_next_queued(&self) -> Result<Option<Job>, DomainError> {
let data = self.data.lock().await;
Ok(data.values()
Ok(data
.values()
.filter(|j| j.status == JobStatus::Queued)
.max_by_key(|j| j.priority)
.cloned())
}
async fn find_by_batch(&self, batch_id: &SystemId) -> Result<Vec<Job>, DomainError> {
Ok(self.data.lock().await.values()
Ok(self
.data
.lock()
.await
.values()
.filter(|j| j.batch_id.as_ref() == Some(batch_id))
.cloned()
.collect())
}
async fn save(&self, job: &Job) -> Result<(), DomainError> {
self.data.lock().await.insert(job.job_id.to_string(), job.clone());
self.data
.lock()
.await
.insert(job.job_id.to_string(), job.clone());
Ok(())
}
}
@@ -209,12 +269,16 @@ pub struct InMemoryRoleRepository {
impl InMemoryRoleRepository {
pub fn new() -> Self {
Self { data: Mutex::new(HashMap::new()) }
Self {
data: Mutex::new(HashMap::new()),
}
}
}
impl Default for InMemoryRoleRepository {
fn default() -> Self { Self::new() }
fn default() -> Self {
Self::new()
}
}
#[async_trait]
@@ -224,20 +288,31 @@ impl RoleRepository for InMemoryRoleRepository {
}
async fn find_by_name(&self, name: &str) -> Result<Option<Role>, DomainError> {
Ok(self.data.lock().await.values()
Ok(self
.data
.lock()
.await
.values()
.find(|r| r.name == name)
.cloned())
}
async fn find_defaults(&self) -> Result<Vec<Role>, DomainError> {
Ok(self.data.lock().await.values()
Ok(self
.data
.lock()
.await
.values()
.filter(|r| r.is_system_default)
.cloned()
.collect())
}
async fn save(&self, role: &Role) -> Result<(), DomainError> {
self.data.lock().await.insert(role.role_id.to_string(), role.clone());
self.data
.lock()
.await
.insert(role.role_id.to_string(), role.clone());
Ok(())
}
@@ -255,12 +330,16 @@ pub struct InMemoryGroupRepository {
impl InMemoryGroupRepository {
pub fn new() -> Self {
Self { data: Mutex::new(HashMap::new()) }
Self {
data: Mutex::new(HashMap::new()),
}
}
}
impl Default for InMemoryGroupRepository {
fn default() -> Self { Self::new() }
fn default() -> Self {
Self::new()
}
}
#[async_trait]
@@ -270,14 +349,21 @@ impl GroupRepository for InMemoryGroupRepository {
}
async fn find_by_user(&self, user_id: &SystemId) -> Result<Vec<Group>, DomainError> {
Ok(self.data.lock().await.values()
Ok(self
.data
.lock()
.await
.values()
.filter(|g| g.is_member(user_id))
.cloned()
.collect())
}
async fn save(&self, group: &Group) -> Result<(), DomainError> {
self.data.lock().await.insert(group.group_id.to_string(), group.clone());
self.data
.lock()
.await
.insert(group.group_id.to_string(), group.clone());
Ok(())
}
@@ -295,12 +381,16 @@ pub struct InMemoryStorageVolumeRepository {
impl InMemoryStorageVolumeRepository {
pub fn new() -> Self {
Self { data: Mutex::new(HashMap::new()) }
Self {
data: Mutex::new(HashMap::new()),
}
}
}
impl Default for InMemoryStorageVolumeRepository {
fn default() -> Self { Self::new() }
fn default() -> Self {
Self::new()
}
}
#[async_trait]
@@ -314,7 +404,10 @@ impl StorageVolumeRepository for InMemoryStorageVolumeRepository {
}
async fn save(&self, volume: &StorageVolume) -> Result<(), DomainError> {
self.data.lock().await.insert(volume.volume_id.to_string(), volume.clone());
self.data
.lock()
.await
.insert(volume.volume_id.to_string(), volume.clone());
Ok(())
}
@@ -332,12 +425,16 @@ pub struct InMemoryLibraryPathRepository {
impl InMemoryLibraryPathRepository {
pub fn new() -> Self {
Self { data: Mutex::new(HashMap::new()) }
Self {
data: Mutex::new(HashMap::new()),
}
}
}
impl Default for InMemoryLibraryPathRepository {
fn default() -> Self { Self::new() }
fn default() -> Self {
Self::new()
}
}
#[async_trait]
@@ -347,21 +444,35 @@ impl LibraryPathRepository for InMemoryLibraryPathRepository {
}
async fn find_by_volume(&self, volume_id: &SystemId) -> Result<Vec<LibraryPath>, DomainError> {
Ok(self.data.lock().await.values()
Ok(self
.data
.lock()
.await
.values()
.filter(|p| &p.volume_id == volume_id)
.cloned()
.collect())
}
async fn find_ingest_destinations(&self, owner_id: &SystemId) -> Result<Vec<LibraryPath>, DomainError> {
Ok(self.data.lock().await.values()
async fn find_ingest_destinations(
&self,
owner_id: &SystemId,
) -> Result<Vec<LibraryPath>, DomainError> {
Ok(self
.data
.lock()
.await
.values()
.filter(|p| p.is_ingest_destination && p.designated_owner_id.as_ref() == Some(owner_id))
.cloned()
.collect())
}
async fn save(&self, path: &LibraryPath) -> Result<(), DomainError> {
self.data.lock().await.insert(path.path_id.to_string(), path.clone());
self.data
.lock()
.await
.insert(path.path_id.to_string(), path.clone());
Ok(())
}
@@ -379,12 +490,16 @@ pub struct InMemoryIngestSessionRepository {
impl InMemoryIngestSessionRepository {
pub fn new() -> Self {
Self { data: Mutex::new(HashMap::new()) }
Self {
data: Mutex::new(HashMap::new()),
}
}
}
impl Default for InMemoryIngestSessionRepository {
fn default() -> Self { Self::new() }
fn default() -> Self {
Self::new()
}
}
#[async_trait]
@@ -394,14 +509,21 @@ impl IngestSessionRepository for InMemoryIngestSessionRepository {
}
async fn find_by_user(&self, user_id: &SystemId) -> Result<Vec<IngestSession>, DomainError> {
Ok(self.data.lock().await.values()
Ok(self
.data
.lock()
.await
.values()
.filter(|s| &s.uploader_user_id == user_id)
.cloned()
.collect())
}
async fn save(&self, session: &IngestSession) -> Result<(), DomainError> {
self.data.lock().await.insert(session.session_id.to_string(), session.clone());
self.data
.lock()
.await
.insert(session.session_id.to_string(), session.clone());
Ok(())
}
}
@@ -414,24 +536,38 @@ pub struct InMemoryQuotaRepository {
impl InMemoryQuotaRepository {
pub fn new() -> Self {
Self { data: Mutex::new(HashMap::new()) }
Self {
data: Mutex::new(HashMap::new()),
}
}
}
impl Default for InMemoryQuotaRepository {
fn default() -> Self { Self::new() }
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl QuotaRepository for InMemoryQuotaRepository {
async fn find_by_owner(&self, owner_id: &SystemId) -> Result<Option<QuotaDefinition>, DomainError> {
Ok(self.data.lock().await.values()
async fn find_by_owner(
&self,
owner_id: &SystemId,
) -> Result<Option<QuotaDefinition>, DomainError> {
Ok(self
.data
.lock()
.await
.values()
.find(|q| &q.owner_scope == owner_id)
.cloned())
}
async fn save(&self, quota: &QuotaDefinition) -> Result<(), DomainError> {
self.data.lock().await.insert(quota.quota_id.to_string(), quota.clone());
self.data
.lock()
.await
.insert(quota.quota_id.to_string(), quota.clone());
Ok(())
}
@@ -449,12 +585,16 @@ pub struct InMemoryUsageLedgerRepository {
impl InMemoryUsageLedgerRepository {
pub fn new() -> Self {
Self { entries: Mutex::new(Vec::new()) }
Self {
entries: Mutex::new(Vec::new()),
}
}
}
impl Default for InMemoryUsageLedgerRepository {
fn default() -> Self { Self::new() }
fn default() -> Self {
Self::new()
}
}
#[async_trait]
@@ -471,7 +611,8 @@ impl UsageLedgerRepository for InMemoryUsageLedgerRepository {
since: Option<DateTimeStamp>,
) -> Result<u64, DomainError> {
let entries = self.entries.lock().await;
let total = entries.iter()
let total = entries
.iter()
.filter(|e| &e.user_id == user_id && e.usage_type == usage_type)
.filter(|e| match &since {
Some(ts) => &e.timestamp >= ts,
@@ -491,7 +632,9 @@ pub struct InMemoryAssetMetadataRepository {
impl InMemoryAssetMetadataRepository {
pub fn new() -> Self {
Self { data: Mutex::new(HashMap::new()) }
Self {
data: Mutex::new(HashMap::new()),
}
}
fn key(asset_id: &SystemId, source: MetadataSource) -> String {
@@ -500,21 +643,36 @@ impl InMemoryAssetMetadataRepository {
}
impl Default for InMemoryAssetMetadataRepository {
fn default() -> Self { Self::new() }
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl AssetMetadataRepository for InMemoryAssetMetadataRepository {
async fn find_by_asset(&self, asset_id: &SystemId) -> Result<Vec<AssetMetadata>, DomainError> {
let prefix = format!("{asset_id}:");
Ok(self.data.lock().await.iter()
Ok(self
.data
.lock()
.await
.iter()
.filter(|(k, _)| k.starts_with(&prefix))
.map(|(_, v)| v.clone())
.collect())
}
async fn find_by_asset_and_source(&self, asset_id: &SystemId, source: MetadataSource) -> Result<Option<AssetMetadata>, DomainError> {
Ok(self.data.lock().await.get(&Self::key(asset_id, source)).cloned())
async fn find_by_asset_and_source(
&self,
asset_id: &SystemId,
source: MetadataSource,
) -> Result<Option<AssetMetadata>, DomainError> {
Ok(self
.data
.lock()
.await
.get(&Self::key(asset_id, source))
.cloned())
}
async fn save(&self, metadata: &AssetMetadata) -> Result<(), DomainError> {
@@ -523,7 +681,11 @@ impl AssetMetadataRepository for InMemoryAssetMetadataRepository {
Ok(())
}
async fn delete_by_asset_and_source(&self, asset_id: &SystemId, source: MetadataSource) -> Result<(), DomainError> {
async fn delete_by_asset_and_source(
&self,
asset_id: &SystemId,
source: MetadataSource,
) -> Result<(), DomainError> {
self.data.lock().await.remove(&Self::key(asset_id, source));
Ok(())
}
@@ -550,13 +712,18 @@ impl InMemoryShareRepository {
}
impl Default for InMemoryShareRepository {
fn default() -> Self { Self::new() }
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl ShareRepository for InMemoryShareRepository {
async fn save_scope(&self, scope: &ShareScope) -> Result<(), DomainError> {
self.scopes.lock().await.insert(scope.scope_id.to_string(), scope.clone());
self.scopes
.lock()
.await
.insert(scope.scope_id.to_string(), scope.clone());
Ok(())
}
@@ -564,8 +731,15 @@ impl ShareRepository for InMemoryShareRepository {
Ok(self.scopes.lock().await.get(&id.to_string()).cloned())
}
async fn find_scopes_for_resource(&self, resource_id: &SystemId) -> Result<Vec<ShareScope>, DomainError> {
Ok(self.scopes.lock().await.values()
async fn find_scopes_for_resource(
&self,
resource_id: &SystemId,
) -> Result<Vec<ShareScope>, DomainError> {
Ok(self
.scopes
.lock()
.await
.values()
.filter(|s| &s.shareable_id == resource_id)
.cloned()
.collect())
@@ -582,22 +756,39 @@ impl ShareRepository for InMemoryShareRepository {
Ok(())
}
async fn find_targets_for_scope(&self, scope_id: &SystemId) -> Result<Vec<ShareTarget>, DomainError> {
Ok(self.targets.lock().await.values()
async fn find_targets_for_scope(
&self,
scope_id: &SystemId,
) -> Result<Vec<ShareTarget>, DomainError> {
Ok(self
.targets
.lock()
.await
.values()
.filter(|t| &t.scope_id == scope_id)
.cloned()
.collect())
}
async fn find_targets_for_user(&self, user_id: &SystemId) -> Result<Vec<ShareTarget>, DomainError> {
Ok(self.targets.lock().await.values()
async fn find_targets_for_user(
&self,
user_id: &SystemId,
) -> Result<Vec<ShareTarget>, DomainError> {
Ok(self
.targets
.lock()
.await
.values()
.filter(|t| &t.target_id == user_id)
.cloned()
.collect())
}
async fn save_link(&self, link: &ShareLink) -> Result<(), DomainError> {
self.links.lock().await.insert(link.token.clone(), link.clone());
self.links
.lock()
.await
.insert(link.token.clone(), link.clone());
Ok(())
}
@@ -606,7 +797,10 @@ impl ShareRepository for InMemoryShareRepository {
}
async fn save_invite(&self, invite: &InviteCode) -> Result<(), DomainError> {
self.invites.lock().await.insert(invite.code_id.to_string(), invite.clone());
self.invites
.lock()
.await
.insert(invite.code_id.to_string(), invite.clone());
Ok(())
}
@@ -632,7 +826,9 @@ impl InMemoryTagRepository {
}
impl Default for InMemoryTagRepository {
fn default() -> Self { Self::new() }
fn default() -> Self {
Self::new()
}
}
#[async_trait]
@@ -642,17 +838,26 @@ impl TagRepository for InMemoryTagRepository {
}
async fn find_by_name(&self, name: &str) -> Result<Option<Tag>, DomainError> {
Ok(self.tags.lock().await.values()
Ok(self
.tags
.lock()
.await
.values()
.find(|t| t.name == name)
.cloned())
}
async fn find_tags_for_asset(&self, asset_id: &SystemId) -> Result<Vec<(Tag, AssetTag)>, DomainError> {
async fn find_tags_for_asset(
&self,
asset_id: &SystemId,
) -> Result<Vec<(Tag, AssetTag)>, DomainError> {
let asset_tags = self.asset_tags.lock().await;
let tags = self.tags.lock().await;
let mut result = Vec::new();
for at in asset_tags.values() {
if &at.asset_id == asset_id && let Some(tag) = tags.get(&at.tag_id.to_string()) {
if &at.asset_id == asset_id
&& let Some(tag) = tags.get(&at.tag_id.to_string())
{
result.push((tag.clone(), at.clone()));
}
}
@@ -660,7 +865,10 @@ impl TagRepository for InMemoryTagRepository {
}
async fn save_tag(&self, tag: &Tag) -> Result<(), DomainError> {
self.tags.lock().await.insert(tag.tag_id.to_string(), tag.clone());
self.tags
.lock()
.await
.insert(tag.tag_id.to_string(), tag.clone());
Ok(())
}
@@ -670,7 +878,11 @@ impl TagRepository for InMemoryTagRepository {
Ok(())
}
async fn remove_asset_tag(&self, asset_id: &SystemId, tag_id: &SystemId) -> Result<(), DomainError> {
async fn remove_asset_tag(
&self,
asset_id: &SystemId,
tag_id: &SystemId,
) -> Result<(), DomainError> {
let key = format!("{asset_id}:{tag_id}");
self.asset_tags.lock().await.remove(&key);
Ok(())
@@ -685,12 +897,16 @@ pub struct InMemoryDuplicateRepository {
impl InMemoryDuplicateRepository {
pub fn new() -> Self {
Self { data: Mutex::new(HashMap::new()) }
Self {
data: Mutex::new(HashMap::new()),
}
}
}
impl Default for InMemoryDuplicateRepository {
fn default() -> Self { Self::new() }
fn default() -> Self {
Self::new()
}
}
#[async_trait]
@@ -700,21 +916,32 @@ impl DuplicateRepository for InMemoryDuplicateRepository {
}
async fn find_unresolved(&self) -> Result<Vec<DuplicateGroup>, DomainError> {
Ok(self.data.lock().await.values()
Ok(self
.data
.lock()
.await
.values()
.filter(|g| g.status == DuplicateStatus::Unresolved)
.cloned()
.collect())
}
async fn find_by_asset(&self, asset_id: &SystemId) -> Result<Vec<DuplicateGroup>, DomainError> {
Ok(self.data.lock().await.values()
Ok(self
.data
.lock()
.await
.values()
.filter(|g| g.candidates.iter().any(|c| &c.asset_id == asset_id))
.cloned()
.collect())
}
async fn save(&self, group: &DuplicateGroup) -> Result<(), DomainError> {
self.data.lock().await.insert(group.group_id.to_string(), group.clone());
self.data
.lock()
.await
.insert(group.group_id.to_string(), group.clone());
Ok(())
}
}
@@ -727,29 +954,43 @@ pub struct InMemorySidecarRepository {
impl InMemorySidecarRepository {
pub fn new() -> Self {
Self { data: Mutex::new(HashMap::new()) }
Self {
data: Mutex::new(HashMap::new()),
}
}
}
impl Default for InMemorySidecarRepository {
fn default() -> Self { Self::new() }
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl SidecarRepository for InMemorySidecarRepository {
async fn find_by_asset(&self, asset_id: &SystemId) -> Result<Option<SidecarRecord>, DomainError> {
async fn find_by_asset(
&self,
asset_id: &SystemId,
) -> Result<Option<SidecarRecord>, DomainError> {
Ok(self.data.lock().await.get(&asset_id.to_string()).cloned())
}
async fn find_by_status(&self, status: SyncStatus) -> Result<Vec<SidecarRecord>, DomainError> {
Ok(self.data.lock().await.values()
Ok(self
.data
.lock()
.await
.values()
.filter(|r| r.sync_status == status)
.cloned()
.collect())
}
async fn save(&self, record: &SidecarRecord) -> Result<(), DomainError> {
self.data.lock().await.insert(record.asset_id.to_string(), record.clone());
self.data
.lock()
.await
.insert(record.asset_id.to_string(), record.clone());
Ok(())
}
@@ -767,12 +1008,16 @@ pub struct InMemoryJobBatchRepository {
impl InMemoryJobBatchRepository {
pub fn new() -> Self {
Self { data: Mutex::new(HashMap::new()) }
Self {
data: Mutex::new(HashMap::new()),
}
}
}
impl Default for InMemoryJobBatchRepository {
fn default() -> Self { Self::new() }
fn default() -> Self {
Self::new()
}
}
#[async_trait]
@@ -782,7 +1027,10 @@ impl JobBatchRepository for InMemoryJobBatchRepository {
}
async fn save(&self, batch: &JobBatch) -> Result<(), DomainError> {
self.data.lock().await.insert(batch.batch_id.to_string(), batch.clone());
self.data
.lock()
.await
.insert(batch.batch_id.to_string(), batch.clone());
Ok(())
}
}
@@ -795,12 +1043,16 @@ pub struct InMemoryPluginRepository {
impl InMemoryPluginRepository {
pub fn new() -> Self {
Self { data: Mutex::new(HashMap::new()) }
Self {
data: Mutex::new(HashMap::new()),
}
}
}
impl Default for InMemoryPluginRepository {
fn default() -> Self { Self::new() }
fn default() -> Self {
Self::new()
}
}
#[async_trait]
@@ -810,14 +1062,21 @@ impl PluginRepository for InMemoryPluginRepository {
}
async fn find_enabled(&self) -> Result<Vec<Plugin>, DomainError> {
Ok(self.data.lock().await.values()
Ok(self
.data
.lock()
.await
.values()
.filter(|p| p.is_enabled)
.cloned()
.collect())
}
async fn save(&self, plugin: &Plugin) -> Result<(), DomainError> {
self.data.lock().await.insert(plugin.plugin_id.to_string(), plugin.clone());
self.data
.lock()
.await
.insert(plugin.plugin_id.to_string(), plugin.clone());
Ok(())
}
}
@@ -830,12 +1089,16 @@ pub struct InMemoryPipelineRepository {
impl InMemoryPipelineRepository {
pub fn new() -> Self {
Self { data: Mutex::new(HashMap::new()) }
Self {
data: Mutex::new(HashMap::new()),
}
}
}
impl Default for InMemoryPipelineRepository {
fn default() -> Self { Self::new() }
fn default() -> Self {
Self::new()
}
}
#[async_trait]
@@ -845,14 +1108,21 @@ impl PipelineRepository for InMemoryPipelineRepository {
}
async fn find_by_trigger(&self, event: &str) -> Result<Vec<ProcessingPipeline>, DomainError> {
Ok(self.data.lock().await.values()
Ok(self
.data
.lock()
.await
.values()
.filter(|p| p.trigger_event == event)
.cloned()
.collect())
}
async fn save(&self, pipeline: &ProcessingPipeline) -> Result<(), DomainError> {
self.data.lock().await.insert(pipeline.pipeline_id.to_string(), pipeline.clone());
self.data
.lock()
.await
.insert(pipeline.pipeline_id.to_string(), pipeline.clone());
Ok(())
}
}