diff --git a/crates/api-types/Cargo.toml b/crates/api-types/Cargo.toml index 6942318..0b44152 100644 --- a/crates/api-types/Cargo.toml +++ b/crates/api-types/Cargo.toml @@ -4,9 +4,10 @@ version = "0.1.0" edition = "2024" [dependencies] -domain = { workspace = true } -serde = { workspace = true } -serde_json = { workspace = true } -uuid = { workspace = true } -chrono = { workspace = true } -utoipa = { workspace = true } +domain = { workspace = true } +application = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +uuid = { workspace = true } +chrono = { workspace = true } +utoipa = { workspace = true } diff --git a/crates/api-types/src/requests.rs b/crates/api-types/src/requests.rs index 157a047..76bc3e2 100644 --- a/crates/api-types/src/requests.rs +++ b/crates/api-types/src/requests.rs @@ -58,3 +58,79 @@ pub struct GenerateShareLinkRequest { pub expires_in_hours: Option, pub max_uses: Option, } + +// --- Organization --- + +#[derive(Debug, serde::Deserialize, utoipa::ToSchema)] +pub struct TagAssetRequest { + pub tag_name: String, +} + +// --- Storage --- + +#[derive(Debug, serde::Deserialize, utoipa::ToSchema)] +pub struct CheckQuotaParams { + pub usage_type: Option, + pub amount: Option, +} + +// --- Catalog --- + +#[derive(Debug, serde::Deserialize, utoipa::ToSchema)] +pub struct RegisterAssetRequest { + pub volume_id: uuid::Uuid, + pub relative_path: String, + pub checksum: String, + pub asset_type: String, + pub mime_type: String, + pub file_size: u64, +} + +// --- Sidecar --- + +#[derive(Debug, serde::Deserialize, utoipa::ToSchema)] +pub struct ResolveConflictRequest { + pub policy: String, +} + +// --- Processing --- + +#[derive(Debug, serde::Deserialize, utoipa::ToSchema)] +pub struct EnqueueJobRequest { + pub job_type: String, + pub priority: Option, + pub payload: Option>, + pub target_asset_id: Option, + pub batch_id: Option, +} + +#[derive(Debug, serde::Deserialize, utoipa::ToSchema)] +pub struct CompleteJobRequest { + pub result: std::collections::HashMap, +} + +#[derive(Debug, serde::Deserialize, utoipa::ToSchema)] +pub struct FailJobRequest { + pub error: String, +} + +#[derive(Debug, serde::Deserialize, utoipa::ToSchema)] +pub struct ManagePluginRequest { + pub plugin_id: Option, + pub action: String, + pub name: Option, + pub plugin_type: Option, + pub config: Option>, +} + +#[derive(Debug, serde::Deserialize, utoipa::ToSchema)] +pub struct ConfigurePipelineRequest { + pub trigger_event: String, + pub steps: Vec, +} + +#[derive(Debug, serde::Deserialize, utoipa::ToSchema)] +pub struct PipelineStepRequest { + pub plugin_id: uuid::Uuid, + pub config: std::collections::HashMap, +} diff --git a/crates/api-types/src/responses.rs b/crates/api-types/src/responses.rs index 4833bbc..b5eb4ae 100644 --- a/crates/api-types/src/responses.rs +++ b/crates/api-types/src/responses.rs @@ -196,3 +196,157 @@ impl SharedResourceResponse { } } } + +// --- Tag --- + +#[derive(Debug, serde::Serialize, utoipa::ToSchema)] +pub struct TagResponse { + pub tag_id: Uuid, + pub name: String, + pub tag_source: String, +} + +impl TagResponse { + pub fn from_domain(tag: &domain::entities::Tag) -> Self { + Self { + tag_id: *tag.tag_id.as_uuid(), + name: tag.name.clone(), + tag_source: format!("{:?}", tag.tag_source), + } + } +} + +// --- Quota --- + +#[derive(Debug, serde::Serialize, utoipa::ToSchema)] +pub struct QuotaCheckResponse { + pub allowed: bool, + pub current_usage: u64, + pub limit: u64, + pub is_unlimited: bool, +} + +impl QuotaCheckResponse { + pub fn from_domain(result: &domain::storage::services::QuotaCheckResult) -> Self { + Self { + allowed: result.allowed, + current_usage: result.current_usage, + limit: result.limit, + is_unlimited: result.is_unlimited, + } + } +} + +// --- Sidecar --- + +#[derive(Debug, serde::Serialize, utoipa::ToSchema)] +pub struct SidecarExportResponse { + pub asset_id: Uuid, + pub status: String, + pub path: String, +} + +impl SidecarExportResponse { + pub fn from_domain(record: &domain::entities::SidecarRecord) -> Self { + Self { + asset_id: *record.asset_id.as_uuid(), + status: format!("{:?}", record.sync_status), + path: record.sidecar_storage_path.clone(), + } + } +} + +#[derive(Debug, serde::Serialize, utoipa::ToSchema)] +pub struct DetectChangesResponse { + pub changed_count: u32, +} + +#[derive(Debug, serde::Serialize, utoipa::ToSchema)] +pub struct SidecarImportResponse { + pub asset_id: Uuid, + pub status: String, +} + +// --- Processing --- + +#[derive(Debug, serde::Serialize, utoipa::ToSchema)] +pub struct JobResponse { + pub job_id: Uuid, + pub job_type: String, + pub status: String, + pub priority: u32, + pub created_at: DateTime, +} + +impl JobResponse { + pub fn from_domain(job: &domain::entities::Job) -> Self { + Self { + job_id: *job.job_id.as_uuid(), + job_type: format!("{:?}", job.job_type), + status: format!("{:?}", job.status), + priority: job.priority, + created_at: *job.created_at.as_datetime(), + } + } +} + +#[derive(Debug, serde::Serialize, utoipa::ToSchema)] +pub struct BatchProgressResponse { + pub batch_id: Uuid, + pub batch_type: String, + pub total: u32, + pub completed: u32, + pub failed: u32, + pub status: String, + pub jobs: Vec, +} + +impl BatchProgressResponse { + pub fn from_domain(progress: &application::processing::BatchProgress) -> Self { + Self { + batch_id: *progress.batch.batch_id.as_uuid(), + batch_type: progress.batch.batch_type.clone(), + total: progress.batch.total_jobs, + completed: progress.batch.completed_count, + failed: progress.batch.failed_count, + status: format!("{:?}", progress.batch.status), + jobs: progress.jobs.iter().map(JobResponse::from_domain).collect(), + } + } +} + +#[derive(Debug, serde::Serialize, utoipa::ToSchema)] +pub struct PluginResponse { + pub plugin_id: Uuid, + pub name: String, + pub plugin_type: String, + pub is_enabled: bool, +} + +impl PluginResponse { + pub fn from_domain(plugin: &domain::entities::Plugin) -> Self { + Self { + plugin_id: *plugin.plugin_id.as_uuid(), + name: plugin.name.clone(), + plugin_type: format!("{:?}", plugin.plugin_type), + is_enabled: plugin.is_enabled, + } + } +} + +#[derive(Debug, serde::Serialize, utoipa::ToSchema)] +pub struct PipelineResponse { + pub pipeline_id: Uuid, + pub trigger_event: String, + pub steps_count: usize, +} + +impl PipelineResponse { + pub fn from_domain(pipeline: &domain::entities::ProcessingPipeline) -> Self { + Self { + pipeline_id: *pipeline.pipeline_id.as_uuid(), + trigger_event: pipeline.trigger_event.clone(), + steps_count: pipeline.steps.len(), + } + } +} diff --git a/crates/bootstrap/src/factory.rs b/crates/bootstrap/src/factory.rs index 94e670d..443d34f 100644 --- a/crates/bootstrap/src/factory.rs +++ b/crates/bootstrap/src/factory.rs @@ -11,33 +11,52 @@ use adapters_auth::{BcryptPasswordHasher, JwtTokenIssuer}; use adapters_postgres::{ PostgresAlbumRepository, PostgresAssetMetadataRepository, PostgresAssetRepository, - PostgresIngestSessionRepository, PostgresLibraryPathRepository, PostgresQuotaRepository, - PostgresShareRepository, PostgresStorageVolumeRepository, PostgresUsageLedgerRepository, - PostgresUserRepository, PostgresVisibilityFilterRepository, connect, run_migrations, + PostgresDuplicateRepository, PostgresIngestSessionRepository, PostgresJobBatchRepository, + PostgresJobRepository, PostgresLibraryPathRepository, PostgresPipelineRepository, + PostgresPluginRepository, PostgresQuotaRepository, PostgresShareRepository, + PostgresSidecarRepository, PostgresStorageVolumeRepository, PostgresTagRepository, + PostgresUsageLedgerRepository, PostgresUserRepository, PostgresVisibilityFilterRepository, + connect, run_migrations, }; use adapters_storage::LocalFileStorage; use application::{ - catalog::{GetAssetHandler, GetTimelineHandler, ReadAssetFileHandler, UpdateMetadataHandler}, + catalog::{ + GetAssetHandler, GetTimelineHandler, ReadAssetFileHandler, RegisterAssetHandler, + UpdateMetadataHandler, + }, identity::{GetProfileHandler, LoginUserHandler, RegisterUserHandler}, - organization::{CreateAlbumHandler, GetAlbumHandler, ManageAlbumEntriesHandler}, + organization::{ + CreateAlbumHandler, GetAlbumHandler, ManageAlbumEntriesHandler, TagAssetHandler, + }, + processing::{ + CompleteJobHandler, ConfigurePipelineHandler, EnqueueJobHandler, FailJobHandler, + ManagePluginHandler, ReportBatchProgressHandler, StartJobHandler, + }, sharing::{ AccessSharedResourceHandler, GenerateShareLinkHandler, RevokeShareHandler, ShareResourceHandler, }, - storage::{IngestAssetHandler, RegisterLibraryPathHandler, RegisterVolumeHandler}, + sidecar::{ + DetectExternalChangesHandler, ExportSidecarHandler, FullExportHandler, FullImportHandler, + ImportSidecarHandler, ResolveConflictHandler, + }, + storage::{ + CheckQuotaHandler, IngestAssetHandler, RegisterLibraryPathHandler, RegisterVolumeHandler, + }, }; use presentation::{ routes::app_router, state::{ - AppState, CatalogHandlers, IdentityHandlers, OrganizationHandlers, SharingHandlers, - StorageHandlers, + AppState, CatalogHandlers, IdentityHandlers, OrganizationHandlers, ProcessingHandlers, + SharingHandlers, SidecarHandlers, StorageHandlers, }, }; use crate::config::Config; use crate::log_event_publisher::LogEventPublisher; +use crate::log_sidecar_writer::LogSidecarWriter; pub async fn build_app(config: &Config) -> Result { let pool = connect(&config.database_url).await?; @@ -65,7 +84,15 @@ pub async fn build_app(config: &Config) -> Result { let session_repo = Arc::new(PostgresIngestSessionRepository::new(pool.clone())); let quota_repo = Arc::new(PostgresQuotaRepository::new(pool.clone())); let ledger_repo = Arc::new(PostgresUsageLedgerRepository::new(pool.clone())); + let tag_repo = Arc::new(PostgresTagRepository::new(pool.clone())); + let duplicate_repo = Arc::new(PostgresDuplicateRepository::new(pool.clone())); + let sidecar_repo = Arc::new(PostgresSidecarRepository::new(pool.clone())); + let job_repo = Arc::new(PostgresJobRepository::new(pool.clone())); + let batch_repo = Arc::new(PostgresJobBatchRepository::new(pool.clone())); + let plugin_repo = Arc::new(PostgresPluginRepository::new(pool.clone())); + let pipeline_repo = Arc::new(PostgresPipelineRepository::new(pool.clone())); let event_publisher: Arc = Arc::new(LogEventPublisher); + let sidecar_writer: Arc = Arc::new(LogSidecarWriter); // File storage let storage_path = std::env::var("STORAGE_PATH").unwrap_or_else(|_| "./data/media".to_string()); @@ -80,8 +107,8 @@ pub async fn build_app(config: &Config) -> Result { let ingest_asset_handler = Arc::new(IngestAssetHandler::new( session_repo, path_repo.clone(), - quota_repo, - ledger_repo, + quota_repo.clone(), + ledger_repo.clone(), asset_repo.clone(), file_storage.clone(), event_publisher.clone(), @@ -96,10 +123,78 @@ pub async fn build_app(config: &Config) -> Result { )); let update_metadata_handler = Arc::new(UpdateMetadataHandler::new( asset_repo.clone(), - metadata_repo, + metadata_repo.clone(), event_publisher.clone(), )); - let read_asset_file_handler = Arc::new(ReadAssetFileHandler::new(asset_repo, file_storage)); + let read_asset_file_handler = + Arc::new(ReadAssetFileHandler::new(asset_repo.clone(), file_storage)); + + // Register asset handler + let register_asset_handler = Arc::new(RegisterAssetHandler::new( + asset_repo.clone(), + duplicate_repo, + event_publisher.clone(), + )); + + // Tag handler + let tag_asset_handler = Arc::new(TagAssetHandler::new(asset_repo.clone(), tag_repo)); + + // Check quota handler + let check_quota_handler = Arc::new(CheckQuotaHandler::new(quota_repo, ledger_repo)); + + // Sidecar handlers + let export_sidecar_handler = Arc::new(ExportSidecarHandler::new( + metadata_repo.clone(), + sidecar_repo.clone(), + sidecar_writer.clone(), + )); + let detect_changes_handler = Arc::new(DetectExternalChangesHandler::new( + sidecar_repo.clone(), + sidecar_writer.clone(), + )); + let import_sidecar_handler = Arc::new(ImportSidecarHandler::new( + sidecar_repo.clone(), + sidecar_writer.clone(), + metadata_repo.clone(), + )); + let resolve_conflict_handler = Arc::new(ResolveConflictHandler::new( + sidecar_repo.clone(), + sidecar_writer.clone(), + metadata_repo.clone(), + )); + let full_export_handler = Arc::new(FullExportHandler::new( + asset_repo.clone(), + metadata_repo.clone(), + sidecar_repo.clone(), + sidecar_writer.clone(), + )); + let full_import_handler = Arc::new(FullImportHandler::new( + asset_repo, + metadata_repo, + sidecar_repo, + sidecar_writer, + )); + + // Processing handlers + let enqueue_job_handler = Arc::new(EnqueueJobHandler::new( + job_repo.clone(), + event_publisher.clone(), + )); + let start_job_handler = Arc::new(StartJobHandler::new(job_repo.clone())); + let complete_job_handler = Arc::new(CompleteJobHandler::new( + job_repo.clone(), + batch_repo.clone(), + event_publisher.clone(), + )); + let fail_job_handler = Arc::new(FailJobHandler::new( + job_repo.clone(), + batch_repo.clone(), + event_publisher.clone(), + )); + let batch_progress_handler = Arc::new(ReportBatchProgressHandler::new(batch_repo, job_repo)); + let manage_plugin_handler = Arc::new(ManagePluginHandler::new(plugin_repo.clone())); + let configure_pipeline_handler = + Arc::new(ConfigurePipelineHandler::new(pipeline_repo, plugin_repo)); // Sharing repos & handlers let share_repo = Arc::new(PostgresShareRepository::new(pool.clone())); @@ -130,17 +225,39 @@ pub async fn build_app(config: &Config) -> Result { get_timeline: get_timeline_handler, update_metadata: update_metadata_handler, read_asset_file: read_asset_file_handler, + register_asset: register_asset_handler, }; let organization = OrganizationHandlers { create_album: create_album_handler, get_album: get_album_handler, manage_album_entries: manage_album_entries_handler, + tag_asset: tag_asset_handler, }; let storage_handlers = StorageHandlers { register_volume: register_volume_handler, register_library_path: register_library_path_handler, + check_quota: check_quota_handler, + }; + + let sidecar = SidecarHandlers { + export: export_sidecar_handler, + detect_changes: detect_changes_handler, + import: import_sidecar_handler, + resolve: resolve_conflict_handler, + full_export: full_export_handler, + full_import: full_import_handler, + }; + + let processing = ProcessingHandlers { + enqueue_job: enqueue_job_handler, + start_job: start_job_handler, + complete_job: complete_job_handler, + fail_job: fail_job_handler, + batch_progress: batch_progress_handler, + manage_plugin: manage_plugin_handler, + configure_pipeline: configure_pipeline_handler, }; let sharing = SharingHandlers { @@ -156,6 +273,8 @@ pub async fn build_app(config: &Config) -> Result { organization, storage: storage_handlers, sharing, + sidecar, + processing, token_issuer: issuer, }; diff --git a/crates/bootstrap/src/lib.rs b/crates/bootstrap/src/lib.rs index b8ba85b..b3a8391 100644 --- a/crates/bootstrap/src/lib.rs +++ b/crates/bootstrap/src/lib.rs @@ -1,3 +1,4 @@ pub mod config; pub mod factory; pub mod log_event_publisher; +pub mod log_sidecar_writer; diff --git a/crates/bootstrap/src/log_sidecar_writer.rs b/crates/bootstrap/src/log_sidecar_writer.rs new file mode 100644 index 0000000..2e6f0ba --- /dev/null +++ b/crates/bootstrap/src/log_sidecar_writer.rs @@ -0,0 +1,21 @@ +use async_trait::async_trait; +use domain::{errors::DomainError, ports::SidecarWriterPort, value_objects::StructuredData}; + +pub struct LogSidecarWriter; + +#[async_trait] +impl SidecarWriterPort for LogSidecarWriter { + fn format_name(&self) -> &str { + "log_noop" + } + + async fn write_sidecar(&self, _data: &StructuredData, path: &str) -> Result<(), DomainError> { + tracing::info!(path, "sidecar write (no-op)"); + Ok(()) + } + + async fn read_sidecar(&self, path: &str) -> Result { + tracing::info!(path, "sidecar read (no-op)"); + Ok(StructuredData::new()) + } +} diff --git a/crates/bootstrap/src/main.rs b/crates/bootstrap/src/main.rs index a65ff78..61ac860 100644 --- a/crates/bootstrap/src/main.rs +++ b/crates/bootstrap/src/main.rs @@ -4,6 +4,7 @@ use tracing::info; mod config; mod factory; mod log_event_publisher; +mod log_sidecar_writer; #[tokio::main] async fn main() -> anyhow::Result<()> { diff --git a/crates/presentation/src/handlers/assets.rs b/crates/presentation/src/handlers/assets.rs index 761f1ea..dbe5c1c 100644 --- a/crates/presentation/src/handlers/assets.rs +++ b/crates/presentation/src/handlers/assets.rs @@ -4,9 +4,16 @@ use crate::{ extractors::{JwtClaims, UploadedAsset}, state::AppState, }; -use api_types::responses::{AssetResponse, IngestResponse, TimelineResponse}; +use api_types::{ + requests::{RegisterAssetRequest, TagAssetRequest}, + responses::{AssetResponse, IngestResponse, TagResponse, TimelineResponse}, +}; use application::{ - catalog::{GetAssetQuery, GetTimelineQuery, ReadAssetFileQuery, UpdateMetadataCommand}, + catalog::{ + GetAssetQuery, GetTimelineQuery, ReadAssetFileQuery, RegisterAssetCommand, + UpdateMetadataCommand, + }, + organization::TagAssetCommand, storage::IngestAssetCommand, }; use axum::{ @@ -16,7 +23,11 @@ use axum::{ http::{StatusCode, header}, response::Response, }; -use domain::value_objects::{MetadataValue, StructuredData, SystemId}; +use domain::{ + catalog::entities::AssetType, + errors::DomainError, + value_objects::{MetadataValue, StructuredData, SystemId}, +}; #[derive(Debug, serde::Deserialize)] pub struct TimelineParams { @@ -124,3 +135,51 @@ pub async fn serve_file( .body(Body::from(result.data)) .map_err(|e| AppError::from(domain::errors::DomainError::Internal(e.to_string()))) } + +pub async fn tag_asset( + State(state): State, + claims: JwtClaims, + Path((asset_id,)): Path<(uuid::Uuid,)>, + Json(req): Json, +) -> Result<(StatusCode, Json), AppError> { + let cmd = TagAssetCommand { + asset_id: SystemId::from_uuid(asset_id), + tag_name: req.tag_name, + user_id: claims.user_id, + }; + let (tag, _asset_tag) = state.organization.tag_asset.execute(cmd).await?; + Ok((StatusCode::CREATED, Json(TagResponse::from_domain(&tag)))) +} + +fn parse_asset_type(s: &str) -> Result { + match s { + "image" => Ok(AssetType::Image), + "video" => Ok(AssetType::Video), + "live_photo" => Ok(AssetType::LivePhoto), + _ => Err(AppError::from(DomainError::Validation(format!( + "Invalid asset type: {s}" + )))), + } +} + +pub async fn register_asset( + State(state): State, + claims: JwtClaims, + Json(req): Json, +) -> Result<(StatusCode, Json), AppError> { + let asset_type = parse_asset_type(&req.asset_type)?; + let cmd = RegisterAssetCommand { + volume_id: SystemId::from_uuid(req.volume_id), + relative_path: req.relative_path, + checksum: req.checksum, + asset_type, + mime_type: req.mime_type, + file_size: req.file_size, + owner_id: claims.user_id, + }; + let (asset, _dup_group) = state.catalog.register_asset.execute(cmd).await?; + Ok(( + StatusCode::CREATED, + Json(AssetResponse::from_domain(&asset, &StructuredData::new())), + )) +} diff --git a/crates/presentation/src/handlers/mod.rs b/crates/presentation/src/handlers/mod.rs index c04f858..5d2c2cf 100644 --- a/crates/presentation/src/handlers/mod.rs +++ b/crates/presentation/src/handlers/mod.rs @@ -2,5 +2,7 @@ pub mod albums; pub mod assets; pub mod auth; pub mod health; +pub mod processing; pub mod sharing; +pub mod sidecar; pub mod storage; diff --git a/crates/presentation/src/handlers/processing.rs b/crates/presentation/src/handlers/processing.rs new file mode 100644 index 0000000..0af6e1a --- /dev/null +++ b/crates/presentation/src/handlers/processing.rs @@ -0,0 +1,197 @@ +use crate::{errors::AppError, extractors::JwtClaims, state::AppState}; +use api_types::{ + requests::{ + CompleteJobRequest, ConfigurePipelineRequest, EnqueueJobRequest, FailJobRequest, + ManagePluginRequest, + }, + responses::{BatchProgressResponse, JobResponse, PipelineResponse, PluginResponse}, +}; +use application::processing::{ + CompleteJobCommand, ConfigurePipelineCommand, EnqueueJobCommand, FailJobCommand, + ManagePluginCommand, PipelineStepConfig, PluginAction, ReportBatchProgressQuery, + StartJobCommand, +}; +use axum::{ + Json, + extract::{Path, State}, + http::StatusCode, +}; +use domain::{ + entities::{JobType, PluginType}, + errors::DomainError, + value_objects::{MetadataValue, StructuredData, SystemId}, +}; + +fn parse_job_type(s: &str) -> JobType { + match s { + "scan_directory" => JobType::ScanDirectory, + "extract_metadata" => JobType::ExtractMetadata, + "generate_derivative" => JobType::GenerateDerivative, + "sync_sidecar" => JobType::SyncSidecar, + "detect_duplicates" => JobType::DetectDuplicates, + other => JobType::Custom(other.to_string()), + } +} + +fn parse_plugin_type(s: &str) -> Result { + match s { + "media_processor" => Ok(PluginType::MediaProcessor), + "scheduled_task" => Ok(PluginType::ScheduledTask), + "sidecar_writer" => Ok(PluginType::SidecarWriter), + _ => Err(AppError::from(DomainError::Validation(format!( + "Invalid plugin type: {s}" + )))), + } +} + +fn hashmap_to_structured( + map: &std::collections::HashMap, +) -> StructuredData { + let mut sd = StructuredData::new(); + for (k, v) in map { + sd.insert(k.clone(), MetadataValue::from(v.clone())); + } + sd +} + +pub async fn enqueue_job( + State(state): State, + _claims: JwtClaims, + Json(req): Json, +) -> Result<(StatusCode, Json), AppError> { + let payload = req + .payload + .as_ref() + .map(hashmap_to_structured) + .unwrap_or_default(); + + let cmd = EnqueueJobCommand { + job_type: parse_job_type(&req.job_type), + priority: req.priority.unwrap_or(0), + payload, + target_asset_id: req.target_asset_id.map(SystemId::from_uuid), + batch_id: req.batch_id.map(SystemId::from_uuid), + }; + let job = state.processing.enqueue_job.execute(cmd).await?; + Ok((StatusCode::CREATED, Json(JobResponse::from_domain(&job)))) +} + +pub async fn start_job( + State(state): State, + _claims: JwtClaims, + Path((job_id,)): Path<(uuid::Uuid,)>, +) -> Result, AppError> { + let cmd = StartJobCommand { + job_id: SystemId::from_uuid(job_id), + }; + let job = state.processing.start_job.execute(cmd).await?; + Ok(Json(JobResponse::from_domain(&job))) +} + +pub async fn complete_job( + State(state): State, + _claims: JwtClaims, + Path((job_id,)): Path<(uuid::Uuid,)>, + Json(req): Json, +) -> Result, AppError> { + let cmd = CompleteJobCommand { + job_id: SystemId::from_uuid(job_id), + result: hashmap_to_structured(&req.result), + }; + let job = state.processing.complete_job.execute(cmd).await?; + Ok(Json(JobResponse::from_domain(&job))) +} + +pub async fn fail_job( + State(state): State, + _claims: JwtClaims, + Path((job_id,)): Path<(uuid::Uuid,)>, + Json(req): Json, +) -> Result, AppError> { + let cmd = FailJobCommand { + job_id: SystemId::from_uuid(job_id), + error: req.error, + }; + let job = state.processing.fail_job.execute(cmd).await?; + Ok(Json(JobResponse::from_domain(&job))) +} + +pub async fn batch_progress( + State(state): State, + _claims: JwtClaims, + Path((batch_id,)): Path<(uuid::Uuid,)>, +) -> Result, AppError> { + let query = ReportBatchProgressQuery { + batch_id: SystemId::from_uuid(batch_id), + }; + let progress = state.processing.batch_progress.execute(query).await?; + Ok(Json(BatchProgressResponse::from_domain(&progress))) +} + +pub async fn manage_plugin( + State(state): State, + _claims: JwtClaims, + Json(req): Json, +) -> Result<(StatusCode, Json), AppError> { + let action = match req.action.as_str() { + "create" => { + let name = req.name.ok_or_else(|| { + AppError::from(DomainError::Validation("name required for create".into())) + })?; + let pt = req.plugin_type.as_deref().unwrap_or("media_processor"); + let plugin_type = parse_plugin_type(pt)?; + let config = req + .config + .as_ref() + .map(hashmap_to_structured) + .unwrap_or_default(); + PluginAction::Create { + name, + plugin_type, + config, + } + } + "enable" => PluginAction::Enable, + "disable" => PluginAction::Disable, + other => { + return Err(AppError::from(DomainError::Validation(format!( + "Invalid plugin action: {other}. Use create, enable, or disable" + )))); + } + }; + + let cmd = ManagePluginCommand { + plugin_id: req.plugin_id.map(SystemId::from_uuid), + action, + }; + let plugin = state.processing.manage_plugin.execute(cmd).await?; + Ok(( + StatusCode::CREATED, + Json(PluginResponse::from_domain(&plugin)), + )) +} + +pub async fn configure_pipeline( + State(state): State, + _claims: JwtClaims, + Json(req): Json, +) -> Result<(StatusCode, Json), AppError> { + let steps = req + .steps + .iter() + .map(|s| PipelineStepConfig { + plugin_id: SystemId::from_uuid(s.plugin_id), + config: hashmap_to_structured(&s.config), + }) + .collect(); + + let cmd = ConfigurePipelineCommand { + trigger_event: req.trigger_event, + steps, + }; + let pipeline = state.processing.configure_pipeline.execute(cmd).await?; + Ok(( + StatusCode::CREATED, + Json(PipelineResponse::from_domain(&pipeline)), + )) +} diff --git a/crates/presentation/src/handlers/sidecar.rs b/crates/presentation/src/handlers/sidecar.rs new file mode 100644 index 0000000..c6e717b --- /dev/null +++ b/crates/presentation/src/handlers/sidecar.rs @@ -0,0 +1,103 @@ +use crate::{errors::AppError, extractors::JwtClaims, state::AppState}; +use api_types::responses::{DetectChangesResponse, SidecarExportResponse, SidecarImportResponse}; +use application::sidecar::{ + DetectExternalChangesCommand, ExportSidecarCommand, FullExportCommand, FullImportCommand, + ImportSidecarCommand, ResolveConflictCommand, +}; +use axum::{ + Json, + extract::{Path, State}, +}; +use domain::{entities::ConflictPolicy, errors::DomainError, value_objects::SystemId}; + +fn parse_conflict_policy(s: &str) -> Result { + match s { + "db_wins" => Ok(ConflictPolicy::DbWins), + "file_wins" => Ok(ConflictPolicy::FileWins), + _ => Err(AppError::from(DomainError::Validation(format!( + "Invalid conflict policy: {s}. Use db_wins or file_wins" + )))), + } +} + +pub async fn export_sidecar( + State(state): State, + _claims: JwtClaims, + Path((asset_id,)): Path<(uuid::Uuid,)>, +) -> Result, AppError> { + let cmd = ExportSidecarCommand { + asset_id: SystemId::from_uuid(asset_id), + }; + let record = state.sidecar.export.execute(cmd).await?; + Ok(Json(SidecarExportResponse::from_domain(&record))) +} + +pub async fn detect_changes( + State(state): State, + _claims: JwtClaims, +) -> Result, AppError> { + let count = state + .sidecar + .detect_changes + .execute(DetectExternalChangesCommand) + .await?; + Ok(Json(DetectChangesResponse { + changed_count: count, + })) +} + +pub async fn import_sidecar( + State(state): State, + _claims: JwtClaims, + Path((asset_id,)): Path<(uuid::Uuid,)>, +) -> Result, AppError> { + let cmd = ImportSidecarCommand { + asset_id: SystemId::from_uuid(asset_id), + }; + let metadata = state.sidecar.import.execute(cmd).await?; + Ok(Json(SidecarImportResponse { + asset_id: *metadata.asset_id.as_uuid(), + status: "imported".to_string(), + })) +} + +pub async fn resolve_conflict( + State(state): State, + _claims: JwtClaims, + Path((asset_id,)): Path<(uuid::Uuid,)>, + Json(req): Json, +) -> Result, AppError> { + let policy = parse_conflict_policy(&req.policy)?; + let cmd = ResolveConflictCommand { + asset_id: SystemId::from_uuid(asset_id), + policy, + }; + let record = state.sidecar.resolve.execute(cmd).await?; + Ok(Json(SidecarExportResponse::from_domain(&record))) +} + +pub async fn full_export( + State(state): State, + claims: JwtClaims, +) -> Result, AppError> { + let cmd = FullExportCommand { + owner_id: claims.user_id, + }; + let count = state.sidecar.full_export.execute(cmd).await?; + Ok(Json(DetectChangesResponse { + changed_count: count, + })) +} + +pub async fn full_import( + State(state): State, + claims: JwtClaims, +) -> Result, AppError> { + let cmd = FullImportCommand { + owner_id: claims.user_id, + }; + let count = state.sidecar.full_import.execute(cmd).await?; + Ok(Json(DetectChangesResponse { + changed_count: count, + })) +} diff --git a/crates/presentation/src/handlers/storage.rs b/crates/presentation/src/handlers/storage.rs index 37aa5e6..de87fda 100644 --- a/crates/presentation/src/handlers/storage.rs +++ b/crates/presentation/src/handlers/storage.rs @@ -1,11 +1,15 @@ use crate::{errors::AppError, extractors::JwtClaims, state::AppState}; use api_types::{ - requests::{RegisterLibraryPathRequest, RegisterVolumeRequest}, - responses::{LibraryPathResponse, VolumeResponse}, + requests::{CheckQuotaParams, RegisterLibraryPathRequest, RegisterVolumeRequest}, + responses::{LibraryPathResponse, QuotaCheckResponse, VolumeResponse}, }; -use application::storage::{RegisterLibraryPathCommand, RegisterVolumeCommand}; -use axum::{Json, extract::State, http::StatusCode}; -use domain::value_objects::SystemId; +use application::storage::{CheckQuotaQuery, RegisterLibraryPathCommand, RegisterVolumeCommand}; +use axum::{ + Json, + extract::{Query, State}, + http::StatusCode, +}; +use domain::{entities::UsageType, errors::DomainError, value_objects::SystemId}; pub async fn register_volume( State(state): State, @@ -41,3 +45,38 @@ pub async fn register_library_path( Json(LibraryPathResponse::from_domain(&path)), )) } + +const DEFAULT_QUOTA_USAGE_TYPE: &str = "storage_bytes"; +const DEFAULT_QUOTA_AMOUNT: u64 = 0; + +fn parse_usage_type(s: &str) -> Result { + match s { + "storage_bytes" => Ok(UsageType::StorageBytes), + "process_jobs" => Ok(UsageType::ProcessJobs), + "api_calls" => Ok(UsageType::ApiCalls), + "indexing_size" => Ok(UsageType::IndexingSize), + _ => Err(AppError::from(DomainError::Validation(format!( + "Invalid usage type: {s}" + )))), + } +} + +pub async fn check_quota( + State(state): State, + claims: JwtClaims, + Query(params): Query, +) -> Result, AppError> { + let usage_type = parse_usage_type( + params + .usage_type + .as_deref() + .unwrap_or(DEFAULT_QUOTA_USAGE_TYPE), + )?; + let query = CheckQuotaQuery { + user_id: claims.user_id, + usage_type, + requested_amount: params.amount.unwrap_or(DEFAULT_QUOTA_AMOUNT), + }; + let result = state.storage.check_quota.execute(query).await?; + Ok(Json(QuotaCheckResponse::from_domain(&result))) +} diff --git a/crates/presentation/src/routes.rs b/crates/presentation/src/routes.rs index 0e42da9..51eb975 100644 --- a/crates/presentation/src/routes.rs +++ b/crates/presentation/src/routes.rs @@ -1,5 +1,5 @@ use crate::{ - handlers::{albums, assets, auth, health, sharing, storage}, + handlers::{albums, assets, auth, health, processing, sharing, sidecar, storage}, openapi::openapi_router, state::AppState, }; @@ -24,10 +24,12 @@ pub fn api_v1_router() -> Router { ) // assets .route("/assets/ingest", post(assets::ingest)) + .route("/assets/register", post(assets::register_asset)) .route("/assets/timeline", get(assets::timeline)) .route("/assets/{id}", get(assets::get_asset)) .route("/assets/{id}/metadata", put(assets::update_metadata)) .route("/assets/{id}/file", get(assets::serve_file)) + .route("/assets/{id}/tags", post(assets::tag_asset)) // sharing .route("/sharing", post(sharing::share_resource)) .route("/sharing/links", post(sharing::generate_link)) @@ -39,6 +41,25 @@ pub fn api_v1_router() -> Router { "/storage/library-paths", post(storage::register_library_path), ) + .route("/storage/quota", get(storage::check_quota)) + // sidecar + .route("/sidecar/export/{asset_id}", post(sidecar::export_sidecar)) + .route("/sidecar/detect-changes", post(sidecar::detect_changes)) + .route("/sidecar/import/{asset_id}", post(sidecar::import_sidecar)) + .route( + "/sidecar/resolve/{asset_id}", + post(sidecar::resolve_conflict), + ) + .route("/sidecar/full-export", post(sidecar::full_export)) + .route("/sidecar/full-import", post(sidecar::full_import)) + // processing + .route("/jobs", post(processing::enqueue_job)) + .route("/jobs/{id}/start", post(processing::start_job)) + .route("/jobs/{id}/complete", post(processing::complete_job)) + .route("/jobs/{id}/fail", post(processing::fail_job)) + .route("/jobs/batches/{id}", get(processing::batch_progress)) + .route("/plugins", post(processing::manage_plugin)) + .route("/pipelines", post(processing::configure_pipeline)) } pub fn app_router() -> Router { diff --git a/crates/presentation/src/state.rs b/crates/presentation/src/state.rs index ebaed94..6490395 100644 --- a/crates/presentation/src/state.rs +++ b/crates/presentation/src/state.rs @@ -1,14 +1,29 @@ use std::sync::Arc; use application::{ - catalog::{GetAssetHandler, GetTimelineHandler, ReadAssetFileHandler, UpdateMetadataHandler}, + catalog::{ + GetAssetHandler, GetTimelineHandler, ReadAssetFileHandler, RegisterAssetHandler, + UpdateMetadataHandler, + }, identity::{GetProfileHandler, LoginUserHandler, RegisterUserHandler}, - organization::{CreateAlbumHandler, GetAlbumHandler, ManageAlbumEntriesHandler}, + organization::{ + CreateAlbumHandler, GetAlbumHandler, ManageAlbumEntriesHandler, TagAssetHandler, + }, + processing::{ + CompleteJobHandler, ConfigurePipelineHandler, EnqueueJobHandler, FailJobHandler, + ManagePluginHandler, ReportBatchProgressHandler, StartJobHandler, + }, sharing::{ AccessSharedResourceHandler, GenerateShareLinkHandler, RevokeShareHandler, ShareResourceHandler, }, - storage::{IngestAssetHandler, RegisterLibraryPathHandler, RegisterVolumeHandler}, + sidecar::{ + DetectExternalChangesHandler, ExportSidecarHandler, FullExportHandler, FullImportHandler, + ImportSidecarHandler, ResolveConflictHandler, + }, + storage::{ + CheckQuotaHandler, IngestAssetHandler, RegisterLibraryPathHandler, RegisterVolumeHandler, + }, }; use domain::ports::TokenIssuer; @@ -26,6 +41,7 @@ pub struct CatalogHandlers { pub get_timeline: Arc, pub update_metadata: Arc, pub read_asset_file: Arc, + pub register_asset: Arc, } #[derive(Clone)] @@ -33,12 +49,14 @@ pub struct OrganizationHandlers { pub create_album: Arc, pub get_album: Arc, pub manage_album_entries: Arc, + pub tag_asset: Arc, } #[derive(Clone)] pub struct StorageHandlers { pub register_volume: Arc, pub register_library_path: Arc, + pub check_quota: Arc, } #[derive(Clone)] @@ -49,6 +67,27 @@ pub struct SharingHandlers { pub access: Arc, } +#[derive(Clone)] +pub struct SidecarHandlers { + pub export: Arc, + pub detect_changes: Arc, + pub import: Arc, + pub resolve: Arc, + pub full_export: Arc, + pub full_import: Arc, +} + +#[derive(Clone)] +pub struct ProcessingHandlers { + pub enqueue_job: Arc, + pub start_job: Arc, + pub complete_job: Arc, + pub fail_job: Arc, + pub batch_progress: Arc, + pub manage_plugin: Arc, + pub configure_pipeline: Arc, +} + #[derive(Clone)] pub struct AppState { pub identity: IdentityHandlers, @@ -56,5 +95,7 @@ pub struct AppState { pub organization: OrganizationHandlers, pub storage: StorageHandlers, pub sharing: SharingHandlers, + pub sidecar: SidecarHandlers, + pub processing: ProcessingHandlers, pub token_issuer: Arc, }