From d1c7243f5bd929bb3d4b377bf882a55634b23c7f Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Sun, 31 May 2026 20:12:42 +0200 Subject: [PATCH] feat: seed default plugins/pipelines, auto-enqueue jobs on asset ingest MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Migration seeds metadata_extractor, sidecar_sync, no_op plugins - Pipelines: extract_metadata → metadata_extractor, sync_sidecar → sidecar_sync - Worker reacts to AssetIngested → enqueues ExtractMetadata job - Worker reacts to SidecarSyncRequested → enqueues SyncSidecar job - Closes the ingest-to-processing loop end-to-end --- .../migrations/011_seed_default_plugins.sql | 25 ++++++++ crates/adapters/postgres/src/catalog/mod.rs | 2 +- crates/adapters/postgres/src/event_store.rs | 16 +++--- crates/adapters/postgres/src/identity/mod.rs | 2 +- .../adapters/postgres/src/organization/mod.rs | 2 +- .../adapters/postgres/src/processing/mod.rs | 2 +- crates/adapters/postgres/src/sharing/mod.rs | 2 +- crates/adapters/postgres/src/sidecar/mod.rs | 2 +- crates/adapters/postgres/src/storage/mod.rs | 2 +- crates/application/src/catalog/visibility.rs | 42 ++++---------- crates/bootstrap/src/factory.rs | 8 ++- crates/bootstrap/src/services/catalog.rs | 4 +- crates/bootstrap/src/services/sharing.rs | 7 +-- crates/bootstrap/src/services/storage.rs | 6 +- crates/domain/src/common/ports.rs | 5 +- crates/domain/src/storage/ports.rs | 3 +- crates/worker/src/factories/mod.rs | 2 +- crates/worker/src/factories/processing.rs | 9 ++- crates/worker/src/main.rs | 57 ++++++++++++++++--- 19 files changed, 127 insertions(+), 71 deletions(-) create mode 100644 crates/adapters/postgres/migrations/011_seed_default_plugins.sql diff --git a/crates/adapters/postgres/migrations/011_seed_default_plugins.sql b/crates/adapters/postgres/migrations/011_seed_default_plugins.sql new file mode 100644 index 0000000..72cee7a --- /dev/null +++ b/crates/adapters/postgres/migrations/011_seed_default_plugins.sql @@ -0,0 +1,25 @@ +-- Default plugins matching worker's InMemoryPluginRegistry +INSERT INTO plugins (plugin_id, name, plugin_type, is_enabled, configuration) +VALUES + ('a0000000-0000-4000-8000-000000000001', 'metadata_extractor', 'media_processor', true, '{}'), + ('a0000000-0000-4000-8000-000000000002', 'sidecar_sync', 'sidecar_writer', true, '{}'), + ('a0000000-0000-4000-8000-000000000003', 'no_op', 'scheduled_task', true, '{}') +ON CONFLICT (plugin_id) DO NOTHING; + +-- Pipeline: extract_metadata → metadata_extractor +INSERT INTO processing_pipelines (pipeline_id, trigger_event, steps) +VALUES ( + 'b0000000-0000-4000-8000-000000000001', + 'extract_metadata', + '[{"plugin_id": "a0000000-0000-4000-8000-000000000001", "step_order": 0, "configuration": {}}]' +) +ON CONFLICT (pipeline_id) DO NOTHING; + +-- Pipeline: sync_sidecar → sidecar_sync +INSERT INTO processing_pipelines (pipeline_id, trigger_event, steps) +VALUES ( + 'b0000000-0000-4000-8000-000000000002', + 'sync_sidecar', + '[{"plugin_id": "a0000000-0000-4000-8000-000000000002", "step_order": 0, "configuration": {}}]' +) +ON CONFLICT (pipeline_id) DO NOTHING; diff --git a/crates/adapters/postgres/src/catalog/mod.rs b/crates/adapters/postgres/src/catalog/mod.rs index 55068c8..a306d4d 100644 --- a/crates/adapters/postgres/src/catalog/mod.rs +++ b/crates/adapters/postgres/src/catalog/mod.rs @@ -1,4 +1,4 @@ -use crate::helpers::{pg_repo, MapDomainError}; +use crate::helpers::{MapDomainError, pg_repo}; use async_trait::async_trait; use chrono::{DateTime, Utc}; use domain::{ diff --git a/crates/adapters/postgres/src/event_store.rs b/crates/adapters/postgres/src/event_store.rs index 3f8cbc2..a084476 100644 --- a/crates/adapters/postgres/src/event_store.rs +++ b/crates/adapters/postgres/src/event_store.rs @@ -1,10 +1,7 @@ -use crate::helpers::{pg_repo, MapDomainError}; +use crate::helpers::{MapDomainError, pg_repo}; use async_trait::async_trait; use domain::{ - errors::DomainError, - events::DomainEvent, - ports::EventStore, - value_objects::SystemId, + errors::DomainError, events::DomainEvent, ports::EventStore, value_objects::SystemId, }; use event_payload::EventPayload; use uuid::Uuid; @@ -19,8 +16,9 @@ fn aggregate_id(event: &DomainEvent) -> Uuid { | DomainEvent::AssetDeleted { asset_id, .. } | DomainEvent::SidecarSyncRequested { asset_id, .. } => *asset_id.as_uuid(), - DomainEvent::ShareCreated { scope_id, .. } - | DomainEvent::ShareRevoked { scope_id, .. } => *scope_id.as_uuid(), + DomainEvent::ShareCreated { scope_id, .. } | DomainEvent::ShareRevoked { scope_id, .. } => { + *scope_id.as_uuid() + } DomainEvent::JobEnqueued { job_id, .. } | DomainEvent::JobCompleted { job_id, .. } @@ -33,8 +31,8 @@ impl EventStore for PostgresEventStore { async fn append(&self, event: &DomainEvent) -> Result<(), DomainError> { let payload = EventPayload::from(event); let event_type = payload.subject().to_string(); - let json = serde_json::to_value(&payload) - .map_err(|e| DomainError::Internal(e.to_string()))?; + let json = + serde_json::to_value(&payload).map_err(|e| DomainError::Internal(e.to_string()))?; let agg_id = aggregate_id(event); sqlx::query( diff --git a/crates/adapters/postgres/src/identity/mod.rs b/crates/adapters/postgres/src/identity/mod.rs index 3463d4e..1727214 100644 --- a/crates/adapters/postgres/src/identity/mod.rs +++ b/crates/adapters/postgres/src/identity/mod.rs @@ -1,4 +1,4 @@ -use crate::helpers::{pg_repo, MapDomainError}; +use crate::helpers::{MapDomainError, pg_repo}; use async_trait::async_trait; use chrono::{DateTime, Utc}; use domain::{ diff --git a/crates/adapters/postgres/src/organization/mod.rs b/crates/adapters/postgres/src/organization/mod.rs index 8760b1f..c7ae407 100644 --- a/crates/adapters/postgres/src/organization/mod.rs +++ b/crates/adapters/postgres/src/organization/mod.rs @@ -1,4 +1,4 @@ -use crate::helpers::{pg_repo, MapDomainError}; +use crate::helpers::{MapDomainError, pg_repo}; use async_trait::async_trait; use chrono::{DateTime, Utc}; use domain::{ diff --git a/crates/adapters/postgres/src/processing/mod.rs b/crates/adapters/postgres/src/processing/mod.rs index 4465470..0c4ada4 100644 --- a/crates/adapters/postgres/src/processing/mod.rs +++ b/crates/adapters/postgres/src/processing/mod.rs @@ -1,4 +1,4 @@ -use crate::helpers::{pg_repo, MapDomainError}; +use crate::helpers::{MapDomainError, pg_repo}; use async_trait::async_trait; use chrono::{DateTime, Utc}; use domain::{ diff --git a/crates/adapters/postgres/src/sharing/mod.rs b/crates/adapters/postgres/src/sharing/mod.rs index 50ca08e..068e5cc 100644 --- a/crates/adapters/postgres/src/sharing/mod.rs +++ b/crates/adapters/postgres/src/sharing/mod.rs @@ -1,4 +1,4 @@ -use crate::helpers::{pg_repo, MapDomainError}; +use crate::helpers::{MapDomainError, pg_repo}; use async_trait::async_trait; use chrono::{DateTime, Utc}; use domain::{ diff --git a/crates/adapters/postgres/src/sidecar/mod.rs b/crates/adapters/postgres/src/sidecar/mod.rs index 6fda3eb..f50c330 100644 --- a/crates/adapters/postgres/src/sidecar/mod.rs +++ b/crates/adapters/postgres/src/sidecar/mod.rs @@ -1,4 +1,4 @@ -use crate::helpers::{pg_repo, MapDomainError}; +use crate::helpers::{MapDomainError, pg_repo}; use async_trait::async_trait; use chrono::{DateTime, Utc}; use domain::{ diff --git a/crates/adapters/postgres/src/storage/mod.rs b/crates/adapters/postgres/src/storage/mod.rs index 553858b..1e12824 100644 --- a/crates/adapters/postgres/src/storage/mod.rs +++ b/crates/adapters/postgres/src/storage/mod.rs @@ -1,4 +1,4 @@ -use crate::helpers::{pg_repo, MapDomainError}; +use crate::helpers::{MapDomainError, pg_repo}; use async_trait::async_trait; use chrono::{DateTime, Utc}; use domain::{ diff --git a/crates/application/src/catalog/visibility.rs b/crates/application/src/catalog/visibility.rs index 83af003..c1a50df 100644 --- a/crates/application/src/catalog/visibility.rs +++ b/crates/application/src/catalog/visibility.rs @@ -162,11 +162,8 @@ mod tests { inner.save(&asset).await.unwrap(); let share_repo = Arc::new(InMemoryShareRepository::new()); - let filtered = VisibilityFilteredAssetRepository::new( - inner.clone(), - share_repo.clone(), - owner_id, - ); + let filtered = + VisibilityFilteredAssetRepository::new(inner.clone(), share_repo.clone(), owner_id); let found = filtered.find_by_id(&asset.asset_id).await.unwrap(); assert!(found.is_some()); @@ -183,11 +180,8 @@ mod tests { inner.save(&asset).await.unwrap(); let share_repo = Arc::new(InMemoryShareRepository::new()); - let filtered = VisibilityFilteredAssetRepository::new( - inner.clone(), - share_repo.clone(), - stranger_id, - ); + let filtered = + VisibilityFilteredAssetRepository::new(inner.clone(), share_repo.clone(), stranger_id); let found = filtered.find_by_id(&asset.asset_id).await.unwrap(); assert!(found.is_none()); @@ -211,11 +205,8 @@ mod tests { let target = target_user(scope.scope_id, friend_id); share_repo.save_target(&target).await.unwrap(); - let filtered = VisibilityFilteredAssetRepository::new( - inner.clone(), - share_repo.clone(), - friend_id, - ); + let filtered = + VisibilityFilteredAssetRepository::new(inner.clone(), share_repo.clone(), friend_id); let found = filtered.find_by_id(&asset.asset_id).await.unwrap(); assert!(found.is_some()); @@ -239,11 +230,8 @@ mod tests { let share_repo = Arc::new(InMemoryShareRepository::new()); // Stranger queries by checksum — should only see their own - let filtered = VisibilityFilteredAssetRepository::new( - inner.clone(), - share_repo.clone(), - stranger_id, - ); + let filtered = + VisibilityFilteredAssetRepository::new(inner.clone(), share_repo.clone(), stranger_id); let results = filtered .find_by_checksum(&asset_a.source_reference.checksum) @@ -262,11 +250,8 @@ mod tests { inner.save(&asset).await.unwrap(); let share_repo = Arc::new(InMemoryShareRepository::new()); - let filtered = VisibilityFilteredAssetRepository::new( - inner.clone(), - share_repo.clone(), - owner_id, - ); + let filtered = + VisibilityFilteredAssetRepository::new(inner.clone(), share_repo.clone(), owner_id); let results = filtered.find_by_owner(&owner_id, 10, 0).await.unwrap(); assert_eq!(results.len(), 1); @@ -284,11 +269,8 @@ mod tests { let share_repo = Arc::new(InMemoryShareRepository::new()); // Stranger queries owner's assets without a share — should get nothing - let filtered = VisibilityFilteredAssetRepository::new( - inner.clone(), - share_repo.clone(), - stranger_id, - ); + let filtered = + VisibilityFilteredAssetRepository::new(inner.clone(), share_repo.clone(), stranger_id); let results = filtered.find_by_owner(&owner_id, 10, 0).await.unwrap(); assert!(results.is_empty()); diff --git a/crates/bootstrap/src/factory.rs b/crates/bootstrap/src/factory.rs index a9f37b3..2396346 100644 --- a/crates/bootstrap/src/factory.rs +++ b/crates/bootstrap/src/factory.rs @@ -26,8 +26,9 @@ pub async fn build_app(config: &Config) -> Result { Arc::new(event_transport::EventPublisherAdapter::new(transport)); let event_store: Arc = Arc::new(adapters_postgres::PostgresEventStore::new(pool.clone())); - let event_publisher: Arc = - Arc::new(event_transport::CompositeEventPublisher::new(nats_publisher, event_store)); + let event_publisher: Arc = Arc::new( + event_transport::CompositeEventPublisher::new(nats_publisher, event_store), + ); let storage_path = std::env::var("STORAGE_PATH").unwrap_or_else(|_| "./data/media".to_string()); let file_storage: Arc = Arc::new(LocalFileStorage::new(&storage_path)); @@ -35,7 +36,8 @@ pub async fn build_app(config: &Config) -> Result { // Build per-context services let identity = services::identity::build(&pool, &config.jwt_secret); let (storage_repos, storage) = services::storage::build(&pool); - let catalog = services::catalog::build(&pool, &storage_repos, file_storage, event_publisher.clone()); + let catalog = + services::catalog::build(&pool, &storage_repos, file_storage, event_publisher.clone()); let organization = services::organization::build(&pool); let sidecar = services::sidecar::build(&pool); let processing = services::processing::build(&pool, event_publisher.clone()); diff --git a/crates/bootstrap/src/services/catalog.rs b/crates/bootstrap/src/services/catalog.rs index 5250512..15b6aa1 100644 --- a/crates/bootstrap/src/services/catalog.rs +++ b/crates/bootstrap/src/services/catalog.rs @@ -1,8 +1,8 @@ use std::sync::Arc; use adapters_postgres::{ - PgPool, PostgresAssetMetadataRepository, PostgresAssetRepository, - PostgresDuplicateRepository, PostgresIngestTransaction, + PgPool, PostgresAssetMetadataRepository, PostgresAssetRepository, PostgresDuplicateRepository, + PostgresIngestTransaction, }; use adapters_storage::LocalFileStorage; use application::catalog::{ diff --git a/crates/bootstrap/src/services/sharing.rs b/crates/bootstrap/src/services/sharing.rs index 9a45cad..fc5f24c 100644 --- a/crates/bootstrap/src/services/sharing.rs +++ b/crates/bootstrap/src/services/sharing.rs @@ -1,11 +1,8 @@ use std::sync::Arc; -use adapters_postgres::{ - PgPool, PostgresShareRepository, PostgresVisibilityFilterRepository, -}; +use adapters_postgres::{PgPool, PostgresShareRepository, PostgresVisibilityFilterRepository}; use application::sharing::{ - AccessSharedResourceHandler, GenerateShareLinkHandler, RevokeShareHandler, - ShareResourceHandler, + AccessSharedResourceHandler, GenerateShareLinkHandler, RevokeShareHandler, ShareResourceHandler, }; use domain::ports::EventPublisher; use presentation::state::SharingHandlers; diff --git a/crates/bootstrap/src/services/storage.rs b/crates/bootstrap/src/services/storage.rs index 606a6e5..da16e5a 100644 --- a/crates/bootstrap/src/services/storage.rs +++ b/crates/bootstrap/src/services/storage.rs @@ -19,8 +19,10 @@ pub fn build(pool: &PgPool) -> (StorageRepos, StorageHandlers) { let ledger_repo = Arc::new(PostgresUsageLedgerRepository::new(pool.clone())); let register_volume = Arc::new(RegisterVolumeHandler::new(volume_repo.clone())); - let register_library_path = - Arc::new(RegisterLibraryPathHandler::new(volume_repo, path_repo.clone())); + let register_library_path = Arc::new(RegisterLibraryPathHandler::new( + volume_repo, + path_repo.clone(), + )); let check_quota = Arc::new(CheckQuotaHandler::new(quota_repo, ledger_repo)); let handlers = StorageHandlers { diff --git a/crates/domain/src/common/ports.rs b/crates/domain/src/common/ports.rs index 9a2fb08..147bea5 100644 --- a/crates/domain/src/common/ports.rs +++ b/crates/domain/src/common/ports.rs @@ -16,5 +16,8 @@ pub trait EventConsumer: Send + Sync { #[async_trait] pub trait EventStore: Send + Sync { async fn append(&self, event: &DomainEvent) -> Result<(), DomainError>; - async fn query_by_aggregate(&self, aggregate_id: &SystemId) -> Result, DomainError>; + async fn query_by_aggregate( + &self, + aggregate_id: &SystemId, + ) -> Result, DomainError>; } diff --git a/crates/domain/src/storage/ports.rs b/crates/domain/src/storage/ports.rs index d296d9b..7ede133 100644 --- a/crates/domain/src/storage/ports.rs +++ b/crates/domain/src/storage/ports.rs @@ -73,7 +73,8 @@ pub trait UsageLedgerRepository: Send + Sync { pub trait IngestTransaction: Send + Sync { async fn save_asset(&self, asset: &Asset) -> Result<(), DomainError>; async fn save_session(&self, session: &IngestSession) -> Result<(), DomainError>; - async fn find_quota(&self, owner_id: &SystemId) -> Result, DomainError>; + async fn find_quota(&self, owner_id: &SystemId) + -> Result, DomainError>; async fn sum_usage( &self, user_id: &SystemId, diff --git a/crates/worker/src/factories/mod.rs b/crates/worker/src/factories/mod.rs index fbedfe3..0b82f2a 100644 --- a/crates/worker/src/factories/mod.rs +++ b/crates/worker/src/factories/mod.rs @@ -4,4 +4,4 @@ mod processing; pub use infra::Repos; pub use plugins::build_plugin_registry; -pub use processing::build_process_next_handler; +pub use processing::{build_enqueue_handler, build_process_next_handler}; diff --git a/crates/worker/src/factories/processing.rs b/crates/worker/src/factories/processing.rs index ddaf152..7b451bc 100644 --- a/crates/worker/src/factories/processing.rs +++ b/crates/worker/src/factories/processing.rs @@ -1,4 +1,4 @@ -use application::processing::{ExecutePipelineHandler, ProcessNextJobHandler}; +use application::processing::{EnqueueJobHandler, ExecutePipelineHandler, ProcessNextJobHandler}; use domain::ports::{EventPublisher, PluginRegistry}; use std::sync::Arc; @@ -20,3 +20,10 @@ pub fn build_process_next_handler( ProcessNextJobHandler::new(repos.job.clone(), execute_pipeline) } + +pub fn build_enqueue_handler( + repos: &Repos, + event_pub: Arc, +) -> EnqueueJobHandler { + EnqueueJobHandler::new(repos.job.clone(), event_pub) +} diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index d6ebac6..3060303 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -4,16 +4,18 @@ use std::time::Duration; use futures::StreamExt; use tracing::{error, info, warn}; -use application::processing::ProcessNextJobCommand; +use application::processing::{EnqueueJobCommand, ProcessNextJobCommand}; +use domain::entities::JobType; use domain::events::DomainEvent; use domain::ports::EventConsumer; +use domain::value_objects::StructuredData; mod config; mod factories; mod plugin_registry; mod plugins; -use factories::{Repos, build_plugin_registry, build_process_next_handler}; +use factories::{Repos, build_enqueue_handler, build_plugin_registry, build_process_next_handler}; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -45,17 +47,26 @@ async fn main() -> anyhow::Result<()> { let pub_transport = adapters_nats::NatsTransport::new(nats_client.clone()); let nats_publisher: Arc = Arc::new(event_transport::EventPublisherAdapter::new(pub_transport)); - let event_pub: Arc = - Arc::new(event_transport::CompositeEventPublisher::new(nats_publisher, event_store)); + let event_pub: Arc = Arc::new( + event_transport::CompositeEventPublisher::new(nats_publisher, event_store), + ); let registry = Arc::new(build_plugin_registry(&repos, file_storage, sidecar_writer)); - let process_next = Arc::new(build_process_next_handler(&repos, registry, event_pub)); + let process_next = Arc::new(build_process_next_handler( + &repos, + registry, + event_pub.clone(), + )); + let enqueue = Arc::new(build_enqueue_handler(&repos, event_pub)); // ── Fallback sweep task ──────────────────────────────────────────── let sweep_interval = Duration::from_secs(config.fallback_sweep_secs); let sweep_handler = Arc::clone(&process_next); tokio::spawn(async move { - info!(every_secs = config.fallback_sweep_secs, "fallback sweep task started"); + info!( + every_secs = config.fallback_sweep_secs, + "fallback sweep task started" + ); loop { tokio::time::sleep(sweep_interval).await; info!("fallback sweep: draining queued jobs"); @@ -91,17 +102,45 @@ async fn main() -> anyhow::Result<()> { }; match &envelope.event { + DomainEvent::AssetIngested { asset_id, .. } => { + info!(asset_id = %asset_id, "event loop: AssetIngested → enqueue ExtractMetadata"); + (envelope.ack)(); + let cmd = EnqueueJobCommand { + job_type: JobType::ExtractMetadata, + priority: 10, + payload: StructuredData::new(), + target_asset_id: Some(*asset_id), + batch_id: None, + }; + if let Err(e) = enqueue.execute(cmd).await { + error!(error = %e, "event loop: failed to enqueue ExtractMetadata"); + } + } + DomainEvent::SidecarSyncRequested { asset_id, .. } => { + info!(asset_id = %asset_id, "event loop: SidecarSyncRequested → enqueue SyncSidecar"); + (envelope.ack)(); + let cmd = EnqueueJobCommand { + job_type: JobType::SyncSidecar, + priority: 5, + payload: StructuredData::new(), + target_asset_id: Some(*asset_id), + batch_id: None, + }; + if let Err(e) = enqueue.execute(cmd).await { + error!(error = %e, "event loop: failed to enqueue SyncSidecar"); + } + } DomainEvent::JobEnqueued { job_id, job_type, .. } => { - info!(job_id = %job_id, job_type = %job_type, "event loop: JobEnqueued received"); + info!(job_id = %job_id, job_type = %job_type, "event loop: JobEnqueued → process"); (envelope.ack)(); match process_next.execute(ProcessNextJobCommand).await { Ok(Some(job)) => { info!(job_id = %job.job_id, status = ?job.status, "event loop: processed job"); } Ok(None) => { - warn!("event loop: JobEnqueued event but no queued job found"); + warn!("event loop: JobEnqueued but no queued job found"); } Err(e) => { error!(error = %e, "event loop: error processing job"); @@ -109,8 +148,8 @@ async fn main() -> anyhow::Result<()> { } } other => { - info!(event = ?other, "event loop: non-job event, acking"); (envelope.ack)(); + tracing::debug!(event = ?other, "event loop: unhandled event, acked"); } } }