feat: seed default plugins/pipelines, auto-enqueue jobs on asset ingest

- Migration seeds metadata_extractor, sidecar_sync, no_op plugins
- Pipelines: extract_metadata → metadata_extractor, sync_sidecar → sidecar_sync
- Worker reacts to AssetIngested → enqueues ExtractMetadata job
- Worker reacts to SidecarSyncRequested → enqueues SyncSidecar job
- Closes the ingest-to-processing loop end-to-end
This commit is contained in:
2026-05-31 20:12:42 +02:00
parent b5cda3afeb
commit d1c7243f5b
19 changed files with 127 additions and 71 deletions

View File

@@ -0,0 +1,25 @@
-- Default plugins matching worker's InMemoryPluginRegistry
INSERT INTO plugins (plugin_id, name, plugin_type, is_enabled, configuration)
VALUES
('a0000000-0000-4000-8000-000000000001', 'metadata_extractor', 'media_processor', true, '{}'),
('a0000000-0000-4000-8000-000000000002', 'sidecar_sync', 'sidecar_writer', true, '{}'),
('a0000000-0000-4000-8000-000000000003', 'no_op', 'scheduled_task', true, '{}')
ON CONFLICT (plugin_id) DO NOTHING;
-- Pipeline: extract_metadata → metadata_extractor
INSERT INTO processing_pipelines (pipeline_id, trigger_event, steps)
VALUES (
'b0000000-0000-4000-8000-000000000001',
'extract_metadata',
'[{"plugin_id": "a0000000-0000-4000-8000-000000000001", "step_order": 0, "configuration": {}}]'
)
ON CONFLICT (pipeline_id) DO NOTHING;
-- Pipeline: sync_sidecar → sidecar_sync
INSERT INTO processing_pipelines (pipeline_id, trigger_event, steps)
VALUES (
'b0000000-0000-4000-8000-000000000002',
'sync_sidecar',
'[{"plugin_id": "a0000000-0000-4000-8000-000000000002", "step_order": 0, "configuration": {}}]'
)
ON CONFLICT (pipeline_id) DO NOTHING;

View File

@@ -1,4 +1,4 @@
use crate::helpers::{pg_repo, MapDomainError};
use crate::helpers::{MapDomainError, pg_repo};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use domain::{

View File

@@ -1,10 +1,7 @@
use crate::helpers::{pg_repo, MapDomainError};
use crate::helpers::{MapDomainError, pg_repo};
use async_trait::async_trait;
use domain::{
errors::DomainError,
events::DomainEvent,
ports::EventStore,
value_objects::SystemId,
errors::DomainError, events::DomainEvent, ports::EventStore, value_objects::SystemId,
};
use event_payload::EventPayload;
use uuid::Uuid;
@@ -19,8 +16,9 @@ fn aggregate_id(event: &DomainEvent) -> Uuid {
| DomainEvent::AssetDeleted { asset_id, .. }
| DomainEvent::SidecarSyncRequested { asset_id, .. } => *asset_id.as_uuid(),
DomainEvent::ShareCreated { scope_id, .. }
| DomainEvent::ShareRevoked { scope_id, .. } => *scope_id.as_uuid(),
DomainEvent::ShareCreated { scope_id, .. } | DomainEvent::ShareRevoked { scope_id, .. } => {
*scope_id.as_uuid()
}
DomainEvent::JobEnqueued { job_id, .. }
| DomainEvent::JobCompleted { job_id, .. }
@@ -33,8 +31,8 @@ impl EventStore for PostgresEventStore {
async fn append(&self, event: &DomainEvent) -> Result<(), DomainError> {
let payload = EventPayload::from(event);
let event_type = payload.subject().to_string();
let json = serde_json::to_value(&payload)
.map_err(|e| DomainError::Internal(e.to_string()))?;
let json =
serde_json::to_value(&payload).map_err(|e| DomainError::Internal(e.to_string()))?;
let agg_id = aggregate_id(event);
sqlx::query(

View File

@@ -1,4 +1,4 @@
use crate::helpers::{pg_repo, MapDomainError};
use crate::helpers::{MapDomainError, pg_repo};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use domain::{

View File

@@ -1,4 +1,4 @@
use crate::helpers::{pg_repo, MapDomainError};
use crate::helpers::{MapDomainError, pg_repo};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use domain::{

View File

@@ -1,4 +1,4 @@
use crate::helpers::{pg_repo, MapDomainError};
use crate::helpers::{MapDomainError, pg_repo};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use domain::{

View File

@@ -1,4 +1,4 @@
use crate::helpers::{pg_repo, MapDomainError};
use crate::helpers::{MapDomainError, pg_repo};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use domain::{

View File

@@ -1,4 +1,4 @@
use crate::helpers::{pg_repo, MapDomainError};
use crate::helpers::{MapDomainError, pg_repo};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use domain::{

View File

@@ -1,4 +1,4 @@
use crate::helpers::{pg_repo, MapDomainError};
use crate::helpers::{MapDomainError, pg_repo};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use domain::{

View File

@@ -162,11 +162,8 @@ mod tests {
inner.save(&asset).await.unwrap();
let share_repo = Arc::new(InMemoryShareRepository::new());
let filtered = VisibilityFilteredAssetRepository::new(
inner.clone(),
share_repo.clone(),
owner_id,
);
let filtered =
VisibilityFilteredAssetRepository::new(inner.clone(), share_repo.clone(), owner_id);
let found = filtered.find_by_id(&asset.asset_id).await.unwrap();
assert!(found.is_some());
@@ -183,11 +180,8 @@ mod tests {
inner.save(&asset).await.unwrap();
let share_repo = Arc::new(InMemoryShareRepository::new());
let filtered = VisibilityFilteredAssetRepository::new(
inner.clone(),
share_repo.clone(),
stranger_id,
);
let filtered =
VisibilityFilteredAssetRepository::new(inner.clone(), share_repo.clone(), stranger_id);
let found = filtered.find_by_id(&asset.asset_id).await.unwrap();
assert!(found.is_none());
@@ -211,11 +205,8 @@ mod tests {
let target = target_user(scope.scope_id, friend_id);
share_repo.save_target(&target).await.unwrap();
let filtered = VisibilityFilteredAssetRepository::new(
inner.clone(),
share_repo.clone(),
friend_id,
);
let filtered =
VisibilityFilteredAssetRepository::new(inner.clone(), share_repo.clone(), friend_id);
let found = filtered.find_by_id(&asset.asset_id).await.unwrap();
assert!(found.is_some());
@@ -239,11 +230,8 @@ mod tests {
let share_repo = Arc::new(InMemoryShareRepository::new());
// Stranger queries by checksum — should only see their own
let filtered = VisibilityFilteredAssetRepository::new(
inner.clone(),
share_repo.clone(),
stranger_id,
);
let filtered =
VisibilityFilteredAssetRepository::new(inner.clone(), share_repo.clone(), stranger_id);
let results = filtered
.find_by_checksum(&asset_a.source_reference.checksum)
@@ -262,11 +250,8 @@ mod tests {
inner.save(&asset).await.unwrap();
let share_repo = Arc::new(InMemoryShareRepository::new());
let filtered = VisibilityFilteredAssetRepository::new(
inner.clone(),
share_repo.clone(),
owner_id,
);
let filtered =
VisibilityFilteredAssetRepository::new(inner.clone(), share_repo.clone(), owner_id);
let results = filtered.find_by_owner(&owner_id, 10, 0).await.unwrap();
assert_eq!(results.len(), 1);
@@ -284,11 +269,8 @@ mod tests {
let share_repo = Arc::new(InMemoryShareRepository::new());
// Stranger queries owner's assets without a share — should get nothing
let filtered = VisibilityFilteredAssetRepository::new(
inner.clone(),
share_repo.clone(),
stranger_id,
);
let filtered =
VisibilityFilteredAssetRepository::new(inner.clone(), share_repo.clone(), stranger_id);
let results = filtered.find_by_owner(&owner_id, 10, 0).await.unwrap();
assert!(results.is_empty());

View File

@@ -26,8 +26,9 @@ pub async fn build_app(config: &Config) -> Result<Router> {
Arc::new(event_transport::EventPublisherAdapter::new(transport));
let event_store: Arc<dyn domain::ports::EventStore> =
Arc::new(adapters_postgres::PostgresEventStore::new(pool.clone()));
let event_publisher: Arc<dyn domain::ports::EventPublisher> =
Arc::new(event_transport::CompositeEventPublisher::new(nats_publisher, event_store));
let event_publisher: Arc<dyn domain::ports::EventPublisher> = Arc::new(
event_transport::CompositeEventPublisher::new(nats_publisher, event_store),
);
let storage_path = std::env::var("STORAGE_PATH").unwrap_or_else(|_| "./data/media".to_string());
let file_storage: Arc<LocalFileStorage> = Arc::new(LocalFileStorage::new(&storage_path));
@@ -35,7 +36,8 @@ pub async fn build_app(config: &Config) -> Result<Router> {
// Build per-context services
let identity = services::identity::build(&pool, &config.jwt_secret);
let (storage_repos, storage) = services::storage::build(&pool);
let catalog = services::catalog::build(&pool, &storage_repos, file_storage, event_publisher.clone());
let catalog =
services::catalog::build(&pool, &storage_repos, file_storage, event_publisher.clone());
let organization = services::organization::build(&pool);
let sidecar = services::sidecar::build(&pool);
let processing = services::processing::build(&pool, event_publisher.clone());

View File

@@ -1,8 +1,8 @@
use std::sync::Arc;
use adapters_postgres::{
PgPool, PostgresAssetMetadataRepository, PostgresAssetRepository,
PostgresDuplicateRepository, PostgresIngestTransaction,
PgPool, PostgresAssetMetadataRepository, PostgresAssetRepository, PostgresDuplicateRepository,
PostgresIngestTransaction,
};
use adapters_storage::LocalFileStorage;
use application::catalog::{

View File

@@ -1,11 +1,8 @@
use std::sync::Arc;
use adapters_postgres::{
PgPool, PostgresShareRepository, PostgresVisibilityFilterRepository,
};
use adapters_postgres::{PgPool, PostgresShareRepository, PostgresVisibilityFilterRepository};
use application::sharing::{
AccessSharedResourceHandler, GenerateShareLinkHandler, RevokeShareHandler,
ShareResourceHandler,
AccessSharedResourceHandler, GenerateShareLinkHandler, RevokeShareHandler, ShareResourceHandler,
};
use domain::ports::EventPublisher;
use presentation::state::SharingHandlers;

View File

@@ -19,8 +19,10 @@ pub fn build(pool: &PgPool) -> (StorageRepos, StorageHandlers) {
let ledger_repo = Arc::new(PostgresUsageLedgerRepository::new(pool.clone()));
let register_volume = Arc::new(RegisterVolumeHandler::new(volume_repo.clone()));
let register_library_path =
Arc::new(RegisterLibraryPathHandler::new(volume_repo, path_repo.clone()));
let register_library_path = Arc::new(RegisterLibraryPathHandler::new(
volume_repo,
path_repo.clone(),
));
let check_quota = Arc::new(CheckQuotaHandler::new(quota_repo, ledger_repo));
let handlers = StorageHandlers {

View File

@@ -16,5 +16,8 @@ pub trait EventConsumer: Send + Sync {
#[async_trait]
pub trait EventStore: Send + Sync {
async fn append(&self, event: &DomainEvent) -> Result<(), DomainError>;
async fn query_by_aggregate(&self, aggregate_id: &SystemId) -> Result<Vec<DomainEvent>, DomainError>;
async fn query_by_aggregate(
&self,
aggregate_id: &SystemId,
) -> Result<Vec<DomainEvent>, DomainError>;
}

View File

@@ -73,7 +73,8 @@ pub trait UsageLedgerRepository: Send + Sync {
pub trait IngestTransaction: Send + Sync {
async fn save_asset(&self, asset: &Asset) -> Result<(), DomainError>;
async fn save_session(&self, session: &IngestSession) -> Result<(), DomainError>;
async fn find_quota(&self, owner_id: &SystemId) -> Result<Option<QuotaDefinition>, DomainError>;
async fn find_quota(&self, owner_id: &SystemId)
-> Result<Option<QuotaDefinition>, DomainError>;
async fn sum_usage(
&self,
user_id: &SystemId,

View File

@@ -4,4 +4,4 @@ mod processing;
pub use infra::Repos;
pub use plugins::build_plugin_registry;
pub use processing::build_process_next_handler;
pub use processing::{build_enqueue_handler, build_process_next_handler};

View File

@@ -1,4 +1,4 @@
use application::processing::{ExecutePipelineHandler, ProcessNextJobHandler};
use application::processing::{EnqueueJobHandler, ExecutePipelineHandler, ProcessNextJobHandler};
use domain::ports::{EventPublisher, PluginRegistry};
use std::sync::Arc;
@@ -20,3 +20,10 @@ pub fn build_process_next_handler(
ProcessNextJobHandler::new(repos.job.clone(), execute_pipeline)
}
pub fn build_enqueue_handler(
repos: &Repos,
event_pub: Arc<dyn EventPublisher>,
) -> EnqueueJobHandler {
EnqueueJobHandler::new(repos.job.clone(), event_pub)
}

View File

@@ -4,16 +4,18 @@ use std::time::Duration;
use futures::StreamExt;
use tracing::{error, info, warn};
use application::processing::ProcessNextJobCommand;
use application::processing::{EnqueueJobCommand, ProcessNextJobCommand};
use domain::entities::JobType;
use domain::events::DomainEvent;
use domain::ports::EventConsumer;
use domain::value_objects::StructuredData;
mod config;
mod factories;
mod plugin_registry;
mod plugins;
use factories::{Repos, build_plugin_registry, build_process_next_handler};
use factories::{Repos, build_enqueue_handler, build_plugin_registry, build_process_next_handler};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
@@ -45,17 +47,26 @@ async fn main() -> anyhow::Result<()> {
let pub_transport = adapters_nats::NatsTransport::new(nats_client.clone());
let nats_publisher: Arc<dyn domain::ports::EventPublisher> =
Arc::new(event_transport::EventPublisherAdapter::new(pub_transport));
let event_pub: Arc<dyn domain::ports::EventPublisher> =
Arc::new(event_transport::CompositeEventPublisher::new(nats_publisher, event_store));
let event_pub: Arc<dyn domain::ports::EventPublisher> = Arc::new(
event_transport::CompositeEventPublisher::new(nats_publisher, event_store),
);
let registry = Arc::new(build_plugin_registry(&repos, file_storage, sidecar_writer));
let process_next = Arc::new(build_process_next_handler(&repos, registry, event_pub));
let process_next = Arc::new(build_process_next_handler(
&repos,
registry,
event_pub.clone(),
));
let enqueue = Arc::new(build_enqueue_handler(&repos, event_pub));
// ── Fallback sweep task ────────────────────────────────────────────
let sweep_interval = Duration::from_secs(config.fallback_sweep_secs);
let sweep_handler = Arc::clone(&process_next);
tokio::spawn(async move {
info!(every_secs = config.fallback_sweep_secs, "fallback sweep task started");
info!(
every_secs = config.fallback_sweep_secs,
"fallback sweep task started"
);
loop {
tokio::time::sleep(sweep_interval).await;
info!("fallback sweep: draining queued jobs");
@@ -91,17 +102,45 @@ async fn main() -> anyhow::Result<()> {
};
match &envelope.event {
DomainEvent::AssetIngested { asset_id, .. } => {
info!(asset_id = %asset_id, "event loop: AssetIngested → enqueue ExtractMetadata");
(envelope.ack)();
let cmd = EnqueueJobCommand {
job_type: JobType::ExtractMetadata,
priority: 10,
payload: StructuredData::new(),
target_asset_id: Some(*asset_id),
batch_id: None,
};
if let Err(e) = enqueue.execute(cmd).await {
error!(error = %e, "event loop: failed to enqueue ExtractMetadata");
}
}
DomainEvent::SidecarSyncRequested { asset_id, .. } => {
info!(asset_id = %asset_id, "event loop: SidecarSyncRequested → enqueue SyncSidecar");
(envelope.ack)();
let cmd = EnqueueJobCommand {
job_type: JobType::SyncSidecar,
priority: 5,
payload: StructuredData::new(),
target_asset_id: Some(*asset_id),
batch_id: None,
};
if let Err(e) = enqueue.execute(cmd).await {
error!(error = %e, "event loop: failed to enqueue SyncSidecar");
}
}
DomainEvent::JobEnqueued {
job_id, job_type, ..
} => {
info!(job_id = %job_id, job_type = %job_type, "event loop: JobEnqueued received");
info!(job_id = %job_id, job_type = %job_type, "event loop: JobEnqueued → process");
(envelope.ack)();
match process_next.execute(ProcessNextJobCommand).await {
Ok(Some(job)) => {
info!(job_id = %job.job_id, status = ?job.status, "event loop: processed job");
}
Ok(None) => {
warn!("event loop: JobEnqueued event but no queued job found");
warn!("event loop: JobEnqueued but no queued job found");
}
Err(e) => {
error!(error = %e, "event loop: error processing job");
@@ -109,8 +148,8 @@ async fn main() -> anyhow::Result<()> {
}
}
other => {
info!(event = ?other, "event loop: non-job event, acking");
(envelope.ack)();
tracing::debug!(event = ?other, "event loop: unhandled event, acked");
}
}
}