feat: directory scanner plugin — walk library paths, auto-register assets

- DirectoryScannerPlugin: recursive directory walk via FileStoragePort
- Computes SHA256 checksums, classifies media by extension
- Registers each file via RegisterAssetHandler (triggers AssetIngested → extract_metadata pipeline)
- Reads library_path_id from job payload, looks up volume + path
- Seeded plugin + scan_directory pipeline
- Trigger via POST /jobs with { job_type: "ScanDirectory", payload: { library_path_id: "..." } }
This commit is contained in:
2026-05-31 21:18:23 +02:00
parent ef64e86439
commit 95916cedde
29 changed files with 1461 additions and 7 deletions

View File

@@ -4,7 +4,8 @@ VALUES
('a0000000-0000-4000-8000-000000000001', 'metadata_extractor', 'media_processor', true, '{}'),
('a0000000-0000-4000-8000-000000000002', 'sidecar_sync', 'sidecar_writer', true, '{}'),
('a0000000-0000-4000-8000-000000000003', 'no_op', 'scheduled_task', true, '{}'),
('a0000000-0000-4000-8000-000000000004', 'thumbnail_generator', 'media_processor', true, '{}')
('a0000000-0000-4000-8000-000000000004', 'thumbnail_generator', 'media_processor', true, '{}'),
('a0000000-0000-4000-8000-000000000005', 'directory_scanner', 'media_processor', true, '{}')
ON CONFLICT (plugin_id) DO NOTHING;
-- Pipeline: extract_metadata → metadata_extractor, then thumbnail_generator
@@ -26,6 +27,15 @@ VALUES (
)
ON CONFLICT (pipeline_id) DO NOTHING;
-- Pipeline: scan_directory → directory_scanner
INSERT INTO processing_pipelines (pipeline_id, trigger_event, steps)
VALUES (
'b0000000-0000-4000-8000-000000000004',
'scan_directory',
'[{"plugin_id": "a0000000-0000-4000-8000-000000000005", "step_order": 0, "configuration": {}}]'
)
ON CONFLICT (pipeline_id) DO NOTHING;
-- Pipeline: sync_sidecar → sidecar_sync
INSERT INTO processing_pipelines (pipeline_id, trigger_event, steps)
VALUES (

View File

@@ -19,6 +19,8 @@ adapters-exif = { workspace = true }
adapters-thumbnail = { workspace = true }
adapters-sidecar = { workspace = true }
async-nats = { workspace = true }
sha2 = { workspace = true }
uuid = { workspace = true }
futures = { workspace = true }
tokio = { workspace = true }

View File

@@ -1,7 +1,8 @@
use adapters_postgres::{
PostgresAssetMetadataRepository, PostgresAssetRepository, PostgresDerivativeRepository,
PostgresJobBatchRepository, PostgresJobRepository, PostgresPipelineRepository,
PostgresPluginRepository, PostgresSidecarRepository,
PostgresDuplicateRepository, PostgresJobBatchRepository, PostgresJobRepository,
PostgresLibraryPathRepository, PostgresPipelineRepository, PostgresPluginRepository,
PostgresSidecarRepository, PostgresStorageVolumeRepository,
};
use std::sync::Arc;
@@ -13,7 +14,10 @@ pub struct Repos {
pub asset: Arc<PostgresAssetRepository>,
pub metadata: Arc<PostgresAssetMetadataRepository>,
pub derivative: Arc<PostgresDerivativeRepository>,
pub duplicate: Arc<PostgresDuplicateRepository>,
pub sidecar: Arc<PostgresSidecarRepository>,
pub volume: Arc<PostgresStorageVolumeRepository>,
pub library_path: Arc<PostgresLibraryPathRepository>,
}
impl Repos {
@@ -26,7 +30,10 @@ impl Repos {
asset: Arc::new(PostgresAssetRepository::new(pool.clone())),
metadata: Arc::new(PostgresAssetMetadataRepository::new(pool.clone())),
derivative: Arc::new(PostgresDerivativeRepository::new(pool.clone())),
sidecar: Arc::new(PostgresSidecarRepository::new(pool)),
duplicate: Arc::new(PostgresDuplicateRepository::new(pool.clone())),
sidecar: Arc::new(PostgresSidecarRepository::new(pool.clone())),
volume: Arc::new(PostgresStorageVolumeRepository::new(pool.clone())),
library_path: Arc::new(PostgresLibraryPathRepository::new(pool)),
}
}
}

View File

@@ -1,8 +1,12 @@
use crate::plugin_registry::InMemoryPluginRegistry;
use crate::plugins::{
MetadataExtractorPlugin, NoOpPlugin, SidecarSyncPlugin, ThumbnailGeneratorPlugin,
DirectoryScannerPlugin, MetadataExtractorPlugin, NoOpPlugin, SidecarSyncPlugin,
ThumbnailGeneratorPlugin,
};
use application::catalog::RegisterAssetHandler;
use domain::ports::{
EventPublisher, MetadataExtractorPort, SidecarWriterPort, ThumbnailGeneratorPort,
};
use domain::ports::{MetadataExtractorPort, SidecarWriterPort, ThumbnailGeneratorPort};
use std::sync::Arc;
use super::Repos;
@@ -13,6 +17,7 @@ pub fn build_plugin_registry(
sidecar_writer: Arc<dyn SidecarWriterPort>,
extractor: Arc<dyn MetadataExtractorPort>,
thumbnail_gen: Arc<dyn ThumbnailGeneratorPort>,
event_pub: Arc<dyn EventPublisher>,
) -> InMemoryPluginRegistry {
let mut registry = InMemoryPluginRegistry::new();
@@ -25,11 +30,23 @@ pub fn build_plugin_registry(
)));
registry.register(Arc::new(ThumbnailGeneratorPlugin::new(
repos.asset.clone(),
file_storage,
file_storage.clone(),
repos.derivative.clone(),
thumbnail_gen,
)));
let register_handler = Arc::new(RegisterAssetHandler::new(
repos.asset.clone(),
repos.duplicate.clone(),
event_pub,
));
registry.register(Arc::new(DirectoryScannerPlugin::new(
repos.volume.clone(),
repos.library_path.clone(),
file_storage.clone(),
register_handler,
)));
let export_handler = Arc::new(application::sidecar::ExportSidecarHandler::new(
repos.metadata.clone(),
repos.sidecar.clone(),

View File

@@ -63,6 +63,7 @@ async fn main() -> anyhow::Result<()> {
sidecar_writer,
extractor,
thumbnail_gen,
event_pub.clone(),
));
let process_next = Arc::new(build_process_next_handler(
&repos,

View File

@@ -0,0 +1,181 @@
use application::catalog::{RegisterAssetCommand, RegisterAssetHandler};
use async_trait::async_trait;
use domain::{
catalog::entities::AssetType,
errors::DomainError,
ports::{FileStoragePort, LibraryPathRepository, PluginExecutor, StorageVolumeRepository},
value_objects::{MetadataValue, StructuredData, SystemId},
};
use sha2::{Digest, Sha256};
use std::sync::Arc;
use tracing::{info, warn};
pub struct DirectoryScannerPlugin {
volume_repo: Arc<dyn StorageVolumeRepository>,
path_repo: Arc<dyn LibraryPathRepository>,
file_storage: Arc<dyn FileStoragePort>,
register_handler: Arc<RegisterAssetHandler>,
}
impl DirectoryScannerPlugin {
pub fn new(
volume_repo: Arc<dyn StorageVolumeRepository>,
path_repo: Arc<dyn LibraryPathRepository>,
file_storage: Arc<dyn FileStoragePort>,
register_handler: Arc<RegisterAssetHandler>,
) -> Self {
Self {
volume_repo,
path_repo,
file_storage,
register_handler,
}
}
}
fn classify(filename: &str) -> Option<(AssetType, &'static str)> {
let lower = filename.to_lowercase();
let ext = lower.rsplit('.').next()?;
match ext {
"jpg" | "jpeg" => Some((AssetType::Image, "image/jpeg")),
"png" => Some((AssetType::Image, "image/png")),
"webp" => Some((AssetType::Image, "image/webp")),
"heic" | "heif" => Some((AssetType::Image, "image/heif")),
"gif" => Some((AssetType::Image, "image/gif")),
"tiff" | "tif" => Some((AssetType::Image, "image/tiff")),
"avif" => Some((AssetType::Image, "image/avif")),
"mp4" => Some((AssetType::Video, "video/mp4")),
"mov" => Some((AssetType::Video, "video/quicktime")),
"avi" => Some((AssetType::Video, "video/x-msvideo")),
"mkv" => Some((AssetType::Video, "video/x-matroska")),
_ => None,
}
}
#[async_trait]
impl PluginExecutor for DirectoryScannerPlugin {
fn plugin_name(&self) -> &str {
"directory_scanner"
}
async fn execute(
&self,
_asset_id: Option<SystemId>,
payload: &StructuredData,
config: &StructuredData,
) -> Result<StructuredData, DomainError> {
let path_id_str = payload
.get_string("library_path_id")
.or_else(|| config.get_string("library_path_id"))
.ok_or_else(|| {
DomainError::Validation("directory_scanner requires library_path_id".into())
})?;
let path_uuid = uuid::Uuid::parse_str(path_id_str)
.map_err(|_| DomainError::Validation(format!("invalid UUID: {path_id_str}")))?;
let path_id = SystemId::from_uuid(path_uuid);
let library_path =
self.path_repo.find_by_id(&path_id).await?.ok_or_else(|| {
DomainError::NotFound(format!("LibraryPath {} not found", path_id))
})?;
let volume = self
.volume_repo
.find_by_id(&library_path.volume_id)
.await?
.ok_or_else(|| {
DomainError::NotFound(format!("Volume {} not found", library_path.volume_id))
})?;
let owner_id = library_path.designated_owner_id.ok_or_else(|| {
DomainError::Validation(format!("LibraryPath {} has no designated owner", path_id))
})?;
let scan_root = &library_path.relative_path;
info!(path = scan_root, volume = %volume.volume_name, "scanning directory");
let mut found = 0u64;
let mut registered = 0u64;
let mut skipped = 0u64;
let mut dirs_to_scan = vec![scan_root.to_string()];
while let Some(dir) = dirs_to_scan.pop() {
let entries = match self.file_storage.list_directory(&dir).await {
Ok(e) => e,
Err(e) => {
warn!(dir = dir, error = %e, "failed to list directory, skipping");
continue;
}
};
for entry in entries {
let full_path = if dir.is_empty() {
entry.path.clone()
} else {
format!("{}/{}", dir, entry.path)
};
if entry.is_directory {
dirs_to_scan.push(full_path);
continue;
}
found += 1;
let (asset_type, mime_type) = match classify(&entry.path) {
Some(c) => c,
None => {
skipped += 1;
continue;
}
};
let data = match self.file_storage.read_file(&full_path).await {
Ok(d) => d,
Err(e) => {
warn!(path = full_path, error = %e, "failed to read file, skipping");
skipped += 1;
continue;
}
};
let checksum = format!("{:x}", Sha256::digest(&data));
let cmd = RegisterAssetCommand {
volume_id: library_path.volume_id,
relative_path: full_path.clone(),
checksum,
asset_type,
mime_type: mime_type.to_string(),
file_size: data.len() as u64,
owner_id,
};
match self.register_handler.execute(cmd).await {
Ok((asset, dup)) => {
registered += 1;
if dup.is_some() {
info!(path = full_path, asset_id = %asset.asset_id, "registered (duplicate detected)");
}
}
Err(e) => {
warn!(path = full_path, error = %e, "failed to register asset");
skipped += 1;
}
}
}
}
info!(found, registered, skipped, "directory scan complete");
let mut result = StructuredData::new();
result.insert("files_found", MetadataValue::Integer(found as i64));
result.insert(
"files_registered",
MetadataValue::Integer(registered as i64),
);
result.insert("files_skipped", MetadataValue::Integer(skipped as i64));
Ok(result)
}
}

View File

@@ -1,8 +1,10 @@
pub mod directory_scanner;
pub mod metadata_extractor;
pub mod no_op;
pub mod sidecar_sync;
pub mod thumbnail_generator;
pub use directory_scanner::DirectoryScannerPlugin;
pub use metadata_extractor::MetadataExtractorPlugin;
pub use no_op::NoOpPlugin;
pub use sidecar_sync::SidecarSyncPlugin;