feat: frontend MVP — auth, timeline, upload, albums, admin, image viewer

Backend:
- user roles (DB + JWT + first-user-is-admin)
- volume-aware file resolver (multi-volume asset serving)
- directory scanner uses volume URI directly
- date-summary endpoint (capture date from EXIF)
- timeline ordered by capture date
- list endpoints: volumes, plugins, pipelines, library paths
- delete endpoints: volumes, library paths
- configurable upload body limit (MAX_UPLOAD_BYTES)

Frontend:
- auth: login/register, token refresh, role-based admin gate
- timeline: date-grouped grid, infinite scroll, date scrubber
- image viewer: fullscreen zoom/pan/pinch, metadata sidebar
- upload: drag-drop, sequential upload, progress tracking
- albums: create, add/remove photos, asset picker dialog
- admin: storage (import library), jobs (pagination, error details),
  plugins (list + toggle), pipelines, sidecars, duplicates
- multi-select mode with add-to-album action
- TanStack Query for all data fetching
This commit is contained in:
2026-06-01 01:35:43 +02:00
parent 49f77a78b9
commit 957737ac9b
101 changed files with 4679 additions and 109 deletions

View File

@@ -0,0 +1 @@
ALTER TABLE users ADD COLUMN role TEXT NOT NULL DEFAULT 'user';

View File

@@ -0,0 +1 @@
UPDATE plugins SET name = 'scan_directory' WHERE plugin_id = 'a0000000-0000-4000-8000-000000000005';

View File

@@ -189,10 +189,16 @@ impl AssetRepository for PostgresAssetRepository {
offset: u32,
) -> Result<Vec<Asset>, DomainError> {
let rows = sqlx::query_as::<_, AssetRow>(
"SELECT asset_id, volume_id, relative_path, checksum, asset_type, mime_type,
file_size, is_processed, owner_user_id, created_at
FROM assets WHERE owner_user_id = $1
ORDER BY created_at DESC
"SELECT a.asset_id, a.volume_id, a.relative_path, a.checksum, a.asset_type, a.mime_type,
a.file_size, a.is_processed, a.owner_user_id, a.created_at
FROM assets a
LEFT JOIN asset_metadata am
ON am.asset_id = a.asset_id AND am.metadata_source = 'exif_extracted'
WHERE a.owner_user_id = $1
ORDER BY COALESCE(
(am.data->>'DateTimeOriginal')::timestamptz,
a.created_at
) DESC
LIMIT $2 OFFSET $3",
)
.bind(*owner_id.as_uuid())
@@ -296,6 +302,30 @@ impl AssetRepository for PostgresAssetRepository {
Ok(count as u64)
}
async fn date_summary(
&self,
owner_id: &SystemId,
) -> Result<Vec<(chrono::NaiveDate, u64)>, DomainError> {
let rows: Vec<(chrono::NaiveDate, i64)> = sqlx::query_as(
"SELECT COALESCE(
(am.data->>'DateTimeOriginal')::timestamptz,
a.created_at
)::date AS day,
COUNT(*) AS cnt
FROM assets a
LEFT JOIN asset_metadata am
ON am.asset_id = a.asset_id AND am.metadata_source = 'exif_extracted'
WHERE a.owner_user_id = $1
GROUP BY day ORDER BY day DESC",
)
.bind(*owner_id.as_uuid())
.fetch_all(&self.pool)
.await
.map_pg()?;
Ok(rows.into_iter().map(|(d, c)| (d, c as u64)).collect())
}
async fn save(&self, asset: &Asset) -> Result<(), DomainError> {
sqlx::query(
"INSERT INTO assets (asset_id, volume_id, relative_path, checksum, asset_type,

View File

@@ -15,6 +15,7 @@ struct UserRow {
username: String,
email: String,
password_hash: String,
role: String,
created_at: DateTime<Utc>,
}
@@ -26,6 +27,7 @@ impl TryFrom<UserRow> for domain::entities::User {
username: r.username,
email: Email::new(r.email)?,
password_hash: PasswordHash::from_hash(r.password_hash),
role: r.role,
created_at: r.created_at,
})
}
@@ -40,7 +42,7 @@ impl UserRepository for PostgresUserRepository {
id: &SystemId,
) -> Result<Option<domain::entities::User>, DomainError> {
let row = sqlx::query_as::<_, UserRow>(
"SELECT id, username, email, password_hash, created_at FROM users WHERE id = $1",
"SELECT id, username, email, password_hash, role, created_at FROM users WHERE id = $1",
)
.bind(*id.as_uuid())
.fetch_optional(&self.pool)
@@ -55,7 +57,7 @@ impl UserRepository for PostgresUserRepository {
email: &Email,
) -> Result<Option<domain::entities::User>, DomainError> {
let row = sqlx::query_as::<_, UserRow>(
"SELECT id, username, email, password_hash, created_at FROM users WHERE email = $1",
"SELECT id, username, email, password_hash, role, created_at FROM users WHERE email = $1",
)
.bind(email.as_str())
.fetch_optional(&self.pool)
@@ -70,7 +72,7 @@ impl UserRepository for PostgresUserRepository {
username: &str,
) -> Result<Option<domain::entities::User>, DomainError> {
let row = sqlx::query_as::<_, UserRow>(
"SELECT id, username, email, password_hash, created_at FROM users WHERE username = $1",
"SELECT id, username, email, password_hash, role, created_at FROM users WHERE username = $1",
)
.bind(username)
.fetch_optional(&self.pool)
@@ -82,18 +84,20 @@ impl UserRepository for PostgresUserRepository {
async fn save(&self, user: &domain::entities::User) -> Result<(), DomainError> {
sqlx::query_as::<_, UserRow>(
"INSERT INTO users (id, username, email, password_hash, created_at)
VALUES ($1, $2, $3, $4, $5)
"INSERT INTO users (id, username, email, password_hash, role, created_at)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (id) DO UPDATE SET
username = EXCLUDED.username,
email = EXCLUDED.email,
password_hash = EXCLUDED.password_hash
RETURNING id, username, email, password_hash, created_at",
password_hash = EXCLUDED.password_hash,
role = EXCLUDED.role
RETURNING id, username, email, password_hash, role, created_at",
)
.bind(*user.id.as_uuid())
.bind(&user.username)
.bind(user.email.as_str())
.bind(user.password_hash.as_str())
.bind(&user.role)
.bind(user.created_at)
.fetch_one(&self.pool)
.await
@@ -109,6 +113,14 @@ impl UserRepository for PostgresUserRepository {
.map_pg()?;
Ok(())
}
async fn count(&self) -> Result<u64, DomainError> {
let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM users")
.fetch_one(&self.pool)
.await
.map_pg()?;
Ok(count as u64)
}
}
// --- PostgresRefreshTokenRepository ---

View File

@@ -405,6 +405,17 @@ impl PluginRepository for PostgresPluginRepository {
Ok(row.map(Into::into))
}
async fn find_all(&self) -> Result<Vec<Plugin>, DomainError> {
let rows = sqlx::query_as::<_, PluginRow>(
"SELECT plugin_id, name, plugin_type, is_enabled, configuration FROM plugins",
)
.fetch_all(&self.pool)
.await
.map_pg()?;
Ok(rows.into_iter().map(Into::into).collect())
}
async fn find_enabled(&self) -> Result<Vec<Plugin>, DomainError> {
let rows = sqlx::query_as::<_, PluginRow>(
"SELECT plugin_id, name, plugin_type, is_enabled, configuration
@@ -521,6 +532,17 @@ impl PipelineRepository for PostgresPipelineRepository {
Ok(row.map(Into::into))
}
async fn find_all(&self) -> Result<Vec<ProcessingPipeline>, DomainError> {
let rows = sqlx::query_as::<_, PipelineRow>(
"SELECT pipeline_id, trigger_event, steps FROM processing_pipelines",
)
.fetch_all(&self.pool)
.await
.map_pg()?;
Ok(rows.into_iter().map(Into::into).collect())
}
async fn find_by_trigger(&self, event: &str) -> Result<Vec<ProcessingPipeline>, DomainError> {
let rows = sqlx::query_as::<_, PipelineRow>(
"SELECT pipeline_id, trigger_event, steps

View File

@@ -160,6 +160,18 @@ impl LibraryPathRepository for PostgresLibraryPathRepository {
Ok(row.map(Into::into))
}
async fn find_all(&self) -> Result<Vec<LibraryPath>, DomainError> {
let rows = sqlx::query_as::<_, LibraryPathRow>(
"SELECT path_id, volume_id, relative_path, is_ingest_destination, ownership_policy, designated_owner_id
FROM library_paths",
)
.fetch_all(&self.pool)
.await
.map_pg()?;
Ok(rows.into_iter().map(Into::into).collect())
}
async fn find_by_volume(&self, volume_id: &SystemId) -> Result<Vec<LibraryPath>, DomainError> {
let rows = sqlx::query_as::<_, LibraryPathRow>(
"SELECT path_id, volume_id, relative_path, is_ingest_destination, ownership_policy, designated_owner_id
@@ -180,7 +192,7 @@ impl LibraryPathRepository for PostgresLibraryPathRepository {
let rows = sqlx::query_as::<_, LibraryPathRow>(
"SELECT path_id, volume_id, relative_path, is_ingest_destination, ownership_policy, designated_owner_id
FROM library_paths
WHERE is_ingest_destination = true AND designated_owner_id = $1",
WHERE is_ingest_destination = true AND (designated_owner_id = $1 OR designated_owner_id IS NULL)",
)
.bind(*owner_id.as_uuid())
.fetch_all(&self.pool)

View File

@@ -1,7 +1,9 @@
pub mod adapter;
pub mod config;
pub mod local_file_storage;
pub mod volume_resolver;
pub use adapter::ObjectStorageAdapter;
pub use config::{StorageConfig, build_store};
pub use local_file_storage::LocalFileStorage;
pub use volume_resolver::LocalVolumeFileResolver;

View File

@@ -0,0 +1,91 @@
use async_trait::async_trait;
use bytes::Bytes;
use domain::{
errors::DomainError,
ports::{DataStream, StorageVolumeRepository, VolumeFileResolver},
value_objects::SystemId,
};
use futures::StreamExt;
use std::path::PathBuf;
use std::sync::Arc;
use tokio_util::io::ReaderStream;
pub struct LocalVolumeFileResolver {
volume_repo: Arc<dyn StorageVolumeRepository>,
}
impl LocalVolumeFileResolver {
pub fn new(volume_repo: Arc<dyn StorageVolumeRepository>) -> Self {
Self { volume_repo }
}
async fn resolve_path(
&self,
volume_id: &SystemId,
relative_path: &str,
) -> Result<PathBuf, DomainError> {
let volume = self
.volume_repo
.find_by_id(volume_id)
.await?
.ok_or_else(|| DomainError::NotFound(format!("Volume {} not found", volume_id)))?;
let base = volume
.uri_prefix
.strip_prefix("file://")
.unwrap_or(&volume.uri_prefix);
let full = if relative_path.is_empty() {
PathBuf::from(base)
} else {
PathBuf::from(base).join(relative_path)
};
Ok(full)
}
}
#[async_trait]
impl VolumeFileResolver for LocalVolumeFileResolver {
async fn open_by_volume(
&self,
volume_id: &SystemId,
relative_path: &str,
) -> Result<(DataStream, u64), DomainError> {
let full = self.resolve_path(volume_id, relative_path).await?;
let meta = tokio::fs::metadata(&full)
.await
.map_err(|e| match e.kind() {
std::io::ErrorKind::NotFound => {
DomainError::NotFound(full.display().to_string())
}
_ => DomainError::Internal(format!("Failed to stat file: {e}")),
})?;
let file = tokio::fs::File::open(&full)
.await
.map_err(|e| match e.kind() {
std::io::ErrorKind::NotFound => {
DomainError::NotFound(full.display().to_string())
}
_ => DomainError::Internal(format!("Failed to open file: {e}")),
})?;
let stream = ReaderStream::new(file)
.map(|r| r.map_err(|e| DomainError::Internal(format!("Read error: {e}"))));
Ok((Box::pin(stream), meta.len()))
}
async fn read_by_volume(
&self,
volume_id: &SystemId,
relative_path: &str,
) -> Result<Bytes, DomainError> {
let full = self.resolve_path(volume_id, relative_path).await?;
let data = tokio::fs::read(&full).await.map_err(|e| match e.kind() {
std::io::ErrorKind::NotFound => {
DomainError::NotFound(full.display().to_string())
}
_ => DomainError::Internal(format!("Failed to read file: {e}")),
})?;
Ok(Bytes::from(data))
}
}

View File

@@ -6,6 +6,7 @@ pub struct UserResponse {
pub id: Uuid,
pub username: String,
pub email: String,
pub role: String,
pub created_at: DateTime<Utc>,
}
@@ -22,6 +23,7 @@ impl UserResponse {
id: *user.id.as_uuid(),
username: user.username.clone(),
email: user.email.to_string(),
role: user.role.clone(),
created_at: user.created_at,
}
}
@@ -34,6 +36,7 @@ pub struct AlbumResponse {
pub description: String,
pub creator_id: Uuid,
pub asset_count: usize,
pub asset_ids: Vec<Uuid>,
pub created_at: DateTime<Utc>,
}
@@ -45,6 +48,7 @@ impl AlbumResponse {
description: album.description.clone(),
creator_id: *album.creator_user_id.as_uuid(),
asset_count: album.asset_count(),
asset_ids: album.entries.iter().map(|e| *e.asset_id.as_uuid()).collect(),
created_at: *album.created_at.as_datetime(),
}
}
@@ -84,6 +88,17 @@ impl AssetResponse {
}
}
#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
pub struct DateSummaryResponse {
pub dates: Vec<DateCountEntry>,
}
#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
pub struct DateCountEntry {
pub date: String,
pub count: u64,
}
#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
pub struct TimelineResponse {
pub assets: Vec<AssetResponse>,
@@ -349,6 +364,7 @@ pub struct JobResponse {
pub status: String,
pub priority: u32,
pub created_at: DateTime<Utc>,
pub error_message: Option<String>,
}
impl JobResponse {
@@ -359,6 +375,7 @@ impl JobResponse {
status: format!("{:?}", job.status),
priority: job.priority,
created_at: *job.created_at.as_datetime(),
error_message: job.error_message.clone(),
}
}
}

View File

@@ -13,6 +13,7 @@ pub use commands::resolve_duplicate::{
};
pub use commands::update_metadata::{UpdateMetadataCommand, UpdateMetadataHandler};
pub use queries::get_asset::{GetAssetHandler, GetAssetQuery};
pub use queries::get_date_summary::{DateSummaryEntry, GetDateSummaryHandler, GetDateSummaryQuery};
pub use queries::get_stack::{GetStackHandler, GetStackQuery};
pub use queries::get_timeline::{GetTimelineHandler, GetTimelineQuery, TimelineResult};
pub use queries::list_stacks::{ListStacksHandler, ListStacksQuery};

View File

@@ -0,0 +1,32 @@
use domain::{errors::DomainError, ports::AssetRepository, value_objects::SystemId};
use std::sync::Arc;
pub struct GetDateSummaryQuery {
pub owner_id: SystemId,
}
pub struct DateSummaryEntry {
pub date: chrono::NaiveDate,
pub count: u64,
}
pub struct GetDateSummaryHandler {
asset_repo: Arc<dyn AssetRepository>,
}
impl GetDateSummaryHandler {
pub fn new(asset_repo: Arc<dyn AssetRepository>) -> Self {
Self { asset_repo }
}
pub async fn execute(
&self,
query: GetDateSummaryQuery,
) -> Result<Vec<DateSummaryEntry>, DomainError> {
let rows = self.asset_repo.date_summary(&query.owner_id).await?;
Ok(rows
.into_iter()
.map(|(date, count)| DateSummaryEntry { date, count })
.collect())
}
}

View File

@@ -1,4 +1,5 @@
pub mod get_asset;
pub mod get_date_summary;
pub mod get_stack;
pub mod get_timeline;
pub mod list_stacks;

View File

@@ -1,6 +1,6 @@
use domain::{
errors::DomainError,
ports::{AssetRepository, DataStream, FileStoragePort},
ports::{AssetRepository, DataStream, VolumeFileResolver},
value_objects::SystemId,
};
use std::sync::Arc;
@@ -20,17 +20,17 @@ pub struct AssetFileResult {
pub struct ReadAssetFileHandler {
asset_repo: Arc<dyn AssetRepository>,
file_storage: Arc<dyn FileStoragePort>,
volume_resolver: Arc<dyn VolumeFileResolver>,
}
impl ReadAssetFileHandler {
pub fn new(
asset_repo: Arc<dyn AssetRepository>,
file_storage: Arc<dyn FileStoragePort>,
volume_resolver: Arc<dyn VolumeFileResolver>,
) -> Self {
Self {
asset_repo,
file_storage,
volume_resolver,
}
}
@@ -46,8 +46,11 @@ impl ReadAssetFileHandler {
}
let (stream, size) = self
.file_storage
.open_file(&asset.source_reference.relative_path)
.volume_resolver
.open_by_volume(
&asset.source_reference.volume_id,
&asset.source_reference.relative_path,
)
.await?;
let filename = asset

View File

@@ -134,6 +134,13 @@ impl AssetRepository for VisibilityFilteredAssetRepository {
self.inner.count_search(owner_id, filters).await
}
async fn date_summary(
&self,
owner_id: &SystemId,
) -> Result<Vec<(chrono::NaiveDate, u64)>, DomainError> {
self.inner.date_summary(owner_id).await
}
async fn save(&self, asset: &Asset) -> Result<(), DomainError> {
self.inner.save(asset).await
}

View File

@@ -52,7 +52,7 @@ impl LoginUserHandler {
if !valid {
return Err(DomainError::Unauthorized("Invalid credentials".to_string()));
}
let access_token = self.issuer.issue(&user.id, "user").await?;
let access_token = self.issuer.issue(&user.id, &user.role).await?;
let (raw_refresh, _) = generate_refresh_token(&self.refresh_repo, &user.id).await?;
Ok((user, access_token, raw_refresh))
}

View File

@@ -1,7 +1,7 @@
use super::login_user::generate_refresh_token;
use domain::{
errors::DomainError,
ports::{RefreshTokenRepository, TokenIssuer},
ports::{RefreshTokenRepository, TokenIssuer, UserRepository},
};
use sha2::{Digest, Sha256};
use std::sync::Arc;
@@ -13,16 +13,19 @@ pub struct RefreshTokenCommand {
pub struct RefreshTokenHandler {
refresh_repo: Arc<dyn RefreshTokenRepository>,
user_repo: Arc<dyn UserRepository>,
issuer: Arc<dyn TokenIssuer>,
}
impl RefreshTokenHandler {
pub fn new(
refresh_repo: Arc<dyn RefreshTokenRepository>,
user_repo: Arc<dyn UserRepository>,
issuer: Arc<dyn TokenIssuer>,
) -> Self {
Self {
refresh_repo,
user_repo,
issuer,
}
}
@@ -42,11 +45,16 @@ impl RefreshTokenHandler {
));
}
// Rotation: delete old, issue new pair
let user = self
.user_repo
.find_by_id(&token.user_id)
.await?
.ok_or_else(|| DomainError::NotFound("User not found".to_string()))?;
self.refresh_repo.delete(&token.token_id).await?;
let access_token = self.issuer.issue(&token.user_id, "user").await?;
let (raw_refresh, _) = generate_refresh_token(&self.refresh_repo, &token.user_id).await?;
let access_token = self.issuer.issue(&user.id, &user.role).await?;
let (raw_refresh, _) = generate_refresh_token(&self.refresh_repo, &user.id).await?;
Ok((access_token, raw_refresh))
}

View File

@@ -53,7 +53,11 @@ impl RegisterUserHandler {
)));
}
let hash = self.hasher.hash(&cmd.password).await?;
let user = User::new(&cmd.username, email, hash);
let is_first = self.user_repo.count().await? == 0;
let mut user = User::new(&cmd.username, email, hash);
if is_first {
user.role = "admin".to_string();
}
self.user_repo.save(&user).await?;
Ok(user)
}

View File

@@ -12,6 +12,8 @@ pub use commands::manage_plugin::{ManagePluginCommand, ManagePluginHandler, Plug
pub use commands::process_next_job::{ProcessNextJobCommand, ProcessNextJobHandler};
pub use commands::start_job::{StartJobCommand, StartJobHandler};
pub use queries::list_jobs::{JobListResult, ListJobsHandler, ListJobsQuery};
pub use queries::list_pipelines::ListPipelinesHandler;
pub use queries::list_plugins::ListPluginsHandler;
pub use queries::report_batch_progress::{
BatchProgress, ReportBatchProgressHandler, ReportBatchProgressQuery,
};

View File

@@ -0,0 +1,18 @@
use domain::{
entities::ProcessingPipeline, errors::DomainError, ports::PipelineRepository,
};
use std::sync::Arc;
pub struct ListPipelinesHandler {
repo: Arc<dyn PipelineRepository>,
}
impl ListPipelinesHandler {
pub fn new(repo: Arc<dyn PipelineRepository>) -> Self {
Self { repo }
}
pub async fn execute(&self) -> Result<Vec<ProcessingPipeline>, DomainError> {
self.repo.find_all().await
}
}

View File

@@ -0,0 +1,16 @@
use domain::{entities::Plugin, errors::DomainError, ports::PluginRepository};
use std::sync::Arc;
pub struct ListPluginsHandler {
repo: Arc<dyn PluginRepository>,
}
impl ListPluginsHandler {
pub fn new(repo: Arc<dyn PluginRepository>) -> Self {
Self { repo }
}
pub async fn execute(&self) -> Result<Vec<Plugin>, DomainError> {
self.repo.find_all().await
}
}

View File

@@ -1,2 +1,4 @@
pub mod list_jobs;
pub mod list_pipelines;
pub mod list_plugins;
pub mod report_batch_progress;

View File

@@ -0,0 +1,16 @@
use domain::{errors::DomainError, ports::LibraryPathRepository, value_objects::SystemId};
use std::sync::Arc;
pub struct DeleteLibraryPathHandler {
repo: Arc<dyn LibraryPathRepository>,
}
impl DeleteLibraryPathHandler {
pub fn new(repo: Arc<dyn LibraryPathRepository>) -> Self {
Self { repo }
}
pub async fn execute(&self, id: SystemId) -> Result<(), DomainError> {
self.repo.delete(&id).await
}
}

View File

@@ -0,0 +1,16 @@
use domain::{errors::DomainError, ports::StorageVolumeRepository, value_objects::SystemId};
use std::sync::Arc;
pub struct DeleteVolumeHandler {
repo: Arc<dyn StorageVolumeRepository>,
}
impl DeleteVolumeHandler {
pub fn new(repo: Arc<dyn StorageVolumeRepository>) -> Self {
Self { repo }
}
pub async fn execute(&self, id: SystemId) -> Result<(), DomainError> {
self.repo.delete(&id).await
}
}

View File

@@ -1,3 +1,5 @@
pub mod delete_library_path;
pub mod delete_volume;
pub mod ingest_asset;
pub mod register_library_path;
pub mod register_volume;

View File

@@ -1,7 +1,12 @@
pub mod commands;
pub mod queries;
pub use commands::delete_library_path::DeleteLibraryPathHandler;
pub use commands::delete_volume::DeleteVolumeHandler;
pub use commands::ingest_asset::{IngestAssetCommand, IngestAssetHandler};
pub use commands::register_library_path::{RegisterLibraryPathCommand, RegisterLibraryPathHandler};
pub use commands::register_volume::{RegisterVolumeCommand, RegisterVolumeHandler};
pub use queries::check_quota::{CheckQuotaHandler, CheckQuotaQuery};
pub use queries::list_all_library_paths::ListAllLibraryPathsHandler;
pub use queries::list_ingest_paths::{ListIngestPathsHandler, ListIngestPathsQuery};
pub use queries::list_volumes::ListVolumesHandler;

View File

@@ -0,0 +1,16 @@
use domain::{entities::LibraryPath, errors::DomainError, ports::LibraryPathRepository};
use std::sync::Arc;
pub struct ListAllLibraryPathsHandler {
repo: Arc<dyn LibraryPathRepository>,
}
impl ListAllLibraryPathsHandler {
pub fn new(repo: Arc<dyn LibraryPathRepository>) -> Self {
Self { repo }
}
pub async fn execute(&self) -> Result<Vec<LibraryPath>, DomainError> {
self.repo.find_all().await
}
}

View File

@@ -0,0 +1,28 @@
use domain::{
entities::LibraryPath,
errors::DomainError,
ports::LibraryPathRepository,
value_objects::SystemId,
};
use std::sync::Arc;
pub struct ListIngestPathsQuery {
pub user_id: SystemId,
}
pub struct ListIngestPathsHandler {
repo: Arc<dyn LibraryPathRepository>,
}
impl ListIngestPathsHandler {
pub fn new(repo: Arc<dyn LibraryPathRepository>) -> Self {
Self { repo }
}
pub async fn execute(
&self,
query: ListIngestPathsQuery,
) -> Result<Vec<LibraryPath>, DomainError> {
self.repo.find_ingest_destinations(&query.user_id).await
}
}

View File

@@ -0,0 +1,16 @@
use domain::{entities::StorageVolume, errors::DomainError, ports::StorageVolumeRepository};
use std::sync::Arc;
pub struct ListVolumesHandler {
repo: Arc<dyn StorageVolumeRepository>,
}
impl ListVolumesHandler {
pub fn new(repo: Arc<dyn StorageVolumeRepository>) -> Self {
Self { repo }
}
pub async fn execute(&self) -> Result<Vec<StorageVolume>, DomainError> {
self.repo.find_all().await
}
}

View File

@@ -1 +1,4 @@
pub mod check_quota;
pub mod list_all_library_paths;
pub mod list_ingest_paths;
pub mod list_volumes;

View File

@@ -103,6 +103,10 @@ impl UserRepository for InMemoryUserRepository {
self.users.lock().await.remove(&id.to_string());
Ok(())
}
async fn count(&self) -> Result<u64, DomainError> {
Ok(self.users.lock().await.len() as u64)
}
}
in_memory_repo!(InMemoryAssetRepository, Asset);
@@ -173,6 +177,21 @@ impl AssetRepository for InMemoryAssetRepository {
self.count_by_owner(owner_id).await
}
async fn date_summary(
&self,
owner_id: &SystemId,
) -> Result<Vec<(chrono::NaiveDate, u64)>, DomainError> {
let data = self.data.lock().await;
let mut map = std::collections::BTreeMap::<chrono::NaiveDate, u64>::new();
for asset in data.values() {
if &asset.owner_user_id == owner_id {
let date = asset.created_at.as_datetime().date_naive();
*map.entry(date).or_default() += 1;
}
}
Ok(map.into_iter().rev().collect())
}
async fn save(&self, asset: &Asset) -> Result<(), DomainError> {
self.data
.lock()
@@ -385,6 +404,10 @@ impl LibraryPathRepository for InMemoryLibraryPathRepository {
Ok(self.data.lock().await.get(&id.to_string()).cloned())
}
async fn find_all(&self) -> Result<Vec<LibraryPath>, DomainError> {
Ok(self.data.lock().await.values().cloned().collect())
}
async fn find_by_volume(&self, volume_id: &SystemId) -> Result<Vec<LibraryPath>, DomainError> {
Ok(self
.data
@@ -918,6 +941,10 @@ impl PluginRepository for InMemoryPluginRepository {
Ok(self.data.lock().await.get(&id.to_string()).cloned())
}
async fn find_all(&self) -> Result<Vec<Plugin>, DomainError> {
Ok(self.data.lock().await.values().cloned().collect())
}
async fn find_enabled(&self) -> Result<Vec<Plugin>, DomainError> {
Ok(self
.data
@@ -946,6 +973,10 @@ impl PipelineRepository for InMemoryPipelineRepository {
Ok(self.data.lock().await.get(&id.to_string()).cloned())
}
async fn find_all(&self) -> Result<Vec<ProcessingPipeline>, DomainError> {
Ok(self.data.lock().await.values().cloned().collect())
}
async fn find_by_trigger(&self, event: &str) -> Result<Vec<ProcessingPipeline>, DomainError> {
Ok(self
.data

View File

@@ -6,6 +6,7 @@ pub struct Config {
pub nats_url: String,
pub jwt_secret: String,
pub cors_allowed_origins: Vec<String>,
pub max_upload_bytes: usize,
}
impl Config {
@@ -26,6 +27,10 @@ impl Config {
.split(',')
.map(|s| s.trim().to_string())
.collect(),
max_upload_bytes: std::env::var("MAX_UPLOAD_BYTES")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(256 * 1024 * 1024),
}
}
}

View File

@@ -1,5 +1,6 @@
use anyhow::Result;
use axum::Router;
use axum::extract::DefaultBodyLimit;
use axum::http::HeaderValue;
use std::sync::Arc;
use tower_http::{
@@ -32,7 +33,8 @@ pub async fn build_app(config: &Config) -> Result<Router> {
);
let storage_path = std::env::var("STORAGE_PATH").unwrap_or_else(|_| "./data/media".to_string());
let file_storage: Arc<LocalFileStorage> = Arc::new(LocalFileStorage::new(&storage_path));
let file_storage: Arc<dyn domain::ports::FileStoragePort> =
Arc::new(LocalFileStorage::new(&storage_path));
// Build per-context services
let identity = services::identity::build(&pool, &config.jwt_secret);
@@ -68,6 +70,7 @@ pub async fn build_app(config: &Config) -> Result<Router> {
Ok(app_router(&state)
.with_state(state)
.layer(DefaultBodyLimit::max(config.max_upload_bytes))
.layer(TraceLayer::new_for_http())
.layer(cors))
}

View File

@@ -5,10 +5,12 @@ use adapters_postgres::{
PostgresDerivativeRepository, PostgresDuplicateRepository, PostgresIngestTransaction,
PostgresSidecarRepository,
};
use adapters_storage::LocalFileStorage;
use adapters_storage::LocalVolumeFileResolver;
use domain::ports::FileStoragePort;
use application::catalog::{
CreateStackHandler, DeleteAssetHandler, DeleteStackHandler, DetectLivePhotosHandler,
GetAssetHandler, GetStackHandler, GetTimelineHandler, ListDuplicatesHandler,
GetAssetHandler, GetDateSummaryHandler, GetStackHandler, GetTimelineHandler,
ListDuplicatesHandler,
ReadAssetFileHandler, ReadDerivativeHandler, RegisterAssetHandler, ResolveDuplicateHandler,
SearchAssetsHandler, UpdateMetadataHandler,
};
@@ -21,7 +23,7 @@ use super::storage::StorageRepos;
pub fn build(
pool: &PgPool,
storage_repos: &StorageRepos,
file_storage: Arc<LocalFileStorage>,
file_storage: Arc<dyn FileStoragePort>,
event_publisher: Arc<dyn EventPublisher>,
) -> CatalogHandlers {
let asset_repo = Arc::new(PostgresAssetRepository::new(pool.clone()));
@@ -49,15 +51,20 @@ pub fn build(
metadata_repo.clone(),
));
let get_date_summary = Arc::new(GetDateSummaryHandler::new(asset_repo.clone()));
let update_metadata = Arc::new(UpdateMetadataHandler::new(
asset_repo.clone(),
metadata_repo.clone(),
event_publisher.clone(),
));
let volume_resolver = Arc::new(LocalVolumeFileResolver::new(
storage_repos.volume_repo.clone(),
));
let read_asset_file = Arc::new(ReadAssetFileHandler::new(
asset_repo.clone(),
file_storage.clone(),
volume_resolver,
));
let read_derivative = Arc::new(ReadDerivativeHandler::new(
@@ -103,6 +110,7 @@ pub fn build(
ingest_asset,
get_asset,
get_timeline,
get_date_summary,
update_metadata,
read_asset_file,
read_derivative,

View File

@@ -26,9 +26,10 @@ pub fn build(pool: &PgPool, jwt_secret: &str) -> IdentityServices {
issuer.clone(),
refresh_repo.clone(),
));
let get_profile = Arc::new(GetProfileHandler::new(user_repo));
let get_profile = Arc::new(GetProfileHandler::new(user_repo.clone()));
let refresh = Arc::new(RefreshTokenHandler::new(
refresh_repo.clone(),
user_repo,
issuer.clone(),
));
let logout = Arc::new(LogoutHandler::new(refresh_repo.clone()));

View File

@@ -6,7 +6,8 @@ use adapters_postgres::{
};
use application::processing::{
CompleteJobHandler, ConfigurePipelineHandler, EnqueueJobHandler, FailJobHandler,
ListJobsHandler, ManagePluginHandler, ReportBatchProgressHandler, StartJobHandler,
ListJobsHandler, ListPipelinesHandler, ListPluginsHandler, ManagePluginHandler,
ReportBatchProgressHandler, StartJobHandler,
};
use domain::ports::EventPublisher;
use presentation::state::ProcessingHandlers;
@@ -35,6 +36,8 @@ pub fn build(pool: &PgPool, event_publisher: Arc<dyn EventPublisher>) -> Process
let list_jobs = Arc::new(ListJobsHandler::new(job_repo.clone()));
let batch_progress = Arc::new(ReportBatchProgressHandler::new(batch_repo, job_repo));
let manage_plugin = Arc::new(ManagePluginHandler::new(plugin_repo.clone()));
let list_plugins = Arc::new(ListPluginsHandler::new(plugin_repo.clone()));
let list_pipelines = Arc::new(ListPipelinesHandler::new(pipeline_repo.clone()));
let configure_pipeline = Arc::new(ConfigurePipelineHandler::new(pipeline_repo, plugin_repo));
ProcessingHandlers {
@@ -45,6 +48,8 @@ pub fn build(pool: &PgPool, event_publisher: Arc<dyn EventPublisher>) -> Process
list_jobs,
batch_progress,
manage_plugin,
list_plugins,
configure_pipeline,
list_pipelines,
}
}

View File

@@ -4,12 +4,17 @@ use adapters_postgres::{
PgPool, PostgresLibraryPathRepository, PostgresQuotaRepository,
PostgresStorageVolumeRepository, PostgresUsageLedgerRepository,
};
use application::storage::{CheckQuotaHandler, RegisterLibraryPathHandler, RegisterVolumeHandler};
use application::storage::{
CheckQuotaHandler, DeleteLibraryPathHandler, DeleteVolumeHandler, ListAllLibraryPathsHandler,
ListIngestPathsHandler, ListVolumesHandler, RegisterLibraryPathHandler,
RegisterVolumeHandler,
};
use presentation::state::StorageHandlers;
/// Shared storage repos needed by other bounded contexts (catalog ingest, etc.).
pub struct StorageRepos {
pub path_repo: Arc<PostgresLibraryPathRepository>,
pub path_repo: Arc<dyn domain::ports::LibraryPathRepository>,
pub volume_repo: Arc<dyn domain::ports::StorageVolumeRepository>,
}
pub fn build(pool: &PgPool) -> (StorageRepos, StorageHandlers) {
@@ -18,20 +23,33 @@ pub fn build(pool: &PgPool) -> (StorageRepos, StorageHandlers) {
let quota_repo = Arc::new(PostgresQuotaRepository::new(pool.clone()));
let ledger_repo = Arc::new(PostgresUsageLedgerRepository::new(pool.clone()));
let list_volumes = Arc::new(ListVolumesHandler::new(volume_repo.clone()));
let register_volume = Arc::new(RegisterVolumeHandler::new(volume_repo.clone()));
let delete_volume = Arc::new(DeleteVolumeHandler::new(volume_repo.clone()));
let register_library_path = Arc::new(RegisterLibraryPathHandler::new(
volume_repo,
volume_repo.clone(),
path_repo.clone(),
));
let list_ingest_paths = Arc::new(ListIngestPathsHandler::new(path_repo.clone()));
let list_all_library_paths = Arc::new(ListAllLibraryPathsHandler::new(path_repo.clone()));
let delete_library_path = Arc::new(DeleteLibraryPathHandler::new(path_repo.clone()));
let check_quota = Arc::new(CheckQuotaHandler::new(quota_repo, ledger_repo));
let handlers = StorageHandlers {
register_volume,
delete_volume,
list_volumes,
register_library_path,
list_ingest_paths,
list_all_library_paths,
delete_library_path,
check_quota,
};
let repos = StorageRepos { path_repo };
let repos = StorageRepos {
path_repo,
volume_repo,
};
(repos, handlers)
}

View File

@@ -32,6 +32,10 @@ pub trait AssetRepository: Send + Sync {
owner_id: &SystemId,
filters: &AssetFilters,
) -> Result<u64, DomainError>;
async fn date_summary(
&self,
owner_id: &SystemId,
) -> Result<Vec<(chrono::NaiveDate, u64)>, DomainError>;
async fn save(&self, asset: &Asset) -> Result<(), DomainError>;
async fn delete(&self, id: &SystemId) -> Result<(), DomainError>;
}

View File

@@ -126,6 +126,7 @@ pub struct User {
pub username: String,
pub email: Email,
pub password_hash: PasswordHash,
pub role: String,
pub created_at: DateTime<Utc>,
}
@@ -136,9 +137,14 @@ impl User {
username: username.into(),
email,
password_hash,
role: "user".to_string(),
created_at: Utc::now(),
}
}
pub fn is_admin(&self) -> bool {
self.role == "admin"
}
}
// --- RefreshToken ---

View File

@@ -12,6 +12,7 @@ pub trait UserRepository: Send + Sync {
async fn find_by_username(&self, username: &str) -> Result<Option<User>, DomainError>;
async fn save(&self, user: &User) -> Result<(), DomainError>;
async fn delete(&self, id: &SystemId) -> Result<(), DomainError>;
async fn count(&self) -> Result<u64, DomainError>;
}
// --- RoleRepository ---

View File

@@ -34,6 +34,7 @@ pub trait JobBatchRepository: Send + Sync {
#[async_trait]
pub trait PluginRepository: Send + Sync {
async fn find_by_id(&self, id: &SystemId) -> Result<Option<Plugin>, DomainError>;
async fn find_all(&self) -> Result<Vec<Plugin>, DomainError>;
async fn find_enabled(&self) -> Result<Vec<Plugin>, DomainError>;
async fn save(&self, plugin: &Plugin) -> Result<(), DomainError>;
}
@@ -43,6 +44,7 @@ pub trait PluginRepository: Send + Sync {
#[async_trait]
pub trait PipelineRepository: Send + Sync {
async fn find_by_id(&self, id: &SystemId) -> Result<Option<ProcessingPipeline>, DomainError>;
async fn find_all(&self) -> Result<Vec<ProcessingPipeline>, DomainError>;
async fn find_by_trigger(&self, event: &str) -> Result<Vec<ProcessingPipeline>, DomainError>;
async fn save(&self, pipeline: &ProcessingPipeline) -> Result<(), DomainError>;
}

View File

@@ -23,6 +23,7 @@ pub trait StorageVolumeRepository: Send + Sync {
#[async_trait]
pub trait LibraryPathRepository: Send + Sync {
async fn find_by_id(&self, id: &SystemId) -> Result<Option<LibraryPath>, DomainError>;
async fn find_all(&self) -> Result<Vec<LibraryPath>, DomainError>;
async fn find_by_volume(&self, volume_id: &SystemId) -> Result<Vec<LibraryPath>, DomainError>;
async fn find_ingest_destinations(
&self,
@@ -84,6 +85,23 @@ pub trait IngestTransaction: Send + Sync {
async fn record_usage(&self, entry: &UsageLedgerEntry) -> Result<(), DomainError>;
}
// --- VolumeFileResolver ---
#[async_trait]
pub trait VolumeFileResolver: Send + Sync {
async fn open_by_volume(
&self,
volume_id: &SystemId,
relative_path: &str,
) -> Result<(DataStream, u64), DomainError>;
async fn read_by_volume(
&self,
volume_id: &SystemId,
relative_path: &str,
) -> Result<Bytes, DomainError>;
}
// --- FileStoragePort ---
#[derive(Debug, Clone)]

View File

@@ -7,11 +7,14 @@ use crate::{
};
use api_types::{
requests::{RegisterAssetRequest, TagAssetRequest},
responses::{AssetResponse, IngestResponse, TagResponse, TimelineResponse},
responses::{
AssetResponse, DateCountEntry, DateSummaryResponse, IngestResponse, TagResponse,
TimelineResponse,
},
};
use application::{
catalog::{
DeleteAssetCommand, GetAssetQuery, GetTimelineQuery, ReadAssetFileQuery,
DeleteAssetCommand, GetAssetQuery, GetDateSummaryQuery, GetTimelineQuery, ReadAssetFileQuery,
ReadDerivativeQuery, RegisterAssetCommand, SearchAssetsQuery, UpdateMetadataCommand,
},
organization::TagAssetCommand,
@@ -225,6 +228,25 @@ pub async fn timeline(
}))
}
pub async fn date_summary(
State(state): State<AppState>,
claims: JwtClaims,
) -> Result<Json<DateSummaryResponse>, AppError> {
let query = GetDateSummaryQuery {
owner_id: claims.user_id,
};
let entries = state.catalog.get_date_summary.execute(query).await?;
Ok(Json(DateSummaryResponse {
dates: entries
.into_iter()
.map(|e| DateCountEntry {
date: e.date.to_string(),
count: e.count,
})
.collect(),
}))
}
#[utoipa::path(
put, path = "/api/v1/assets/{id}/metadata",
request_body = api_types::requests::UpdateMetadataRequest,

View File

@@ -34,7 +34,7 @@ pub async fn register(
let user = state.identity.register.execute(cmd).await?;
let token = state
.token_issuer
.issue(&user.id, "user")
.issue(&user.id, &user.role)
.await
.map_err(AppError::from)?;
let (refresh_token, _) =

View File

@@ -198,6 +198,15 @@ pub async fn batch_progress(
Ok(Json(BatchProgressResponse::from_domain(&progress)))
}
pub async fn list_plugins(
State(state): State<AppState>,
claims: JwtClaims,
) -> Result<Json<Vec<PluginResponse>>, AppError> {
super::require_admin(&claims)?;
let plugins = state.processing.list_plugins.execute().await?;
Ok(Json(plugins.iter().map(PluginResponse::from_domain).collect()))
}
#[utoipa::path(
post, path = "/api/v1/plugins",
request_body = ManagePluginRequest,
@@ -251,6 +260,15 @@ pub async fn manage_plugin(
))
}
pub async fn list_pipelines(
State(state): State<AppState>,
claims: JwtClaims,
) -> Result<Json<Vec<PipelineResponse>>, AppError> {
super::require_admin(&claims)?;
let pipelines = state.processing.list_pipelines.execute().await?;
Ok(Json(pipelines.iter().map(PipelineResponse::from_domain).collect()))
}
#[utoipa::path(
post, path = "/api/v1/pipelines",
request_body = ConfigurePipelineRequest,

View File

@@ -3,14 +3,35 @@ use api_types::{
requests::{CheckQuotaParams, RegisterLibraryPathRequest, RegisterVolumeRequest},
responses::{LibraryPathResponse, QuotaCheckResponse, VolumeResponse},
};
use application::storage::{CheckQuotaQuery, RegisterLibraryPathCommand, RegisterVolumeCommand};
use application::storage::{
CheckQuotaQuery, ListIngestPathsQuery, RegisterLibraryPathCommand, RegisterVolumeCommand,
};
use axum::{
Json,
extract::{Query, State},
extract::{Path, Query, State},
http::StatusCode,
};
use domain::value_objects::SystemId;
#[utoipa::path(
get, path = "/api/v1/storage/volumes",
security(("bearer_token" = [])),
responses(
(status = 200, description = "All volumes", body = Vec<VolumeResponse>),
(status = 401, description = "Unauthorized")
)
)]
pub async fn list_volumes(
State(state): State<AppState>,
claims: JwtClaims,
) -> Result<Json<Vec<VolumeResponse>>, AppError> {
super::require_admin(&claims)?;
let volumes = state.storage.list_volumes.execute().await?;
Ok(Json(
volumes.iter().map(VolumeResponse::from_domain).collect(),
))
}
#[utoipa::path(
post, path = "/api/v1/storage/volumes",
request_body = RegisterVolumeRequest,
@@ -66,6 +87,75 @@ pub async fn register_library_path(
))
}
#[utoipa::path(
get, path = "/api/v1/storage/library-paths",
security(("bearer_token" = [])),
responses(
(status = 200, description = "Ingest destinations", body = Vec<LibraryPathResponse>),
(status = 401, description = "Unauthorized")
)
)]
pub async fn list_ingest_paths(
State(state): State<AppState>,
claims: JwtClaims,
) -> Result<Json<Vec<LibraryPathResponse>>, AppError> {
let query = ListIngestPathsQuery {
user_id: claims.user_id,
};
let paths = state.storage.list_ingest_paths.execute(query).await?;
Ok(Json(
paths.iter().map(LibraryPathResponse::from_domain).collect(),
))
}
#[utoipa::path(
get, path = "/api/v1/storage/library-paths/all",
security(("bearer_token" = [])),
responses(
(status = 200, description = "All library paths", body = Vec<LibraryPathResponse>),
(status = 401, description = "Unauthorized"),
(status = 403, description = "Forbidden")
)
)]
pub async fn list_all_library_paths(
State(state): State<AppState>,
claims: JwtClaims,
) -> Result<Json<Vec<LibraryPathResponse>>, AppError> {
super::require_admin(&claims)?;
let paths = state.storage.list_all_library_paths.execute().await?;
Ok(Json(
paths.iter().map(LibraryPathResponse::from_domain).collect(),
))
}
pub async fn delete_volume(
State(state): State<AppState>,
claims: JwtClaims,
Path((id,)): Path<(uuid::Uuid,)>,
) -> Result<StatusCode, AppError> {
super::require_admin(&claims)?;
state
.storage
.delete_volume
.execute(SystemId::from_uuid(id))
.await?;
Ok(StatusCode::NO_CONTENT)
}
pub async fn delete_library_path(
State(state): State<AppState>,
claims: JwtClaims,
Path((id,)): Path<(uuid::Uuid,)>,
) -> Result<StatusCode, AppError> {
super::require_admin(&claims)?;
state
.storage
.delete_library_path
.execute(SystemId::from_uuid(id))
.await?;
Ok(StatusCode::NO_CONTENT)
}
const DEFAULT_QUOTA_USAGE_TYPE: &str = "storage_bytes";
const DEFAULT_QUOTA_AMOUNT: u64 = 0;

View File

@@ -13,6 +13,7 @@ pub fn routes() -> Router<AppState> {
.route("/assets/ingest", post(assets::ingest))
.route("/assets/register", post(assets::register_asset))
.route("/assets/timeline", get(assets::timeline))
.route("/assets/date-summary", get(assets::date_summary))
.route(
"/assets/{id}",
get(assets::get_asset).delete(assets::delete_asset),

View File

@@ -14,6 +14,6 @@ pub fn routes() -> Router<AppState> {
.route("/jobs/{id}/complete", post(processing::complete_job))
.route("/jobs/{id}/fail", post(processing::fail_job))
.route("/jobs/batches/{id}", get(processing::batch_progress))
.route("/plugins", post(processing::manage_plugin))
.route("/pipelines", post(processing::configure_pipeline))
.route("/plugins", get(processing::list_plugins).post(processing::manage_plugin))
.route("/pipelines", get(processing::list_pipelines).post(processing::configure_pipeline))
}

View File

@@ -1,15 +1,27 @@
use crate::{handlers::storage, state::AppState};
use axum::{
Router,
routing::{get, post},
routing::{delete, get, post},
};
pub fn routes() -> Router<AppState> {
Router::new()
.route("/storage/volumes", post(storage::register_volume))
.route(
"/storage/volumes",
get(storage::list_volumes).post(storage::register_volume),
)
.route("/storage/volumes/{id}", delete(storage::delete_volume))
.route(
"/storage/library-paths",
post(storage::register_library_path),
get(storage::list_ingest_paths).post(storage::register_library_path),
)
.route(
"/storage/library-paths/all",
get(storage::list_all_library_paths),
)
.route(
"/storage/library-paths/{id}",
delete(storage::delete_library_path),
)
.route("/storage/quota", get(storage::check_quota))
}

View File

@@ -3,9 +3,10 @@ use std::sync::Arc;
use application::{
catalog::{
CreateStackHandler, DeleteAssetHandler, DeleteStackHandler, DetectLivePhotosHandler,
GetAssetHandler, GetStackHandler, GetTimelineHandler, ListDuplicatesHandler,
ListStacksHandler, ReadAssetFileHandler, ReadDerivativeHandler, RegisterAssetHandler,
ResolveDuplicateHandler, SearchAssetsHandler, UpdateMetadataHandler,
GetAssetHandler, GetDateSummaryHandler, GetStackHandler, GetTimelineHandler,
ListDuplicatesHandler, ListStacksHandler, ReadAssetFileHandler, ReadDerivativeHandler,
RegisterAssetHandler, ResolveDuplicateHandler, SearchAssetsHandler,
UpdateMetadataHandler,
},
identity::{
GetProfileHandler, LoginUserHandler, LogoutHandler, RefreshTokenHandler,
@@ -17,7 +18,8 @@ use application::{
},
processing::{
CompleteJobHandler, ConfigurePipelineHandler, EnqueueJobHandler, FailJobHandler,
ListJobsHandler, ManagePluginHandler, ReportBatchProgressHandler, StartJobHandler,
ListJobsHandler, ListPipelinesHandler, ListPluginsHandler, ManagePluginHandler,
ReportBatchProgressHandler, StartJobHandler,
},
sharing::{
AccessSharedResourceHandler, GenerateShareLinkHandler, RevokeShareHandler,
@@ -28,7 +30,9 @@ use application::{
ImportSidecarHandler, ResolveConflictHandler,
},
storage::{
CheckQuotaHandler, IngestAssetHandler, RegisterLibraryPathHandler, RegisterVolumeHandler,
CheckQuotaHandler, DeleteLibraryPathHandler, DeleteVolumeHandler, IngestAssetHandler,
ListAllLibraryPathsHandler, ListIngestPathsHandler, ListVolumesHandler,
RegisterLibraryPathHandler, RegisterVolumeHandler,
},
};
use domain::ports::{RefreshTokenRepository, TokenIssuer};
@@ -48,6 +52,7 @@ pub struct CatalogHandlers {
pub ingest_asset: Arc<IngestAssetHandler>,
pub get_asset: Arc<GetAssetHandler>,
pub get_timeline: Arc<GetTimelineHandler>,
pub get_date_summary: Arc<GetDateSummaryHandler>,
pub update_metadata: Arc<UpdateMetadataHandler>,
pub read_asset_file: Arc<ReadAssetFileHandler>,
pub read_derivative: Arc<ReadDerivativeHandler>,
@@ -76,7 +81,12 @@ pub struct OrganizationHandlers {
#[derive(Clone)]
pub struct StorageHandlers {
pub register_volume: Arc<RegisterVolumeHandler>,
pub delete_volume: Arc<DeleteVolumeHandler>,
pub list_volumes: Arc<ListVolumesHandler>,
pub register_library_path: Arc<RegisterLibraryPathHandler>,
pub list_ingest_paths: Arc<ListIngestPathsHandler>,
pub list_all_library_paths: Arc<ListAllLibraryPathsHandler>,
pub delete_library_path: Arc<DeleteLibraryPathHandler>,
pub check_quota: Arc<CheckQuotaHandler>,
}
@@ -107,7 +117,9 @@ pub struct ProcessingHandlers {
pub list_jobs: Arc<ListJobsHandler>,
pub batch_progress: Arc<ReportBatchProgressHandler>,
pub manage_plugin: Arc<ManagePluginHandler>,
pub list_plugins: Arc<ListPluginsHandler>,
pub configure_pipeline: Arc<ConfigurePipelineHandler>,
pub list_pipelines: Arc<ListPipelinesHandler>,
}
#[derive(Clone)]

View File

@@ -3,6 +3,7 @@ use crate::plugins::{
DirectoryScannerPlugin, MetadataExtractorPlugin, NoOpPlugin, SidecarSyncPlugin,
ThumbnailGeneratorPlugin,
};
use adapters_storage::LocalVolumeFileResolver;
use application::catalog::RegisterAssetHandler;
use domain::ports::{
EventPublisher, MetadataExtractorPort, SidecarWriterPort, ThumbnailGeneratorPort,
@@ -21,15 +22,18 @@ pub fn build_plugin_registry(
) -> InMemoryPluginRegistry {
let mut registry = InMemoryPluginRegistry::new();
let volume_resolver = Arc::new(LocalVolumeFileResolver::new(repos.volume.clone()));
registry.register(Arc::new(NoOpPlugin));
registry.register(Arc::new(MetadataExtractorPlugin::new(
repos.asset.clone(),
file_storage.clone(),
volume_resolver.clone(),
repos.metadata.clone(),
extractor,
)));
registry.register(Arc::new(ThumbnailGeneratorPlugin::new(
repos.asset.clone(),
volume_resolver,
file_storage.clone(),
repos.derivative.clone(),
thumbnail_gen,
@@ -43,7 +47,6 @@ pub fn build_plugin_registry(
registry.register(Arc::new(DirectoryScannerPlugin::new(
repos.volume.clone(),
repos.library_path.clone(),
file_storage.clone(),
register_handler,
)));

View File

@@ -13,6 +13,8 @@ mod sweep;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenvy::dotenv().ok();
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::from_default_env().add_directive("worker=info".parse()?),

View File

@@ -3,17 +3,17 @@ use async_trait::async_trait;
use domain::{
catalog::entities::AssetType,
errors::DomainError,
ports::{FileStoragePort, LibraryPathRepository, PluginExecutor, StorageVolumeRepository},
ports::{LibraryPathRepository, PluginExecutor, StorageVolumeRepository},
value_objects::{MetadataValue, StructuredData, SystemId},
};
use sha2::{Digest, Sha256};
use std::path::PathBuf;
use std::sync::Arc;
use tracing::{info, warn};
pub struct DirectoryScannerPlugin {
volume_repo: Arc<dyn StorageVolumeRepository>,
path_repo: Arc<dyn LibraryPathRepository>,
file_storage: Arc<dyn FileStoragePort>,
register_handler: Arc<RegisterAssetHandler>,
}
@@ -21,13 +21,11 @@ impl DirectoryScannerPlugin {
pub fn new(
volume_repo: Arc<dyn StorageVolumeRepository>,
path_repo: Arc<dyn LibraryPathRepository>,
file_storage: Arc<dyn FileStoragePort>,
register_handler: Arc<RegisterAssetHandler>,
) -> Self {
Self {
volume_repo,
path_repo,
file_storage,
register_handler,
}
}
@@ -55,7 +53,7 @@ fn classify(filename: &str) -> Option<(AssetType, &'static str)> {
#[async_trait]
impl PluginExecutor for DirectoryScannerPlugin {
fn plugin_name(&self) -> &str {
"directory_scanner"
"scan_directory"
}
async fn execute(
@@ -92,8 +90,14 @@ impl PluginExecutor for DirectoryScannerPlugin {
DomainError::Validation(format!("LibraryPath {} has no designated owner", path_id))
})?;
let volume_base = volume
.uri_prefix
.strip_prefix("file://")
.unwrap_or(&volume.uri_prefix);
let volume_root = PathBuf::from(volume_base);
let scan_root = &library_path.relative_path;
info!(path = scan_root, volume = %volume.volume_name, "scanning directory");
info!(path = scan_root, volume = %volume.volume_name, base = %volume_root.display(), "scanning directory");
let mut found = 0u64;
let mut registered = 0u64;
@@ -101,29 +105,40 @@ impl PluginExecutor for DirectoryScannerPlugin {
let mut dirs_to_scan = vec![scan_root.to_string()];
while let Some(dir) = dirs_to_scan.pop() {
let entries = match self.file_storage.list_directory(&dir).await {
Ok(e) => e,
let abs_dir = if dir.is_empty() {
volume_root.clone()
} else {
volume_root.join(&dir)
};
let mut read_dir = match tokio::fs::read_dir(&abs_dir).await {
Ok(r) => r,
Err(e) => {
warn!(dir = dir, error = %e, "failed to list directory, skipping");
warn!(dir = %abs_dir.display(), error = %e, "failed to list directory, skipping");
continue;
}
};
for entry in entries {
let full_path = if dir.is_empty() {
entry.path.clone()
while let Ok(Some(entry)) = read_dir.next_entry().await {
let meta = match entry.metadata().await {
Ok(m) => m,
Err(_) => continue,
};
let name = entry.file_name().to_string_lossy().to_string();
let relative = if dir.is_empty() {
name.clone()
} else {
format!("{}/{}", dir, entry.path)
format!("{}/{}", dir, name)
};
if entry.is_directory {
dirs_to_scan.push(full_path);
if meta.is_dir() {
dirs_to_scan.push(relative);
continue;
}
found += 1;
let (asset_type, mime_type) = match classify(&entry.path) {
let (asset_type, mime_type) = match classify(&name) {
Some(c) => c,
None => {
skipped += 1;
@@ -131,10 +146,11 @@ impl PluginExecutor for DirectoryScannerPlugin {
}
};
let data = match self.file_storage.read_file(&full_path).await {
let abs_path = volume_root.join(&relative);
let data = match tokio::fs::read(&abs_path).await {
Ok(d) => d,
Err(e) => {
warn!(path = full_path, error = %e, "failed to read file, skipping");
warn!(path = %abs_path.display(), error = %e, "failed to read file, skipping");
skipped += 1;
continue;
}
@@ -144,7 +160,7 @@ impl PluginExecutor for DirectoryScannerPlugin {
let cmd = RegisterAssetCommand {
volume_id: library_path.volume_id,
relative_path: full_path.clone(),
relative_path: relative.clone(),
checksum,
asset_type,
mime_type: mime_type.to_string(),
@@ -156,11 +172,11 @@ impl PluginExecutor for DirectoryScannerPlugin {
Ok((asset, dup)) => {
registered += 1;
if dup.is_some() {
info!(path = full_path, asset_id = %asset.asset_id, "registered (duplicate detected)");
info!(path = relative, asset_id = %asset.asset_id, "registered (duplicate detected)");
}
}
Err(e) => {
warn!(path = full_path, error = %e, "failed to register asset");
warn!(path = relative, error = %e, "failed to register asset");
skipped += 1;
}
}

View File

@@ -3,8 +3,8 @@ use domain::{
entities::{AssetMetadata, MetadataSource},
errors::DomainError,
ports::{
AssetMetadataRepository, AssetRepository, FileStoragePort, MetadataExtractorPort,
PluginExecutor,
AssetMetadataRepository, AssetRepository, MetadataExtractorPort, PluginExecutor,
VolumeFileResolver,
},
value_objects::{MetadataValue, StructuredData, SystemId},
};
@@ -13,7 +13,7 @@ use tracing::info;
pub struct MetadataExtractorPlugin {
asset_repo: Arc<dyn AssetRepository>,
file_storage: Arc<dyn FileStoragePort>,
volume_resolver: Arc<dyn VolumeFileResolver>,
metadata_repo: Arc<dyn AssetMetadataRepository>,
extractor: Arc<dyn MetadataExtractorPort>,
}
@@ -21,13 +21,13 @@ pub struct MetadataExtractorPlugin {
impl MetadataExtractorPlugin {
pub fn new(
asset_repo: Arc<dyn AssetRepository>,
file_storage: Arc<dyn FileStoragePort>,
volume_resolver: Arc<dyn VolumeFileResolver>,
metadata_repo: Arc<dyn AssetMetadataRepository>,
extractor: Arc<dyn MetadataExtractorPort>,
) -> Self {
Self {
asset_repo,
file_storage,
volume_resolver,
metadata_repo,
extractor,
}
@@ -56,8 +56,13 @@ impl PluginExecutor for MetadataExtractorPlugin {
.await?
.ok_or_else(|| DomainError::NotFound(format!("Asset {} not found", asset_id)))?;
let path = &asset.source_reference.relative_path;
let data = self.file_storage.read_file(path).await?;
let data = self
.volume_resolver
.read_by_volume(
&asset.source_reference.volume_id,
&asset.source_reference.relative_path,
)
.await?;
let mut extracted = self.extractor.extract(&data)?;
extracted.insert("file_size_bytes", MetadataValue::Integer(data.len() as i64));

View File

@@ -4,7 +4,7 @@ use domain::{
errors::DomainError,
ports::{
AssetRepository, DerivativeRepository, FileStoragePort, PluginExecutor,
ThumbnailGeneratorPort,
ThumbnailGeneratorPort, VolumeFileResolver,
},
value_objects::{MetadataValue, StructuredData, SystemId},
};
@@ -13,6 +13,7 @@ use tracing::info;
pub struct ThumbnailGeneratorPlugin {
asset_repo: Arc<dyn AssetRepository>,
volume_resolver: Arc<dyn VolumeFileResolver>,
file_storage: Arc<dyn FileStoragePort>,
derivative_repo: Arc<dyn DerivativeRepository>,
thumbnail_gen: Arc<dyn ThumbnailGeneratorPort>,
@@ -21,12 +22,14 @@ pub struct ThumbnailGeneratorPlugin {
impl ThumbnailGeneratorPlugin {
pub fn new(
asset_repo: Arc<dyn AssetRepository>,
volume_resolver: Arc<dyn VolumeFileResolver>,
file_storage: Arc<dyn FileStoragePort>,
derivative_repo: Arc<dyn DerivativeRepository>,
thumbnail_gen: Arc<dyn ThumbnailGeneratorPort>,
) -> Self {
Self {
asset_repo,
volume_resolver,
file_storage,
derivative_repo,
thumbnail_gen,
@@ -92,8 +95,11 @@ impl PluginExecutor for ThumbnailGeneratorPlugin {
}
let source_bytes = self
.file_storage
.read_file(&asset.source_reference.relative_path)
.volume_resolver
.read_by_volume(
&asset.source_reference.volume_id,
&asset.source_reference.relative_path,
)
.await?;
let output = self