diff --git a/Cargo.lock b/Cargo.lock index 3f25f1c..9cd04f3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -815,6 +815,7 @@ dependencies = [ "async-trait", "bytes", "chrono", + "email_address", "futures", "serde", "serde_json", @@ -859,6 +860,15 @@ dependencies = [ "serde", ] +[[package]] +name = "email_address" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e079f19b08ca6239f47f8ba8509c11cf3ea30095831f7fed61441475edd8c449" +dependencies = [ + "serde", +] + [[package]] name = "encoding_rs" version = "0.8.35" @@ -894,6 +904,16 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" +[[package]] +name = "errno" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" +dependencies = [ + "libc", + "windows-sys 0.61.2", +] + [[package]] name = "etcetera" version = "0.8.0" @@ -2979,6 +2999,16 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook-registry" +version = "1.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4db69cba1110affc0e9f7bcd48bbf87b3f4fc7c61fc9155afd4c469eb3d6c1b" +dependencies = [ + "errno", + "libc", +] + [[package]] name = "signatory" version = "0.27.1" @@ -3452,6 +3482,7 @@ dependencies = [ "libc", "mio", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.61.2", diff --git a/Cargo.toml b/Cargo.toml index 6d21e7f..958376a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ members = [ resolver = "2" [workspace.dependencies] -tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "net", "time", "sync"] } +tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "net", "time", "sync", "signal"] } async-trait = "0.1" futures = "0.3" bytes = "1.0" @@ -28,6 +28,7 @@ anyhow = "1.0" thiserror = "2.0" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" +email_address = "0.2" sha2 = "0.10" uuid = { version = "1.0", features = ["v4", "serde"] } chrono = { version = "0.4", features = ["serde"] } diff --git a/crates/adapters/auth/src/jwt.rs b/crates/adapters/auth/src/jwt.rs index 18c839c..3f9e63a 100644 --- a/crates/adapters/auth/src/jwt.rs +++ b/crates/adapters/auth/src/jwt.rs @@ -19,10 +19,14 @@ pub struct JwtTokenIssuer { impl JwtTokenIssuer { pub fn new(secret: &str) -> Self { + Self::with_expiry(secret, 1) + } + + pub fn with_expiry(secret: &str, expiry_hours: i64) -> Self { Self { encoding_key: EncodingKey::from_secret(secret.as_bytes()), decoding_key: DecodingKey::from_secret(secret.as_bytes()), - expiry_hours: 24, + expiry_hours, } } } diff --git a/crates/adapters/event-payload/src/lib.rs b/crates/adapters/event-payload/src/lib.rs index 9e884b9..7266694 100644 --- a/crates/adapters/event-payload/src/lib.rs +++ b/crates/adapters/event-payload/src/lib.rs @@ -54,6 +54,29 @@ pub enum EventPayload { error: String, timestamp: String, }, + UserCreated { + user_id: String, + timestamp: String, + }, + UserDeleted { + user_id: String, + timestamp: String, + }, + AlbumCreated { + album_id: String, + creator_id: String, + timestamp: String, + }, + TagCreated { + tag_id: String, + asset_id: String, + timestamp: String, + }, + DuplicateDetected { + group_id: String, + asset_ids: Vec, + timestamp: String, + }, } impl EventPayload { @@ -69,6 +92,11 @@ impl EventPayload { Self::JobEnqueued { .. } => "jobs.enqueued", Self::JobCompleted { .. } => "jobs.completed", Self::JobFailed { .. } => "jobs.failed", + Self::UserCreated { .. } => "users.created", + Self::UserDeleted { .. } => "users.deleted", + Self::AlbumCreated { .. } => "albums.created", + Self::TagCreated { .. } => "tags.created", + Self::DuplicateDetected { .. } => "duplicates.detected", } } } @@ -163,6 +191,41 @@ impl From<&DomainEvent> for EventPayload { error: error.clone(), timestamp: timestamp.to_string(), }, + DomainEvent::UserCreated { user_id, timestamp } => Self::UserCreated { + user_id: user_id.to_string(), + timestamp: timestamp.to_string(), + }, + DomainEvent::UserDeleted { user_id, timestamp } => Self::UserDeleted { + user_id: user_id.to_string(), + timestamp: timestamp.to_string(), + }, + DomainEvent::AlbumCreated { + album_id, + creator_id, + timestamp, + } => Self::AlbumCreated { + album_id: album_id.to_string(), + creator_id: creator_id.to_string(), + timestamp: timestamp.to_string(), + }, + DomainEvent::TagCreated { + tag_id, + asset_id, + timestamp, + } => Self::TagCreated { + tag_id: tag_id.to_string(), + asset_id: asset_id.to_string(), + timestamp: timestamp.to_string(), + }, + DomainEvent::DuplicateDetected { + group_id, + asset_ids, + timestamp, + } => Self::DuplicateDetected { + group_id: group_id.to_string(), + asset_ids: asset_ids.iter().map(|id| id.to_string()).collect(), + timestamp: timestamp.to_string(), + }, } } } @@ -273,6 +336,44 @@ impl TryFrom for DomainEvent { error, timestamp: parse_timestamp(×tamp)?, }, + EventPayload::UserCreated { user_id, timestamp } => DomainEvent::UserCreated { + user_id: SystemId::from_uuid(parse_uuid(&user_id, "user_id")?), + timestamp: parse_timestamp(×tamp)?, + }, + EventPayload::UserDeleted { user_id, timestamp } => DomainEvent::UserDeleted { + user_id: SystemId::from_uuid(parse_uuid(&user_id, "user_id")?), + timestamp: parse_timestamp(×tamp)?, + }, + EventPayload::AlbumCreated { + album_id, + creator_id, + timestamp, + } => DomainEvent::AlbumCreated { + album_id: SystemId::from_uuid(parse_uuid(&album_id, "album_id")?), + creator_id: SystemId::from_uuid(parse_uuid(&creator_id, "creator_id")?), + timestamp: parse_timestamp(×tamp)?, + }, + EventPayload::TagCreated { + tag_id, + asset_id, + timestamp, + } => DomainEvent::TagCreated { + tag_id: SystemId::from_uuid(parse_uuid(&tag_id, "tag_id")?), + asset_id: SystemId::from_uuid(parse_uuid(&asset_id, "asset_id")?), + timestamp: parse_timestamp(×tamp)?, + }, + EventPayload::DuplicateDetected { + group_id, + asset_ids, + timestamp, + } => DomainEvent::DuplicateDetected { + group_id: SystemId::from_uuid(parse_uuid(&group_id, "group_id")?), + asset_ids: asset_ids + .iter() + .map(|id| parse_uuid(id, "asset_id").map(SystemId::from_uuid)) + .collect::, _>>()?, + timestamp: parse_timestamp(×tamp)?, + }, }) } } diff --git a/crates/adapters/postgres/migrations/013_asset_stacks.sql b/crates/adapters/postgres/migrations/013_asset_stacks.sql new file mode 100644 index 0000000..f662e38 --- /dev/null +++ b/crates/adapters/postgres/migrations/013_asset_stacks.sql @@ -0,0 +1,9 @@ +CREATE TABLE asset_stacks ( + stack_id UUID PRIMARY KEY, + stack_type TEXT NOT NULL, + primary_asset_id UUID NOT NULL REFERENCES assets(asset_id), + owner_user_id UUID NOT NULL, + members JSONB NOT NULL DEFAULT '[]' +); + +CREATE INDEX idx_stacks_owner ON asset_stacks(owner_user_id); diff --git a/crates/adapters/postgres/migrations/014_refresh_tokens.sql b/crates/adapters/postgres/migrations/014_refresh_tokens.sql new file mode 100644 index 0000000..3b898e4 --- /dev/null +++ b/crates/adapters/postgres/migrations/014_refresh_tokens.sql @@ -0,0 +1,10 @@ +CREATE TABLE refresh_tokens ( + token_id UUID PRIMARY KEY, + user_id UUID NOT NULL REFERENCES users(id), + token_hash TEXT NOT NULL, + expires_at TIMESTAMPTZ NOT NULL, + revoked BOOLEAN NOT NULL DEFAULT FALSE, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX idx_refresh_tokens_user ON refresh_tokens(user_id); diff --git a/crates/adapters/postgres/src/catalog/mod.rs b/crates/adapters/postgres/src/catalog/mod.rs index e46f6c6..50a7661 100644 --- a/crates/adapters/postgres/src/catalog/mod.rs +++ b/crates/adapters/postgres/src/catalog/mod.rs @@ -3,12 +3,16 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; use domain::{ entities::{ - Asset, AssetMetadata, AssetType, DerivativeAsset, DerivativeProfile, DetectionMethod, - DuplicateCandidate, DuplicateGroup, DuplicateStatus, GenerationStatus, MetadataSource, - SourceReference, + Asset, AssetFilters, AssetMetadata, AssetStack, AssetStackMember, AssetType, + DerivativeAsset, DerivativeProfile, DetectionMethod, DuplicateCandidate, DuplicateGroup, + DuplicateStatus, GenerationStatus, MetadataSource, SourceReference, StackMemberRole, + StackType, }, errors::DomainError, - ports::{AssetMetadataRepository, AssetRepository, DerivativeRepository, DuplicateRepository}, + ports::{ + AssetMetadataRepository, AssetRepository, AssetStackRepository, DerivativeRepository, + DuplicateRepository, + }, value_objects::{Checksum, DateTimeStamp, MetadataValue, StructuredData, SystemId}, }; use uuid::Uuid; @@ -123,6 +127,75 @@ impl AssetRepository for PostgresAssetRepository { rows.into_iter().map(TryInto::try_into).collect() } + async fn search( + &self, + owner_id: &SystemId, + filters: &AssetFilters, + limit: u32, + offset: u32, + ) -> Result, DomainError> { + let mut sql = String::from( + "SELECT asset_id, volume_id, relative_path, checksum, asset_type, mime_type, + file_size, is_processed, owner_user_id, created_at + FROM assets WHERE owner_user_id = $1", + ); + let mut param_idx = 2u32; + + if filters.asset_type.is_some() { + sql.push_str(&format!(" AND asset_type = ${param_idx}")); + param_idx += 1; + } + if filters.mime_type.is_some() { + sql.push_str(&format!(" AND mime_type = ${param_idx}")); + param_idx += 1; + } + if filters.date_from.is_some() { + sql.push_str(&format!(" AND created_at >= ${param_idx}")); + param_idx += 1; + } + if filters.date_to.is_some() { + sql.push_str(&format!(" AND created_at <= ${param_idx}")); + param_idx += 1; + } + if filters.is_processed.is_some() { + sql.push_str(&format!(" AND is_processed = ${param_idx}")); + param_idx += 1; + } + + sql.push_str(&format!( + " ORDER BY created_at DESC LIMIT ${} OFFSET ${}", + param_idx, + param_idx + 1 + )); + + let mut query = sqlx::query_as::<_, AssetRow>(&sql).bind(*owner_id.as_uuid()); + + if let Some(ref t) = filters.asset_type { + query = query.bind(asset_type_to_str(t)); + } + if let Some(ref m) = filters.mime_type { + query = query.bind(m.as_str()); + } + if let Some(ref d) = filters.date_from { + query = query.bind(d.as_datetime()); + } + if let Some(ref d) = filters.date_to { + query = query.bind(d.as_datetime()); + } + if let Some(p) = filters.is_processed { + query = query.bind(p); + } + + let rows = query + .bind(limit as i64) + .bind(offset as i64) + .fetch_all(&self.pool) + .await + .map_pg()?; + + rows.into_iter().map(TryInto::try_into).collect() + } + async fn save(&self, asset: &Asset) -> Result<(), DomainError> { sqlx::query( "INSERT INTO assets (asset_id, volume_id, relative_path, checksum, asset_type, @@ -597,3 +670,156 @@ impl DerivativeRepository for PostgresDerivativeRepository { Ok(()) } } + +// ── AssetStack ────────────────────────────────────────────────────── + +#[derive(sqlx::FromRow)] +struct StackRow { + stack_id: Uuid, + stack_type: String, + primary_asset_id: Uuid, + owner_user_id: Uuid, + members: serde_json::Value, +} + +fn stack_type_from_str(s: &str) -> StackType { + match s { + "live_photo" => StackType::LivePhoto, + "format_pair" => StackType::FormatPair, + "burst_sequence" => StackType::BurstSequence, + "exposure_bracket" => StackType::ExposureBracket, + "manual_group" => StackType::ManualGroup, + _ => StackType::ManualGroup, + } +} + +fn stack_type_to_str(t: &StackType) -> &'static str { + match t { + StackType::LivePhoto => "live_photo", + StackType::FormatPair => "format_pair", + StackType::BurstSequence => "burst_sequence", + StackType::ExposureBracket => "exposure_bracket", + StackType::ManualGroup => "manual_group", + } +} + +fn member_role_from_str(s: &str) -> StackMemberRole { + match s { + "primary_display" => StackMemberRole::PrimaryDisplay, + "high_res_source" => StackMemberRole::HighResSource, + "motion_clip" => StackMemberRole::MotionClip, + "alternate_frame" => StackMemberRole::AlternateFrame, + _ => StackMemberRole::AlternateFrame, + } +} + +fn member_role_to_str(r: &StackMemberRole) -> &'static str { + match r { + StackMemberRole::PrimaryDisplay => "primary_display", + StackMemberRole::HighResSource => "high_res_source", + StackMemberRole::MotionClip => "motion_clip", + StackMemberRole::AlternateFrame => "alternate_frame", + } +} + +#[derive(serde::Serialize, serde::Deserialize)] +struct MemberJson { + asset_id: Uuid, + role: String, + sort_order: u32, +} + +fn members_from_json(v: serde_json::Value) -> Vec { + let arr: Vec = serde_json::from_value(v).unwrap_or_default(); + arr.into_iter() + .map(|m| AssetStackMember { + asset_id: SystemId::from_uuid(m.asset_id), + role: member_role_from_str(&m.role), + sort_order: m.sort_order, + }) + .collect() +} + +fn members_to_json(members: &[AssetStackMember]) -> serde_json::Value { + let arr: Vec = members + .iter() + .map(|m| MemberJson { + asset_id: *m.asset_id.as_uuid(), + role: member_role_to_str(&m.role).to_string(), + sort_order: m.sort_order, + }) + .collect(); + serde_json::to_value(arr).unwrap_or(serde_json::Value::Array(vec![])) +} + +impl From for AssetStack { + fn from(r: StackRow) -> Self { + Self { + stack_id: SystemId::from_uuid(r.stack_id), + stack_type: stack_type_from_str(&r.stack_type), + primary_asset_id: SystemId::from_uuid(r.primary_asset_id), + owner_user_id: SystemId::from_uuid(r.owner_user_id), + members: members_from_json(r.members), + } + } +} + +pg_repo!(PostgresAssetStackRepository); + +#[async_trait] +impl AssetStackRepository for PostgresAssetStackRepository { + async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { + let row = sqlx::query_as::<_, StackRow>( + "SELECT stack_id, stack_type, primary_asset_id, owner_user_id, members + FROM asset_stacks WHERE stack_id = $1", + ) + .bind(*id.as_uuid()) + .fetch_optional(&self.pool) + .await + .map_pg()?; + + Ok(row.map(Into::into)) + } + + async fn find_by_asset(&self, asset_id: &SystemId) -> Result, DomainError> { + let rows = sqlx::query_as::<_, StackRow>( + "SELECT stack_id, stack_type, primary_asset_id, owner_user_id, members + FROM asset_stacks WHERE members @> $1::jsonb", + ) + .bind(serde_json::json!([{"asset_id": asset_id.as_uuid()}])) + .fetch_all(&self.pool) + .await + .map_pg()?; + + Ok(rows.into_iter().map(Into::into).collect()) + } + + async fn save(&self, stack: &AssetStack) -> Result<(), DomainError> { + sqlx::query( + "INSERT INTO asset_stacks (stack_id, stack_type, primary_asset_id, owner_user_id, members) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (stack_id) DO UPDATE SET + stack_type = EXCLUDED.stack_type, + primary_asset_id = EXCLUDED.primary_asset_id, + members = EXCLUDED.members", + ) + .bind(*stack.stack_id.as_uuid()) + .bind(stack_type_to_str(&stack.stack_type)) + .bind(*stack.primary_asset_id.as_uuid()) + .bind(*stack.owner_user_id.as_uuid()) + .bind(members_to_json(&stack.members)) + .execute(&self.pool) + .await + .map_pg()?; + Ok(()) + } + + async fn delete(&self, id: &SystemId) -> Result<(), DomainError> { + sqlx::query("DELETE FROM asset_stacks WHERE stack_id = $1") + .bind(*id.as_uuid()) + .execute(&self.pool) + .await + .map_pg()?; + Ok(()) + } +} diff --git a/crates/adapters/postgres/src/event_store.rs b/crates/adapters/postgres/src/event_store.rs index 229417a..11ed693 100644 --- a/crates/adapters/postgres/src/event_store.rs +++ b/crates/adapters/postgres/src/event_store.rs @@ -24,6 +24,16 @@ fn aggregate_id(event: &DomainEvent) -> Uuid { DomainEvent::JobEnqueued { job_id, .. } | DomainEvent::JobCompleted { job_id, .. } | DomainEvent::JobFailed { job_id, .. } => *job_id.as_uuid(), + + DomainEvent::UserCreated { user_id, .. } | DomainEvent::UserDeleted { user_id, .. } => { + *user_id.as_uuid() + } + + DomainEvent::AlbumCreated { album_id, .. } => *album_id.as_uuid(), + + DomainEvent::TagCreated { tag_id, .. } => *tag_id.as_uuid(), + + DomainEvent::DuplicateDetected { group_id, .. } => *group_id.as_uuid(), } } diff --git a/crates/adapters/postgres/src/helpers.rs b/crates/adapters/postgres/src/helpers.rs index 4e54135..67eac5a 100644 --- a/crates/adapters/postgres/src/helpers.rs +++ b/crates/adapters/postgres/src/helpers.rs @@ -1,24 +1,33 @@ use domain::errors::DomainError; -/// Extension trait for converting `sqlx::Error` into `DomainError`. pub trait MapDomainError { fn map_pg(self) -> Result; } impl MapDomainError for Result { fn map_pg(self) -> Result { - self.map_err(|e| DomainError::Internal(e.to_string())) + self.map_err(|e| match &e { + sqlx::Error::Database(db_err) if db_err.code().as_deref() == Some("23505") => { + DomainError::Conflict( + db_err + .constraint() + .map(|c| format!("Duplicate: {c}")) + .unwrap_or_else(|| "Duplicate entry".into()), + ) + } + sqlx::Error::Database(db_err) if db_err.code().as_deref() == Some("23503") => { + DomainError::NotFound( + db_err + .constraint() + .map(|c| format!("Referenced entity not found: {c}")) + .unwrap_or_else(|| "Referenced entity not found".into()), + ) + } + _ => DomainError::Internal(e.to_string()), + }) } } -/// Generates a Postgres repository struct with a `PgPool` field and a `new` constructor. -/// -/// ```ignore -/// pg_repo!(PostgresFooRepository); -/// // expands to: -/// // pub struct PostgresFooRepository { pool: PgPool } -/// // impl PostgresFooRepository { pub fn new(pool: PgPool) -> Self { Self { pool } } } -/// ``` macro_rules! pg_repo { ($name:ident) => { pub struct $name { diff --git a/crates/adapters/postgres/src/identity/mod.rs b/crates/adapters/postgres/src/identity/mod.rs index 1727214..dc031ab 100644 --- a/crates/adapters/postgres/src/identity/mod.rs +++ b/crates/adapters/postgres/src/identity/mod.rs @@ -2,9 +2,10 @@ use crate::helpers::{MapDomainError, pg_repo}; use async_trait::async_trait; use chrono::{DateTime, Utc}; use domain::{ + entities::RefreshToken, errors::DomainError, - ports::UserRepository, - value_objects::{Email, PasswordHash, SystemId}, + ports::{RefreshTokenRepository, UserRepository}, + value_objects::{DateTimeStamp, Email, PasswordHash, SystemId}, }; use uuid::Uuid; @@ -109,3 +110,82 @@ impl UserRepository for PostgresUserRepository { Ok(()) } } + +// --- PostgresRefreshTokenRepository --- + +#[derive(sqlx::FromRow)] +struct RefreshTokenRow { + token_id: Uuid, + user_id: Uuid, + token_hash: String, + expires_at: DateTime, + revoked: bool, + created_at: DateTime, +} + +impl From for RefreshToken { + fn from(r: RefreshTokenRow) -> Self { + Self { + token_id: SystemId::from_uuid(r.token_id), + user_id: SystemId::from_uuid(r.user_id), + token_hash: r.token_hash, + expires_at: DateTimeStamp::from_datetime(r.expires_at), + revoked: r.revoked, + created_at: DateTimeStamp::from_datetime(r.created_at), + } + } +} + +pg_repo!(PostgresRefreshTokenRepository); + +#[async_trait] +impl RefreshTokenRepository for PostgresRefreshTokenRepository { + async fn save(&self, token: &RefreshToken) -> Result<(), DomainError> { + sqlx::query( + "INSERT INTO refresh_tokens (token_id, user_id, token_hash, expires_at, revoked, created_at) + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT (token_id) DO UPDATE SET revoked = EXCLUDED.revoked", + ) + .bind(*token.token_id.as_uuid()) + .bind(*token.user_id.as_uuid()) + .bind(&token.token_hash) + .bind(*token.expires_at.as_datetime()) + .bind(token.revoked) + .bind(*token.created_at.as_datetime()) + .execute(&self.pool) + .await + .map_pg()?; + Ok(()) + } + + async fn find_by_hash(&self, token_hash: &str) -> Result, DomainError> { + let row = sqlx::query_as::<_, RefreshTokenRow>( + "SELECT token_id, user_id, token_hash, expires_at, revoked, created_at + FROM refresh_tokens WHERE token_hash = $1", + ) + .bind(token_hash) + .fetch_optional(&self.pool) + .await + .map_pg()?; + + Ok(row.map(Into::into)) + } + + async fn delete_by_user(&self, user_id: &SystemId) -> Result<(), DomainError> { + sqlx::query("DELETE FROM refresh_tokens WHERE user_id = $1") + .bind(*user_id.as_uuid()) + .execute(&self.pool) + .await + .map_pg()?; + Ok(()) + } + + async fn delete(&self, id: &SystemId) -> Result<(), DomainError> { + sqlx::query("DELETE FROM refresh_tokens WHERE token_id = $1") + .bind(*id.as_uuid()) + .execute(&self.pool) + .await + .map_pg()?; + Ok(()) + } +} diff --git a/crates/adapters/postgres/src/processing/mod.rs b/crates/adapters/postgres/src/processing/mod.rs index 0c4ada4..7cf6845 100644 --- a/crates/adapters/postgres/src/processing/mod.rs +++ b/crates/adapters/postgres/src/processing/mod.rs @@ -154,6 +154,59 @@ impl JobRepository for PostgresJobRepository { Ok(row.map(Into::into)) } + async fn find_all( + &self, + status: Option<&str>, + limit: u32, + offset: u32, + ) -> Result, DomainError> { + let rows = match status { + Some(s) => sqlx::query_as::<_, JobRow>( + "SELECT job_id, job_type, target_asset_id, batch_id, status, priority, + payload, result_data, retry_count, max_retries, created_at, + started_at, completed_at, error_message + FROM jobs WHERE status = $1 + ORDER BY created_at DESC + LIMIT $2 OFFSET $3", + ) + .bind(s) + .bind(limit as i64) + .bind(offset as i64) + .fetch_all(&self.pool) + .await + .map_pg()?, + None => sqlx::query_as::<_, JobRow>( + "SELECT job_id, job_type, target_asset_id, batch_id, status, priority, + payload, result_data, retry_count, max_retries, created_at, + started_at, completed_at, error_message + FROM jobs + ORDER BY created_at DESC + LIMIT $1 OFFSET $2", + ) + .bind(limit as i64) + .bind(offset as i64) + .fetch_all(&self.pool) + .await + .map_pg()?, + }; + Ok(rows.into_iter().map(Into::into).collect()) + } + + async fn count(&self, status: Option<&str>) -> Result { + let count: (i64,) = match status { + Some(s) => sqlx::query_as("SELECT COUNT(*) FROM jobs WHERE status = $1") + .bind(s) + .fetch_one(&self.pool) + .await + .map_pg()?, + None => sqlx::query_as("SELECT COUNT(*) FROM jobs") + .fetch_one(&self.pool) + .await + .map_pg()?, + }; + Ok(count.0 as u64) + } + async fn find_by_batch(&self, batch_id: &SystemId) -> Result, DomainError> { let rows = sqlx::query_as::<_, JobRow>( "SELECT job_id, job_type, target_asset_id, batch_id, status, priority, diff --git a/crates/api-types/src/requests.rs b/crates/api-types/src/requests.rs index 76bc3e2..4678cc8 100644 --- a/crates/api-types/src/requests.rs +++ b/crates/api-types/src/requests.rs @@ -11,6 +11,11 @@ pub struct LoginRequest { pub password: String, } +#[derive(Debug, serde::Deserialize, utoipa::ToSchema)] +pub struct RefreshTokenRequest { + pub refresh_token: String, +} + #[derive(Debug, serde::Deserialize, utoipa::ToSchema)] pub struct CreateAlbumRequest { pub title: String, @@ -86,6 +91,28 @@ pub struct RegisterAssetRequest { pub file_size: u64, } +// --- Stacks --- + +#[derive(Debug, serde::Deserialize, utoipa::ToSchema)] +pub struct CreateStackRequest { + pub stack_type: String, + pub primary_asset_id: uuid::Uuid, + pub members: Vec, +} + +#[derive(Debug, serde::Deserialize, utoipa::ToSchema)] +pub struct StackMemberRequest { + pub asset_id: uuid::Uuid, + pub role: String, +} + +// --- Duplicates --- + +#[derive(Debug, serde::Deserialize, utoipa::ToSchema)] +pub struct ResolveDuplicateRequest { + pub keep_asset_id: uuid::Uuid, +} + // --- Sidecar --- #[derive(Debug, serde::Deserialize, utoipa::ToSchema)] diff --git a/crates/api-types/src/responses.rs b/crates/api-types/src/responses.rs index b5eb4ae..d3e93c3 100644 --- a/crates/api-types/src/responses.rs +++ b/crates/api-types/src/responses.rs @@ -12,6 +12,7 @@ pub struct UserResponse { #[derive(Debug, serde::Serialize, utoipa::ToSchema)] pub struct AuthResponse { pub token: String, + pub refresh_token: String, pub user: UserResponse, } @@ -267,6 +268,78 @@ pub struct SidecarImportResponse { pub status: String, } +// --- Stacks --- + +#[derive(Debug, serde::Serialize, utoipa::ToSchema)] +pub struct StackResponse { + pub stack_id: Uuid, + pub stack_type: String, + pub primary_asset_id: Uuid, + pub owner_user_id: Uuid, + pub members: Vec, +} + +#[derive(Debug, serde::Serialize, utoipa::ToSchema)] +pub struct StackMemberResponse { + pub asset_id: Uuid, + pub role: String, + pub sort_order: u32, +} + +impl StackResponse { + pub fn from_domain(stack: &domain::entities::AssetStack) -> Self { + Self { + stack_id: *stack.stack_id.as_uuid(), + stack_type: format!("{:?}", stack.stack_type), + primary_asset_id: *stack.primary_asset_id.as_uuid(), + owner_user_id: *stack.owner_user_id.as_uuid(), + members: stack + .members + .iter() + .map(|m| StackMemberResponse { + asset_id: *m.asset_id.as_uuid(), + role: format!("{:?}", m.role), + sort_order: m.sort_order, + }) + .collect(), + } + } +} + +// --- Duplicates --- + +#[derive(Debug, serde::Serialize, utoipa::ToSchema)] +pub struct DuplicateGroupResponse { + pub group_id: Uuid, + pub detection_method: String, + pub status: String, + pub candidates: Vec, +} + +#[derive(Debug, serde::Serialize, utoipa::ToSchema)] +pub struct DuplicateCandidateResponse { + pub asset_id: Uuid, + pub similarity_score: f64, +} + +impl DuplicateGroupResponse { + pub fn from_domain(group: &domain::entities::DuplicateGroup) -> Self { + Self { + group_id: *group.group_id.as_uuid(), + detection_method: format!("{:?}", group.detection_method), + status: format!("{:?}", group.status), + candidates: group + .candidates + .iter() + .map(|c| DuplicateCandidateResponse { + asset_id: *c.asset_id.as_uuid(), + similarity_score: c.similarity_score, + }) + .collect(), + } + } +} + // --- Processing --- #[derive(Debug, serde::Serialize, utoipa::ToSchema)] @@ -290,6 +363,12 @@ impl JobResponse { } } +#[derive(Debug, serde::Serialize, utoipa::ToSchema)] +pub struct JobListResponse { + pub jobs: Vec, + pub total: u64, +} + #[derive(Debug, serde::Serialize, utoipa::ToSchema)] pub struct BatchProgressResponse { pub batch_id: Uuid, diff --git a/crates/application/Cargo.toml b/crates/application/Cargo.toml index e787932..f1f503b 100644 --- a/crates/application/Cargo.toml +++ b/crates/application/Cargo.toml @@ -15,5 +15,7 @@ serde = { workspace = true } serde_json = { workspace = true } sha2 = { workspace = true } +[dependencies.chrono] +workspace = true + [dev-dependencies] -chrono = { workspace = true } diff --git a/crates/application/src/catalog/commands/create_stack.rs b/crates/application/src/catalog/commands/create_stack.rs new file mode 100644 index 0000000..8eaa8bf --- /dev/null +++ b/crates/application/src/catalog/commands/create_stack.rs @@ -0,0 +1,81 @@ +use std::sync::Arc; + +use domain::{ + entities::{AssetStack, StackMemberRole, StackType}, + errors::DomainError, + ports::{AssetRepository, AssetStackRepository}, + value_objects::SystemId, +}; + +pub struct CreateStackCommand { + pub stack_type: StackType, + pub primary_asset_id: SystemId, + pub additional_asset_ids: Vec<(SystemId, StackMemberRole)>, + pub owner_id: SystemId, +} + +pub struct CreateStackHandler { + asset_repo: Arc, + stack_repo: Arc, +} + +impl CreateStackHandler { + pub fn new( + asset_repo: Arc, + stack_repo: Arc, + ) -> Self { + Self { + asset_repo, + stack_repo, + } + } + + pub async fn execute(&self, cmd: CreateStackCommand) -> Result { + self.asset_repo + .find_by_id(&cmd.primary_asset_id) + .await? + .ok_or_else(|| DomainError::NotFound("Primary asset not found".into()))?; + + let mut stack = AssetStack::new(cmd.stack_type, cmd.primary_asset_id, cmd.owner_id); + + for (asset_id, role) in cmd.additional_asset_ids { + self.asset_repo + .find_by_id(&asset_id) + .await? + .ok_or_else(|| { + DomainError::NotFound(format!("Asset {} not found", asset_id.as_uuid())) + })?; + stack.add_member(asset_id, role)?; + } + + self.stack_repo.save(&stack).await?; + Ok(stack) + } +} + +pub struct DeleteStackCommand { + pub stack_id: SystemId, + pub caller_id: SystemId, +} + +pub struct DeleteStackHandler { + stack_repo: Arc, +} + +impl DeleteStackHandler { + pub fn new(stack_repo: Arc) -> Self { + Self { stack_repo } + } + + pub async fn execute(&self, cmd: DeleteStackCommand) -> Result<(), DomainError> { + let stack = self + .stack_repo + .find_by_id(&cmd.stack_id) + .await? + .ok_or_else(|| DomainError::NotFound("Stack not found".into()))?; + if stack.owner_user_id != cmd.caller_id { + return Err(DomainError::Forbidden("Not your stack".into())); + } + self.stack_repo.delete(&cmd.stack_id).await + } +} diff --git a/crates/application/src/catalog/commands/delete_asset.rs b/crates/application/src/catalog/commands/delete_asset.rs new file mode 100644 index 0000000..26dcff9 --- /dev/null +++ b/crates/application/src/catalog/commands/delete_asset.rs @@ -0,0 +1,84 @@ +use std::sync::Arc; + +use domain::{ + errors::DomainError, + events::DomainEvent, + ports::{ + AssetRepository, DerivativeRepository, EventPublisher, FileStoragePort, SidecarRepository, + }, + value_objects::{DateTimeStamp, SystemId}, +}; + +pub struct DeleteAssetCommand { + pub asset_id: SystemId, + pub deleted_by: SystemId, +} + +pub struct DeleteAssetHandler { + asset_repo: Arc, + derivative_repo: Arc, + sidecar_repo: Arc, + file_storage: Arc, + event_publisher: Arc, +} + +impl DeleteAssetHandler { + pub fn new( + asset_repo: Arc, + derivative_repo: Arc, + sidecar_repo: Arc, + file_storage: Arc, + event_publisher: Arc, + ) -> Self { + Self { + asset_repo, + derivative_repo, + sidecar_repo, + file_storage, + event_publisher, + } + } + + pub async fn execute(&self, cmd: DeleteAssetCommand) -> Result<(), DomainError> { + let asset = self + .asset_repo + .find_by_id(&cmd.asset_id) + .await? + .ok_or_else(|| DomainError::NotFound("Asset not found".into()))?; + + // Delete derivative files + DB records + let derivatives = self.derivative_repo.find_by_asset(&cmd.asset_id).await?; + for d in &derivatives { + let _ = self.file_storage.delete_file(&d.storage_path).await; + self.derivative_repo.delete(&d.derivative_id).await?; + } + + // Delete sidecar file + DB record + if let Some(sidecar) = self.sidecar_repo.find_by_asset(&cmd.asset_id).await? { + let _ = self + .file_storage + .delete_file(&sidecar.sidecar_storage_path) + .await; + self.sidecar_repo.delete(&cmd.asset_id).await?; + } + + // Delete asset file + let _ = self + .file_storage + .delete_file(&asset.source_reference.relative_path) + .await; + + // Delete asset DB record + self.asset_repo.delete(&cmd.asset_id).await?; + + self.event_publisher + .publish(&DomainEvent::AssetDeleted { + asset_id: cmd.asset_id, + deleted_by: cmd.deleted_by, + timestamp: DateTimeStamp::now(), + }) + .await?; + + Ok(()) + } +} diff --git a/crates/application/src/catalog/commands/detect_live_photos.rs b/crates/application/src/catalog/commands/detect_live_photos.rs new file mode 100644 index 0000000..1179834 --- /dev/null +++ b/crates/application/src/catalog/commands/detect_live_photos.rs @@ -0,0 +1,87 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use domain::{ + entities::{AssetStack, AssetType, StackMemberRole, StackType}, + errors::DomainError, + ports::{AssetRepository, AssetStackRepository}, + value_objects::SystemId, +}; + +pub struct DetectLivePhotosCommand { + pub owner_id: SystemId, +} + +pub struct DetectLivePhotosHandler { + asset_repo: Arc, + stack_repo: Arc, +} + +impl DetectLivePhotosHandler { + pub fn new( + asset_repo: Arc, + stack_repo: Arc, + ) -> Self { + Self { + asset_repo, + stack_repo, + } + } + + pub async fn execute( + &self, + cmd: DetectLivePhotosCommand, + ) -> Result, DomainError> { + let assets = self + .asset_repo + .find_by_owner(&cmd.owner_id, 10_000, 0) + .await?; + + let mut by_basename: HashMap> = HashMap::new(); + + for asset in &assets { + let path = &asset.source_reference.relative_path; + if let Some(stem) = std::path::Path::new(path) + .file_stem() + .and_then(|s| s.to_str()) + { + let key = stem.to_lowercase(); + by_basename.entry(key).or_default().push(( + asset.asset_id, + asset.asset_type, + asset.mime_type.clone(), + )); + } + } + + let mut created = Vec::new(); + + for group in by_basename.values() { + if group.len() < 2 { + continue; + } + + let image = group.iter().find(|(_, t, _)| *t == AssetType::Image); + let video = group + .iter() + .find(|(_, t, m)| *t == AssetType::Video || m.starts_with("video/")); + + if let (Some((img_id, _, _)), Some((vid_id, _, _))) = (image, video) { + let existing = self.stack_repo.find_by_asset(img_id).await?; + if existing + .iter() + .any(|s| s.stack_type == StackType::LivePhoto) + { + continue; + } + + let mut stack = AssetStack::new(StackType::LivePhoto, *img_id, cmd.owner_id); + stack.add_member(*vid_id, StackMemberRole::MotionClip)?; + self.stack_repo.save(&stack).await?; + created.push(stack); + } + } + + Ok(created) + } +} diff --git a/crates/application/src/catalog/commands/mod.rs b/crates/application/src/catalog/commands/mod.rs index 62db3fd..b6e38d3 100644 --- a/crates/application/src/catalog/commands/mod.rs +++ b/crates/application/src/catalog/commands/mod.rs @@ -1,2 +1,6 @@ +pub mod create_stack; +pub mod delete_asset; +pub mod detect_live_photos; pub mod register_asset; +pub mod resolve_duplicate; pub mod update_metadata; diff --git a/crates/application/src/catalog/commands/resolve_duplicate.rs b/crates/application/src/catalog/commands/resolve_duplicate.rs new file mode 100644 index 0000000..bd377e2 --- /dev/null +++ b/crates/application/src/catalog/commands/resolve_duplicate.rs @@ -0,0 +1,86 @@ +use std::sync::Arc; + +use domain::{errors::DomainError, ports::DuplicateRepository, value_objects::SystemId}; + +use super::delete_asset::{DeleteAssetCommand, DeleteAssetHandler}; + +pub struct ResolveDuplicateCommand { + pub group_id: SystemId, + pub keep_asset_id: SystemId, + pub resolved_by: SystemId, +} + +pub struct ResolveDuplicateHandler { + duplicate_repo: Arc, + delete_handler: Arc, +} + +impl ResolveDuplicateHandler { + pub fn new( + duplicate_repo: Arc, + delete_handler: Arc, + ) -> Self { + Self { + duplicate_repo, + delete_handler, + } + } + + pub async fn execute(&self, cmd: ResolveDuplicateCommand) -> Result<(), DomainError> { + let mut group = self + .duplicate_repo + .find_by_id(&cmd.group_id) + .await? + .ok_or_else(|| DomainError::NotFound("Duplicate group not found".into()))?; + + if !group + .candidates + .iter() + .any(|c| c.asset_id == cmd.keep_asset_id) + { + return Err(DomainError::Validation( + "keep_asset_id not in duplicate group".into(), + )); + } + + let to_delete: Vec = group + .candidates + .iter() + .filter(|c| c.asset_id != cmd.keep_asset_id) + .map(|c| c.asset_id) + .collect(); + + for asset_id in to_delete { + self.delete_handler + .execute(DeleteAssetCommand { + asset_id, + deleted_by: cmd.resolved_by, + }) + .await?; + } + + group.resolve(); + self.duplicate_repo.save(&group).await?; + + Ok(()) + } +} + +pub struct ListDuplicatesQuery; + +pub struct ListDuplicatesHandler { + duplicate_repo: Arc, +} + +impl ListDuplicatesHandler { + pub fn new(duplicate_repo: Arc) -> Self { + Self { duplicate_repo } + } + + pub async fn execute( + &self, + _query: ListDuplicatesQuery, + ) -> Result, DomainError> { + self.duplicate_repo.find_unresolved().await + } +} diff --git a/crates/application/src/catalog/mod.rs b/crates/application/src/catalog/mod.rs index b20beae..e57707b 100644 --- a/crates/application/src/catalog/mod.rs +++ b/crates/application/src/catalog/mod.rs @@ -2,12 +2,22 @@ pub mod commands; pub mod queries; pub mod visibility; +pub use commands::create_stack::{ + CreateStackCommand, CreateStackHandler, DeleteStackCommand, DeleteStackHandler, +}; +pub use commands::delete_asset::{DeleteAssetCommand, DeleteAssetHandler}; +pub use commands::detect_live_photos::{DetectLivePhotosCommand, DetectLivePhotosHandler}; pub use commands::register_asset::{RegisterAssetCommand, RegisterAssetHandler}; +pub use commands::resolve_duplicate::{ + ListDuplicatesHandler, ListDuplicatesQuery, ResolveDuplicateCommand, ResolveDuplicateHandler, +}; pub use commands::update_metadata::{UpdateMetadataCommand, UpdateMetadataHandler}; pub use queries::get_asset::{GetAssetHandler, GetAssetQuery}; +pub use queries::get_stack::{GetStackHandler, GetStackQuery}; pub use queries::get_timeline::{GetTimelineHandler, GetTimelineQuery}; pub use queries::read_asset_file::{AssetFileResult, ReadAssetFileHandler, ReadAssetFileQuery}; pub use queries::read_derivative::{ DerivativeFileResult, ReadDerivativeHandler, ReadDerivativeQuery, }; +pub use queries::search_assets::{SearchAssetsHandler, SearchAssetsQuery}; pub use visibility::VisibilityFilteredAssetRepository; diff --git a/crates/application/src/catalog/queries/get_stack.rs b/crates/application/src/catalog/queries/get_stack.rs new file mode 100644 index 0000000..19c229a --- /dev/null +++ b/crates/application/src/catalog/queries/get_stack.rs @@ -0,0 +1,31 @@ +use domain::{ + entities::AssetStack, errors::DomainError, ports::AssetStackRepository, value_objects::SystemId, +}; +use std::sync::Arc; + +pub struct GetStackQuery { + pub stack_id: SystemId, + pub caller_id: SystemId, +} + +pub struct GetStackHandler { + stack_repo: Arc, +} + +impl GetStackHandler { + pub fn new(stack_repo: Arc) -> Self { + Self { stack_repo } + } + + pub async fn execute(&self, query: GetStackQuery) -> Result { + let stack = self + .stack_repo + .find_by_id(&query.stack_id) + .await? + .ok_or_else(|| DomainError::NotFound("Stack not found".into()))?; + if stack.owner_user_id != query.caller_id { + return Err(DomainError::Forbidden("Not your stack".into())); + } + Ok(stack) + } +} diff --git a/crates/application/src/catalog/queries/mod.rs b/crates/application/src/catalog/queries/mod.rs index 1263d02..5db004e 100644 --- a/crates/application/src/catalog/queries/mod.rs +++ b/crates/application/src/catalog/queries/mod.rs @@ -1,4 +1,6 @@ pub mod get_asset; +pub mod get_stack; pub mod get_timeline; pub mod read_asset_file; pub mod read_derivative; +pub mod search_assets; diff --git a/crates/application/src/catalog/queries/read_asset_file.rs b/crates/application/src/catalog/queries/read_asset_file.rs index dc14404..36bd156 100644 --- a/crates/application/src/catalog/queries/read_asset_file.rs +++ b/crates/application/src/catalog/queries/read_asset_file.rs @@ -9,6 +9,7 @@ use std::sync::Arc; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct ReadAssetFileQuery { pub asset_id: SystemId, + pub caller_id: SystemId, } pub struct AssetFileResult { @@ -40,6 +41,10 @@ impl ReadAssetFileHandler { .await? .ok_or_else(|| DomainError::NotFound(format!("Asset {} not found", query.asset_id)))?; + if asset.owner_user_id != query.caller_id { + return Err(DomainError::Forbidden("Access denied".into())); + } + let data = self .file_storage .read_file(&asset.source_reference.relative_path) diff --git a/crates/application/src/catalog/queries/read_derivative.rs b/crates/application/src/catalog/queries/read_derivative.rs index 9d96901..2f231e2 100644 --- a/crates/application/src/catalog/queries/read_derivative.rs +++ b/crates/application/src/catalog/queries/read_derivative.rs @@ -10,6 +10,7 @@ use std::sync::Arc; pub struct ReadDerivativeQuery { pub asset_id: SystemId, pub profile: DerivativeProfile, + pub caller_id: SystemId, } pub struct DerivativeFileResult { @@ -19,16 +20,19 @@ pub struct DerivativeFileResult { pub struct ReadDerivativeHandler { derivative_repo: Arc, + asset_repo: Arc, file_storage: Arc, } impl ReadDerivativeHandler { pub fn new( derivative_repo: Arc, + asset_repo: Arc, file_storage: Arc, ) -> Self { Self { derivative_repo, + asset_repo, file_storage, } } @@ -37,6 +41,15 @@ impl ReadDerivativeHandler { &self, query: ReadDerivativeQuery, ) -> Result { + let asset = self + .asset_repo + .find_by_id(&query.asset_id) + .await? + .ok_or_else(|| DomainError::NotFound("Asset not found".into()))?; + if asset.owner_user_id != query.caller_id { + return Err(DomainError::Forbidden("Access denied".into())); + } + let derivative = self .derivative_repo .find_by_asset_and_profile(&query.asset_id, query.profile) diff --git a/crates/application/src/catalog/queries/search_assets.rs b/crates/application/src/catalog/queries/search_assets.rs new file mode 100644 index 0000000..24e2b6b --- /dev/null +++ b/crates/application/src/catalog/queries/search_assets.rs @@ -0,0 +1,31 @@ +use std::sync::Arc; + +use domain::{ + entities::{Asset, AssetFilters}, + errors::DomainError, + ports::AssetRepository, + value_objects::SystemId, +}; + +pub struct SearchAssetsQuery { + pub owner_id: SystemId, + pub filters: AssetFilters, + pub limit: u32, + pub offset: u32, +} + +pub struct SearchAssetsHandler { + asset_repo: Arc, +} + +impl SearchAssetsHandler { + pub fn new(asset_repo: Arc) -> Self { + Self { asset_repo } + } + + pub async fn execute(&self, query: SearchAssetsQuery) -> Result, DomainError> { + self.asset_repo + .search(&query.owner_id, &query.filters, query.limit, query.offset) + .await + } +} diff --git a/crates/application/src/catalog/visibility.rs b/crates/application/src/catalog/visibility.rs index c1a50df..fcce4c9 100644 --- a/crates/application/src/catalog/visibility.rs +++ b/crates/application/src/catalog/visibility.rs @@ -1,6 +1,6 @@ use async_trait::async_trait; use domain::{ - catalog::entities::Asset, + catalog::entities::{Asset, AssetFilters}, errors::DomainError, ports::{AssetRepository, ShareRepository}, value_objects::{Checksum, SystemId}, @@ -112,6 +112,27 @@ impl AssetRepository for VisibilityFilteredAssetRepository { Ok(visible) } + async fn search( + &self, + owner_id: &SystemId, + filters: &AssetFilters, + limit: u32, + offset: u32, + ) -> Result, DomainError> { + if owner_id == &self.caller_id { + return self.inner.search(owner_id, filters, limit, offset).await; + } + + let assets = self.inner.search(owner_id, filters, limit, offset).await?; + let mut visible = Vec::with_capacity(assets.len()); + for asset in assets { + if self.caller_can_access(&asset).await? { + visible.push(asset); + } + } + Ok(visible) + } + async fn save(&self, asset: &Asset) -> Result<(), DomainError> { self.inner.save(asset).await } diff --git a/crates/application/src/identity/commands/login_user.rs b/crates/application/src/identity/commands/login_user.rs index 34b7b85..1908354 100644 --- a/crates/application/src/identity/commands/login_user.rs +++ b/crates/application/src/identity/commands/login_user.rs @@ -1,9 +1,10 @@ use domain::{ - entities::User, + entities::{RefreshToken, User}, errors::DomainError, - ports::{PasswordHasher, TokenIssuer, UserRepository}, - value_objects::Email, + ports::{PasswordHasher, RefreshTokenRepository, TokenIssuer, UserRepository}, + value_objects::{DateTimeStamp, Email}, }; +use sha2::{Digest, Sha256}; use std::sync::Arc; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] @@ -16,6 +17,7 @@ pub struct LoginUserHandler { repo: Arc, hasher: Arc, issuer: Arc, + refresh_repo: Arc, } impl LoginUserHandler { @@ -23,15 +25,20 @@ impl LoginUserHandler { repo: Arc, hasher: Arc, issuer: Arc, + refresh_repo: Arc, ) -> Self { Self { repo, hasher, issuer, + refresh_repo, } } - pub async fn execute(&self, cmd: LoginUserCommand) -> Result<(User, String), DomainError> { + pub async fn execute( + &self, + cmd: LoginUserCommand, + ) -> Result<(User, String, String), DomainError> { let email = Email::new(&cmd.email)?; let user = self .repo @@ -45,7 +52,21 @@ impl LoginUserHandler { if !valid { return Err(DomainError::Unauthorized("Invalid credentials".to_string())); } - let token = self.issuer.issue(&user.id, "user").await?; - Ok((user, token)) + let access_token = self.issuer.issue(&user.id, "user").await?; + let (raw_refresh, _) = generate_refresh_token(&self.refresh_repo, &user.id).await?; + Ok((user, access_token, raw_refresh)) } } + +pub async fn generate_refresh_token( + repo: &Arc, + user_id: &domain::value_objects::SystemId, +) -> Result<(String, domain::value_objects::SystemId), DomainError> { + let raw = uuid::Uuid::new_v4().to_string(); + let hash = format!("{:x}", Sha256::digest(raw.as_bytes())); + let expires_at = DateTimeStamp::from_datetime(chrono::Utc::now() + chrono::Duration::days(30)); + let token = RefreshToken::new(*user_id, hash, expires_at); + let token_id = token.token_id; + repo.save(&token).await?; + Ok((raw, token_id)) +} diff --git a/crates/application/src/identity/commands/logout.rs b/crates/application/src/identity/commands/logout.rs new file mode 100644 index 0000000..7f7d71f --- /dev/null +++ b/crates/application/src/identity/commands/logout.rs @@ -0,0 +1,16 @@ +use domain::{errors::DomainError, ports::RefreshTokenRepository, value_objects::SystemId}; +use std::sync::Arc; + +pub struct LogoutHandler { + refresh_repo: Arc, +} + +impl LogoutHandler { + pub fn new(refresh_repo: Arc) -> Self { + Self { refresh_repo } + } + + pub async fn execute(&self, user_id: &SystemId) -> Result<(), DomainError> { + self.refresh_repo.delete_by_user(user_id).await + } +} diff --git a/crates/application/src/identity/commands/mod.rs b/crates/application/src/identity/commands/mod.rs index 2e2b705..4defe43 100644 --- a/crates/application/src/identity/commands/mod.rs +++ b/crates/application/src/identity/commands/mod.rs @@ -1,5 +1,9 @@ pub mod login_user; +pub mod logout; +pub mod refresh_token; pub mod register_user; pub use login_user::{LoginUserCommand, LoginUserHandler}; +pub use logout::LogoutHandler; +pub use refresh_token::{RefreshTokenCommand, RefreshTokenHandler}; pub use register_user::{RegisterUserCommand, RegisterUserHandler}; diff --git a/crates/application/src/identity/commands/refresh_token.rs b/crates/application/src/identity/commands/refresh_token.rs new file mode 100644 index 0000000..72c6f8d --- /dev/null +++ b/crates/application/src/identity/commands/refresh_token.rs @@ -0,0 +1,53 @@ +use super::login_user::generate_refresh_token; +use domain::{ + errors::DomainError, + ports::{RefreshTokenRepository, TokenIssuer}, +}; +use sha2::{Digest, Sha256}; +use std::sync::Arc; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct RefreshTokenCommand { + pub refresh_token: String, +} + +pub struct RefreshTokenHandler { + refresh_repo: Arc, + issuer: Arc, +} + +impl RefreshTokenHandler { + pub fn new( + refresh_repo: Arc, + issuer: Arc, + ) -> Self { + Self { + refresh_repo, + issuer, + } + } + + pub async fn execute(&self, cmd: RefreshTokenCommand) -> Result<(String, String), DomainError> { + let hash = format!("{:x}", Sha256::digest(cmd.refresh_token.as_bytes())); + + let token = self + .refresh_repo + .find_by_hash(&hash) + .await? + .ok_or_else(|| DomainError::Unauthorized("Invalid refresh token".to_string()))?; + + if !token.is_valid() { + return Err(DomainError::Unauthorized( + "Refresh token expired or revoked".to_string(), + )); + } + + // Rotation: delete old, issue new pair + self.refresh_repo.delete(&token.token_id).await?; + + let access_token = self.issuer.issue(&token.user_id, "user").await?; + let (raw_refresh, _) = generate_refresh_token(&self.refresh_repo, &token.user_id).await?; + + Ok((access_token, raw_refresh)) + } +} diff --git a/crates/application/src/identity/mod.rs b/crates/application/src/identity/mod.rs index 1e5adf2..bd9a0f9 100644 --- a/crates/application/src/identity/mod.rs +++ b/crates/application/src/identity/mod.rs @@ -1,5 +1,8 @@ pub mod commands; pub mod queries; -pub use commands::{LoginUserCommand, LoginUserHandler, RegisterUserCommand, RegisterUserHandler}; +pub use commands::{ + LoginUserCommand, LoginUserHandler, LogoutHandler, RefreshTokenCommand, RefreshTokenHandler, + RegisterUserCommand, RegisterUserHandler, login_user::generate_refresh_token, +}; pub use queries::{GetProfileHandler, GetProfileQuery}; diff --git a/crates/application/src/processing/mod.rs b/crates/application/src/processing/mod.rs index 5cc1323..4614268 100644 --- a/crates/application/src/processing/mod.rs +++ b/crates/application/src/processing/mod.rs @@ -11,6 +11,7 @@ pub use commands::fail_job::{FailJobCommand, FailJobHandler}; pub use commands::manage_plugin::{ManagePluginCommand, ManagePluginHandler, PluginAction}; pub use commands::process_next_job::{ProcessNextJobCommand, ProcessNextJobHandler}; pub use commands::start_job::{StartJobCommand, StartJobHandler}; +pub use queries::list_jobs::{JobListResult, ListJobsHandler, ListJobsQuery}; pub use queries::report_batch_progress::{ BatchProgress, ReportBatchProgressHandler, ReportBatchProgressQuery, }; diff --git a/crates/application/src/processing/queries/list_jobs.rs b/crates/application/src/processing/queries/list_jobs.rs new file mode 100644 index 0000000..31ebfdd --- /dev/null +++ b/crates/application/src/processing/queries/list_jobs.rs @@ -0,0 +1,34 @@ +use std::sync::Arc; + +use domain::{entities::Job, errors::DomainError, ports::JobRepository}; + +pub struct ListJobsQuery { + pub status: Option, + pub limit: u32, + pub offset: u32, +} + +pub struct JobListResult { + pub jobs: Vec, + pub total: u64, +} + +pub struct ListJobsHandler { + job_repo: Arc, +} + +impl ListJobsHandler { + pub fn new(job_repo: Arc) -> Self { + Self { job_repo } + } + + pub async fn execute(&self, query: ListJobsQuery) -> Result { + let status_ref = query.status.as_deref(); + let jobs = self + .job_repo + .find_all(status_ref, query.limit, query.offset) + .await?; + let total = self.job_repo.count(status_ref).await?; + Ok(JobListResult { jobs, total }) + } +} diff --git a/crates/application/src/processing/queries/mod.rs b/crates/application/src/processing/queries/mod.rs index affaceb..14fb23b 100644 --- a/crates/application/src/processing/queries/mod.rs +++ b/crates/application/src/processing/queries/mod.rs @@ -1 +1,2 @@ +pub mod list_jobs; pub mod report_batch_progress; diff --git a/crates/application/src/testing/repositories.rs b/crates/application/src/testing/repositories.rs index cc49b92..a7cdbcb 100644 --- a/crates/application/src/testing/repositories.rs +++ b/crates/application/src/testing/repositories.rs @@ -1,24 +1,46 @@ use async_trait::async_trait; use domain::{ entities::{ - Album, Asset, AssetMetadata, AssetTag, DuplicateGroup, DuplicateStatus, Group, - IngestSession, InviteCode, Job, JobBatch, JobStatus, LibraryPath, MetadataSource, Plugin, - ProcessingPipeline, QuotaDefinition, Role, ShareLink, ShareScope, ShareTarget, - SidecarRecord, StorageVolume, SyncStatus, Tag, UsageLedgerEntry, UsageType, User, + Album, Asset, AssetFilters, AssetMetadata, AssetTag, DuplicateGroup, DuplicateStatus, + Group, IngestSession, InviteCode, Job, JobBatch, JobStatus, LibraryPath, MetadataSource, + Plugin, ProcessingPipeline, QuotaDefinition, RefreshToken, Role, ShareLink, ShareScope, + ShareTarget, SidecarRecord, StorageVolume, SyncStatus, Tag, UsageLedgerEntry, UsageType, + User, }, errors::DomainError, ports::{ AlbumRepository, AssetMetadataRepository, AssetRepository, DuplicateRepository, GroupRepository, IngestSessionRepository, IngestTransaction, JobBatchRepository, JobRepository, LibraryPathRepository, PipelineRepository, PluginRepository, - QuotaRepository, RoleRepository, ShareRepository, SidecarRepository, - StorageVolumeRepository, TagRepository, UsageLedgerRepository, UserRepository, + QuotaRepository, RefreshTokenRepository, RoleRepository, ShareRepository, + SidecarRepository, StorageVolumeRepository, TagRepository, UsageLedgerRepository, + UserRepository, }, value_objects::{Checksum, DateTimeStamp, Email, SystemId}, }; use std::collections::HashMap; use tokio::sync::Mutex; +macro_rules! in_memory_repo { + ($name:ident, $entity:ty) => { + pub struct $name { + data: Mutex>, + } + impl $name { + pub fn new() -> Self { + Self { + data: Mutex::new(HashMap::new()), + } + } + } + impl Default for $name { + fn default() -> Self { + Self::new() + } + } + }; +} + // --- InMemoryUserRepository --- pub struct InMemoryUserRepository { @@ -83,25 +105,7 @@ impl UserRepository for InMemoryUserRepository { } } -// --- InMemoryAssetRepository --- - -pub struct InMemoryAssetRepository { - data: Mutex>, -} - -impl InMemoryAssetRepository { - pub fn new() -> Self { - Self { - data: Mutex::new(HashMap::new()), - } - } -} - -impl Default for InMemoryAssetRepository { - fn default() -> Self { - Self::new() - } -} +in_memory_repo!(InMemoryAssetRepository, Asset); #[async_trait] impl AssetRepository for InMemoryAssetRepository { @@ -141,6 +145,16 @@ impl AssetRepository for InMemoryAssetRepository { .collect()) } + async fn search( + &self, + owner_id: &SystemId, + _filters: &AssetFilters, + limit: u32, + offset: u32, + ) -> Result, DomainError> { + self.find_by_owner(owner_id, limit, offset).await + } + async fn save(&self, asset: &Asset) -> Result<(), DomainError> { self.data .lock() @@ -155,25 +169,7 @@ impl AssetRepository for InMemoryAssetRepository { } } -// --- InMemoryAlbumRepository --- - -pub struct InMemoryAlbumRepository { - data: Mutex>, -} - -impl InMemoryAlbumRepository { - pub fn new() -> Self { - Self { - data: Mutex::new(HashMap::new()), - } - } -} - -impl Default for InMemoryAlbumRepository { - fn default() -> Self { - Self::new() - } -} +in_memory_repo!(InMemoryAlbumRepository, Album); #[async_trait] impl AlbumRepository for InMemoryAlbumRepository { @@ -206,25 +202,7 @@ impl AlbumRepository for InMemoryAlbumRepository { } } -// --- InMemoryJobRepository --- - -pub struct InMemoryJobRepository { - data: Mutex>, -} - -impl InMemoryJobRepository { - pub fn new() -> Self { - Self { - data: Mutex::new(HashMap::new()), - } - } -} - -impl Default for InMemoryJobRepository { - fn default() -> Self { - Self::new() - } -} +in_memory_repo!(InMemoryJobRepository, Job); #[async_trait] impl JobRepository for InMemoryJobRepository { @@ -252,6 +230,24 @@ impl JobRepository for InMemoryJobRepository { .collect()) } + async fn find_all( + &self, + _status: Option<&str>, + limit: u32, + offset: u32, + ) -> Result, DomainError> { + let all: Vec = self.data.lock().await.values().cloned().collect(); + Ok(all + .into_iter() + .skip(offset as usize) + .take(limit as usize) + .collect()) + } + + async fn count(&self, _status: Option<&str>) -> Result { + Ok(self.data.lock().await.len() as u64) + } + async fn save(&self, job: &Job) -> Result<(), DomainError> { self.data .lock() @@ -261,25 +257,7 @@ impl JobRepository for InMemoryJobRepository { } } -// --- InMemoryRoleRepository --- - -pub struct InMemoryRoleRepository { - data: Mutex>, -} - -impl InMemoryRoleRepository { - pub fn new() -> Self { - Self { - data: Mutex::new(HashMap::new()), - } - } -} - -impl Default for InMemoryRoleRepository { - fn default() -> Self { - Self::new() - } -} +in_memory_repo!(InMemoryRoleRepository, Role); #[async_trait] impl RoleRepository for InMemoryRoleRepository { @@ -322,25 +300,7 @@ impl RoleRepository for InMemoryRoleRepository { } } -// --- InMemoryGroupRepository --- - -pub struct InMemoryGroupRepository { - data: Mutex>, -} - -impl InMemoryGroupRepository { - pub fn new() -> Self { - Self { - data: Mutex::new(HashMap::new()), - } - } -} - -impl Default for InMemoryGroupRepository { - fn default() -> Self { - Self::new() - } -} +in_memory_repo!(InMemoryGroupRepository, Group); #[async_trait] impl GroupRepository for InMemoryGroupRepository { @@ -373,25 +333,7 @@ impl GroupRepository for InMemoryGroupRepository { } } -// --- InMemoryStorageVolumeRepository --- - -pub struct InMemoryStorageVolumeRepository { - data: Mutex>, -} - -impl InMemoryStorageVolumeRepository { - pub fn new() -> Self { - Self { - data: Mutex::new(HashMap::new()), - } - } -} - -impl Default for InMemoryStorageVolumeRepository { - fn default() -> Self { - Self::new() - } -} +in_memory_repo!(InMemoryStorageVolumeRepository, StorageVolume); #[async_trait] impl StorageVolumeRepository for InMemoryStorageVolumeRepository { @@ -417,25 +359,7 @@ impl StorageVolumeRepository for InMemoryStorageVolumeRepository { } } -// --- InMemoryLibraryPathRepository --- - -pub struct InMemoryLibraryPathRepository { - data: Mutex>, -} - -impl InMemoryLibraryPathRepository { - pub fn new() -> Self { - Self { - data: Mutex::new(HashMap::new()), - } - } -} - -impl Default for InMemoryLibraryPathRepository { - fn default() -> Self { - Self::new() - } -} +in_memory_repo!(InMemoryLibraryPathRepository, LibraryPath); #[async_trait] impl LibraryPathRepository for InMemoryLibraryPathRepository { @@ -482,25 +406,7 @@ impl LibraryPathRepository for InMemoryLibraryPathRepository { } } -// --- InMemoryIngestSessionRepository --- - -pub struct InMemoryIngestSessionRepository { - data: Mutex>, -} - -impl InMemoryIngestSessionRepository { - pub fn new() -> Self { - Self { - data: Mutex::new(HashMap::new()), - } - } -} - -impl Default for InMemoryIngestSessionRepository { - fn default() -> Self { - Self::new() - } -} +in_memory_repo!(InMemoryIngestSessionRepository, IngestSession); #[async_trait] impl IngestSessionRepository for InMemoryIngestSessionRepository { @@ -528,25 +434,7 @@ impl IngestSessionRepository for InMemoryIngestSessionRepository { } } -// --- InMemoryQuotaRepository --- - -pub struct InMemoryQuotaRepository { - data: Mutex>, -} - -impl InMemoryQuotaRepository { - pub fn new() -> Self { - Self { - data: Mutex::new(HashMap::new()), - } - } -} - -impl Default for InMemoryQuotaRepository { - fn default() -> Self { - Self::new() - } -} +in_memory_repo!(InMemoryQuotaRepository, QuotaDefinition); #[async_trait] impl QuotaRepository for InMemoryQuotaRepository { @@ -889,25 +777,7 @@ impl TagRepository for InMemoryTagRepository { } } -// --- InMemoryDuplicateRepository --- - -pub struct InMemoryDuplicateRepository { - data: Mutex>, -} - -impl InMemoryDuplicateRepository { - pub fn new() -> Self { - Self { - data: Mutex::new(HashMap::new()), - } - } -} - -impl Default for InMemoryDuplicateRepository { - fn default() -> Self { - Self::new() - } -} +in_memory_repo!(InMemoryDuplicateRepository, DuplicateGroup); #[async_trait] impl DuplicateRepository for InMemoryDuplicateRepository { @@ -946,25 +816,7 @@ impl DuplicateRepository for InMemoryDuplicateRepository { } } -// --- InMemorySidecarRepository --- - -pub struct InMemorySidecarRepository { - data: Mutex>, -} - -impl InMemorySidecarRepository { - pub fn new() -> Self { - Self { - data: Mutex::new(HashMap::new()), - } - } -} - -impl Default for InMemorySidecarRepository { - fn default() -> Self { - Self::new() - } -} +in_memory_repo!(InMemorySidecarRepository, SidecarRecord); #[async_trait] impl SidecarRepository for InMemorySidecarRepository { @@ -1000,25 +852,7 @@ impl SidecarRepository for InMemorySidecarRepository { } } -// --- InMemoryJobBatchRepository --- - -pub struct InMemoryJobBatchRepository { - data: Mutex>, -} - -impl InMemoryJobBatchRepository { - pub fn new() -> Self { - Self { - data: Mutex::new(HashMap::new()), - } - } -} - -impl Default for InMemoryJobBatchRepository { - fn default() -> Self { - Self::new() - } -} +in_memory_repo!(InMemoryJobBatchRepository, JobBatch); #[async_trait] impl JobBatchRepository for InMemoryJobBatchRepository { @@ -1035,25 +869,7 @@ impl JobBatchRepository for InMemoryJobBatchRepository { } } -// --- InMemoryPluginRepository --- - -pub struct InMemoryPluginRepository { - data: Mutex>, -} - -impl InMemoryPluginRepository { - pub fn new() -> Self { - Self { - data: Mutex::new(HashMap::new()), - } - } -} - -impl Default for InMemoryPluginRepository { - fn default() -> Self { - Self::new() - } -} +in_memory_repo!(InMemoryPluginRepository, Plugin); #[async_trait] impl PluginRepository for InMemoryPluginRepository { @@ -1081,25 +897,7 @@ impl PluginRepository for InMemoryPluginRepository { } } -// --- InMemoryPipelineRepository --- - -pub struct InMemoryPipelineRepository { - data: Mutex>, -} - -impl InMemoryPipelineRepository { - pub fn new() -> Self { - Self { - data: Mutex::new(HashMap::new()), - } - } -} - -impl Default for InMemoryPipelineRepository { - fn default() -> Self { - Self::new() - } -} +in_memory_repo!(InMemoryPipelineRepository, ProcessingPipeline); #[async_trait] impl PipelineRepository for InMemoryPipelineRepository { @@ -1216,3 +1014,36 @@ impl IngestTransaction for InMemoryIngestTransaction { Ok(()) } } + +in_memory_repo!(InMemoryRefreshTokenRepository, RefreshToken); + +#[async_trait] +impl RefreshTokenRepository for InMemoryRefreshTokenRepository { + async fn save(&self, token: &RefreshToken) -> Result<(), DomainError> { + self.data + .lock() + .await + .insert(token.token_id.to_string(), token.clone()); + Ok(()) + } + + async fn find_by_hash(&self, token_hash: &str) -> Result, DomainError> { + Ok(self + .data + .lock() + .await + .values() + .find(|t| t.token_hash == token_hash) + .cloned()) + } + + async fn delete_by_user(&self, user_id: &SystemId) -> Result<(), DomainError> { + self.data.lock().await.retain(|_, t| &t.user_id != user_id); + Ok(()) + } + + async fn delete(&self, id: &SystemId) -> Result<(), DomainError> { + self.data.lock().await.remove(&id.to_string()); + Ok(()) + } +} diff --git a/crates/application/tests/catalog/queries/read_asset_file.rs b/crates/application/tests/catalog/queries/read_asset_file.rs index 31c1669..5a264ae 100644 --- a/crates/application/tests/catalog/queries/read_asset_file.rs +++ b/crates/application/tests/catalog/queries/read_asset_file.rs @@ -17,7 +17,8 @@ async fn reads_file_successfully() { relative_path: "photos/inbox/cat.jpg".into(), checksum: Checksum::new("a".repeat(64)).unwrap(), }; - let asset = Asset::new(source, AssetType::Image, "image/jpeg", 512, SystemId::new()); + let owner_id = SystemId::new(); + let asset = Asset::new(source, AssetType::Image, "image/jpeg", 512, owner_id); asset_repo.save(&asset).await.unwrap(); let file_data = Bytes::from(vec![0xFFu8; 512]); @@ -30,6 +31,7 @@ async fn reads_file_successfully() { let result = handler .execute(ReadAssetFileQuery { asset_id: asset.asset_id, + caller_id: owner_id, }) .await .unwrap(); @@ -48,6 +50,7 @@ async fn rejects_nonexistent_asset() { let result = handler .execute(ReadAssetFileQuery { asset_id: SystemId::new(), + caller_id: SystemId::new(), }) .await; diff --git a/crates/bootstrap/src/factory.rs b/crates/bootstrap/src/factory.rs index 466e6b5..8aa6521 100644 --- a/crates/bootstrap/src/factory.rs +++ b/crates/bootstrap/src/factory.rs @@ -66,7 +66,7 @@ pub async fn build_app(config: &Config) -> Result { .allow_methods(Any) .allow_headers(Any); - Ok(app_router() + Ok(app_router(&state) .with_state(state) .layer(TraceLayer::new_for_http()) .layer(cors)) diff --git a/crates/bootstrap/src/services/catalog.rs b/crates/bootstrap/src/services/catalog.rs index 3138fff..f144a51 100644 --- a/crates/bootstrap/src/services/catalog.rs +++ b/crates/bootstrap/src/services/catalog.rs @@ -1,13 +1,16 @@ use std::sync::Arc; use adapters_postgres::{ - PgPool, PostgresAssetMetadataRepository, PostgresAssetRepository, PostgresDerivativeRepository, - PostgresDuplicateRepository, PostgresIngestTransaction, + PgPool, PostgresAssetMetadataRepository, PostgresAssetRepository, PostgresAssetStackRepository, + PostgresDerivativeRepository, PostgresDuplicateRepository, PostgresIngestTransaction, + PostgresSidecarRepository, }; use adapters_storage::LocalFileStorage; use application::catalog::{ - GetAssetHandler, GetTimelineHandler, ReadAssetFileHandler, ReadDerivativeHandler, - RegisterAssetHandler, UpdateMetadataHandler, + CreateStackHandler, DeleteAssetHandler, DeleteStackHandler, DetectLivePhotosHandler, + GetAssetHandler, GetStackHandler, GetTimelineHandler, ListDuplicatesHandler, + ReadAssetFileHandler, ReadDerivativeHandler, RegisterAssetHandler, ResolveDuplicateHandler, + SearchAssetsHandler, UpdateMetadataHandler, }; use application::storage::IngestAssetHandler; use domain::ports::EventPublisher; @@ -25,6 +28,8 @@ pub fn build( let metadata_repo = Arc::new(PostgresAssetMetadataRepository::new(pool.clone())); let derivative_repo = Arc::new(PostgresDerivativeRepository::new(pool.clone())); let duplicate_repo = Arc::new(PostgresDuplicateRepository::new(pool.clone())); + let sidecar_repo = Arc::new(PostgresSidecarRepository::new(pool.clone())); + let stack_repo = Arc::new(PostgresAssetStackRepository::new(pool.clone())); let ingest_tx = Arc::new(PostgresIngestTransaction::new(pool.clone())); let ingest_asset = Arc::new(IngestAssetHandler::new( @@ -55,7 +60,35 @@ pub fn build( file_storage.clone(), )); - let read_derivative = Arc::new(ReadDerivativeHandler::new(derivative_repo, file_storage)); + let read_derivative = Arc::new(ReadDerivativeHandler::new( + derivative_repo.clone(), + asset_repo.clone(), + file_storage.clone(), + )); + + let search_assets = Arc::new(SearchAssetsHandler::new(asset_repo.clone())); + + let delete_asset = Arc::new(DeleteAssetHandler::new( + asset_repo.clone(), + derivative_repo.clone(), + sidecar_repo, + file_storage.clone(), + event_publisher.clone(), + )); + + let list_duplicates = Arc::new(ListDuplicatesHandler::new(duplicate_repo.clone())); + let resolve_duplicate = Arc::new(ResolveDuplicateHandler::new( + duplicate_repo.clone(), + delete_asset.clone(), + )); + + let create_stack = Arc::new(CreateStackHandler::new( + asset_repo.clone(), + stack_repo.clone(), + )); + let get_stack = Arc::new(GetStackHandler::new(stack_repo.clone())); + let delete_stack = Arc::new(DeleteStackHandler::new(stack_repo.clone())); + let detect_live_photos = Arc::new(DetectLivePhotosHandler::new(asset_repo.clone(), stack_repo)); let register_asset = Arc::new(RegisterAssetHandler::new( asset_repo, @@ -71,5 +104,13 @@ pub fn build( read_asset_file, read_derivative, register_asset, + delete_asset, + search_assets, + list_duplicates, + resolve_duplicate, + create_stack, + get_stack, + delete_stack, + detect_live_photos, } } diff --git a/crates/bootstrap/src/services/identity.rs b/crates/bootstrap/src/services/identity.rs index 2bc38c8..5f51788 100644 --- a/crates/bootstrap/src/services/identity.rs +++ b/crates/bootstrap/src/services/identity.rs @@ -1,8 +1,10 @@ use std::sync::Arc; use adapters_auth::{BcryptPasswordHasher, JwtTokenIssuer}; -use adapters_postgres::{PgPool, PostgresUserRepository}; -use application::identity::{GetProfileHandler, LoginUserHandler, RegisterUserHandler}; +use adapters_postgres::{PgPool, PostgresRefreshTokenRepository, PostgresUserRepository}; +use application::identity::{ + GetProfileHandler, LoginUserHandler, LogoutHandler, RefreshTokenHandler, RegisterUserHandler, +}; use domain::ports::TokenIssuer; use presentation::state::IdentityHandlers; @@ -15,20 +17,30 @@ pub fn build(pool: &PgPool, jwt_secret: &str) -> IdentityServices { let user_repo = Arc::new(PostgresUserRepository::new(pool.clone())); let hasher = Arc::new(BcryptPasswordHasher); let issuer: Arc = Arc::new(JwtTokenIssuer::new(jwt_secret)); + let refresh_repo = Arc::new(PostgresRefreshTokenRepository::new(pool.clone())); let register = Arc::new(RegisterUserHandler::new(user_repo.clone(), hasher.clone())); let login = Arc::new(LoginUserHandler::new( user_repo.clone(), hasher, issuer.clone(), + refresh_repo.clone(), )); let get_profile = Arc::new(GetProfileHandler::new(user_repo)); + let refresh = Arc::new(RefreshTokenHandler::new( + refresh_repo.clone(), + issuer.clone(), + )); + let logout = Arc::new(LogoutHandler::new(refresh_repo.clone())); IdentityServices { handlers: IdentityHandlers { register, login, get_profile, + refresh, + logout, + refresh_token_repo: refresh_repo, }, token_issuer: issuer, } diff --git a/crates/bootstrap/src/services/processing.rs b/crates/bootstrap/src/services/processing.rs index e941798..80bf9d6 100644 --- a/crates/bootstrap/src/services/processing.rs +++ b/crates/bootstrap/src/services/processing.rs @@ -6,7 +6,7 @@ use adapters_postgres::{ }; use application::processing::{ CompleteJobHandler, ConfigurePipelineHandler, EnqueueJobHandler, FailJobHandler, - ManagePluginHandler, ReportBatchProgressHandler, StartJobHandler, + ListJobsHandler, ManagePluginHandler, ReportBatchProgressHandler, StartJobHandler, }; use domain::ports::EventPublisher; use presentation::state::ProcessingHandlers; @@ -32,6 +32,7 @@ pub fn build(pool: &PgPool, event_publisher: Arc) -> Process batch_repo.clone(), event_publisher, )); + let list_jobs = Arc::new(ListJobsHandler::new(job_repo.clone())); let batch_progress = Arc::new(ReportBatchProgressHandler::new(batch_repo, job_repo)); let manage_plugin = Arc::new(ManagePluginHandler::new(plugin_repo.clone())); let configure_pipeline = Arc::new(ConfigurePipelineHandler::new(pipeline_repo, plugin_repo)); @@ -41,6 +42,7 @@ pub fn build(pool: &PgPool, event_publisher: Arc) -> Process start_job, complete_job, fail_job, + list_jobs, batch_progress, manage_plugin, configure_pipeline, diff --git a/crates/domain/Cargo.toml b/crates/domain/Cargo.toml index 3e91bc8..2b6cc4a 100644 --- a/crates/domain/Cargo.toml +++ b/crates/domain/Cargo.toml @@ -8,6 +8,7 @@ uuid = { workspace = true } chrono = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } +email_address = { workspace = true } thiserror = { workspace = true } async-trait = { workspace = true } bytes = { workspace = true } diff --git a/crates/domain/src/catalog/entities.rs b/crates/domain/src/catalog/entities.rs index 64aecde..1000181 100644 --- a/crates/domain/src/catalog/entities.rs +++ b/crates/domain/src/catalog/entities.rs @@ -54,6 +54,17 @@ impl Asset { } } +// --- AssetFilters --- + +#[derive(Default)] +pub struct AssetFilters { + pub asset_type: Option, + pub mime_type: Option, + pub date_from: Option, + pub date_to: Option, + pub is_processed: Option, +} + // --- AssetMetadata --- #[derive( diff --git a/crates/domain/src/catalog/ports.rs b/crates/domain/src/catalog/ports.rs index 3ba913b..9662e94 100644 --- a/crates/domain/src/catalog/ports.rs +++ b/crates/domain/src/catalog/ports.rs @@ -1,6 +1,6 @@ use super::entities::{ - Asset, AssetMetadata, AssetStack, DerivativeAsset, DerivativeProfile, DuplicateGroup, - MetadataSource, + Asset, AssetFilters, AssetMetadata, AssetStack, DerivativeAsset, DerivativeProfile, + DuplicateGroup, MetadataSource, }; use crate::common::errors::DomainError; use crate::common::value_objects::{Checksum, StructuredData, SystemId}; @@ -19,6 +19,13 @@ pub trait AssetRepository: Send + Sync { limit: u32, offset: u32, ) -> Result, DomainError>; + async fn search( + &self, + owner_id: &SystemId, + filters: &AssetFilters, + limit: u32, + offset: u32, + ) -> Result, DomainError>; async fn save(&self, asset: &Asset) -> Result<(), DomainError>; async fn delete(&self, id: &SystemId) -> Result<(), DomainError>; } diff --git a/crates/domain/src/common/errors.rs b/crates/domain/src/common/errors.rs index 14822ab..7791217 100644 --- a/crates/domain/src/common/errors.rs +++ b/crates/domain/src/common/errors.rs @@ -15,3 +15,13 @@ pub enum DomainError { #[error("Internal error: {0}")] Internal(String), } + +pub trait OptionExt { + fn or_not_found(self, entity: &str) -> Result; +} + +impl OptionExt for Option { + fn or_not_found(self, entity: &str) -> Result { + self.ok_or_else(|| DomainError::NotFound(format!("{entity} not found"))) + } +} diff --git a/crates/domain/src/common/events.rs b/crates/domain/src/common/events.rs index 84c8b00..df6997b 100644 --- a/crates/domain/src/common/events.rs +++ b/crates/domain/src/common/events.rs @@ -59,4 +59,27 @@ pub enum DomainEvent { error: String, timestamp: DateTimeStamp, }, + UserCreated { + user_id: SystemId, + timestamp: DateTimeStamp, + }, + UserDeleted { + user_id: SystemId, + timestamp: DateTimeStamp, + }, + AlbumCreated { + album_id: SystemId, + creator_id: SystemId, + timestamp: DateTimeStamp, + }, + TagCreated { + tag_id: SystemId, + asset_id: SystemId, + timestamp: DateTimeStamp, + }, + DuplicateDetected { + group_id: SystemId, + asset_ids: Vec, + timestamp: DateTimeStamp, + }, } diff --git a/crates/domain/src/common/value_objects/email.rs b/crates/domain/src/common/value_objects/email.rs index c4dd9fb..a7582a4 100644 --- a/crates/domain/src/common/value_objects/email.rs +++ b/crates/domain/src/common/value_objects/email.rs @@ -6,8 +6,8 @@ pub struct Email(String); impl Email { pub fn new(value: impl Into) -> Result { let value = value.into().trim().to_lowercase(); - if value.is_empty() || !value.contains('@') { - return Err(DomainError::Validation("Invalid email address".to_string())); + if !email_address::EmailAddress::is_valid(&value) { + return Err(DomainError::Validation("Invalid email address".into())); } Ok(Self(value)) } diff --git a/crates/domain/src/common/value_objects/mime_type.rs b/crates/domain/src/common/value_objects/mime_type.rs new file mode 100644 index 0000000..5123668 --- /dev/null +++ b/crates/domain/src/common/value_objects/mime_type.rs @@ -0,0 +1,26 @@ +use crate::common::errors::DomainError; + +#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] +pub struct MimeType(String); + +impl MimeType { + pub fn new(value: impl Into) -> Result { + let value = value.into(); + if !value.contains('/') || value.len() < 3 { + return Err(DomainError::Validation(format!( + "Invalid MIME type: {value}" + ))); + } + Ok(Self(value)) + } + + pub fn as_str(&self) -> &str { + &self.0 + } +} + +impl std::fmt::Display for MimeType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} diff --git a/crates/domain/src/common/value_objects/mod.rs b/crates/domain/src/common/value_objects/mod.rs index d5a3f26..7e08620 100644 --- a/crates/domain/src/common/value_objects/mod.rs +++ b/crates/domain/src/common/value_objects/mod.rs @@ -2,14 +2,20 @@ mod checksum; mod date_time_stamp; mod email; pub mod filter_criteria; +mod mime_type; mod password; +mod relative_path; mod structured_data; mod system_id; +mod username; pub use checksum::Checksum; pub use date_time_stamp::DateTimeStamp; pub use email::Email; pub use filter_criteria::{FilterCondition, FilterCriteria, FilterOperator}; +pub use mime_type::MimeType; pub use password::PasswordHash; +pub use relative_path::RelativePath; pub use structured_data::{MetadataValue, StructuredData}; pub use system_id::SystemId; +pub use username::Username; diff --git a/crates/domain/src/common/value_objects/relative_path.rs b/crates/domain/src/common/value_objects/relative_path.rs new file mode 100644 index 0000000..f14a035 --- /dev/null +++ b/crates/domain/src/common/value_objects/relative_path.rs @@ -0,0 +1,31 @@ +use crate::common::errors::DomainError; + +#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] +pub struct RelativePath(String); + +impl RelativePath { + pub fn new(value: impl Into) -> Result { + let value = value.into(); + if value.is_empty() { + return Err(DomainError::Validation("Path must not be empty".into())); + } + if value.contains("..") { + return Err(DomainError::Validation("Path must not contain '..'".into())); + } + Ok(Self(value)) + } + + pub fn as_str(&self) -> &str { + &self.0 + } + + pub fn filename(&self) -> &str { + self.0.rsplit('/').next().unwrap_or(&self.0) + } +} + +impl std::fmt::Display for RelativePath { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} diff --git a/crates/domain/src/common/value_objects/username.rs b/crates/domain/src/common/value_objects/username.rs new file mode 100644 index 0000000..1a9d88d --- /dev/null +++ b/crates/domain/src/common/value_objects/username.rs @@ -0,0 +1,34 @@ +use crate::common::errors::DomainError; + +#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] +pub struct Username(String); + +impl Username { + pub fn new(value: impl Into) -> Result { + let value = value.into(); + if value.len() < 2 || value.len() > 64 { + return Err(DomainError::Validation( + "Username must be 2-64 characters".into(), + )); + } + if !value + .chars() + .all(|c| c.is_alphanumeric() || c == '_' || c == '-' || c == '.') + { + return Err(DomainError::Validation( + "Username may only contain alphanumeric, underscore, dash, or dot".into(), + )); + } + Ok(Self(value)) + } + + pub fn as_str(&self) -> &str { + &self.0 + } +} + +impl std::fmt::Display for Username { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} diff --git a/crates/domain/src/identity/entities.rs b/crates/domain/src/identity/entities.rs index a94526e..2b8c680 100644 --- a/crates/domain/src/identity/entities.rs +++ b/crates/domain/src/identity/entities.rs @@ -1,5 +1,5 @@ use crate::common::errors::DomainError; -use crate::common::value_objects::{Email, PasswordHash, SystemId}; +use crate::common::value_objects::{DateTimeStamp, Email, PasswordHash, SystemId}; use chrono::{DateTime, Utc}; use std::collections::HashSet; @@ -141,6 +141,35 @@ impl User { } } +// --- RefreshToken --- + +#[derive(Debug, Clone)] +pub struct RefreshToken { + pub token_id: SystemId, + pub user_id: SystemId, + pub token_hash: String, + pub expires_at: DateTimeStamp, + pub revoked: bool, + pub created_at: DateTimeStamp, +} + +impl RefreshToken { + pub fn new(user_id: SystemId, token_hash: String, expires_at: DateTimeStamp) -> Self { + Self { + token_id: SystemId::new(), + user_id, + token_hash, + expires_at, + revoked: false, + created_at: DateTimeStamp::now(), + } + } + + pub fn is_valid(&self) -> bool { + !self.revoked && *self.expires_at.as_datetime() > Utc::now() + } +} + // --- Group --- #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] diff --git a/crates/domain/src/identity/ports.rs b/crates/domain/src/identity/ports.rs index fe9a4d4..a4c6269 100644 --- a/crates/domain/src/identity/ports.rs +++ b/crates/domain/src/identity/ports.rs @@ -1,4 +1,4 @@ -use super::entities::{Group, Role, User}; +use super::entities::{Group, RefreshToken, Role, User}; use crate::common::errors::DomainError; use crate::common::value_objects::{Email, PasswordHash, SystemId}; use async_trait::async_trait; @@ -35,6 +35,16 @@ pub trait GroupRepository: Send + Sync { async fn delete(&self, id: &SystemId) -> Result<(), DomainError>; } +// --- RefreshTokenRepository --- + +#[async_trait] +pub trait RefreshTokenRepository: Send + Sync { + async fn save(&self, token: &RefreshToken) -> Result<(), DomainError>; + async fn find_by_hash(&self, token_hash: &str) -> Result, DomainError>; + async fn delete_by_user(&self, user_id: &SystemId) -> Result<(), DomainError>; + async fn delete(&self, id: &SystemId) -> Result<(), DomainError>; +} + // --- Auth --- #[async_trait] diff --git a/crates/domain/src/processing/ports.rs b/crates/domain/src/processing/ports.rs index 859334b..ef178c9 100644 --- a/crates/domain/src/processing/ports.rs +++ b/crates/domain/src/processing/ports.rs @@ -11,6 +11,13 @@ pub trait JobRepository: Send + Sync { async fn find_by_id(&self, id: &SystemId) -> Result, DomainError>; async fn find_next_queued(&self) -> Result, DomainError>; async fn find_by_batch(&self, batch_id: &SystemId) -> Result, DomainError>; + async fn find_all( + &self, + status: Option<&str>, + limit: u32, + offset: u32, + ) -> Result, DomainError>; + async fn count(&self, status: Option<&str>) -> Result; async fn save(&self, job: &Job) -> Result<(), DomainError>; } diff --git a/crates/presentation/src/extractors/auth.rs b/crates/presentation/src/extractors/auth.rs index 4fe2a42..7ec2413 100644 --- a/crates/presentation/src/extractors/auth.rs +++ b/crates/presentation/src/extractors/auth.rs @@ -1,12 +1,6 @@ -use crate::state::AppState; -use axum::{ - Json, - extract::FromRequestParts, - http::{StatusCode, request::Parts}, - response::{IntoResponse, Response}, -}; +use crate::{middleware::auth::extract_bearer_token, state::AppState}; +use axum::{extract::FromRequestParts, http::request::Parts, response::Response}; use domain::value_objects::SystemId; -use serde_json::json; pub struct JwtClaims { pub user_id: SystemId, @@ -20,30 +14,13 @@ impl FromRequestParts for JwtClaims { parts: &mut Parts, state: &AppState, ) -> Result { - let auth_header = parts - .headers - .get(axum::http::header::AUTHORIZATION) - .and_then(|v| v.to_str().ok()) - .ok_or_else(|| { - ( - StatusCode::UNAUTHORIZED, - Json(json!({ "error": "Missing Authorization header" })), - ) - .into_response() - })?; - - let token = auth_header.strip_prefix("Bearer ").ok_or_else(|| { - ( - StatusCode::UNAUTHORIZED, - Json(json!({ "error": "Invalid Authorization format" })), - ) - .into_response() - })?; + let token = extract_bearer_token(&parts.headers)?; let (user_id, role) = state.token_issuer.verify(token).await.map_err(|_| { + use axum::{Json, http::StatusCode, response::IntoResponse}; ( StatusCode::UNAUTHORIZED, - Json(json!({ "error": "Invalid or expired token" })), + Json(serde_json::json!({ "error": "Invalid or expired token" })), ) .into_response() })?; diff --git a/crates/presentation/src/handlers/albums.rs b/crates/presentation/src/handlers/albums.rs index 93722ef..080f0c3 100644 --- a/crates/presentation/src/handlers/albums.rs +++ b/crates/presentation/src/handlers/albums.rs @@ -56,7 +56,10 @@ pub async fn add_entry( user_id: claims.user_id, }; let album = state.organization.manage_album_entries.execute(cmd).await?; - Ok((StatusCode::OK, Json(AlbumResponse::from_domain(&album)))) + Ok(( + StatusCode::CREATED, + Json(AlbumResponse::from_domain(&album)), + )) } pub async fn remove_entry( diff --git a/crates/presentation/src/handlers/assets.rs b/crates/presentation/src/handlers/assets.rs index 3880676..0931895 100644 --- a/crates/presentation/src/handlers/assets.rs +++ b/crates/presentation/src/handlers/assets.rs @@ -2,6 +2,7 @@ use crate::{ constants::{DEFAULT_PAGE_SIZE, MAX_PAGE_SIZE}, errors::AppError, extractors::{JwtClaims, UploadedAsset}, + parsers, state::AppState, }; use api_types::{ @@ -10,8 +11,8 @@ use api_types::{ }; use application::{ catalog::{ - GetAssetQuery, GetTimelineQuery, ReadAssetFileQuery, ReadDerivativeQuery, - RegisterAssetCommand, UpdateMetadataCommand, + DeleteAssetCommand, GetAssetQuery, GetTimelineQuery, ReadAssetFileQuery, + ReadDerivativeQuery, RegisterAssetCommand, SearchAssetsQuery, UpdateMetadataCommand, }, organization::TagAssetCommand, storage::IngestAssetCommand, @@ -24,9 +25,9 @@ use axum::{ response::Response, }; use domain::{ - catalog::entities::AssetType, + catalog::entities::AssetFilters, errors::DomainError, - value_objects::{MetadataValue, StructuredData, SystemId}, + value_objects::{DateTimeStamp, MetadataValue, StructuredData, SystemId}, }; #[derive(Debug, serde::Deserialize)] @@ -35,6 +36,79 @@ pub struct TimelineParams { pub offset: Option, } +#[derive(Debug, serde::Deserialize)] +pub struct SearchParams { + #[serde(rename = "type")] + pub asset_type: Option, + pub mime_type: Option, + pub date_from: Option, + pub date_to: Option, + pub is_processed: Option, + pub limit: Option, + pub offset: Option, +} + +pub async fn search_assets( + State(state): State, + claims: JwtClaims, + Query(params): Query, +) -> Result, AppError> { + let asset_type = params + .asset_type + .as_deref() + .map(parsers::asset_type) + .transpose()?; + + let date_from = params + .date_from + .as_deref() + .map(|s| { + let d = chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d") + .map_err(|_| AppError::from(DomainError::Validation("Invalid date_from".into())))?; + d.and_hms_opt(0, 0, 0) + .map(|dt| DateTimeStamp::from_datetime(dt.and_utc())) + .ok_or_else(|| AppError::from(DomainError::Validation("Invalid date_from".into()))) + }) + .transpose()?; + + let date_to = params + .date_to + .as_deref() + .map(|s| { + let d = chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d") + .map_err(|_| AppError::from(DomainError::Validation("Invalid date_to".into())))?; + d.and_hms_opt(23, 59, 59) + .map(|dt| DateTimeStamp::from_datetime(dt.and_utc())) + .ok_or_else(|| AppError::from(DomainError::Validation("Invalid date_to".into()))) + }) + .transpose()?; + + let filters = AssetFilters { + asset_type, + mime_type: params.mime_type, + date_from, + date_to, + is_processed: params.is_processed, + }; + + let limit = params.limit.unwrap_or(DEFAULT_PAGE_SIZE).min(MAX_PAGE_SIZE); + let offset = params.offset.unwrap_or(0); + + let query = SearchAssetsQuery { + owner_id: claims.user_id, + filters, + limit, + offset, + }; + let results = state.catalog.search_assets.execute(query).await?; + let total = results.len(); + let assets = results + .iter() + .map(|a| AssetResponse::from_domain(a, &StructuredData::new())) + .collect(); + Ok(Json(TimelineResponse { assets, total })) +} + pub async fn ingest( State(state): State, claims: JwtClaims, @@ -117,11 +191,12 @@ pub async fn update_metadata( pub async fn serve_file( State(state): State, - _claims: JwtClaims, + claims: JwtClaims, Path((asset_id,)): Path<(uuid::Uuid,)>, ) -> Result { let query = ReadAssetFileQuery { asset_id: SystemId::from_uuid(asset_id), + caller_id: claims.user_id, }; let result = state.catalog.read_asset_file.execute(query).await?; @@ -152,15 +227,29 @@ pub async fn tag_asset( Ok((StatusCode::CREATED, Json(TagResponse::from_domain(&tag)))) } +pub async fn delete_asset( + State(state): State, + claims: JwtClaims, + Path((asset_id,)): Path<(uuid::Uuid,)>, +) -> Result { + let cmd = DeleteAssetCommand { + asset_id: SystemId::from_uuid(asset_id), + deleted_by: claims.user_id, + }; + state.catalog.delete_asset.execute(cmd).await?; + Ok(StatusCode::NO_CONTENT) +} + pub async fn serve_derivative( State(state): State, - _claims: JwtClaims, + claims: JwtClaims, Path((asset_id, profile)): Path<(uuid::Uuid, String)>, ) -> Result { - let profile = parse_derivative_profile(&profile)?; + let profile = parsers::derivative_profile(&profile)?; let query = ReadDerivativeQuery { asset_id: SystemId::from_uuid(asset_id), profile, + caller_id: claims.user_id, }; let result = state.catalog.read_derivative.execute(query).await?; @@ -173,36 +262,12 @@ pub async fn serve_derivative( .map_err(|e| AppError::from(DomainError::Internal(e.to_string()))) } -fn parse_derivative_profile(s: &str) -> Result { - use domain::entities::DerivativeProfile; - match s { - "thumbnail" | "thumbnail_square" => Ok(DerivativeProfile::ThumbnailSquare), - "thumbnail_large" => Ok(DerivativeProfile::ThumbnailLarge), - "web" | "web_optimized" => Ok(DerivativeProfile::WebOptimized), - "video_sd" => Ok(DerivativeProfile::VideoSd), - _ => Err(AppError::from(DomainError::Validation(format!( - "Unknown derivative profile: {s}" - )))), - } -} - -fn parse_asset_type(s: &str) -> Result { - match s { - "image" => Ok(AssetType::Image), - "video" => Ok(AssetType::Video), - "live_photo" => Ok(AssetType::LivePhoto), - _ => Err(AppError::from(DomainError::Validation(format!( - "Invalid asset type: {s}" - )))), - } -} - pub async fn register_asset( State(state): State, claims: JwtClaims, Json(req): Json, ) -> Result<(StatusCode, Json), AppError> { - let asset_type = parse_asset_type(&req.asset_type)?; + let asset_type = parsers::asset_type(&req.asset_type)?; let cmd = RegisterAssetCommand { volume_id: SystemId::from_uuid(req.volume_id), relative_path: req.relative_path, diff --git a/crates/presentation/src/handlers/auth.rs b/crates/presentation/src/handlers/auth.rs index d7c86d1..6789211 100644 --- a/crates/presentation/src/handlers/auth.rs +++ b/crates/presentation/src/handlers/auth.rs @@ -4,10 +4,13 @@ use crate::{ state::AppState, }; use api_types::{ - requests::{LoginRequest, RegisterRequest}, + requests::{LoginRequest, RefreshTokenRequest, RegisterRequest}, responses::{AuthResponse, UserResponse}, }; -use application::identity::{GetProfileQuery, LoginUserCommand, RegisterUserCommand}; +use application::identity::{ + GetProfileQuery, LoginUserCommand, RefreshTokenCommand, RegisterUserCommand, + generate_refresh_token, +}; use axum::{Json, extract::State, http::StatusCode}; #[utoipa::path( @@ -34,10 +37,13 @@ pub async fn register( .issue(&user.id, "user") .await .map_err(AppError::from)?; + let (refresh_token, _) = + generate_refresh_token(&state.identity.refresh_token_repo, &user.id).await?; Ok(( StatusCode::CREATED, Json(AuthResponse { token, + refresh_token, user: UserResponse::from_domain(&user), }), )) @@ -59,9 +65,10 @@ pub async fn login( email: req.email, password: req.password, }; - let (user, token) = state.identity.login.execute(cmd).await?; + let (user, token, refresh_token) = state.identity.login.execute(cmd).await?; Ok(Json(AuthResponse { token, + refresh_token, user: UserResponse::from_domain(&user), })) } @@ -84,3 +91,32 @@ pub async fn me( let user = state.identity.get_profile.execute(query).await?; Ok(Json(UserResponse::from_domain(&user))) } + +pub async fn refresh( + State(state): State, + ValidatedJson(req): ValidatedJson, +) -> Result, AppError> { + let cmd = RefreshTokenCommand { + refresh_token: req.refresh_token, + }; + let (access_token, refresh_token) = state.identity.refresh.execute(cmd).await?; + let (user_id, _) = state.token_issuer.verify(&access_token).await?; + let user = state + .identity + .get_profile + .execute(GetProfileQuery { user_id }) + .await?; + Ok(Json(AuthResponse { + token: access_token, + refresh_token, + user: UserResponse::from_domain(&user), + })) +} + +pub async fn logout( + State(state): State, + claims: JwtClaims, +) -> Result { + state.identity.logout.execute(&claims.user_id).await?; + Ok(StatusCode::NO_CONTENT) +} diff --git a/crates/presentation/src/handlers/duplicates.rs b/crates/presentation/src/handlers/duplicates.rs new file mode 100644 index 0000000..f35394f --- /dev/null +++ b/crates/presentation/src/handlers/duplicates.rs @@ -0,0 +1,41 @@ +use crate::{errors::AppError, extractors::JwtClaims, state::AppState}; +use api_types::{requests::ResolveDuplicateRequest, responses::DuplicateGroupResponse}; +use application::catalog::{ListDuplicatesQuery, ResolveDuplicateCommand}; +use axum::{ + Json, + extract::{Path, State}, + http::StatusCode, +}; +use domain::value_objects::SystemId; + +pub async fn list_duplicates( + State(state): State, + claims: JwtClaims, +) -> Result>, AppError> { + super::require_admin(&claims)?; + let groups = state + .catalog + .list_duplicates + .execute(ListDuplicatesQuery) + .await?; + let resp = groups + .iter() + .map(DuplicateGroupResponse::from_domain) + .collect(); + Ok(Json(resp)) +} + +pub async fn resolve_duplicate( + State(state): State, + claims: JwtClaims, + Path((group_id,)): Path<(uuid::Uuid,)>, + Json(req): Json, +) -> Result { + let cmd = ResolveDuplicateCommand { + group_id: SystemId::from_uuid(group_id), + keep_asset_id: SystemId::from_uuid(req.keep_asset_id), + resolved_by: claims.user_id, + }; + state.catalog.resolve_duplicate.execute(cmd).await?; + Ok(StatusCode::OK) +} diff --git a/crates/presentation/src/handlers/mod.rs b/crates/presentation/src/handlers/mod.rs index 5d2c2cf..378d7ba 100644 --- a/crates/presentation/src/handlers/mod.rs +++ b/crates/presentation/src/handlers/mod.rs @@ -1,8 +1,22 @@ pub mod albums; pub mod assets; pub mod auth; +pub mod duplicates; pub mod health; pub mod processing; pub mod sharing; pub mod sidecar; +pub mod stacks; pub mod storage; + +use crate::{errors::AppError, extractors::JwtClaims}; +use domain::errors::DomainError; + +pub(crate) fn require_admin(claims: &JwtClaims) -> Result<(), AppError> { + if claims.role != "admin" { + return Err(AppError::from(DomainError::Forbidden( + "Admin access required".into(), + ))); + } + Ok(()) +} diff --git a/crates/presentation/src/handlers/processing.rs b/crates/presentation/src/handlers/processing.rs index 0af6e1a..fe9b46a 100644 --- a/crates/presentation/src/handlers/processing.rs +++ b/crates/presentation/src/handlers/processing.rs @@ -1,49 +1,28 @@ -use crate::{errors::AppError, extractors::JwtClaims, state::AppState}; +use crate::{errors::AppError, extractors::JwtClaims, parsers, state::AppState}; use api_types::{ requests::{ CompleteJobRequest, ConfigurePipelineRequest, EnqueueJobRequest, FailJobRequest, ManagePluginRequest, }, - responses::{BatchProgressResponse, JobResponse, PipelineResponse, PluginResponse}, + responses::{ + BatchProgressResponse, JobListResponse, JobResponse, PipelineResponse, PluginResponse, + }, }; use application::processing::{ - CompleteJobCommand, ConfigurePipelineCommand, EnqueueJobCommand, FailJobCommand, + CompleteJobCommand, ConfigurePipelineCommand, EnqueueJobCommand, FailJobCommand, ListJobsQuery, ManagePluginCommand, PipelineStepConfig, PluginAction, ReportBatchProgressQuery, StartJobCommand, }; use axum::{ Json, - extract::{Path, State}, + extract::{Path, Query, State}, http::StatusCode, }; use domain::{ - entities::{JobType, PluginType}, errors::DomainError, value_objects::{MetadataValue, StructuredData, SystemId}, }; -fn parse_job_type(s: &str) -> JobType { - match s { - "scan_directory" => JobType::ScanDirectory, - "extract_metadata" => JobType::ExtractMetadata, - "generate_derivative" => JobType::GenerateDerivative, - "sync_sidecar" => JobType::SyncSidecar, - "detect_duplicates" => JobType::DetectDuplicates, - other => JobType::Custom(other.to_string()), - } -} - -fn parse_plugin_type(s: &str) -> Result { - match s { - "media_processor" => Ok(PluginType::MediaProcessor), - "scheduled_task" => Ok(PluginType::ScheduledTask), - "sidecar_writer" => Ok(PluginType::SidecarWriter), - _ => Err(AppError::from(DomainError::Validation(format!( - "Invalid plugin type: {s}" - )))), - } -} - fn hashmap_to_structured( map: &std::collections::HashMap, ) -> StructuredData { @@ -54,11 +33,38 @@ fn hashmap_to_structured( sd } +#[derive(Debug, serde::Deserialize)] +pub struct ListJobsParams { + pub status: Option, + pub limit: Option, + pub offset: Option, +} + +pub async fn list_jobs( + State(state): State, + claims: JwtClaims, + Query(params): Query, +) -> Result, AppError> { + super::require_admin(&claims)?; + let query = ListJobsQuery { + status: params.status, + limit: params.limit.unwrap_or(20).min(100), + offset: params.offset.unwrap_or(0), + }; + let result = state.processing.list_jobs.execute(query).await?; + let jobs = result.jobs.iter().map(JobResponse::from_domain).collect(); + Ok(Json(JobListResponse { + jobs, + total: result.total, + })) +} + pub async fn enqueue_job( State(state): State, - _claims: JwtClaims, + claims: JwtClaims, Json(req): Json, ) -> Result<(StatusCode, Json), AppError> { + super::require_admin(&claims)?; let payload = req .payload .as_ref() @@ -66,7 +72,7 @@ pub async fn enqueue_job( .unwrap_or_default(); let cmd = EnqueueJobCommand { - job_type: parse_job_type(&req.job_type), + job_type: parsers::job_type(&req.job_type), priority: req.priority.unwrap_or(0), payload, target_asset_id: req.target_asset_id.map(SystemId::from_uuid), @@ -78,9 +84,10 @@ pub async fn enqueue_job( pub async fn start_job( State(state): State, - _claims: JwtClaims, + claims: JwtClaims, Path((job_id,)): Path<(uuid::Uuid,)>, ) -> Result, AppError> { + super::require_admin(&claims)?; let cmd = StartJobCommand { job_id: SystemId::from_uuid(job_id), }; @@ -90,10 +97,11 @@ pub async fn start_job( pub async fn complete_job( State(state): State, - _claims: JwtClaims, + claims: JwtClaims, Path((job_id,)): Path<(uuid::Uuid,)>, Json(req): Json, ) -> Result, AppError> { + super::require_admin(&claims)?; let cmd = CompleteJobCommand { job_id: SystemId::from_uuid(job_id), result: hashmap_to_structured(&req.result), @@ -104,10 +112,11 @@ pub async fn complete_job( pub async fn fail_job( State(state): State, - _claims: JwtClaims, + claims: JwtClaims, Path((job_id,)): Path<(uuid::Uuid,)>, Json(req): Json, ) -> Result, AppError> { + super::require_admin(&claims)?; let cmd = FailJobCommand { job_id: SystemId::from_uuid(job_id), error: req.error, @@ -118,9 +127,10 @@ pub async fn fail_job( pub async fn batch_progress( State(state): State, - _claims: JwtClaims, + claims: JwtClaims, Path((batch_id,)): Path<(uuid::Uuid,)>, ) -> Result, AppError> { + super::require_admin(&claims)?; let query = ReportBatchProgressQuery { batch_id: SystemId::from_uuid(batch_id), }; @@ -130,16 +140,17 @@ pub async fn batch_progress( pub async fn manage_plugin( State(state): State, - _claims: JwtClaims, + claims: JwtClaims, Json(req): Json, ) -> Result<(StatusCode, Json), AppError> { + super::require_admin(&claims)?; let action = match req.action.as_str() { "create" => { let name = req.name.ok_or_else(|| { AppError::from(DomainError::Validation("name required for create".into())) })?; let pt = req.plugin_type.as_deref().unwrap_or("media_processor"); - let plugin_type = parse_plugin_type(pt)?; + let plugin_type = parsers::plugin_type(pt)?; let config = req .config .as_ref() @@ -173,9 +184,10 @@ pub async fn manage_plugin( pub async fn configure_pipeline( State(state): State, - _claims: JwtClaims, + claims: JwtClaims, Json(req): Json, ) -> Result<(StatusCode, Json), AppError> { + super::require_admin(&claims)?; let steps = req .steps .iter() diff --git a/crates/presentation/src/handlers/sharing.rs b/crates/presentation/src/handlers/sharing.rs index 6886a4f..0b3889c 100644 --- a/crates/presentation/src/handlers/sharing.rs +++ b/crates/presentation/src/handlers/sharing.rs @@ -1,4 +1,4 @@ -use crate::{errors::AppError, extractors::JwtClaims, state::AppState}; +use crate::{errors::AppError, extractors::JwtClaims, parsers, state::AppState}; use api_types::{ requests::{GenerateShareLinkRequest, ShareResourceRequest}, responses::{ShareLinkResponse, ShareScopeResponse, SharedResourceResponse}, @@ -11,53 +11,17 @@ use axum::{ extract::{Path, State}, http::StatusCode, }; -use domain::{ - entities::{LinkAccessLevel, ShareableType, TargetType}, - errors::DomainError, - value_objects::{DateTimeStamp, SystemId}, -}; +use domain::value_objects::{DateTimeStamp, SystemId}; const DEFAULT_ACCESS_LEVEL: &str = "view_only"; -fn parse_shareable_type(s: &str) -> Result { - match s { - "asset" => Ok(ShareableType::Asset), - "album" => Ok(ShareableType::Album), - "collection" => Ok(ShareableType::Collection), - "directory" => Ok(ShareableType::Directory), - _ => Err(AppError::from(DomainError::Validation(format!( - "Invalid shareable type: {s}" - )))), - } -} - -fn parse_target_type(s: &str) -> Result { - match s { - "user" => Ok(TargetType::User), - "group" => Ok(TargetType::Group), - _ => Err(AppError::from(DomainError::Validation(format!( - "Invalid target type: {s}" - )))), - } -} - -fn parse_access_level(s: &str) -> Result { - match s { - "view_only" => Ok(LinkAccessLevel::ViewOnly), - "limited_search" => Ok(LinkAccessLevel::LimitedSearch), - _ => Err(AppError::from(DomainError::Validation(format!( - "Invalid access level: {s}" - )))), - } -} - pub async fn share_resource( State(state): State, claims: JwtClaims, Json(req): Json, ) -> Result<(StatusCode, Json), AppError> { - let shareable_type = parse_shareable_type(&req.shareable_type)?; - let target_type = parse_target_type(&req.target_type)?; + let shareable_type = parsers::shareable_type(&req.shareable_type)?; + let target_type = parsers::target_type(&req.target_type)?; let cmd = ShareResourceCommand { shareable_type, @@ -79,9 +43,8 @@ pub async fn generate_link( claims: JwtClaims, Json(req): Json, ) -> Result<(StatusCode, Json), AppError> { - let shareable_type = parse_shareable_type(&req.shareable_type)?; - let access_level = - parse_access_level(req.access_level.as_deref().unwrap_or(DEFAULT_ACCESS_LEVEL))?; + let shareable_type = parsers::shareable_type(&req.shareable_type)?; + let al = parsers::access_level(req.access_level.as_deref().unwrap_or(DEFAULT_ACCESS_LEVEL))?; let expires_at = req.expires_in_hours.map(|h| { DateTimeStamp::from_datetime(chrono::Utc::now() + chrono::Duration::hours(h as i64)) @@ -90,7 +53,7 @@ pub async fn generate_link( let cmd = GenerateShareLinkCommand { shareable_type, shareable_id: SystemId::from_uuid(req.shareable_id), - access_level, + access_level: al, created_by: claims.user_id, expires_at, max_uses: req.max_uses, diff --git a/crates/presentation/src/handlers/sidecar.rs b/crates/presentation/src/handlers/sidecar.rs index c6e717b..7a6f836 100644 --- a/crates/presentation/src/handlers/sidecar.rs +++ b/crates/presentation/src/handlers/sidecar.rs @@ -1,4 +1,4 @@ -use crate::{errors::AppError, extractors::JwtClaims, state::AppState}; +use crate::{errors::AppError, extractors::JwtClaims, parsers, state::AppState}; use api_types::responses::{DetectChangesResponse, SidecarExportResponse, SidecarImportResponse}; use application::sidecar::{ DetectExternalChangesCommand, ExportSidecarCommand, FullExportCommand, FullImportCommand, @@ -8,23 +8,14 @@ use axum::{ Json, extract::{Path, State}, }; -use domain::{entities::ConflictPolicy, errors::DomainError, value_objects::SystemId}; - -fn parse_conflict_policy(s: &str) -> Result { - match s { - "db_wins" => Ok(ConflictPolicy::DbWins), - "file_wins" => Ok(ConflictPolicy::FileWins), - _ => Err(AppError::from(DomainError::Validation(format!( - "Invalid conflict policy: {s}. Use db_wins or file_wins" - )))), - } -} +use domain::value_objects::SystemId; pub async fn export_sidecar( State(state): State, - _claims: JwtClaims, + claims: JwtClaims, Path((asset_id,)): Path<(uuid::Uuid,)>, ) -> Result, AppError> { + super::require_admin(&claims)?; let cmd = ExportSidecarCommand { asset_id: SystemId::from_uuid(asset_id), }; @@ -34,8 +25,9 @@ pub async fn export_sidecar( pub async fn detect_changes( State(state): State, - _claims: JwtClaims, + claims: JwtClaims, ) -> Result, AppError> { + super::require_admin(&claims)?; let count = state .sidecar .detect_changes @@ -48,9 +40,10 @@ pub async fn detect_changes( pub async fn import_sidecar( State(state): State, - _claims: JwtClaims, + claims: JwtClaims, Path((asset_id,)): Path<(uuid::Uuid,)>, ) -> Result, AppError> { + super::require_admin(&claims)?; let cmd = ImportSidecarCommand { asset_id: SystemId::from_uuid(asset_id), }; @@ -63,11 +56,12 @@ pub async fn import_sidecar( pub async fn resolve_conflict( State(state): State, - _claims: JwtClaims, + claims: JwtClaims, Path((asset_id,)): Path<(uuid::Uuid,)>, Json(req): Json, ) -> Result, AppError> { - let policy = parse_conflict_policy(&req.policy)?; + super::require_admin(&claims)?; + let policy = parsers::conflict_policy(&req.policy)?; let cmd = ResolveConflictCommand { asset_id: SystemId::from_uuid(asset_id), policy, diff --git a/crates/presentation/src/handlers/stacks.rs b/crates/presentation/src/handlers/stacks.rs new file mode 100644 index 0000000..974215d --- /dev/null +++ b/crates/presentation/src/handlers/stacks.rs @@ -0,0 +1,77 @@ +use crate::{errors::AppError, extractors::JwtClaims, parsers, state::AppState}; +use api_types::{requests::CreateStackRequest, responses::StackResponse}; +use application::catalog::{ + CreateStackCommand, DeleteStackCommand, DetectLivePhotosCommand, GetStackQuery, +}; +use axum::{ + Json, + extract::{Path, State}, + http::StatusCode, +}; +use domain::value_objects::SystemId; + +pub async fn create_stack( + State(state): State, + claims: JwtClaims, + Json(req): Json, +) -> Result<(StatusCode, Json), AppError> { + let stack_type = parsers::stack_type(&req.stack_type)?; + let members = req + .members + .iter() + .map(|m| { + let role = parsers::member_role(&m.role)?; + Ok((SystemId::from_uuid(m.asset_id), role)) + }) + .collect::, AppError>>()?; + + let cmd = CreateStackCommand { + stack_type, + primary_asset_id: SystemId::from_uuid(req.primary_asset_id), + additional_asset_ids: members, + owner_id: claims.user_id, + }; + let stack = state.catalog.create_stack.execute(cmd).await?; + Ok(( + StatusCode::CREATED, + Json(StackResponse::from_domain(&stack)), + )) +} + +pub async fn get_stack( + State(state): State, + claims: JwtClaims, + Path((stack_id,)): Path<(uuid::Uuid,)>, +) -> Result, AppError> { + let query = GetStackQuery { + stack_id: SystemId::from_uuid(stack_id), + caller_id: claims.user_id, + }; + let stack = state.catalog.get_stack.execute(query).await?; + Ok(Json(StackResponse::from_domain(&stack))) +} + +pub async fn delete_stack( + State(state): State, + claims: JwtClaims, + Path((stack_id,)): Path<(uuid::Uuid,)>, +) -> Result { + let cmd = DeleteStackCommand { + stack_id: SystemId::from_uuid(stack_id), + caller_id: claims.user_id, + }; + state.catalog.delete_stack.execute(cmd).await?; + Ok(StatusCode::NO_CONTENT) +} + +pub async fn detect_live_photos( + State(state): State, + claims: JwtClaims, +) -> Result>, AppError> { + let cmd = DetectLivePhotosCommand { + owner_id: claims.user_id, + }; + let stacks = state.catalog.detect_live_photos.execute(cmd).await?; + let resp = stacks.iter().map(StackResponse::from_domain).collect(); + Ok(Json(resp)) +} diff --git a/crates/presentation/src/handlers/storage.rs b/crates/presentation/src/handlers/storage.rs index de87fda..eba34ec 100644 --- a/crates/presentation/src/handlers/storage.rs +++ b/crates/presentation/src/handlers/storage.rs @@ -1,4 +1,4 @@ -use crate::{errors::AppError, extractors::JwtClaims, state::AppState}; +use crate::{errors::AppError, extractors::JwtClaims, parsers, state::AppState}; use api_types::{ requests::{CheckQuotaParams, RegisterLibraryPathRequest, RegisterVolumeRequest}, responses::{LibraryPathResponse, QuotaCheckResponse, VolumeResponse}, @@ -9,13 +9,14 @@ use axum::{ extract::{Query, State}, http::StatusCode, }; -use domain::{entities::UsageType, errors::DomainError, value_objects::SystemId}; +use domain::value_objects::SystemId; pub async fn register_volume( State(state): State, - _claims: JwtClaims, + claims: JwtClaims, Json(req): Json, ) -> Result<(StatusCode, Json), AppError> { + super::require_admin(&claims)?; let cmd = RegisterVolumeCommand { volume_name: req.volume_name, uri_prefix: req.uri_prefix, @@ -30,9 +31,10 @@ pub async fn register_volume( pub async fn register_library_path( State(state): State, - _claims: JwtClaims, + claims: JwtClaims, Json(req): Json, ) -> Result<(StatusCode, Json), AppError> { + super::require_admin(&claims)?; let cmd = RegisterLibraryPathCommand { volume_id: SystemId::from_uuid(req.volume_id), relative_path: req.relative_path, @@ -49,24 +51,12 @@ pub async fn register_library_path( const DEFAULT_QUOTA_USAGE_TYPE: &str = "storage_bytes"; const DEFAULT_QUOTA_AMOUNT: u64 = 0; -fn parse_usage_type(s: &str) -> Result { - match s { - "storage_bytes" => Ok(UsageType::StorageBytes), - "process_jobs" => Ok(UsageType::ProcessJobs), - "api_calls" => Ok(UsageType::ApiCalls), - "indexing_size" => Ok(UsageType::IndexingSize), - _ => Err(AppError::from(DomainError::Validation(format!( - "Invalid usage type: {s}" - )))), - } -} - pub async fn check_quota( State(state): State, claims: JwtClaims, Query(params): Query, ) -> Result, AppError> { - let usage_type = parse_usage_type( + let usage_type = parsers::usage_type( params .usage_type .as_deref() diff --git a/crates/presentation/src/lib.rs b/crates/presentation/src/lib.rs index 719f9b2..d191204 100644 --- a/crates/presentation/src/lib.rs +++ b/crates/presentation/src/lib.rs @@ -2,6 +2,8 @@ pub mod constants; pub mod errors; pub mod extractors; pub mod handlers; +pub mod middleware; pub mod openapi; +pub mod parsers; pub mod routes; pub mod state; diff --git a/crates/presentation/src/middleware/auth.rs b/crates/presentation/src/middleware/auth.rs new file mode 100644 index 0000000..e24c435 --- /dev/null +++ b/crates/presentation/src/middleware/auth.rs @@ -0,0 +1,53 @@ +use crate::state::AppState; +use axum::{ + Json, + body::Body, + extract::State, + http::{Request, StatusCode}, + middleware::Next, + response::{IntoResponse, Response}, +}; +use serde_json::json; + +#[allow(clippy::result_large_err)] +pub fn extract_bearer_token(headers: &axum::http::HeaderMap) -> Result<&str, Response> { + let header = headers + .get(axum::http::header::AUTHORIZATION) + .and_then(|v| v.to_str().ok()) + .ok_or_else(|| { + ( + StatusCode::UNAUTHORIZED, + Json(json!({ "error": "Missing Authorization header" })), + ) + .into_response() + })?; + + header.strip_prefix("Bearer ").ok_or_else(|| { + ( + StatusCode::UNAUTHORIZED, + Json(json!({ "error": "Invalid Authorization format" })), + ) + .into_response() + }) +} + +pub async fn require_auth( + State(state): State, + req: Request, + next: Next, +) -> Response { + let token = match extract_bearer_token(req.headers()) { + Ok(t) => t.to_string(), + Err(r) => return r, + }; + + if state.token_issuer.verify(&token).await.is_err() { + return ( + StatusCode::UNAUTHORIZED, + Json(json!({ "error": "Invalid or expired token" })), + ) + .into_response(); + } + + next.run(req).await +} diff --git a/crates/presentation/src/middleware/mod.rs b/crates/presentation/src/middleware/mod.rs new file mode 100644 index 0000000..0e4a05d --- /dev/null +++ b/crates/presentation/src/middleware/mod.rs @@ -0,0 +1 @@ +pub mod auth; diff --git a/crates/presentation/src/parsers.rs b/crates/presentation/src/parsers.rs new file mode 100644 index 0000000..dde8cff --- /dev/null +++ b/crates/presentation/src/parsers.rs @@ -0,0 +1,116 @@ +use crate::errors::AppError; +use domain::{ + entities::{ + AssetType, ConflictPolicy, DerivativeProfile, JobType, LinkAccessLevel, PluginType, + ShareableType, StackMemberRole, StackType, TargetType, UsageType, + }, + errors::DomainError, +}; + +fn parse_err(kind: &str, value: &str) -> AppError { + AppError::from(DomainError::Validation(format!("Invalid {kind}: {value}"))) +} + +pub fn asset_type(s: &str) -> Result { + match s { + "image" => Ok(AssetType::Image), + "video" => Ok(AssetType::Video), + "live_photo" => Ok(AssetType::LivePhoto), + _ => Err(parse_err("asset type", s)), + } +} + +pub fn derivative_profile(s: &str) -> Result { + match s { + "thumbnail" | "thumbnail_square" => Ok(DerivativeProfile::ThumbnailSquare), + "thumbnail_large" => Ok(DerivativeProfile::ThumbnailLarge), + "web" | "web_optimized" => Ok(DerivativeProfile::WebOptimized), + "video_sd" => Ok(DerivativeProfile::VideoSd), + _ => Err(parse_err("derivative profile", s)), + } +} + +pub fn stack_type(s: &str) -> Result { + match s { + "live_photo" => Ok(StackType::LivePhoto), + "format_pair" => Ok(StackType::FormatPair), + "burst_sequence" => Ok(StackType::BurstSequence), + "exposure_bracket" => Ok(StackType::ExposureBracket), + "manual_group" => Ok(StackType::ManualGroup), + _ => Err(parse_err("stack type", s)), + } +} + +pub fn member_role(s: &str) -> Result { + match s { + "primary_display" => Ok(StackMemberRole::PrimaryDisplay), + "high_res_source" => Ok(StackMemberRole::HighResSource), + "motion_clip" => Ok(StackMemberRole::MotionClip), + "alternate_frame" => Ok(StackMemberRole::AlternateFrame), + _ => Err(parse_err("member role", s)), + } +} + +pub fn shareable_type(s: &str) -> Result { + match s { + "asset" => Ok(ShareableType::Asset), + "album" => Ok(ShareableType::Album), + "collection" => Ok(ShareableType::Collection), + "directory" => Ok(ShareableType::Directory), + _ => Err(parse_err("shareable type", s)), + } +} + +pub fn target_type(s: &str) -> Result { + match s { + "user" => Ok(TargetType::User), + "group" => Ok(TargetType::Group), + _ => Err(parse_err("target type", s)), + } +} + +pub fn access_level(s: &str) -> Result { + match s { + "view_only" => Ok(LinkAccessLevel::ViewOnly), + "limited_search" => Ok(LinkAccessLevel::LimitedSearch), + _ => Err(parse_err("access level", s)), + } +} + +pub fn job_type(s: &str) -> JobType { + match s { + "scan_directory" => JobType::ScanDirectory, + "extract_metadata" => JobType::ExtractMetadata, + "generate_derivative" => JobType::GenerateDerivative, + "sync_sidecar" => JobType::SyncSidecar, + "detect_duplicates" => JobType::DetectDuplicates, + other => JobType::Custom(other.to_string()), + } +} + +pub fn plugin_type(s: &str) -> Result { + match s { + "media_processor" => Ok(PluginType::MediaProcessor), + "scheduled_task" => Ok(PluginType::ScheduledTask), + "sidecar_writer" => Ok(PluginType::SidecarWriter), + _ => Err(parse_err("plugin type", s)), + } +} + +pub fn usage_type(s: &str) -> Result { + match s { + "storage_bytes" => Ok(UsageType::StorageBytes), + "process_jobs" => Ok(UsageType::ProcessJobs), + "api_calls" => Ok(UsageType::ApiCalls), + "indexing_size" => Ok(UsageType::IndexingSize), + _ => Err(parse_err("usage type", s)), + } +} + +pub fn conflict_policy(s: &str) -> Result { + match s { + "db_wins" => Ok(ConflictPolicy::DbWins), + "file_wins" => Ok(ConflictPolicy::FileWins), + _ => Err(parse_err("conflict policy", s)), + } +} diff --git a/crates/presentation/src/routes.rs b/crates/presentation/src/routes.rs index a4c6bd9..f6d0c11 100644 --- a/crates/presentation/src/routes.rs +++ b/crates/presentation/src/routes.rs @@ -1,19 +1,30 @@ use crate::{ - handlers::{albums, assets, auth, health, processing, sharing, sidecar, storage}, + handlers::{ + albums, assets, auth, duplicates, health, processing, sharing, sidecar, stacks, storage, + }, + middleware::auth::require_auth, openapi::openapi_router, state::AppState, }; use axum::{ Router, + middleware::from_fn_with_state, routing::{delete, get, post, put}, }; -pub fn api_v1_router() -> Router { +fn public_routes() -> Router { Router::new() - // auth .route("/auth/register", post(auth::register)) .route("/auth/login", post(auth::login)) + .route("/auth/refresh", post(auth::refresh)) + .route("/sharing/access/{token}", get(sharing::access_by_token)) +} + +fn protected_routes(state: &AppState) -> Router { + Router::new() + // auth .route("/auth/me", get(auth::me)) + .route("/auth/logout", post(auth::logout)) // albums .route("/albums", post(albums::create_album)) .route("/albums/{id}", get(albums::get_album)) @@ -23,10 +34,14 @@ pub fn api_v1_router() -> Router { delete(albums::remove_entry), ) // assets + .route("/assets", get(assets::search_assets)) .route("/assets/ingest", post(assets::ingest)) .route("/assets/register", post(assets::register_asset)) .route("/assets/timeline", get(assets::timeline)) - .route("/assets/{id}", get(assets::get_asset)) + .route( + "/assets/{id}", + get(assets::get_asset).delete(assets::delete_asset), + ) .route("/assets/{id}/metadata", put(assets::update_metadata)) .route("/assets/{id}/file", get(assets::serve_file)) .route( @@ -34,11 +49,26 @@ pub fn api_v1_router() -> Router { get(assets::serve_derivative), ) .route("/assets/{id}/tags", post(assets::tag_asset)) + // stacks + .route("/stacks", post(stacks::create_stack)) + .route( + "/stacks/detect-live-photos", + post(stacks::detect_live_photos), + ) + .route( + "/stacks/{id}", + get(stacks::get_stack).delete(stacks::delete_stack), + ) + // duplicates + .route("/duplicates", get(duplicates::list_duplicates)) + .route( + "/duplicates/{id}/resolve", + post(duplicates::resolve_duplicate), + ) // sharing .route("/sharing", post(sharing::share_resource)) .route("/sharing/links", post(sharing::generate_link)) .route("/sharing/{id}", delete(sharing::revoke)) - .route("/sharing/access/{token}", get(sharing::access_by_token)) // storage .route("/storage/volumes", post(storage::register_volume)) .route( @@ -57,18 +87,22 @@ pub fn api_v1_router() -> Router { .route("/sidecar/full-export", post(sidecar::full_export)) .route("/sidecar/full-import", post(sidecar::full_import)) // processing - .route("/jobs", post(processing::enqueue_job)) + .route( + "/jobs", + get(processing::list_jobs).post(processing::enqueue_job), + ) .route("/jobs/{id}/start", post(processing::start_job)) .route("/jobs/{id}/complete", post(processing::complete_job)) .route("/jobs/{id}/fail", post(processing::fail_job)) .route("/jobs/batches/{id}", get(processing::batch_progress)) .route("/plugins", post(processing::manage_plugin)) .route("/pipelines", post(processing::configure_pipeline)) + .route_layer(from_fn_with_state(state.clone(), require_auth)) } -pub fn app_router() -> Router { +pub fn app_router(state: &AppState) -> Router { Router::new() .route("/health", get(health::health)) - .nest("/api/v1", api_v1_router()) + .nest("/api/v1", public_routes().merge(protected_routes(state))) .merge(openapi_router()) } diff --git a/crates/presentation/src/state.rs b/crates/presentation/src/state.rs index c7eb042..c1761c9 100644 --- a/crates/presentation/src/state.rs +++ b/crates/presentation/src/state.rs @@ -2,16 +2,21 @@ use std::sync::Arc; use application::{ catalog::{ - GetAssetHandler, GetTimelineHandler, ReadAssetFileHandler, ReadDerivativeHandler, - RegisterAssetHandler, UpdateMetadataHandler, + CreateStackHandler, DeleteAssetHandler, DeleteStackHandler, DetectLivePhotosHandler, + GetAssetHandler, GetStackHandler, GetTimelineHandler, ListDuplicatesHandler, + ReadAssetFileHandler, ReadDerivativeHandler, RegisterAssetHandler, ResolveDuplicateHandler, + SearchAssetsHandler, UpdateMetadataHandler, + }, + identity::{ + GetProfileHandler, LoginUserHandler, LogoutHandler, RefreshTokenHandler, + RegisterUserHandler, }, - identity::{GetProfileHandler, LoginUserHandler, RegisterUserHandler}, organization::{ CreateAlbumHandler, GetAlbumHandler, ManageAlbumEntriesHandler, TagAssetHandler, }, processing::{ CompleteJobHandler, ConfigurePipelineHandler, EnqueueJobHandler, FailJobHandler, - ManagePluginHandler, ReportBatchProgressHandler, StartJobHandler, + ListJobsHandler, ManagePluginHandler, ReportBatchProgressHandler, StartJobHandler, }, sharing::{ AccessSharedResourceHandler, GenerateShareLinkHandler, RevokeShareHandler, @@ -25,13 +30,16 @@ use application::{ CheckQuotaHandler, IngestAssetHandler, RegisterLibraryPathHandler, RegisterVolumeHandler, }, }; -use domain::ports::TokenIssuer; +use domain::ports::{RefreshTokenRepository, TokenIssuer}; #[derive(Clone)] pub struct IdentityHandlers { pub register: Arc, pub login: Arc, pub get_profile: Arc, + pub refresh: Arc, + pub logout: Arc, + pub refresh_token_repo: Arc, } #[derive(Clone)] @@ -43,6 +51,14 @@ pub struct CatalogHandlers { pub read_asset_file: Arc, pub read_derivative: Arc, pub register_asset: Arc, + pub delete_asset: Arc, + pub search_assets: Arc, + pub list_duplicates: Arc, + pub resolve_duplicate: Arc, + pub create_stack: Arc, + pub get_stack: Arc, + pub delete_stack: Arc, + pub detect_live_photos: Arc, } #[derive(Clone)] @@ -84,6 +100,7 @@ pub struct ProcessingHandlers { pub start_job: Arc, pub complete_job: Arc, pub fail_job: Arc, + pub list_jobs: Arc, pub batch_progress: Arc, pub manage_plugin: Arc, pub configure_pipeline: Arc, diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index 5a1859c..f5573d3 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use std::time::Duration; use futures::StreamExt; +use tokio::sync::watch; use tracing::{error, info, warn}; use application::processing::{EnqueueJobCommand, ProcessNextJobCommand}; @@ -44,7 +45,6 @@ async fn main() -> anyhow::Result<()> { let sidecar_writer: Arc = Arc::new(adapters_sidecar::XmpSidecarWriter); - // Publisher transport consumes a client clone; the consumer gets another. let pub_transport = adapters_nats::NatsTransport::new(nats_client.clone()); let nats_publisher: Arc = Arc::new( adapters_event_transport::EventPublisherAdapter::new(pub_transport), @@ -72,16 +72,45 @@ async fn main() -> anyhow::Result<()> { )); let enqueue = Arc::new(build_enqueue_handler(&repos, event_pub)); + // ── Shutdown signal ─────────────────────────────────────────────── + let (shutdown_tx, shutdown_rx) = watch::channel(false); + + tokio::spawn(async move { + let ctrl_c = tokio::signal::ctrl_c(); + #[cfg(unix)] + { + use tokio::signal::unix::{SignalKind, signal}; + let mut sigterm = signal(SignalKind::terminate()).expect("SIGTERM handler"); + tokio::select! { + _ = ctrl_c => {}, + _ = sigterm.recv() => {}, + } + } + #[cfg(not(unix))] + { + ctrl_c.await.ok(); + } + info!("shutdown signal received"); + shutdown_tx.send(true).ok(); + }); + // ── Fallback sweep task ──────────────────────────────────────────── let sweep_interval = Duration::from_secs(config.fallback_sweep_secs); let sweep_handler = Arc::clone(&process_next); + let mut sweep_shutdown = shutdown_rx.clone(); tokio::spawn(async move { info!( every_secs = config.fallback_sweep_secs, "fallback sweep task started" ); loop { - tokio::time::sleep(sweep_interval).await; + tokio::select! { + _ = sweep_shutdown.changed() => { + info!("sweep task: shutting down"); + break; + } + _ = tokio::time::sleep(sweep_interval) => {} + } info!("fallback sweep: draining queued jobs"); loop { match sweep_handler.execute(ProcessNextJobCommand).await { @@ -104,69 +133,79 @@ async fn main() -> anyhow::Result<()> { info!("event loop: listening for NATS events"); let mut stream = event_consumer.consume(); + let mut event_shutdown = shutdown_rx.clone(); - while let Some(result) = stream.next().await { - let envelope = match result { - Ok(env) => env, - Err(e) => { - error!(error = %e, "event loop: consumer error"); - continue; + loop { + tokio::select! { + _ = event_shutdown.changed() => { + info!("event loop: shutting down"); + break; } - }; - - match &envelope.event { - DomainEvent::AssetIngested { asset_id, .. } => { - info!(asset_id = %asset_id, "event loop: AssetIngested → enqueue ExtractMetadata"); - (envelope.ack)(); - let cmd = EnqueueJobCommand { - job_type: JobType::ExtractMetadata, - priority: 10, - payload: StructuredData::new(), - target_asset_id: Some(*asset_id), - batch_id: None, - }; - if let Err(e) = enqueue.execute(cmd).await { - error!(error = %e, "event loop: failed to enqueue ExtractMetadata"); - } - } - DomainEvent::SidecarSyncRequested { asset_id, .. } => { - info!(asset_id = %asset_id, "event loop: SidecarSyncRequested → enqueue SyncSidecar"); - (envelope.ack)(); - let cmd = EnqueueJobCommand { - job_type: JobType::SyncSidecar, - priority: 5, - payload: StructuredData::new(), - target_asset_id: Some(*asset_id), - batch_id: None, - }; - if let Err(e) = enqueue.execute(cmd).await { - error!(error = %e, "event loop: failed to enqueue SyncSidecar"); - } - } - DomainEvent::JobEnqueued { - job_id, job_type, .. - } => { - info!(job_id = %job_id, job_type = %job_type, "event loop: JobEnqueued → process"); - (envelope.ack)(); - match process_next.execute(ProcessNextJobCommand).await { - Ok(Some(job)) => { - info!(job_id = %job.job_id, status = ?job.status, "event loop: processed job"); - } - Ok(None) => { - warn!("event loop: JobEnqueued but no queued job found"); - } + msg = stream.next() => { + let Some(result) = msg else { break }; + let envelope = match result { + Ok(env) => env, Err(e) => { - error!(error = %e, "event loop: error processing job"); + error!(error = %e, "event loop: consumer error"); + continue; + } + }; + + match &envelope.event { + DomainEvent::AssetIngested { asset_id, .. } => { + info!(asset_id = %asset_id, "event loop: AssetIngested → enqueue ExtractMetadata"); + (envelope.ack)(); + let cmd = EnqueueJobCommand { + job_type: JobType::ExtractMetadata, + priority: 10, + payload: StructuredData::new(), + target_asset_id: Some(*asset_id), + batch_id: None, + }; + if let Err(e) = enqueue.execute(cmd).await { + error!(error = %e, "event loop: failed to enqueue ExtractMetadata"); + } + } + DomainEvent::SidecarSyncRequested { asset_id, .. } => { + info!(asset_id = %asset_id, "event loop: SidecarSyncRequested → enqueue SyncSidecar"); + (envelope.ack)(); + let cmd = EnqueueJobCommand { + job_type: JobType::SyncSidecar, + priority: 5, + payload: StructuredData::new(), + target_asset_id: Some(*asset_id), + batch_id: None, + }; + if let Err(e) = enqueue.execute(cmd).await { + error!(error = %e, "event loop: failed to enqueue SyncSidecar"); + } + } + DomainEvent::JobEnqueued { + job_id, job_type, .. + } => { + info!(job_id = %job_id, job_type = %job_type, "event loop: JobEnqueued → process"); + (envelope.ack)(); + match process_next.execute(ProcessNextJobCommand).await { + Ok(Some(job)) => { + info!(job_id = %job.job_id, status = ?job.status, "event loop: processed job"); + } + Ok(None) => { + warn!("event loop: JobEnqueued but no queued job found"); + } + Err(e) => { + error!(error = %e, "event loop: error processing job"); + } + } + } + other => { + (envelope.ack)(); + tracing::debug!(event = ?other, "event loop: unhandled event, acked"); } } } - other => { - (envelope.ack)(); - tracing::debug!(event = ?other, "event loop: unhandled event, acked"); - } } } - error!("event loop: NATS stream ended unexpectedly"); + info!("worker shutdown complete"); Ok(()) }