From 957737ac9b0ad98643fe9cf0cef1579c07ffbe01 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Mon, 1 Jun 2026 01:35:43 +0200 Subject: [PATCH] =?UTF-8?q?feat:=20frontend=20MVP=20=E2=80=94=20auth,=20ti?= =?UTF-8?q?meline,=20upload,=20albums,=20admin,=20image=20viewer?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Backend: - user roles (DB + JWT + first-user-is-admin) - volume-aware file resolver (multi-volume asset serving) - directory scanner uses volume URI directly - date-summary endpoint (capture date from EXIF) - timeline ordered by capture date - list endpoints: volumes, plugins, pipelines, library paths - delete endpoints: volumes, library paths - configurable upload body limit (MAX_UPLOAD_BYTES) Frontend: - auth: login/register, token refresh, role-based admin gate - timeline: date-grouped grid, infinite scroll, date scrubber - image viewer: fullscreen zoom/pan/pinch, metadata sidebar - upload: drag-drop, sequential upload, progress tracking - albums: create, add/remove photos, asset picker dialog - admin: storage (import library), jobs (pagination, error details), plugins (list + toggle), pipelines, sidecars, duplicates - multi-select mode with add-to-album action - TanStack Query for all data fetching --- .env.example | 5 + .../postgres/migrations/016_user_roles.sql | 1 + .../017_rename_directory_scanner_plugin.sql | 1 + crates/adapters/postgres/src/catalog/mod.rs | 38 +- crates/adapters/postgres/src/identity/mod.rs | 26 +- .../adapters/postgres/src/processing/mod.rs | 22 + crates/adapters/postgres/src/storage/mod.rs | 14 +- crates/adapters/storage/src/lib.rs | 2 + .../adapters/storage/src/volume_resolver.rs | 91 +++++ crates/api-types/src/responses.rs | 17 + crates/application/src/catalog/mod.rs | 1 + .../src/catalog/queries/get_date_summary.rs | 32 ++ crates/application/src/catalog/queries/mod.rs | 1 + .../src/catalog/queries/read_asset_file.rs | 15 +- crates/application/src/catalog/visibility.rs | 7 + .../src/identity/commands/login_user.rs | 2 +- .../src/identity/commands/refresh_token.rs | 16 +- .../src/identity/commands/register_user.rs | 6 +- crates/application/src/processing/mod.rs | 2 + .../src/processing/queries/list_pipelines.rs | 18 + .../src/processing/queries/list_plugins.rs | 16 + .../application/src/processing/queries/mod.rs | 2 + .../storage/commands/delete_library_path.rs | 16 + .../src/storage/commands/delete_volume.rs | 16 + .../application/src/storage/commands/mod.rs | 2 + crates/application/src/storage/mod.rs | 5 + .../storage/queries/list_all_library_paths.rs | 16 + .../src/storage/queries/list_ingest_paths.rs | 28 ++ .../src/storage/queries/list_volumes.rs | 16 + crates/application/src/storage/queries/mod.rs | 3 + .../application/src/testing/repositories.rs | 31 ++ crates/bootstrap/src/config.rs | 5 + crates/bootstrap/src/factory.rs | 5 +- crates/bootstrap/src/services/catalog.rs | 16 +- crates/bootstrap/src/services/identity.rs | 3 +- crates/bootstrap/src/services/processing.rs | 7 +- crates/bootstrap/src/services/storage.rs | 26 +- crates/domain/src/catalog/ports.rs | 4 + crates/domain/src/identity/entities.rs | 6 + crates/domain/src/identity/ports.rs | 1 + crates/domain/src/processing/ports.rs | 2 + crates/domain/src/storage/ports.rs | 18 + crates/presentation/src/handlers/assets.rs | 26 +- crates/presentation/src/handlers/auth.rs | 2 +- .../presentation/src/handlers/processing.rs | 18 + crates/presentation/src/handlers/storage.rs | 94 ++++- crates/presentation/src/routes/catalog.rs | 1 + crates/presentation/src/routes/processing.rs | 4 +- crates/presentation/src/routes/storage.rs | 18 +- crates/presentation/src/state.rs | 22 +- crates/worker/src/factories/plugins.rs | 7 +- crates/worker/src/main.rs | 2 + .../worker/src/plugins/directory_scanner.rs | 58 ++- .../worker/src/plugins/metadata_extractor.rs | 19 +- .../worker/src/plugins/thumbnail_generator.rs | 12 +- .../app/(app)/admin/duplicates/page.tsx | 131 ++++++ .../app/(app)/admin/jobs/page.tsx | 275 +++++++++++++ k-photos-frontend/app/(app)/admin/layout.tsx | 22 + .../app/(app)/admin/pipelines/page.tsx | 128 ++++++ .../app/(app)/admin/plugins/page.tsx | 141 +++++++ .../app/(app)/admin/sidecars/page.tsx | 178 ++++++++ .../app/(app)/admin/storage/page.tsx | 384 ++++++++++++++++++ .../app/(app)/albums/[id]/page.tsx | 129 ++++++ k-photos-frontend/app/(app)/layout.tsx | 76 ++++ k-photos-frontend/app/(app)/page.tsx | 35 ++ k-photos-frontend/app/(auth)/layout.tsx | 11 + k-photos-frontend/app/(auth)/login/page.tsx | 86 ++++ .../app/(auth)/register/page.tsx | 97 +++++ k-photos-frontend/app/layout.tsx | 25 +- k-photos-frontend/app/page.tsx | 19 - k-photos-frontend/bun.lock | 8 + .../components/add-to-album-dialog.tsx | 111 +++++ .../components/admin-sidebar.tsx | 57 +++ .../components/album-sidebar.tsx | 78 ++++ .../components/asset-picker-dialog.tsx | 103 +++++ .../components/auth-provider.tsx | 81 ++++ .../components/date-scrubber.tsx | 145 +++++++ k-photos-frontend/components/image-viewer.tsx | 183 +++++++++ .../components/metadata-sidebar.tsx | 204 ++++++++++ k-photos-frontend/components/photo-card.tsx | 86 ++++ k-photos-frontend/components/photo-grid.tsx | 180 ++++++++ .../components/query-provider.tsx | 9 + k-photos-frontend/components/ui/calendar.tsx | 1 + k-photos-frontend/components/ui/dialog.tsx | 2 +- .../components/upload-dialog.tsx | 107 +++++ k-photos-frontend/hooks/use-albums.ts | 67 +++ k-photos-frontend/hooks/use-auth.ts | 10 + k-photos-frontend/hooks/use-duplicates.ts | 35 ++ k-photos-frontend/hooks/use-jobs.ts | 89 ++++ k-photos-frontend/hooks/use-pipelines.ts | 30 ++ k-photos-frontend/hooks/use-plugins.ts | 32 ++ k-photos-frontend/hooks/use-sidecars.ts | 71 ++++ k-photos-frontend/hooks/use-storage-admin.ts | 83 ++++ k-photos-frontend/hooks/use-timeline.ts | 46 +++ k-photos-frontend/hooks/use-upload.ts | 90 ++++ k-photos-frontend/lib/api.ts | 62 +++ k-photos-frontend/lib/auth.ts | 31 ++ k-photos-frontend/lib/timeline.ts | 35 ++ k-photos-frontend/lib/types.ts | 158 +++++++ k-photos-frontend/next.config.ts | 11 +- k-photos-frontend/package.json | 2 + 101 files changed, 4679 insertions(+), 109 deletions(-) create mode 100644 crates/adapters/postgres/migrations/016_user_roles.sql create mode 100644 crates/adapters/postgres/migrations/017_rename_directory_scanner_plugin.sql create mode 100644 crates/adapters/storage/src/volume_resolver.rs create mode 100644 crates/application/src/catalog/queries/get_date_summary.rs create mode 100644 crates/application/src/processing/queries/list_pipelines.rs create mode 100644 crates/application/src/processing/queries/list_plugins.rs create mode 100644 crates/application/src/storage/commands/delete_library_path.rs create mode 100644 crates/application/src/storage/commands/delete_volume.rs create mode 100644 crates/application/src/storage/queries/list_all_library_paths.rs create mode 100644 crates/application/src/storage/queries/list_ingest_paths.rs create mode 100644 crates/application/src/storage/queries/list_volumes.rs create mode 100644 k-photos-frontend/app/(app)/admin/duplicates/page.tsx create mode 100644 k-photos-frontend/app/(app)/admin/jobs/page.tsx create mode 100644 k-photos-frontend/app/(app)/admin/layout.tsx create mode 100644 k-photos-frontend/app/(app)/admin/pipelines/page.tsx create mode 100644 k-photos-frontend/app/(app)/admin/plugins/page.tsx create mode 100644 k-photos-frontend/app/(app)/admin/sidecars/page.tsx create mode 100644 k-photos-frontend/app/(app)/admin/storage/page.tsx create mode 100644 k-photos-frontend/app/(app)/albums/[id]/page.tsx create mode 100644 k-photos-frontend/app/(app)/layout.tsx create mode 100644 k-photos-frontend/app/(app)/page.tsx create mode 100644 k-photos-frontend/app/(auth)/layout.tsx create mode 100644 k-photos-frontend/app/(auth)/login/page.tsx create mode 100644 k-photos-frontend/app/(auth)/register/page.tsx delete mode 100644 k-photos-frontend/app/page.tsx create mode 100644 k-photos-frontend/components/add-to-album-dialog.tsx create mode 100644 k-photos-frontend/components/admin-sidebar.tsx create mode 100644 k-photos-frontend/components/album-sidebar.tsx create mode 100644 k-photos-frontend/components/asset-picker-dialog.tsx create mode 100644 k-photos-frontend/components/auth-provider.tsx create mode 100644 k-photos-frontend/components/date-scrubber.tsx create mode 100644 k-photos-frontend/components/image-viewer.tsx create mode 100644 k-photos-frontend/components/metadata-sidebar.tsx create mode 100644 k-photos-frontend/components/photo-card.tsx create mode 100644 k-photos-frontend/components/photo-grid.tsx create mode 100644 k-photos-frontend/components/query-provider.tsx create mode 100644 k-photos-frontend/components/upload-dialog.tsx create mode 100644 k-photos-frontend/hooks/use-albums.ts create mode 100644 k-photos-frontend/hooks/use-auth.ts create mode 100644 k-photos-frontend/hooks/use-duplicates.ts create mode 100644 k-photos-frontend/hooks/use-jobs.ts create mode 100644 k-photos-frontend/hooks/use-pipelines.ts create mode 100644 k-photos-frontend/hooks/use-plugins.ts create mode 100644 k-photos-frontend/hooks/use-sidecars.ts create mode 100644 k-photos-frontend/hooks/use-storage-admin.ts create mode 100644 k-photos-frontend/hooks/use-timeline.ts create mode 100644 k-photos-frontend/hooks/use-upload.ts create mode 100644 k-photos-frontend/lib/api.ts create mode 100644 k-photos-frontend/lib/auth.ts create mode 100644 k-photos-frontend/lib/timeline.ts create mode 100644 k-photos-frontend/lib/types.ts diff --git a/.env.example b/.env.example index cd71724..40d05be 100644 --- a/.env.example +++ b/.env.example @@ -34,6 +34,11 @@ CORS_ALLOWED_ORIGINS=http://localhost:8000,http://localhost:5173 # ============================================================================ STORAGE_PATH=./data/media +# ============================================================================ +# Uploads (default 256 MiB) +# ============================================================================ +# MAX_UPLOAD_BYTES=268435456 + # ============================================================================ # Logging # ============================================================================ diff --git a/crates/adapters/postgres/migrations/016_user_roles.sql b/crates/adapters/postgres/migrations/016_user_roles.sql new file mode 100644 index 0000000..ea2f609 --- /dev/null +++ b/crates/adapters/postgres/migrations/016_user_roles.sql @@ -0,0 +1 @@ +ALTER TABLE users ADD COLUMN role TEXT NOT NULL DEFAULT 'user'; diff --git a/crates/adapters/postgres/migrations/017_rename_directory_scanner_plugin.sql b/crates/adapters/postgres/migrations/017_rename_directory_scanner_plugin.sql new file mode 100644 index 0000000..525b700 --- /dev/null +++ b/crates/adapters/postgres/migrations/017_rename_directory_scanner_plugin.sql @@ -0,0 +1 @@ +UPDATE plugins SET name = 'scan_directory' WHERE plugin_id = 'a0000000-0000-4000-8000-000000000005'; diff --git a/crates/adapters/postgres/src/catalog/mod.rs b/crates/adapters/postgres/src/catalog/mod.rs index 1be9a41..1ee51ff 100644 --- a/crates/adapters/postgres/src/catalog/mod.rs +++ b/crates/adapters/postgres/src/catalog/mod.rs @@ -189,10 +189,16 @@ impl AssetRepository for PostgresAssetRepository { offset: u32, ) -> Result, DomainError> { let rows = sqlx::query_as::<_, AssetRow>( - "SELECT asset_id, volume_id, relative_path, checksum, asset_type, mime_type, - file_size, is_processed, owner_user_id, created_at - FROM assets WHERE owner_user_id = $1 - ORDER BY created_at DESC + "SELECT a.asset_id, a.volume_id, a.relative_path, a.checksum, a.asset_type, a.mime_type, + a.file_size, a.is_processed, a.owner_user_id, a.created_at + FROM assets a + LEFT JOIN asset_metadata am + ON am.asset_id = a.asset_id AND am.metadata_source = 'exif_extracted' + WHERE a.owner_user_id = $1 + ORDER BY COALESCE( + (am.data->>'DateTimeOriginal')::timestamptz, + a.created_at + ) DESC LIMIT $2 OFFSET $3", ) .bind(*owner_id.as_uuid()) @@ -296,6 +302,30 @@ impl AssetRepository for PostgresAssetRepository { Ok(count as u64) } + async fn date_summary( + &self, + owner_id: &SystemId, + ) -> Result, DomainError> { + let rows: Vec<(chrono::NaiveDate, i64)> = sqlx::query_as( + "SELECT COALESCE( + (am.data->>'DateTimeOriginal')::timestamptz, + a.created_at + )::date AS day, + COUNT(*) AS cnt + FROM assets a + LEFT JOIN asset_metadata am + ON am.asset_id = a.asset_id AND am.metadata_source = 'exif_extracted' + WHERE a.owner_user_id = $1 + GROUP BY day ORDER BY day DESC", + ) + .bind(*owner_id.as_uuid()) + .fetch_all(&self.pool) + .await + .map_pg()?; + + Ok(rows.into_iter().map(|(d, c)| (d, c as u64)).collect()) + } + async fn save(&self, asset: &Asset) -> Result<(), DomainError> { sqlx::query( "INSERT INTO assets (asset_id, volume_id, relative_path, checksum, asset_type, diff --git a/crates/adapters/postgres/src/identity/mod.rs b/crates/adapters/postgres/src/identity/mod.rs index dc031ab..1a561c1 100644 --- a/crates/adapters/postgres/src/identity/mod.rs +++ b/crates/adapters/postgres/src/identity/mod.rs @@ -15,6 +15,7 @@ struct UserRow { username: String, email: String, password_hash: String, + role: String, created_at: DateTime, } @@ -26,6 +27,7 @@ impl TryFrom for domain::entities::User { username: r.username, email: Email::new(r.email)?, password_hash: PasswordHash::from_hash(r.password_hash), + role: r.role, created_at: r.created_at, }) } @@ -40,7 +42,7 @@ impl UserRepository for PostgresUserRepository { id: &SystemId, ) -> Result, DomainError> { let row = sqlx::query_as::<_, UserRow>( - "SELECT id, username, email, password_hash, created_at FROM users WHERE id = $1", + "SELECT id, username, email, password_hash, role, created_at FROM users WHERE id = $1", ) .bind(*id.as_uuid()) .fetch_optional(&self.pool) @@ -55,7 +57,7 @@ impl UserRepository for PostgresUserRepository { email: &Email, ) -> Result, DomainError> { let row = sqlx::query_as::<_, UserRow>( - "SELECT id, username, email, password_hash, created_at FROM users WHERE email = $1", + "SELECT id, username, email, password_hash, role, created_at FROM users WHERE email = $1", ) .bind(email.as_str()) .fetch_optional(&self.pool) @@ -70,7 +72,7 @@ impl UserRepository for PostgresUserRepository { username: &str, ) -> Result, DomainError> { let row = sqlx::query_as::<_, UserRow>( - "SELECT id, username, email, password_hash, created_at FROM users WHERE username = $1", + "SELECT id, username, email, password_hash, role, created_at FROM users WHERE username = $1", ) .bind(username) .fetch_optional(&self.pool) @@ -82,18 +84,20 @@ impl UserRepository for PostgresUserRepository { async fn save(&self, user: &domain::entities::User) -> Result<(), DomainError> { sqlx::query_as::<_, UserRow>( - "INSERT INTO users (id, username, email, password_hash, created_at) - VALUES ($1, $2, $3, $4, $5) + "INSERT INTO users (id, username, email, password_hash, role, created_at) + VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (id) DO UPDATE SET username = EXCLUDED.username, email = EXCLUDED.email, - password_hash = EXCLUDED.password_hash - RETURNING id, username, email, password_hash, created_at", + password_hash = EXCLUDED.password_hash, + role = EXCLUDED.role + RETURNING id, username, email, password_hash, role, created_at", ) .bind(*user.id.as_uuid()) .bind(&user.username) .bind(user.email.as_str()) .bind(user.password_hash.as_str()) + .bind(&user.role) .bind(user.created_at) .fetch_one(&self.pool) .await @@ -109,6 +113,14 @@ impl UserRepository for PostgresUserRepository { .map_pg()?; Ok(()) } + + async fn count(&self) -> Result { + let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM users") + .fetch_one(&self.pool) + .await + .map_pg()?; + Ok(count as u64) + } } // --- PostgresRefreshTokenRepository --- diff --git a/crates/adapters/postgres/src/processing/mod.rs b/crates/adapters/postgres/src/processing/mod.rs index 7cf6845..c54b374 100644 --- a/crates/adapters/postgres/src/processing/mod.rs +++ b/crates/adapters/postgres/src/processing/mod.rs @@ -405,6 +405,17 @@ impl PluginRepository for PostgresPluginRepository { Ok(row.map(Into::into)) } + async fn find_all(&self) -> Result, DomainError> { + let rows = sqlx::query_as::<_, PluginRow>( + "SELECT plugin_id, name, plugin_type, is_enabled, configuration FROM plugins", + ) + .fetch_all(&self.pool) + .await + .map_pg()?; + + Ok(rows.into_iter().map(Into::into).collect()) + } + async fn find_enabled(&self) -> Result, DomainError> { let rows = sqlx::query_as::<_, PluginRow>( "SELECT plugin_id, name, plugin_type, is_enabled, configuration @@ -521,6 +532,17 @@ impl PipelineRepository for PostgresPipelineRepository { Ok(row.map(Into::into)) } + async fn find_all(&self) -> Result, DomainError> { + let rows = sqlx::query_as::<_, PipelineRow>( + "SELECT pipeline_id, trigger_event, steps FROM processing_pipelines", + ) + .fetch_all(&self.pool) + .await + .map_pg()?; + + Ok(rows.into_iter().map(Into::into).collect()) + } + async fn find_by_trigger(&self, event: &str) -> Result, DomainError> { let rows = sqlx::query_as::<_, PipelineRow>( "SELECT pipeline_id, trigger_event, steps diff --git a/crates/adapters/postgres/src/storage/mod.rs b/crates/adapters/postgres/src/storage/mod.rs index 1e12824..b848722 100644 --- a/crates/adapters/postgres/src/storage/mod.rs +++ b/crates/adapters/postgres/src/storage/mod.rs @@ -160,6 +160,18 @@ impl LibraryPathRepository for PostgresLibraryPathRepository { Ok(row.map(Into::into)) } + async fn find_all(&self) -> Result, DomainError> { + let rows = sqlx::query_as::<_, LibraryPathRow>( + "SELECT path_id, volume_id, relative_path, is_ingest_destination, ownership_policy, designated_owner_id + FROM library_paths", + ) + .fetch_all(&self.pool) + .await + .map_pg()?; + + Ok(rows.into_iter().map(Into::into).collect()) + } + async fn find_by_volume(&self, volume_id: &SystemId) -> Result, DomainError> { let rows = sqlx::query_as::<_, LibraryPathRow>( "SELECT path_id, volume_id, relative_path, is_ingest_destination, ownership_policy, designated_owner_id @@ -180,7 +192,7 @@ impl LibraryPathRepository for PostgresLibraryPathRepository { let rows = sqlx::query_as::<_, LibraryPathRow>( "SELECT path_id, volume_id, relative_path, is_ingest_destination, ownership_policy, designated_owner_id FROM library_paths - WHERE is_ingest_destination = true AND designated_owner_id = $1", + WHERE is_ingest_destination = true AND (designated_owner_id = $1 OR designated_owner_id IS NULL)", ) .bind(*owner_id.as_uuid()) .fetch_all(&self.pool) diff --git a/crates/adapters/storage/src/lib.rs b/crates/adapters/storage/src/lib.rs index 47ddeaf..b8ae7cf 100644 --- a/crates/adapters/storage/src/lib.rs +++ b/crates/adapters/storage/src/lib.rs @@ -1,7 +1,9 @@ pub mod adapter; pub mod config; pub mod local_file_storage; +pub mod volume_resolver; pub use adapter::ObjectStorageAdapter; pub use config::{StorageConfig, build_store}; pub use local_file_storage::LocalFileStorage; +pub use volume_resolver::LocalVolumeFileResolver; diff --git a/crates/adapters/storage/src/volume_resolver.rs b/crates/adapters/storage/src/volume_resolver.rs new file mode 100644 index 0000000..a00f11c --- /dev/null +++ b/crates/adapters/storage/src/volume_resolver.rs @@ -0,0 +1,91 @@ +use async_trait::async_trait; +use bytes::Bytes; +use domain::{ + errors::DomainError, + ports::{DataStream, StorageVolumeRepository, VolumeFileResolver}, + value_objects::SystemId, +}; +use futures::StreamExt; +use std::path::PathBuf; +use std::sync::Arc; +use tokio_util::io::ReaderStream; + +pub struct LocalVolumeFileResolver { + volume_repo: Arc, +} + +impl LocalVolumeFileResolver { + pub fn new(volume_repo: Arc) -> Self { + Self { volume_repo } + } + + async fn resolve_path( + &self, + volume_id: &SystemId, + relative_path: &str, + ) -> Result { + let volume = self + .volume_repo + .find_by_id(volume_id) + .await? + .ok_or_else(|| DomainError::NotFound(format!("Volume {} not found", volume_id)))?; + + let base = volume + .uri_prefix + .strip_prefix("file://") + .unwrap_or(&volume.uri_prefix); + + let full = if relative_path.is_empty() { + PathBuf::from(base) + } else { + PathBuf::from(base).join(relative_path) + }; + + Ok(full) + } +} + +#[async_trait] +impl VolumeFileResolver for LocalVolumeFileResolver { + async fn open_by_volume( + &self, + volume_id: &SystemId, + relative_path: &str, + ) -> Result<(DataStream, u64), DomainError> { + let full = self.resolve_path(volume_id, relative_path).await?; + let meta = tokio::fs::metadata(&full) + .await + .map_err(|e| match e.kind() { + std::io::ErrorKind::NotFound => { + DomainError::NotFound(full.display().to_string()) + } + _ => DomainError::Internal(format!("Failed to stat file: {e}")), + })?; + let file = tokio::fs::File::open(&full) + .await + .map_err(|e| match e.kind() { + std::io::ErrorKind::NotFound => { + DomainError::NotFound(full.display().to_string()) + } + _ => DomainError::Internal(format!("Failed to open file: {e}")), + })?; + let stream = ReaderStream::new(file) + .map(|r| r.map_err(|e| DomainError::Internal(format!("Read error: {e}")))); + Ok((Box::pin(stream), meta.len())) + } + + async fn read_by_volume( + &self, + volume_id: &SystemId, + relative_path: &str, + ) -> Result { + let full = self.resolve_path(volume_id, relative_path).await?; + let data = tokio::fs::read(&full).await.map_err(|e| match e.kind() { + std::io::ErrorKind::NotFound => { + DomainError::NotFound(full.display().to_string()) + } + _ => DomainError::Internal(format!("Failed to read file: {e}")), + })?; + Ok(Bytes::from(data)) + } +} diff --git a/crates/api-types/src/responses.rs b/crates/api-types/src/responses.rs index 033db74..bb31913 100644 --- a/crates/api-types/src/responses.rs +++ b/crates/api-types/src/responses.rs @@ -6,6 +6,7 @@ pub struct UserResponse { pub id: Uuid, pub username: String, pub email: String, + pub role: String, pub created_at: DateTime, } @@ -22,6 +23,7 @@ impl UserResponse { id: *user.id.as_uuid(), username: user.username.clone(), email: user.email.to_string(), + role: user.role.clone(), created_at: user.created_at, } } @@ -34,6 +36,7 @@ pub struct AlbumResponse { pub description: String, pub creator_id: Uuid, pub asset_count: usize, + pub asset_ids: Vec, pub created_at: DateTime, } @@ -45,6 +48,7 @@ impl AlbumResponse { description: album.description.clone(), creator_id: *album.creator_user_id.as_uuid(), asset_count: album.asset_count(), + asset_ids: album.entries.iter().map(|e| *e.asset_id.as_uuid()).collect(), created_at: *album.created_at.as_datetime(), } } @@ -84,6 +88,17 @@ impl AssetResponse { } } +#[derive(Debug, serde::Serialize, utoipa::ToSchema)] +pub struct DateSummaryResponse { + pub dates: Vec, +} + +#[derive(Debug, serde::Serialize, utoipa::ToSchema)] +pub struct DateCountEntry { + pub date: String, + pub count: u64, +} + #[derive(Debug, serde::Serialize, utoipa::ToSchema)] pub struct TimelineResponse { pub assets: Vec, @@ -349,6 +364,7 @@ pub struct JobResponse { pub status: String, pub priority: u32, pub created_at: DateTime, + pub error_message: Option, } impl JobResponse { @@ -359,6 +375,7 @@ impl JobResponse { status: format!("{:?}", job.status), priority: job.priority, created_at: *job.created_at.as_datetime(), + error_message: job.error_message.clone(), } } } diff --git a/crates/application/src/catalog/mod.rs b/crates/application/src/catalog/mod.rs index d717182..cce4783 100644 --- a/crates/application/src/catalog/mod.rs +++ b/crates/application/src/catalog/mod.rs @@ -13,6 +13,7 @@ pub use commands::resolve_duplicate::{ }; pub use commands::update_metadata::{UpdateMetadataCommand, UpdateMetadataHandler}; pub use queries::get_asset::{GetAssetHandler, GetAssetQuery}; +pub use queries::get_date_summary::{DateSummaryEntry, GetDateSummaryHandler, GetDateSummaryQuery}; pub use queries::get_stack::{GetStackHandler, GetStackQuery}; pub use queries::get_timeline::{GetTimelineHandler, GetTimelineQuery, TimelineResult}; pub use queries::list_stacks::{ListStacksHandler, ListStacksQuery}; diff --git a/crates/application/src/catalog/queries/get_date_summary.rs b/crates/application/src/catalog/queries/get_date_summary.rs new file mode 100644 index 0000000..f59128d --- /dev/null +++ b/crates/application/src/catalog/queries/get_date_summary.rs @@ -0,0 +1,32 @@ +use domain::{errors::DomainError, ports::AssetRepository, value_objects::SystemId}; +use std::sync::Arc; + +pub struct GetDateSummaryQuery { + pub owner_id: SystemId, +} + +pub struct DateSummaryEntry { + pub date: chrono::NaiveDate, + pub count: u64, +} + +pub struct GetDateSummaryHandler { + asset_repo: Arc, +} + +impl GetDateSummaryHandler { + pub fn new(asset_repo: Arc) -> Self { + Self { asset_repo } + } + + pub async fn execute( + &self, + query: GetDateSummaryQuery, + ) -> Result, DomainError> { + let rows = self.asset_repo.date_summary(&query.owner_id).await?; + Ok(rows + .into_iter() + .map(|(date, count)| DateSummaryEntry { date, count }) + .collect()) + } +} diff --git a/crates/application/src/catalog/queries/mod.rs b/crates/application/src/catalog/queries/mod.rs index a64f9af..bed08f7 100644 --- a/crates/application/src/catalog/queries/mod.rs +++ b/crates/application/src/catalog/queries/mod.rs @@ -1,4 +1,5 @@ pub mod get_asset; +pub mod get_date_summary; pub mod get_stack; pub mod get_timeline; pub mod list_stacks; diff --git a/crates/application/src/catalog/queries/read_asset_file.rs b/crates/application/src/catalog/queries/read_asset_file.rs index 9fc8690..b2ba704 100644 --- a/crates/application/src/catalog/queries/read_asset_file.rs +++ b/crates/application/src/catalog/queries/read_asset_file.rs @@ -1,6 +1,6 @@ use domain::{ errors::DomainError, - ports::{AssetRepository, DataStream, FileStoragePort}, + ports::{AssetRepository, DataStream, VolumeFileResolver}, value_objects::SystemId, }; use std::sync::Arc; @@ -20,17 +20,17 @@ pub struct AssetFileResult { pub struct ReadAssetFileHandler { asset_repo: Arc, - file_storage: Arc, + volume_resolver: Arc, } impl ReadAssetFileHandler { pub fn new( asset_repo: Arc, - file_storage: Arc, + volume_resolver: Arc, ) -> Self { Self { asset_repo, - file_storage, + volume_resolver, } } @@ -46,8 +46,11 @@ impl ReadAssetFileHandler { } let (stream, size) = self - .file_storage - .open_file(&asset.source_reference.relative_path) + .volume_resolver + .open_by_volume( + &asset.source_reference.volume_id, + &asset.source_reference.relative_path, + ) .await?; let filename = asset diff --git a/crates/application/src/catalog/visibility.rs b/crates/application/src/catalog/visibility.rs index 4f8e770..aae852e 100644 --- a/crates/application/src/catalog/visibility.rs +++ b/crates/application/src/catalog/visibility.rs @@ -134,6 +134,13 @@ impl AssetRepository for VisibilityFilteredAssetRepository { self.inner.count_search(owner_id, filters).await } + async fn date_summary( + &self, + owner_id: &SystemId, + ) -> Result, DomainError> { + self.inner.date_summary(owner_id).await + } + async fn save(&self, asset: &Asset) -> Result<(), DomainError> { self.inner.save(asset).await } diff --git a/crates/application/src/identity/commands/login_user.rs b/crates/application/src/identity/commands/login_user.rs index 1908354..10cd593 100644 --- a/crates/application/src/identity/commands/login_user.rs +++ b/crates/application/src/identity/commands/login_user.rs @@ -52,7 +52,7 @@ impl LoginUserHandler { if !valid { return Err(DomainError::Unauthorized("Invalid credentials".to_string())); } - let access_token = self.issuer.issue(&user.id, "user").await?; + let access_token = self.issuer.issue(&user.id, &user.role).await?; let (raw_refresh, _) = generate_refresh_token(&self.refresh_repo, &user.id).await?; Ok((user, access_token, raw_refresh)) } diff --git a/crates/application/src/identity/commands/refresh_token.rs b/crates/application/src/identity/commands/refresh_token.rs index 72c6f8d..6520f36 100644 --- a/crates/application/src/identity/commands/refresh_token.rs +++ b/crates/application/src/identity/commands/refresh_token.rs @@ -1,7 +1,7 @@ use super::login_user::generate_refresh_token; use domain::{ errors::DomainError, - ports::{RefreshTokenRepository, TokenIssuer}, + ports::{RefreshTokenRepository, TokenIssuer, UserRepository}, }; use sha2::{Digest, Sha256}; use std::sync::Arc; @@ -13,16 +13,19 @@ pub struct RefreshTokenCommand { pub struct RefreshTokenHandler { refresh_repo: Arc, + user_repo: Arc, issuer: Arc, } impl RefreshTokenHandler { pub fn new( refresh_repo: Arc, + user_repo: Arc, issuer: Arc, ) -> Self { Self { refresh_repo, + user_repo, issuer, } } @@ -42,11 +45,16 @@ impl RefreshTokenHandler { )); } - // Rotation: delete old, issue new pair + let user = self + .user_repo + .find_by_id(&token.user_id) + .await? + .ok_or_else(|| DomainError::NotFound("User not found".to_string()))?; + self.refresh_repo.delete(&token.token_id).await?; - let access_token = self.issuer.issue(&token.user_id, "user").await?; - let (raw_refresh, _) = generate_refresh_token(&self.refresh_repo, &token.user_id).await?; + let access_token = self.issuer.issue(&user.id, &user.role).await?; + let (raw_refresh, _) = generate_refresh_token(&self.refresh_repo, &user.id).await?; Ok((access_token, raw_refresh)) } diff --git a/crates/application/src/identity/commands/register_user.rs b/crates/application/src/identity/commands/register_user.rs index e2ec439..00b4784 100644 --- a/crates/application/src/identity/commands/register_user.rs +++ b/crates/application/src/identity/commands/register_user.rs @@ -53,7 +53,11 @@ impl RegisterUserHandler { ))); } let hash = self.hasher.hash(&cmd.password).await?; - let user = User::new(&cmd.username, email, hash); + let is_first = self.user_repo.count().await? == 0; + let mut user = User::new(&cmd.username, email, hash); + if is_first { + user.role = "admin".to_string(); + } self.user_repo.save(&user).await?; Ok(user) } diff --git a/crates/application/src/processing/mod.rs b/crates/application/src/processing/mod.rs index 4614268..3799b43 100644 --- a/crates/application/src/processing/mod.rs +++ b/crates/application/src/processing/mod.rs @@ -12,6 +12,8 @@ pub use commands::manage_plugin::{ManagePluginCommand, ManagePluginHandler, Plug pub use commands::process_next_job::{ProcessNextJobCommand, ProcessNextJobHandler}; pub use commands::start_job::{StartJobCommand, StartJobHandler}; pub use queries::list_jobs::{JobListResult, ListJobsHandler, ListJobsQuery}; +pub use queries::list_pipelines::ListPipelinesHandler; +pub use queries::list_plugins::ListPluginsHandler; pub use queries::report_batch_progress::{ BatchProgress, ReportBatchProgressHandler, ReportBatchProgressQuery, }; diff --git a/crates/application/src/processing/queries/list_pipelines.rs b/crates/application/src/processing/queries/list_pipelines.rs new file mode 100644 index 0000000..02f04e4 --- /dev/null +++ b/crates/application/src/processing/queries/list_pipelines.rs @@ -0,0 +1,18 @@ +use domain::{ + entities::ProcessingPipeline, errors::DomainError, ports::PipelineRepository, +}; +use std::sync::Arc; + +pub struct ListPipelinesHandler { + repo: Arc, +} + +impl ListPipelinesHandler { + pub fn new(repo: Arc) -> Self { + Self { repo } + } + + pub async fn execute(&self) -> Result, DomainError> { + self.repo.find_all().await + } +} diff --git a/crates/application/src/processing/queries/list_plugins.rs b/crates/application/src/processing/queries/list_plugins.rs new file mode 100644 index 0000000..ea0313d --- /dev/null +++ b/crates/application/src/processing/queries/list_plugins.rs @@ -0,0 +1,16 @@ +use domain::{entities::Plugin, errors::DomainError, ports::PluginRepository}; +use std::sync::Arc; + +pub struct ListPluginsHandler { + repo: Arc, +} + +impl ListPluginsHandler { + pub fn new(repo: Arc) -> Self { + Self { repo } + } + + pub async fn execute(&self) -> Result, DomainError> { + self.repo.find_all().await + } +} diff --git a/crates/application/src/processing/queries/mod.rs b/crates/application/src/processing/queries/mod.rs index 14fb23b..289710c 100644 --- a/crates/application/src/processing/queries/mod.rs +++ b/crates/application/src/processing/queries/mod.rs @@ -1,2 +1,4 @@ pub mod list_jobs; +pub mod list_pipelines; +pub mod list_plugins; pub mod report_batch_progress; diff --git a/crates/application/src/storage/commands/delete_library_path.rs b/crates/application/src/storage/commands/delete_library_path.rs new file mode 100644 index 0000000..e071925 --- /dev/null +++ b/crates/application/src/storage/commands/delete_library_path.rs @@ -0,0 +1,16 @@ +use domain::{errors::DomainError, ports::LibraryPathRepository, value_objects::SystemId}; +use std::sync::Arc; + +pub struct DeleteLibraryPathHandler { + repo: Arc, +} + +impl DeleteLibraryPathHandler { + pub fn new(repo: Arc) -> Self { + Self { repo } + } + + pub async fn execute(&self, id: SystemId) -> Result<(), DomainError> { + self.repo.delete(&id).await + } +} diff --git a/crates/application/src/storage/commands/delete_volume.rs b/crates/application/src/storage/commands/delete_volume.rs new file mode 100644 index 0000000..96c4c73 --- /dev/null +++ b/crates/application/src/storage/commands/delete_volume.rs @@ -0,0 +1,16 @@ +use domain::{errors::DomainError, ports::StorageVolumeRepository, value_objects::SystemId}; +use std::sync::Arc; + +pub struct DeleteVolumeHandler { + repo: Arc, +} + +impl DeleteVolumeHandler { + pub fn new(repo: Arc) -> Self { + Self { repo } + } + + pub async fn execute(&self, id: SystemId) -> Result<(), DomainError> { + self.repo.delete(&id).await + } +} diff --git a/crates/application/src/storage/commands/mod.rs b/crates/application/src/storage/commands/mod.rs index 855b5c7..7f15663 100644 --- a/crates/application/src/storage/commands/mod.rs +++ b/crates/application/src/storage/commands/mod.rs @@ -1,3 +1,5 @@ +pub mod delete_library_path; +pub mod delete_volume; pub mod ingest_asset; pub mod register_library_path; pub mod register_volume; diff --git a/crates/application/src/storage/mod.rs b/crates/application/src/storage/mod.rs index 9952e8c..b9bd5c3 100644 --- a/crates/application/src/storage/mod.rs +++ b/crates/application/src/storage/mod.rs @@ -1,7 +1,12 @@ pub mod commands; pub mod queries; +pub use commands::delete_library_path::DeleteLibraryPathHandler; +pub use commands::delete_volume::DeleteVolumeHandler; pub use commands::ingest_asset::{IngestAssetCommand, IngestAssetHandler}; pub use commands::register_library_path::{RegisterLibraryPathCommand, RegisterLibraryPathHandler}; pub use commands::register_volume::{RegisterVolumeCommand, RegisterVolumeHandler}; pub use queries::check_quota::{CheckQuotaHandler, CheckQuotaQuery}; +pub use queries::list_all_library_paths::ListAllLibraryPathsHandler; +pub use queries::list_ingest_paths::{ListIngestPathsHandler, ListIngestPathsQuery}; +pub use queries::list_volumes::ListVolumesHandler; diff --git a/crates/application/src/storage/queries/list_all_library_paths.rs b/crates/application/src/storage/queries/list_all_library_paths.rs new file mode 100644 index 0000000..669b531 --- /dev/null +++ b/crates/application/src/storage/queries/list_all_library_paths.rs @@ -0,0 +1,16 @@ +use domain::{entities::LibraryPath, errors::DomainError, ports::LibraryPathRepository}; +use std::sync::Arc; + +pub struct ListAllLibraryPathsHandler { + repo: Arc, +} + +impl ListAllLibraryPathsHandler { + pub fn new(repo: Arc) -> Self { + Self { repo } + } + + pub async fn execute(&self) -> Result, DomainError> { + self.repo.find_all().await + } +} diff --git a/crates/application/src/storage/queries/list_ingest_paths.rs b/crates/application/src/storage/queries/list_ingest_paths.rs new file mode 100644 index 0000000..c2eea9c --- /dev/null +++ b/crates/application/src/storage/queries/list_ingest_paths.rs @@ -0,0 +1,28 @@ +use domain::{ + entities::LibraryPath, + errors::DomainError, + ports::LibraryPathRepository, + value_objects::SystemId, +}; +use std::sync::Arc; + +pub struct ListIngestPathsQuery { + pub user_id: SystemId, +} + +pub struct ListIngestPathsHandler { + repo: Arc, +} + +impl ListIngestPathsHandler { + pub fn new(repo: Arc) -> Self { + Self { repo } + } + + pub async fn execute( + &self, + query: ListIngestPathsQuery, + ) -> Result, DomainError> { + self.repo.find_ingest_destinations(&query.user_id).await + } +} diff --git a/crates/application/src/storage/queries/list_volumes.rs b/crates/application/src/storage/queries/list_volumes.rs new file mode 100644 index 0000000..a0167ae --- /dev/null +++ b/crates/application/src/storage/queries/list_volumes.rs @@ -0,0 +1,16 @@ +use domain::{entities::StorageVolume, errors::DomainError, ports::StorageVolumeRepository}; +use std::sync::Arc; + +pub struct ListVolumesHandler { + repo: Arc, +} + +impl ListVolumesHandler { + pub fn new(repo: Arc) -> Self { + Self { repo } + } + + pub async fn execute(&self) -> Result, DomainError> { + self.repo.find_all().await + } +} diff --git a/crates/application/src/storage/queries/mod.rs b/crates/application/src/storage/queries/mod.rs index 384f573..3822711 100644 --- a/crates/application/src/storage/queries/mod.rs +++ b/crates/application/src/storage/queries/mod.rs @@ -1 +1,4 @@ pub mod check_quota; +pub mod list_all_library_paths; +pub mod list_ingest_paths; +pub mod list_volumes; diff --git a/crates/application/src/testing/repositories.rs b/crates/application/src/testing/repositories.rs index 4a436f0..48d50db 100644 --- a/crates/application/src/testing/repositories.rs +++ b/crates/application/src/testing/repositories.rs @@ -103,6 +103,10 @@ impl UserRepository for InMemoryUserRepository { self.users.lock().await.remove(&id.to_string()); Ok(()) } + + async fn count(&self) -> Result { + Ok(self.users.lock().await.len() as u64) + } } in_memory_repo!(InMemoryAssetRepository, Asset); @@ -173,6 +177,21 @@ impl AssetRepository for InMemoryAssetRepository { self.count_by_owner(owner_id).await } + async fn date_summary( + &self, + owner_id: &SystemId, + ) -> Result, DomainError> { + let data = self.data.lock().await; + let mut map = std::collections::BTreeMap::::new(); + for asset in data.values() { + if &asset.owner_user_id == owner_id { + let date = asset.created_at.as_datetime().date_naive(); + *map.entry(date).or_default() += 1; + } + } + Ok(map.into_iter().rev().collect()) + } + async fn save(&self, asset: &Asset) -> Result<(), DomainError> { self.data .lock() @@ -385,6 +404,10 @@ impl LibraryPathRepository for InMemoryLibraryPathRepository { Ok(self.data.lock().await.get(&id.to_string()).cloned()) } + async fn find_all(&self) -> Result, DomainError> { + Ok(self.data.lock().await.values().cloned().collect()) + } + async fn find_by_volume(&self, volume_id: &SystemId) -> Result, DomainError> { Ok(self .data @@ -918,6 +941,10 @@ impl PluginRepository for InMemoryPluginRepository { Ok(self.data.lock().await.get(&id.to_string()).cloned()) } + async fn find_all(&self) -> Result, DomainError> { + Ok(self.data.lock().await.values().cloned().collect()) + } + async fn find_enabled(&self) -> Result, DomainError> { Ok(self .data @@ -946,6 +973,10 @@ impl PipelineRepository for InMemoryPipelineRepository { Ok(self.data.lock().await.get(&id.to_string()).cloned()) } + async fn find_all(&self) -> Result, DomainError> { + Ok(self.data.lock().await.values().cloned().collect()) + } + async fn find_by_trigger(&self, event: &str) -> Result, DomainError> { Ok(self .data diff --git a/crates/bootstrap/src/config.rs b/crates/bootstrap/src/config.rs index 6db4732..491d4f1 100644 --- a/crates/bootstrap/src/config.rs +++ b/crates/bootstrap/src/config.rs @@ -6,6 +6,7 @@ pub struct Config { pub nats_url: String, pub jwt_secret: String, pub cors_allowed_origins: Vec, + pub max_upload_bytes: usize, } impl Config { @@ -26,6 +27,10 @@ impl Config { .split(',') .map(|s| s.trim().to_string()) .collect(), + max_upload_bytes: std::env::var("MAX_UPLOAD_BYTES") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(256 * 1024 * 1024), } } } diff --git a/crates/bootstrap/src/factory.rs b/crates/bootstrap/src/factory.rs index 8aa6521..ea5eb03 100644 --- a/crates/bootstrap/src/factory.rs +++ b/crates/bootstrap/src/factory.rs @@ -1,5 +1,6 @@ use anyhow::Result; use axum::Router; +use axum::extract::DefaultBodyLimit; use axum::http::HeaderValue; use std::sync::Arc; use tower_http::{ @@ -32,7 +33,8 @@ pub async fn build_app(config: &Config) -> Result { ); let storage_path = std::env::var("STORAGE_PATH").unwrap_or_else(|_| "./data/media".to_string()); - let file_storage: Arc = Arc::new(LocalFileStorage::new(&storage_path)); + let file_storage: Arc = + Arc::new(LocalFileStorage::new(&storage_path)); // Build per-context services let identity = services::identity::build(&pool, &config.jwt_secret); @@ -68,6 +70,7 @@ pub async fn build_app(config: &Config) -> Result { Ok(app_router(&state) .with_state(state) + .layer(DefaultBodyLimit::max(config.max_upload_bytes)) .layer(TraceLayer::new_for_http()) .layer(cors)) } diff --git a/crates/bootstrap/src/services/catalog.rs b/crates/bootstrap/src/services/catalog.rs index dafde0e..d93bdea 100644 --- a/crates/bootstrap/src/services/catalog.rs +++ b/crates/bootstrap/src/services/catalog.rs @@ -5,10 +5,12 @@ use adapters_postgres::{ PostgresDerivativeRepository, PostgresDuplicateRepository, PostgresIngestTransaction, PostgresSidecarRepository, }; -use adapters_storage::LocalFileStorage; +use adapters_storage::LocalVolumeFileResolver; +use domain::ports::FileStoragePort; use application::catalog::{ CreateStackHandler, DeleteAssetHandler, DeleteStackHandler, DetectLivePhotosHandler, - GetAssetHandler, GetStackHandler, GetTimelineHandler, ListDuplicatesHandler, + GetAssetHandler, GetDateSummaryHandler, GetStackHandler, GetTimelineHandler, + ListDuplicatesHandler, ReadAssetFileHandler, ReadDerivativeHandler, RegisterAssetHandler, ResolveDuplicateHandler, SearchAssetsHandler, UpdateMetadataHandler, }; @@ -21,7 +23,7 @@ use super::storage::StorageRepos; pub fn build( pool: &PgPool, storage_repos: &StorageRepos, - file_storage: Arc, + file_storage: Arc, event_publisher: Arc, ) -> CatalogHandlers { let asset_repo = Arc::new(PostgresAssetRepository::new(pool.clone())); @@ -49,15 +51,20 @@ pub fn build( metadata_repo.clone(), )); + let get_date_summary = Arc::new(GetDateSummaryHandler::new(asset_repo.clone())); + let update_metadata = Arc::new(UpdateMetadataHandler::new( asset_repo.clone(), metadata_repo.clone(), event_publisher.clone(), )); + let volume_resolver = Arc::new(LocalVolumeFileResolver::new( + storage_repos.volume_repo.clone(), + )); let read_asset_file = Arc::new(ReadAssetFileHandler::new( asset_repo.clone(), - file_storage.clone(), + volume_resolver, )); let read_derivative = Arc::new(ReadDerivativeHandler::new( @@ -103,6 +110,7 @@ pub fn build( ingest_asset, get_asset, get_timeline, + get_date_summary, update_metadata, read_asset_file, read_derivative, diff --git a/crates/bootstrap/src/services/identity.rs b/crates/bootstrap/src/services/identity.rs index 5f51788..8ef9d86 100644 --- a/crates/bootstrap/src/services/identity.rs +++ b/crates/bootstrap/src/services/identity.rs @@ -26,9 +26,10 @@ pub fn build(pool: &PgPool, jwt_secret: &str) -> IdentityServices { issuer.clone(), refresh_repo.clone(), )); - let get_profile = Arc::new(GetProfileHandler::new(user_repo)); + let get_profile = Arc::new(GetProfileHandler::new(user_repo.clone())); let refresh = Arc::new(RefreshTokenHandler::new( refresh_repo.clone(), + user_repo, issuer.clone(), )); let logout = Arc::new(LogoutHandler::new(refresh_repo.clone())); diff --git a/crates/bootstrap/src/services/processing.rs b/crates/bootstrap/src/services/processing.rs index 80bf9d6..25e7992 100644 --- a/crates/bootstrap/src/services/processing.rs +++ b/crates/bootstrap/src/services/processing.rs @@ -6,7 +6,8 @@ use adapters_postgres::{ }; use application::processing::{ CompleteJobHandler, ConfigurePipelineHandler, EnqueueJobHandler, FailJobHandler, - ListJobsHandler, ManagePluginHandler, ReportBatchProgressHandler, StartJobHandler, + ListJobsHandler, ListPipelinesHandler, ListPluginsHandler, ManagePluginHandler, + ReportBatchProgressHandler, StartJobHandler, }; use domain::ports::EventPublisher; use presentation::state::ProcessingHandlers; @@ -35,6 +36,8 @@ pub fn build(pool: &PgPool, event_publisher: Arc) -> Process let list_jobs = Arc::new(ListJobsHandler::new(job_repo.clone())); let batch_progress = Arc::new(ReportBatchProgressHandler::new(batch_repo, job_repo)); let manage_plugin = Arc::new(ManagePluginHandler::new(plugin_repo.clone())); + let list_plugins = Arc::new(ListPluginsHandler::new(plugin_repo.clone())); + let list_pipelines = Arc::new(ListPipelinesHandler::new(pipeline_repo.clone())); let configure_pipeline = Arc::new(ConfigurePipelineHandler::new(pipeline_repo, plugin_repo)); ProcessingHandlers { @@ -45,6 +48,8 @@ pub fn build(pool: &PgPool, event_publisher: Arc) -> Process list_jobs, batch_progress, manage_plugin, + list_plugins, configure_pipeline, + list_pipelines, } } diff --git a/crates/bootstrap/src/services/storage.rs b/crates/bootstrap/src/services/storage.rs index da16e5a..097d18b 100644 --- a/crates/bootstrap/src/services/storage.rs +++ b/crates/bootstrap/src/services/storage.rs @@ -4,12 +4,17 @@ use adapters_postgres::{ PgPool, PostgresLibraryPathRepository, PostgresQuotaRepository, PostgresStorageVolumeRepository, PostgresUsageLedgerRepository, }; -use application::storage::{CheckQuotaHandler, RegisterLibraryPathHandler, RegisterVolumeHandler}; +use application::storage::{ + CheckQuotaHandler, DeleteLibraryPathHandler, DeleteVolumeHandler, ListAllLibraryPathsHandler, + ListIngestPathsHandler, ListVolumesHandler, RegisterLibraryPathHandler, + RegisterVolumeHandler, +}; use presentation::state::StorageHandlers; /// Shared storage repos needed by other bounded contexts (catalog ingest, etc.). pub struct StorageRepos { - pub path_repo: Arc, + pub path_repo: Arc, + pub volume_repo: Arc, } pub fn build(pool: &PgPool) -> (StorageRepos, StorageHandlers) { @@ -18,20 +23,33 @@ pub fn build(pool: &PgPool) -> (StorageRepos, StorageHandlers) { let quota_repo = Arc::new(PostgresQuotaRepository::new(pool.clone())); let ledger_repo = Arc::new(PostgresUsageLedgerRepository::new(pool.clone())); + let list_volumes = Arc::new(ListVolumesHandler::new(volume_repo.clone())); let register_volume = Arc::new(RegisterVolumeHandler::new(volume_repo.clone())); + let delete_volume = Arc::new(DeleteVolumeHandler::new(volume_repo.clone())); let register_library_path = Arc::new(RegisterLibraryPathHandler::new( - volume_repo, + volume_repo.clone(), path_repo.clone(), )); + let list_ingest_paths = Arc::new(ListIngestPathsHandler::new(path_repo.clone())); + let list_all_library_paths = Arc::new(ListAllLibraryPathsHandler::new(path_repo.clone())); + let delete_library_path = Arc::new(DeleteLibraryPathHandler::new(path_repo.clone())); let check_quota = Arc::new(CheckQuotaHandler::new(quota_repo, ledger_repo)); let handlers = StorageHandlers { register_volume, + delete_volume, + list_volumes, register_library_path, + list_ingest_paths, + list_all_library_paths, + delete_library_path, check_quota, }; - let repos = StorageRepos { path_repo }; + let repos = StorageRepos { + path_repo, + volume_repo, + }; (repos, handlers) } diff --git a/crates/domain/src/catalog/ports.rs b/crates/domain/src/catalog/ports.rs index f4ae1bc..3888ac8 100644 --- a/crates/domain/src/catalog/ports.rs +++ b/crates/domain/src/catalog/ports.rs @@ -32,6 +32,10 @@ pub trait AssetRepository: Send + Sync { owner_id: &SystemId, filters: &AssetFilters, ) -> Result; + async fn date_summary( + &self, + owner_id: &SystemId, + ) -> Result, DomainError>; async fn save(&self, asset: &Asset) -> Result<(), DomainError>; async fn delete(&self, id: &SystemId) -> Result<(), DomainError>; } diff --git a/crates/domain/src/identity/entities.rs b/crates/domain/src/identity/entities.rs index 2b8c680..a68f70d 100644 --- a/crates/domain/src/identity/entities.rs +++ b/crates/domain/src/identity/entities.rs @@ -126,6 +126,7 @@ pub struct User { pub username: String, pub email: Email, pub password_hash: PasswordHash, + pub role: String, pub created_at: DateTime, } @@ -136,9 +137,14 @@ impl User { username: username.into(), email, password_hash, + role: "user".to_string(), created_at: Utc::now(), } } + + pub fn is_admin(&self) -> bool { + self.role == "admin" + } } // --- RefreshToken --- diff --git a/crates/domain/src/identity/ports.rs b/crates/domain/src/identity/ports.rs index a4c6269..6ea53d9 100644 --- a/crates/domain/src/identity/ports.rs +++ b/crates/domain/src/identity/ports.rs @@ -12,6 +12,7 @@ pub trait UserRepository: Send + Sync { async fn find_by_username(&self, username: &str) -> Result, DomainError>; async fn save(&self, user: &User) -> Result<(), DomainError>; async fn delete(&self, id: &SystemId) -> Result<(), DomainError>; + async fn count(&self) -> Result; } // --- RoleRepository --- diff --git a/crates/domain/src/processing/ports.rs b/crates/domain/src/processing/ports.rs index ef178c9..1b9240b 100644 --- a/crates/domain/src/processing/ports.rs +++ b/crates/domain/src/processing/ports.rs @@ -34,6 +34,7 @@ pub trait JobBatchRepository: Send + Sync { #[async_trait] pub trait PluginRepository: Send + Sync { async fn find_by_id(&self, id: &SystemId) -> Result, DomainError>; + async fn find_all(&self) -> Result, DomainError>; async fn find_enabled(&self) -> Result, DomainError>; async fn save(&self, plugin: &Plugin) -> Result<(), DomainError>; } @@ -43,6 +44,7 @@ pub trait PluginRepository: Send + Sync { #[async_trait] pub trait PipelineRepository: Send + Sync { async fn find_by_id(&self, id: &SystemId) -> Result, DomainError>; + async fn find_all(&self) -> Result, DomainError>; async fn find_by_trigger(&self, event: &str) -> Result, DomainError>; async fn save(&self, pipeline: &ProcessingPipeline) -> Result<(), DomainError>; } diff --git a/crates/domain/src/storage/ports.rs b/crates/domain/src/storage/ports.rs index 7397a1f..d630759 100644 --- a/crates/domain/src/storage/ports.rs +++ b/crates/domain/src/storage/ports.rs @@ -23,6 +23,7 @@ pub trait StorageVolumeRepository: Send + Sync { #[async_trait] pub trait LibraryPathRepository: Send + Sync { async fn find_by_id(&self, id: &SystemId) -> Result, DomainError>; + async fn find_all(&self) -> Result, DomainError>; async fn find_by_volume(&self, volume_id: &SystemId) -> Result, DomainError>; async fn find_ingest_destinations( &self, @@ -84,6 +85,23 @@ pub trait IngestTransaction: Send + Sync { async fn record_usage(&self, entry: &UsageLedgerEntry) -> Result<(), DomainError>; } +// --- VolumeFileResolver --- + +#[async_trait] +pub trait VolumeFileResolver: Send + Sync { + async fn open_by_volume( + &self, + volume_id: &SystemId, + relative_path: &str, + ) -> Result<(DataStream, u64), DomainError>; + + async fn read_by_volume( + &self, + volume_id: &SystemId, + relative_path: &str, + ) -> Result; +} + // --- FileStoragePort --- #[derive(Debug, Clone)] diff --git a/crates/presentation/src/handlers/assets.rs b/crates/presentation/src/handlers/assets.rs index fc22e6a..90c9f5a 100644 --- a/crates/presentation/src/handlers/assets.rs +++ b/crates/presentation/src/handlers/assets.rs @@ -7,11 +7,14 @@ use crate::{ }; use api_types::{ requests::{RegisterAssetRequest, TagAssetRequest}, - responses::{AssetResponse, IngestResponse, TagResponse, TimelineResponse}, + responses::{ + AssetResponse, DateCountEntry, DateSummaryResponse, IngestResponse, TagResponse, + TimelineResponse, + }, }; use application::{ catalog::{ - DeleteAssetCommand, GetAssetQuery, GetTimelineQuery, ReadAssetFileQuery, + DeleteAssetCommand, GetAssetQuery, GetDateSummaryQuery, GetTimelineQuery, ReadAssetFileQuery, ReadDerivativeQuery, RegisterAssetCommand, SearchAssetsQuery, UpdateMetadataCommand, }, organization::TagAssetCommand, @@ -225,6 +228,25 @@ pub async fn timeline( })) } +pub async fn date_summary( + State(state): State, + claims: JwtClaims, +) -> Result, AppError> { + let query = GetDateSummaryQuery { + owner_id: claims.user_id, + }; + let entries = state.catalog.get_date_summary.execute(query).await?; + Ok(Json(DateSummaryResponse { + dates: entries + .into_iter() + .map(|e| DateCountEntry { + date: e.date.to_string(), + count: e.count, + }) + .collect(), + })) +} + #[utoipa::path( put, path = "/api/v1/assets/{id}/metadata", request_body = api_types::requests::UpdateMetadataRequest, diff --git a/crates/presentation/src/handlers/auth.rs b/crates/presentation/src/handlers/auth.rs index 0e07700..fdb7dc6 100644 --- a/crates/presentation/src/handlers/auth.rs +++ b/crates/presentation/src/handlers/auth.rs @@ -34,7 +34,7 @@ pub async fn register( let user = state.identity.register.execute(cmd).await?; let token = state .token_issuer - .issue(&user.id, "user") + .issue(&user.id, &user.role) .await .map_err(AppError::from)?; let (refresh_token, _) = diff --git a/crates/presentation/src/handlers/processing.rs b/crates/presentation/src/handlers/processing.rs index bcbc3a7..b644354 100644 --- a/crates/presentation/src/handlers/processing.rs +++ b/crates/presentation/src/handlers/processing.rs @@ -198,6 +198,15 @@ pub async fn batch_progress( Ok(Json(BatchProgressResponse::from_domain(&progress))) } +pub async fn list_plugins( + State(state): State, + claims: JwtClaims, +) -> Result>, AppError> { + super::require_admin(&claims)?; + let plugins = state.processing.list_plugins.execute().await?; + Ok(Json(plugins.iter().map(PluginResponse::from_domain).collect())) +} + #[utoipa::path( post, path = "/api/v1/plugins", request_body = ManagePluginRequest, @@ -251,6 +260,15 @@ pub async fn manage_plugin( )) } +pub async fn list_pipelines( + State(state): State, + claims: JwtClaims, +) -> Result>, AppError> { + super::require_admin(&claims)?; + let pipelines = state.processing.list_pipelines.execute().await?; + Ok(Json(pipelines.iter().map(PipelineResponse::from_domain).collect())) +} + #[utoipa::path( post, path = "/api/v1/pipelines", request_body = ConfigurePipelineRequest, diff --git a/crates/presentation/src/handlers/storage.rs b/crates/presentation/src/handlers/storage.rs index 21ca677..043f635 100644 --- a/crates/presentation/src/handlers/storage.rs +++ b/crates/presentation/src/handlers/storage.rs @@ -3,14 +3,35 @@ use api_types::{ requests::{CheckQuotaParams, RegisterLibraryPathRequest, RegisterVolumeRequest}, responses::{LibraryPathResponse, QuotaCheckResponse, VolumeResponse}, }; -use application::storage::{CheckQuotaQuery, RegisterLibraryPathCommand, RegisterVolumeCommand}; +use application::storage::{ + CheckQuotaQuery, ListIngestPathsQuery, RegisterLibraryPathCommand, RegisterVolumeCommand, +}; use axum::{ Json, - extract::{Query, State}, + extract::{Path, Query, State}, http::StatusCode, }; use domain::value_objects::SystemId; +#[utoipa::path( + get, path = "/api/v1/storage/volumes", + security(("bearer_token" = [])), + responses( + (status = 200, description = "All volumes", body = Vec), + (status = 401, description = "Unauthorized") + ) +)] +pub async fn list_volumes( + State(state): State, + claims: JwtClaims, +) -> Result>, AppError> { + super::require_admin(&claims)?; + let volumes = state.storage.list_volumes.execute().await?; + Ok(Json( + volumes.iter().map(VolumeResponse::from_domain).collect(), + )) +} + #[utoipa::path( post, path = "/api/v1/storage/volumes", request_body = RegisterVolumeRequest, @@ -66,6 +87,75 @@ pub async fn register_library_path( )) } +#[utoipa::path( + get, path = "/api/v1/storage/library-paths", + security(("bearer_token" = [])), + responses( + (status = 200, description = "Ingest destinations", body = Vec), + (status = 401, description = "Unauthorized") + ) +)] +pub async fn list_ingest_paths( + State(state): State, + claims: JwtClaims, +) -> Result>, AppError> { + let query = ListIngestPathsQuery { + user_id: claims.user_id, + }; + let paths = state.storage.list_ingest_paths.execute(query).await?; + Ok(Json( + paths.iter().map(LibraryPathResponse::from_domain).collect(), + )) +} + +#[utoipa::path( + get, path = "/api/v1/storage/library-paths/all", + security(("bearer_token" = [])), + responses( + (status = 200, description = "All library paths", body = Vec), + (status = 401, description = "Unauthorized"), + (status = 403, description = "Forbidden") + ) +)] +pub async fn list_all_library_paths( + State(state): State, + claims: JwtClaims, +) -> Result>, AppError> { + super::require_admin(&claims)?; + let paths = state.storage.list_all_library_paths.execute().await?; + Ok(Json( + paths.iter().map(LibraryPathResponse::from_domain).collect(), + )) +} + +pub async fn delete_volume( + State(state): State, + claims: JwtClaims, + Path((id,)): Path<(uuid::Uuid,)>, +) -> Result { + super::require_admin(&claims)?; + state + .storage + .delete_volume + .execute(SystemId::from_uuid(id)) + .await?; + Ok(StatusCode::NO_CONTENT) +} + +pub async fn delete_library_path( + State(state): State, + claims: JwtClaims, + Path((id,)): Path<(uuid::Uuid,)>, +) -> Result { + super::require_admin(&claims)?; + state + .storage + .delete_library_path + .execute(SystemId::from_uuid(id)) + .await?; + Ok(StatusCode::NO_CONTENT) +} + const DEFAULT_QUOTA_USAGE_TYPE: &str = "storage_bytes"; const DEFAULT_QUOTA_AMOUNT: u64 = 0; diff --git a/crates/presentation/src/routes/catalog.rs b/crates/presentation/src/routes/catalog.rs index 5c1d64b..93717d4 100644 --- a/crates/presentation/src/routes/catalog.rs +++ b/crates/presentation/src/routes/catalog.rs @@ -13,6 +13,7 @@ pub fn routes() -> Router { .route("/assets/ingest", post(assets::ingest)) .route("/assets/register", post(assets::register_asset)) .route("/assets/timeline", get(assets::timeline)) + .route("/assets/date-summary", get(assets::date_summary)) .route( "/assets/{id}", get(assets::get_asset).delete(assets::delete_asset), diff --git a/crates/presentation/src/routes/processing.rs b/crates/presentation/src/routes/processing.rs index 69f3538..e2c69c9 100644 --- a/crates/presentation/src/routes/processing.rs +++ b/crates/presentation/src/routes/processing.rs @@ -14,6 +14,6 @@ pub fn routes() -> Router { .route("/jobs/{id}/complete", post(processing::complete_job)) .route("/jobs/{id}/fail", post(processing::fail_job)) .route("/jobs/batches/{id}", get(processing::batch_progress)) - .route("/plugins", post(processing::manage_plugin)) - .route("/pipelines", post(processing::configure_pipeline)) + .route("/plugins", get(processing::list_plugins).post(processing::manage_plugin)) + .route("/pipelines", get(processing::list_pipelines).post(processing::configure_pipeline)) } diff --git a/crates/presentation/src/routes/storage.rs b/crates/presentation/src/routes/storage.rs index 16937b9..bd5c424 100644 --- a/crates/presentation/src/routes/storage.rs +++ b/crates/presentation/src/routes/storage.rs @@ -1,15 +1,27 @@ use crate::{handlers::storage, state::AppState}; use axum::{ Router, - routing::{get, post}, + routing::{delete, get, post}, }; pub fn routes() -> Router { Router::new() - .route("/storage/volumes", post(storage::register_volume)) + .route( + "/storage/volumes", + get(storage::list_volumes).post(storage::register_volume), + ) + .route("/storage/volumes/{id}", delete(storage::delete_volume)) .route( "/storage/library-paths", - post(storage::register_library_path), + get(storage::list_ingest_paths).post(storage::register_library_path), + ) + .route( + "/storage/library-paths/all", + get(storage::list_all_library_paths), + ) + .route( + "/storage/library-paths/{id}", + delete(storage::delete_library_path), ) .route("/storage/quota", get(storage::check_quota)) } diff --git a/crates/presentation/src/state.rs b/crates/presentation/src/state.rs index 0f56b37..880b9cf 100644 --- a/crates/presentation/src/state.rs +++ b/crates/presentation/src/state.rs @@ -3,9 +3,10 @@ use std::sync::Arc; use application::{ catalog::{ CreateStackHandler, DeleteAssetHandler, DeleteStackHandler, DetectLivePhotosHandler, - GetAssetHandler, GetStackHandler, GetTimelineHandler, ListDuplicatesHandler, - ListStacksHandler, ReadAssetFileHandler, ReadDerivativeHandler, RegisterAssetHandler, - ResolveDuplicateHandler, SearchAssetsHandler, UpdateMetadataHandler, + GetAssetHandler, GetDateSummaryHandler, GetStackHandler, GetTimelineHandler, + ListDuplicatesHandler, ListStacksHandler, ReadAssetFileHandler, ReadDerivativeHandler, + RegisterAssetHandler, ResolveDuplicateHandler, SearchAssetsHandler, + UpdateMetadataHandler, }, identity::{ GetProfileHandler, LoginUserHandler, LogoutHandler, RefreshTokenHandler, @@ -17,7 +18,8 @@ use application::{ }, processing::{ CompleteJobHandler, ConfigurePipelineHandler, EnqueueJobHandler, FailJobHandler, - ListJobsHandler, ManagePluginHandler, ReportBatchProgressHandler, StartJobHandler, + ListJobsHandler, ListPipelinesHandler, ListPluginsHandler, ManagePluginHandler, + ReportBatchProgressHandler, StartJobHandler, }, sharing::{ AccessSharedResourceHandler, GenerateShareLinkHandler, RevokeShareHandler, @@ -28,7 +30,9 @@ use application::{ ImportSidecarHandler, ResolveConflictHandler, }, storage::{ - CheckQuotaHandler, IngestAssetHandler, RegisterLibraryPathHandler, RegisterVolumeHandler, + CheckQuotaHandler, DeleteLibraryPathHandler, DeleteVolumeHandler, IngestAssetHandler, + ListAllLibraryPathsHandler, ListIngestPathsHandler, ListVolumesHandler, + RegisterLibraryPathHandler, RegisterVolumeHandler, }, }; use domain::ports::{RefreshTokenRepository, TokenIssuer}; @@ -48,6 +52,7 @@ pub struct CatalogHandlers { pub ingest_asset: Arc, pub get_asset: Arc, pub get_timeline: Arc, + pub get_date_summary: Arc, pub update_metadata: Arc, pub read_asset_file: Arc, pub read_derivative: Arc, @@ -76,7 +81,12 @@ pub struct OrganizationHandlers { #[derive(Clone)] pub struct StorageHandlers { pub register_volume: Arc, + pub delete_volume: Arc, + pub list_volumes: Arc, pub register_library_path: Arc, + pub list_ingest_paths: Arc, + pub list_all_library_paths: Arc, + pub delete_library_path: Arc, pub check_quota: Arc, } @@ -107,7 +117,9 @@ pub struct ProcessingHandlers { pub list_jobs: Arc, pub batch_progress: Arc, pub manage_plugin: Arc, + pub list_plugins: Arc, pub configure_pipeline: Arc, + pub list_pipelines: Arc, } #[derive(Clone)] diff --git a/crates/worker/src/factories/plugins.rs b/crates/worker/src/factories/plugins.rs index b1e77e8..34b42fd 100644 --- a/crates/worker/src/factories/plugins.rs +++ b/crates/worker/src/factories/plugins.rs @@ -3,6 +3,7 @@ use crate::plugins::{ DirectoryScannerPlugin, MetadataExtractorPlugin, NoOpPlugin, SidecarSyncPlugin, ThumbnailGeneratorPlugin, }; +use adapters_storage::LocalVolumeFileResolver; use application::catalog::RegisterAssetHandler; use domain::ports::{ EventPublisher, MetadataExtractorPort, SidecarWriterPort, ThumbnailGeneratorPort, @@ -21,15 +22,18 @@ pub fn build_plugin_registry( ) -> InMemoryPluginRegistry { let mut registry = InMemoryPluginRegistry::new(); + let volume_resolver = Arc::new(LocalVolumeFileResolver::new(repos.volume.clone())); + registry.register(Arc::new(NoOpPlugin)); registry.register(Arc::new(MetadataExtractorPlugin::new( repos.asset.clone(), - file_storage.clone(), + volume_resolver.clone(), repos.metadata.clone(), extractor, ))); registry.register(Arc::new(ThumbnailGeneratorPlugin::new( repos.asset.clone(), + volume_resolver, file_storage.clone(), repos.derivative.clone(), thumbnail_gen, @@ -43,7 +47,6 @@ pub fn build_plugin_registry( registry.register(Arc::new(DirectoryScannerPlugin::new( repos.volume.clone(), repos.library_path.clone(), - file_storage.clone(), register_handler, ))); diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index f5c340e..05cb24f 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -13,6 +13,8 @@ mod sweep; #[tokio::main] async fn main() -> anyhow::Result<()> { + dotenvy::dotenv().ok(); + tracing_subscriber::fmt() .with_env_filter( tracing_subscriber::EnvFilter::from_default_env().add_directive("worker=info".parse()?), diff --git a/crates/worker/src/plugins/directory_scanner.rs b/crates/worker/src/plugins/directory_scanner.rs index 4e628e4..4e29bfc 100644 --- a/crates/worker/src/plugins/directory_scanner.rs +++ b/crates/worker/src/plugins/directory_scanner.rs @@ -3,17 +3,17 @@ use async_trait::async_trait; use domain::{ catalog::entities::AssetType, errors::DomainError, - ports::{FileStoragePort, LibraryPathRepository, PluginExecutor, StorageVolumeRepository}, + ports::{LibraryPathRepository, PluginExecutor, StorageVolumeRepository}, value_objects::{MetadataValue, StructuredData, SystemId}, }; use sha2::{Digest, Sha256}; +use std::path::PathBuf; use std::sync::Arc; use tracing::{info, warn}; pub struct DirectoryScannerPlugin { volume_repo: Arc, path_repo: Arc, - file_storage: Arc, register_handler: Arc, } @@ -21,13 +21,11 @@ impl DirectoryScannerPlugin { pub fn new( volume_repo: Arc, path_repo: Arc, - file_storage: Arc, register_handler: Arc, ) -> Self { Self { volume_repo, path_repo, - file_storage, register_handler, } } @@ -55,7 +53,7 @@ fn classify(filename: &str) -> Option<(AssetType, &'static str)> { #[async_trait] impl PluginExecutor for DirectoryScannerPlugin { fn plugin_name(&self) -> &str { - "directory_scanner" + "scan_directory" } async fn execute( @@ -92,8 +90,14 @@ impl PluginExecutor for DirectoryScannerPlugin { DomainError::Validation(format!("LibraryPath {} has no designated owner", path_id)) })?; + let volume_base = volume + .uri_prefix + .strip_prefix("file://") + .unwrap_or(&volume.uri_prefix); + let volume_root = PathBuf::from(volume_base); + let scan_root = &library_path.relative_path; - info!(path = scan_root, volume = %volume.volume_name, "scanning directory"); + info!(path = scan_root, volume = %volume.volume_name, base = %volume_root.display(), "scanning directory"); let mut found = 0u64; let mut registered = 0u64; @@ -101,29 +105,40 @@ impl PluginExecutor for DirectoryScannerPlugin { let mut dirs_to_scan = vec![scan_root.to_string()]; while let Some(dir) = dirs_to_scan.pop() { - let entries = match self.file_storage.list_directory(&dir).await { - Ok(e) => e, + let abs_dir = if dir.is_empty() { + volume_root.clone() + } else { + volume_root.join(&dir) + }; + + let mut read_dir = match tokio::fs::read_dir(&abs_dir).await { + Ok(r) => r, Err(e) => { - warn!(dir = dir, error = %e, "failed to list directory, skipping"); + warn!(dir = %abs_dir.display(), error = %e, "failed to list directory, skipping"); continue; } }; - for entry in entries { - let full_path = if dir.is_empty() { - entry.path.clone() + while let Ok(Some(entry)) = read_dir.next_entry().await { + let meta = match entry.metadata().await { + Ok(m) => m, + Err(_) => continue, + }; + let name = entry.file_name().to_string_lossy().to_string(); + let relative = if dir.is_empty() { + name.clone() } else { - format!("{}/{}", dir, entry.path) + format!("{}/{}", dir, name) }; - if entry.is_directory { - dirs_to_scan.push(full_path); + if meta.is_dir() { + dirs_to_scan.push(relative); continue; } found += 1; - let (asset_type, mime_type) = match classify(&entry.path) { + let (asset_type, mime_type) = match classify(&name) { Some(c) => c, None => { skipped += 1; @@ -131,10 +146,11 @@ impl PluginExecutor for DirectoryScannerPlugin { } }; - let data = match self.file_storage.read_file(&full_path).await { + let abs_path = volume_root.join(&relative); + let data = match tokio::fs::read(&abs_path).await { Ok(d) => d, Err(e) => { - warn!(path = full_path, error = %e, "failed to read file, skipping"); + warn!(path = %abs_path.display(), error = %e, "failed to read file, skipping"); skipped += 1; continue; } @@ -144,7 +160,7 @@ impl PluginExecutor for DirectoryScannerPlugin { let cmd = RegisterAssetCommand { volume_id: library_path.volume_id, - relative_path: full_path.clone(), + relative_path: relative.clone(), checksum, asset_type, mime_type: mime_type.to_string(), @@ -156,11 +172,11 @@ impl PluginExecutor for DirectoryScannerPlugin { Ok((asset, dup)) => { registered += 1; if dup.is_some() { - info!(path = full_path, asset_id = %asset.asset_id, "registered (duplicate detected)"); + info!(path = relative, asset_id = %asset.asset_id, "registered (duplicate detected)"); } } Err(e) => { - warn!(path = full_path, error = %e, "failed to register asset"); + warn!(path = relative, error = %e, "failed to register asset"); skipped += 1; } } diff --git a/crates/worker/src/plugins/metadata_extractor.rs b/crates/worker/src/plugins/metadata_extractor.rs index aeb410c..252a427 100644 --- a/crates/worker/src/plugins/metadata_extractor.rs +++ b/crates/worker/src/plugins/metadata_extractor.rs @@ -3,8 +3,8 @@ use domain::{ entities::{AssetMetadata, MetadataSource}, errors::DomainError, ports::{ - AssetMetadataRepository, AssetRepository, FileStoragePort, MetadataExtractorPort, - PluginExecutor, + AssetMetadataRepository, AssetRepository, MetadataExtractorPort, PluginExecutor, + VolumeFileResolver, }, value_objects::{MetadataValue, StructuredData, SystemId}, }; @@ -13,7 +13,7 @@ use tracing::info; pub struct MetadataExtractorPlugin { asset_repo: Arc, - file_storage: Arc, + volume_resolver: Arc, metadata_repo: Arc, extractor: Arc, } @@ -21,13 +21,13 @@ pub struct MetadataExtractorPlugin { impl MetadataExtractorPlugin { pub fn new( asset_repo: Arc, - file_storage: Arc, + volume_resolver: Arc, metadata_repo: Arc, extractor: Arc, ) -> Self { Self { asset_repo, - file_storage, + volume_resolver, metadata_repo, extractor, } @@ -56,8 +56,13 @@ impl PluginExecutor for MetadataExtractorPlugin { .await? .ok_or_else(|| DomainError::NotFound(format!("Asset {} not found", asset_id)))?; - let path = &asset.source_reference.relative_path; - let data = self.file_storage.read_file(path).await?; + let data = self + .volume_resolver + .read_by_volume( + &asset.source_reference.volume_id, + &asset.source_reference.relative_path, + ) + .await?; let mut extracted = self.extractor.extract(&data)?; extracted.insert("file_size_bytes", MetadataValue::Integer(data.len() as i64)); diff --git a/crates/worker/src/plugins/thumbnail_generator.rs b/crates/worker/src/plugins/thumbnail_generator.rs index 7b32919..a268007 100644 --- a/crates/worker/src/plugins/thumbnail_generator.rs +++ b/crates/worker/src/plugins/thumbnail_generator.rs @@ -4,7 +4,7 @@ use domain::{ errors::DomainError, ports::{ AssetRepository, DerivativeRepository, FileStoragePort, PluginExecutor, - ThumbnailGeneratorPort, + ThumbnailGeneratorPort, VolumeFileResolver, }, value_objects::{MetadataValue, StructuredData, SystemId}, }; @@ -13,6 +13,7 @@ use tracing::info; pub struct ThumbnailGeneratorPlugin { asset_repo: Arc, + volume_resolver: Arc, file_storage: Arc, derivative_repo: Arc, thumbnail_gen: Arc, @@ -21,12 +22,14 @@ pub struct ThumbnailGeneratorPlugin { impl ThumbnailGeneratorPlugin { pub fn new( asset_repo: Arc, + volume_resolver: Arc, file_storage: Arc, derivative_repo: Arc, thumbnail_gen: Arc, ) -> Self { Self { asset_repo, + volume_resolver, file_storage, derivative_repo, thumbnail_gen, @@ -92,8 +95,11 @@ impl PluginExecutor for ThumbnailGeneratorPlugin { } let source_bytes = self - .file_storage - .read_file(&asset.source_reference.relative_path) + .volume_resolver + .read_by_volume( + &asset.source_reference.volume_id, + &asset.source_reference.relative_path, + ) .await?; let output = self diff --git a/k-photos-frontend/app/(app)/admin/duplicates/page.tsx b/k-photos-frontend/app/(app)/admin/duplicates/page.tsx new file mode 100644 index 0000000..06819cf --- /dev/null +++ b/k-photos-frontend/app/(app)/admin/duplicates/page.tsx @@ -0,0 +1,131 @@ +"use client" + +import { useEffect, useState } from "react" +import { useDuplicates, useResolveDuplicate } from "@/hooks/use-duplicates" +import { getTokens } from "@/lib/auth" +import { Badge } from "@/components/ui/badge" +import { Button } from "@/components/ui/button" +import { Skeleton } from "@/components/ui/skeleton" +import { + Card, + CardContent, + CardHeader, + CardTitle, +} from "@/components/ui/card" +import { Spinner } from "@/components/ui/spinner" +import { toast } from "sonner" + +function AssetThumb({ assetId }: { assetId: string }) { + const [src, setSrc] = useState(null) + + useEffect(() => { + let revoke: string | null = null + const { access } = getTokens() + const headers: HeadersInit = access ? { Authorization: `Bearer ${access}` } : {} + fetch(`/api/v1/assets/${assetId}/derivatives/thumbnail_square`, { headers }) + .then((r) => (r.ok ? r.blob() : Promise.reject())) + .catch(() => + fetch(`/api/v1/assets/${assetId}/file`, { headers }).then((r) => + r.ok ? r.blob() : Promise.reject(), + ), + ) + .then((blob) => { + revoke = URL.createObjectURL(blob) + setSrc(revoke) + }) + .catch(() => {}) + return () => { + if (revoke) URL.revokeObjectURL(revoke) + } + }, [assetId]) + + return src ? ( + + ) : ( + + ) +} + +export default function DuplicatesPage() { + const { data: groups, isLoading } = useDuplicates() + const resolve = useResolveDuplicate() + + return ( +
+

Duplicate Resolution

+ + {isLoading ? ( + + ) : (groups ?? []).length === 0 ? ( +

+ No duplicate groups found. +

+ ) : ( + (groups ?? []).map((group) => ( + + + + + {group.group_id.slice(0, 8)}... + + {group.detection_method} + + {group.status} + + + + +
+ {group.candidates.map((c) => ( +
+ +
+
+

+ {c.asset_id.slice(0, 12)}... +

+

+ {(c.similarity_score * 100).toFixed(1)}% match +

+
+ +
+
+ ))} +
+
+
+ )) + )} +
+ ) +} diff --git a/k-photos-frontend/app/(app)/admin/jobs/page.tsx b/k-photos-frontend/app/(app)/admin/jobs/page.tsx new file mode 100644 index 0000000..681eb5b --- /dev/null +++ b/k-photos-frontend/app/(app)/admin/jobs/page.tsx @@ -0,0 +1,275 @@ +"use client" + +import { useState } from "react" +import { + useJobs, + useStartJob, + useFailJob, + useCompleteJob, + JOBS_PAGE_SIZE, +} from "@/hooks/use-jobs" +import { Badge } from "@/components/ui/badge" +import { Button } from "@/components/ui/button" +import { Tabs, TabsList, TabsTrigger } from "@/components/ui/tabs" +import { + Collapsible, + CollapsibleContent, + CollapsibleTrigger, +} from "@/components/ui/collapsible" +import { + Card, + CardContent, + CardHeader, + CardTitle, +} from "@/components/ui/card" +import { + Table, + TableBody, + TableCell, + TableHead, + TableHeader, + TableRow, +} from "@/components/ui/table" +import { Spinner } from "@/components/ui/spinner" +import { toast } from "sonner" +import { + ChevronDownIcon, + ChevronLeftIcon, + ChevronRightIcon, + PlayIcon, + CheckIcon, + XIcon, +} from "lucide-react" + +const STATUS_FILTERS = [ + { value: undefined, label: "All" }, + { value: "queued", label: "Queued" }, + { value: "running", label: "Running" }, + { value: "completed", label: "Completed" }, + { value: "failed", label: "Failed" }, +] + +function statusVariant(status: string) { + switch (status.toLowerCase()) { + case "queued": + return "secondary" as const + case "running": + return "default" as const + case "completed": + return "default" as const + case "failed": + return "destructive" as const + default: + return "secondary" as const + } +} + +export default function JobsPage() { + const [filter, setFilter] = useState(undefined) + const [offset, setOffset] = useState(0) + const jobs = useJobs(filter, offset) + const startJob = useStartJob() + const failJob = useFailJob() + const completeJob = useCompleteJob() + + const total = jobs.data?.total ?? 0 + const page = Math.floor(offset / JOBS_PAGE_SIZE) + 1 + const totalPages = Math.ceil(total / JOBS_PAGE_SIZE) + + const handleFilterChange = (v: string) => { + setFilter(v === "all" ? undefined : v) + setOffset(0) + } + + return ( +
+
+

Job Queue

+ {total > 0 && ( + {total} total + )} +
+ + + + {STATUS_FILTERS.map((f) => ( + + {f.label} + + ))} + + + + + + Jobs + + + {jobs.isLoading ? ( + + ) : ( + <> + + + + + ID + Type + Status + Priority + Created + Actions + + + + {(jobs.data?.jobs ?? []).map((job) => ( + + <> + + + {job.error_message && ( + + + + )} + + + {job.job_id.slice(0, 8)}... + + + {job.job_type} + + + + {job.status} + + + {job.priority} + + {new Date(job.created_at).toLocaleString()} + + +
+ {job.status.toLowerCase() === "queued" && ( + + )} + {job.status.toLowerCase() === "running" && ( + <> + + + + )} +
+
+
+ {job.error_message && ( + +
+ + + + )} + + + ))} + +
+ +
+                                  {job.error_message}
+                                
+
+ + {totalPages > 1 && ( +
+ + Page {page} of {totalPages} + +
+ + +
+
+ )} + + )} +
+
+
+ ) +} diff --git a/k-photos-frontend/app/(app)/admin/layout.tsx b/k-photos-frontend/app/(app)/admin/layout.tsx new file mode 100644 index 0000000..e91de73 --- /dev/null +++ b/k-photos-frontend/app/(app)/admin/layout.tsx @@ -0,0 +1,22 @@ +"use client" + +import { useAuth } from "@/hooks/use-auth" +import { useRouter } from "next/navigation" +import { useEffect } from "react" + +export default function AdminLayout({ + children, +}: { + children: React.ReactNode +}) { + const { isAdmin, isLoading } = useAuth() + const router = useRouter() + + useEffect(() => { + if (!isLoading && !isAdmin) router.replace("/") + }, [isLoading, isAdmin, router]) + + if (isLoading || !isAdmin) return null + + return <>{children} +} diff --git a/k-photos-frontend/app/(app)/admin/pipelines/page.tsx b/k-photos-frontend/app/(app)/admin/pipelines/page.tsx new file mode 100644 index 0000000..acc521e --- /dev/null +++ b/k-photos-frontend/app/(app)/admin/pipelines/page.tsx @@ -0,0 +1,128 @@ +"use client" + +import { useState } from "react" +import { usePipelines, useConfigurePipeline } from "@/hooks/use-pipelines" +import { Button } from "@/components/ui/button" +import { Input } from "@/components/ui/input" +import { Textarea } from "@/components/ui/textarea" +import { Label } from "@/components/ui/label" +import { Badge } from "@/components/ui/badge" +import { + Card, + CardContent, + CardDescription, + CardHeader, + CardTitle, +} from "@/components/ui/card" +import { + Table, + TableBody, + TableCell, + TableHead, + TableHeader, + TableRow, +} from "@/components/ui/table" +import { Spinner } from "@/components/ui/spinner" +import { toast } from "sonner" + +export default function PipelinesPage() { + const { data: pipelines, isLoading } = usePipelines() + const configure = useConfigurePipeline() + const [triggerEvent, setTriggerEvent] = useState("") + const [stepsJson, setStepsJson] = useState("[]") + + const handleSubmit = async (e: React.FormEvent) => { + e.preventDefault() + try { + const steps = JSON.parse(stepsJson) + await configure.mutateAsync({ trigger_event: triggerEvent, steps }) + toast.success("Pipeline configured") + } catch { + toast.error("Failed — check JSON syntax") + } + } + + return ( +
+

Pipeline Configuration

+ + + + Pipelines + + + {isLoading ? ( + + ) : ( + + + + Trigger Event + Steps + ID + + + + {(pipelines ?? []).map((p) => ( + + + {p.trigger_event} + + + {p.steps_count} + + + {p.pipeline_id.slice(0, 8)}... + + + ))} + +
+ )} +
+
+ + + + Configure Pipeline + + Define a processing pipeline triggered by an event + + + +
+
+ + setTriggerEvent(e.target.value)} + placeholder="asset.ingested" + className="h-8" + /> +
+
+ +