feat: thumbnail generator plugin with configurable size/format

- ThumbnailGeneratorPort in domain (bytes + config → resized bytes)
- adapters-thumbnail: ImageThumbnailGenerator using image crate
- ThumbnailGeneratorPlugin reads width/height/format/profile from step config
- PostgresDerivativeRepository + 012_derivatives migration
- Seeded in extract_metadata pipeline as step 2 (300x300 webp)
- Standalone generate_derivative pipeline for on-demand use
This commit is contained in:
2026-05-31 20:44:55 +02:00
parent 45669ec848
commit 35d5baf7be
15 changed files with 1155 additions and 18 deletions

View File

@@ -1,17 +1,28 @@
-- Default plugins matching worker's InMemoryPluginRegistry
INSERT INTO plugins (plugin_id, name, plugin_type, is_enabled, configuration)
VALUES
('a0000000-0000-4000-8000-000000000001', 'metadata_extractor', 'media_processor', true, '{}'),
('a0000000-0000-4000-8000-000000000002', 'sidecar_sync', 'sidecar_writer', true, '{}'),
('a0000000-0000-4000-8000-000000000003', 'no_op', 'scheduled_task', true, '{}')
('a0000000-0000-4000-8000-000000000001', 'metadata_extractor', 'media_processor', true, '{}'),
('a0000000-0000-4000-8000-000000000002', 'sidecar_sync', 'sidecar_writer', true, '{}'),
('a0000000-0000-4000-8000-000000000003', 'no_op', 'scheduled_task', true, '{}'),
('a0000000-0000-4000-8000-000000000004', 'thumbnail_generator', 'media_processor', true, '{}')
ON CONFLICT (plugin_id) DO NOTHING;
-- Pipeline: extract_metadata → metadata_extractor
-- Pipeline: extract_metadata → metadata_extractor, then thumbnail_generator
INSERT INTO processing_pipelines (pipeline_id, trigger_event, steps)
VALUES (
'b0000000-0000-4000-8000-000000000001',
'extract_metadata',
'[{"plugin_id": "a0000000-0000-4000-8000-000000000001", "step_order": 0, "configuration": {}}]'
'[{"plugin_id": "a0000000-0000-4000-8000-000000000001", "step_order": 0, "configuration": {}},
{"plugin_id": "a0000000-0000-4000-8000-000000000004", "step_order": 1, "configuration": {"width": "300", "height": "300", "format": "webp", "profile": "ThumbnailSquare"}}]'
)
ON CONFLICT (pipeline_id) DO NOTHING;
-- Pipeline: generate_derivative (standalone, configurable per-step)
INSERT INTO processing_pipelines (pipeline_id, trigger_event, steps)
VALUES (
'b0000000-0000-4000-8000-000000000003',
'generate_derivative',
'[{"plugin_id": "a0000000-0000-4000-8000-000000000004", "step_order": 0, "configuration": {"width": "300", "height": "300", "format": "webp", "profile": "ThumbnailSquare"}}]'
)
ON CONFLICT (pipeline_id) DO NOTHING;

View File

@@ -0,0 +1,14 @@
CREATE TABLE derivatives (
derivative_id UUID PRIMARY KEY,
parent_asset_id UUID NOT NULL REFERENCES assets(asset_id),
profile_type TEXT NOT NULL,
storage_path TEXT NOT NULL,
mime_type TEXT NOT NULL DEFAULT '',
file_size BIGINT NOT NULL DEFAULT 0,
width INTEGER NOT NULL DEFAULT 0,
height INTEGER NOT NULL DEFAULT 0,
generation_status TEXT NOT NULL DEFAULT 'pending'
);
CREATE INDEX idx_derivatives_parent ON derivatives(parent_asset_id);
CREATE INDEX idx_derivatives_parent_profile ON derivatives(parent_asset_id, profile_type);

View File

@@ -3,11 +3,12 @@ use async_trait::async_trait;
use chrono::{DateTime, Utc};
use domain::{
entities::{
Asset, AssetMetadata, AssetType, DetectionMethod, DuplicateCandidate, DuplicateGroup,
DuplicateStatus, MetadataSource, SourceReference,
Asset, AssetMetadata, AssetType, DerivativeAsset, DerivativeProfile, DetectionMethod,
DuplicateCandidate, DuplicateGroup, DuplicateStatus, GenerationStatus, MetadataSource,
SourceReference,
},
errors::DomainError,
ports::{AssetMetadataRepository, AssetRepository, DuplicateRepository},
ports::{AssetMetadataRepository, AssetRepository, DerivativeRepository, DuplicateRepository},
value_objects::{Checksum, DateTimeStamp, MetadataValue, StructuredData, SystemId},
};
use uuid::Uuid;
@@ -452,3 +453,147 @@ impl DuplicateRepository for PostgresDuplicateRepository {
Ok(())
}
}
// ── DerivativeRepository ──────────────────────────────────────────────
#[derive(sqlx::FromRow)]
struct DerivativeRow {
derivative_id: Uuid,
parent_asset_id: Uuid,
profile_type: String,
storage_path: String,
mime_type: String,
file_size: i64,
width: i32,
height: i32,
generation_status: String,
}
fn profile_from_str(s: &str) -> DerivativeProfile {
match s {
"thumbnail_square" => DerivativeProfile::ThumbnailSquare,
"thumbnail_large" => DerivativeProfile::ThumbnailLarge,
"web_optimized" => DerivativeProfile::WebOptimized,
"video_sd" => DerivativeProfile::VideoSd,
_ => DerivativeProfile::ThumbnailSquare,
}
}
fn profile_to_str(p: &DerivativeProfile) -> &'static str {
match p {
DerivativeProfile::ThumbnailSquare => "thumbnail_square",
DerivativeProfile::ThumbnailLarge => "thumbnail_large",
DerivativeProfile::WebOptimized => "web_optimized",
DerivativeProfile::VideoSd => "video_sd",
}
}
fn gen_status_from_str(s: &str) -> GenerationStatus {
match s {
"pending" => GenerationStatus::Pending,
"ready" => GenerationStatus::Ready,
"failed" => GenerationStatus::Failed,
_ => GenerationStatus::Pending,
}
}
fn gen_status_to_str(s: &GenerationStatus) -> &'static str {
match s {
GenerationStatus::Pending => "pending",
GenerationStatus::Ready => "ready",
GenerationStatus::Failed => "failed",
}
}
impl From<DerivativeRow> for DerivativeAsset {
fn from(r: DerivativeRow) -> Self {
Self {
derivative_id: SystemId::from_uuid(r.derivative_id),
parent_asset_id: SystemId::from_uuid(r.parent_asset_id),
profile_type: profile_from_str(&r.profile_type),
storage_path: r.storage_path,
mime_type: r.mime_type,
file_size: r.file_size as u64,
dimensions: (r.width as u32, r.height as u32),
generation_status: gen_status_from_str(&r.generation_status),
}
}
}
pg_repo!(PostgresDerivativeRepository);
#[async_trait]
impl DerivativeRepository for PostgresDerivativeRepository {
async fn find_by_asset(
&self,
asset_id: &SystemId,
) -> Result<Vec<DerivativeAsset>, DomainError> {
let rows = sqlx::query_as::<_, DerivativeRow>(
"SELECT derivative_id, parent_asset_id, profile_type, storage_path,
mime_type, file_size, width, height, generation_status
FROM derivatives WHERE parent_asset_id = $1",
)
.bind(*asset_id.as_uuid())
.fetch_all(&self.pool)
.await
.map_pg()?;
Ok(rows.into_iter().map(Into::into).collect())
}
async fn find_by_asset_and_profile(
&self,
asset_id: &SystemId,
profile: DerivativeProfile,
) -> Result<Option<DerivativeAsset>, DomainError> {
let row = sqlx::query_as::<_, DerivativeRow>(
"SELECT derivative_id, parent_asset_id, profile_type, storage_path,
mime_type, file_size, width, height, generation_status
FROM derivatives WHERE parent_asset_id = $1 AND profile_type = $2",
)
.bind(*asset_id.as_uuid())
.bind(profile_to_str(&profile))
.fetch_optional(&self.pool)
.await
.map_pg()?;
Ok(row.map(Into::into))
}
async fn save(&self, d: &DerivativeAsset) -> Result<(), DomainError> {
sqlx::query(
"INSERT INTO derivatives (derivative_id, parent_asset_id, profile_type, storage_path,
mime_type, file_size, width, height, generation_status)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (derivative_id) DO UPDATE SET
storage_path = EXCLUDED.storage_path,
mime_type = EXCLUDED.mime_type,
file_size = EXCLUDED.file_size,
width = EXCLUDED.width,
height = EXCLUDED.height,
generation_status = EXCLUDED.generation_status",
)
.bind(*d.derivative_id.as_uuid())
.bind(*d.parent_asset_id.as_uuid())
.bind(profile_to_str(&d.profile_type))
.bind(&d.storage_path)
.bind(&d.mime_type)
.bind(d.file_size as i64)
.bind(d.dimensions.0 as i32)
.bind(d.dimensions.1 as i32)
.bind(gen_status_to_str(&d.generation_status))
.execute(&self.pool)
.await
.map_pg()?;
Ok(())
}
async fn delete(&self, id: &SystemId) -> Result<(), DomainError> {
sqlx::query("DELETE FROM derivatives WHERE derivative_id = $1")
.bind(*id.as_uuid())
.execute(&self.pool)
.await
.map_pg()?;
Ok(())
}
}