diff --git a/Cargo.lock b/Cargo.lock index a6f97a6..fae8b80 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -25,6 +25,7 @@ dependencies = [ "async-trait", "chrono", "domain", + "serde_json", "sqlx", "uuid", ] diff --git a/crates/adapters/postgres/Cargo.toml b/crates/adapters/postgres/Cargo.toml index 0961e66..95179f1 100644 --- a/crates/adapters/postgres/Cargo.toml +++ b/crates/adapters/postgres/Cargo.toml @@ -5,8 +5,9 @@ edition = "2024" [dependencies] domain = { workspace = true } -sqlx = { workspace = true, features = ["postgres"] } +sqlx = { workspace = true, features = ["postgres", "runtime-tokio", "migrate", "uuid", "chrono", "json"] } uuid = { workspace = true } chrono = { workspace = true } anyhow = { workspace = true } async-trait = { workspace = true } +serde_json = { workspace = true } diff --git a/crates/adapters/postgres/migrations/002_update_users.sql b/crates/adapters/postgres/migrations/002_update_users.sql new file mode 100644 index 0000000..a8db204 --- /dev/null +++ b/crates/adapters/postgres/migrations/002_update_users.sql @@ -0,0 +1,3 @@ +ALTER TABLE users ADD COLUMN username TEXT NOT NULL DEFAULT ''; +ALTER TABLE users DROP COLUMN role; +ALTER TABLE users ADD CONSTRAINT users_username_unique UNIQUE (username); diff --git a/crates/adapters/postgres/migrations/003_storage.sql b/crates/adapters/postgres/migrations/003_storage.sql new file mode 100644 index 0000000..0ead62c --- /dev/null +++ b/crates/adapters/postgres/migrations/003_storage.sql @@ -0,0 +1,52 @@ +CREATE TABLE storage_volumes ( + volume_id UUID PRIMARY KEY, + volume_name TEXT NOT NULL, + uri_prefix TEXT NOT NULL, + is_writable BOOLEAN NOT NULL DEFAULT true, + available_bytes BIGINT NOT NULL DEFAULT 0 +); + +CREATE TABLE library_paths ( + path_id UUID PRIMARY KEY, + volume_id UUID NOT NULL REFERENCES storage_volumes(volume_id), + relative_path TEXT NOT NULL, + is_ingest_destination BOOLEAN NOT NULL DEFAULT false, + ownership_policy TEXT NOT NULL DEFAULT 'unassigned', + designated_owner_id UUID +); + +CREATE TABLE quota_definitions ( + quota_id UUID PRIMARY KEY, + owner_scope UUID NOT NULL, + is_enforced BOOLEAN NOT NULL DEFAULT true +); + +CREATE TABLE quota_rules ( + rule_id UUID PRIMARY KEY, + quota_id UUID NOT NULL REFERENCES quota_definitions(quota_id) ON DELETE CASCADE, + dimension TEXT NOT NULL, + limit_value BIGINT NOT NULL, + time_period TEXT NOT NULL, + is_unlimited BOOLEAN NOT NULL DEFAULT false +); + +CREATE TABLE usage_ledger ( + entry_id UUID PRIMARY KEY, + user_id UUID NOT NULL REFERENCES users(id), + usage_type TEXT NOT NULL, + consumed_amount BIGINT NOT NULL, + timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(), + context TEXT NOT NULL DEFAULT '' +); + +CREATE TABLE ingest_sessions ( + session_id UUID PRIMARY KEY, + uploader_user_id UUID NOT NULL REFERENCES users(id), + client_device_id TEXT NOT NULL, + original_filename TEXT NOT NULL, + client_checksum TEXT NOT NULL, + target_library_path_id UUID NOT NULL REFERENCES library_paths(path_id), + status TEXT NOT NULL DEFAULT 'uploading', + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + error_message TEXT +); diff --git a/crates/adapters/postgres/migrations/004_catalog.sql b/crates/adapters/postgres/migrations/004_catalog.sql new file mode 100644 index 0000000..28ed8b6 --- /dev/null +++ b/crates/adapters/postgres/migrations/004_catalog.sql @@ -0,0 +1,23 @@ +CREATE TABLE assets ( + asset_id UUID PRIMARY KEY, + volume_id UUID NOT NULL REFERENCES storage_volumes(volume_id), + relative_path TEXT NOT NULL, + checksum TEXT NOT NULL, + asset_type TEXT NOT NULL, + mime_type TEXT NOT NULL, + file_size BIGINT NOT NULL, + is_processed BOOLEAN NOT NULL DEFAULT false, + owner_user_id UUID NOT NULL REFERENCES users(id), + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX idx_assets_checksum ON assets(checksum); +CREATE INDEX idx_assets_owner ON assets(owner_user_id); + +CREATE TABLE asset_metadata ( + asset_id UUID NOT NULL REFERENCES assets(asset_id) ON DELETE CASCADE, + metadata_source TEXT NOT NULL, + data JSONB NOT NULL DEFAULT '{}', + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + PRIMARY KEY (asset_id, metadata_source) +); diff --git a/crates/adapters/postgres/migrations/005_organization.sql b/crates/adapters/postgres/migrations/005_organization.sql new file mode 100644 index 0000000..74b4c61 --- /dev/null +++ b/crates/adapters/postgres/migrations/005_organization.sql @@ -0,0 +1,33 @@ +CREATE TABLE albums ( + album_id UUID PRIMARY KEY, + title TEXT NOT NULL, + description TEXT NOT NULL DEFAULT '', + creator_user_id UUID NOT NULL REFERENCES users(id), + cover_asset_id UUID, + start_date TIMESTAMPTZ, + end_date TIMESTAMPTZ, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE TABLE album_entries ( + album_id UUID NOT NULL REFERENCES albums(album_id) ON DELETE CASCADE, + asset_id UUID NOT NULL REFERENCES assets(asset_id) ON DELETE CASCADE, + sort_order INTEGER NOT NULL DEFAULT 0, + added_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + added_by_user_id UUID NOT NULL REFERENCES users(id), + PRIMARY KEY (album_id, asset_id) +); + +CREATE TABLE tags ( + tag_id UUID PRIMARY KEY, + name TEXT NOT NULL, + tag_source TEXT NOT NULL DEFAULT 'user_manual' +); + +CREATE TABLE asset_tags ( + asset_id UUID NOT NULL REFERENCES assets(asset_id) ON DELETE CASCADE, + tag_id UUID NOT NULL REFERENCES tags(tag_id) ON DELETE CASCADE, + tagged_by_user_id UUID, + confidence DOUBLE PRECISION NOT NULL DEFAULT 1.0, + PRIMARY KEY (asset_id, tag_id) +); diff --git a/crates/adapters/postgres/src/album_repository.rs b/crates/adapters/postgres/src/album_repository.rs new file mode 100644 index 0000000..ce55f44 --- /dev/null +++ b/crates/adapters/postgres/src/album_repository.rs @@ -0,0 +1,181 @@ +use crate::db::PgPool; +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use domain::{ + entities::{Album, AlbumEntry}, + errors::DomainError, + ports::AlbumRepository, + value_objects::{DateTimeStamp, SystemId}, +}; +use uuid::Uuid; + +#[derive(sqlx::FromRow)] +struct AlbumRow { + album_id: Uuid, + title: String, + description: String, + creator_user_id: Uuid, + cover_asset_id: Option, + start_date: Option>, + end_date: Option>, + created_at: DateTime, +} + +#[derive(sqlx::FromRow)] +#[allow(dead_code)] +struct AlbumEntryRow { + album_id: Uuid, + asset_id: Uuid, + sort_order: i32, + added_at: DateTime, + added_by_user_id: Uuid, +} + +impl From for AlbumEntry { + fn from(r: AlbumEntryRow) -> Self { + Self { + asset_id: SystemId::from_uuid(r.asset_id), + sort_order: r.sort_order as u32, + added_at: DateTimeStamp::from_datetime(r.added_at), + added_by_user_id: SystemId::from_uuid(r.added_by_user_id), + } + } +} + +fn album_from_row(r: AlbumRow, entries: Vec) -> Album { + Album { + album_id: SystemId::from_uuid(r.album_id), + title: r.title, + description: r.description, + creator_user_id: SystemId::from_uuid(r.creator_user_id), + cover_asset_id: r.cover_asset_id.map(SystemId::from_uuid), + start_date: r.start_date.map(DateTimeStamp::from_datetime), + end_date: r.end_date.map(DateTimeStamp::from_datetime), + entries, + created_at: DateTimeStamp::from_datetime(r.created_at), + } +} + +pub struct PostgresAlbumRepository { + pool: PgPool, +} + +impl PostgresAlbumRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } + + async fn load_entries(&self, album_id: &Uuid) -> Result, DomainError> { + let rows = sqlx::query_as::<_, AlbumEntryRow>( + "SELECT album_id, asset_id, sort_order, added_at, added_by_user_id + FROM album_entries WHERE album_id = $1 + ORDER BY sort_order ASC", + ) + .bind(album_id) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(rows.into_iter().map(Into::into).collect()) + } +} + +#[async_trait] +impl AlbumRepository for PostgresAlbumRepository { + async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { + let row = sqlx::query_as::<_, AlbumRow>( + "SELECT album_id, title, description, creator_user_id, cover_asset_id, + start_date, end_date, created_at + FROM albums WHERE album_id = $1", + ) + .bind(*id.as_uuid()) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + let Some(r) = row else { + return Ok(None); + }; + + let entries = self.load_entries(&r.album_id).await?; + Ok(Some(album_from_row(r, entries))) + } + + async fn find_by_creator(&self, creator_id: &SystemId) -> Result, DomainError> { + let rows = sqlx::query_as::<_, AlbumRow>( + "SELECT album_id, title, description, creator_user_id, cover_asset_id, + start_date, end_date, created_at + FROM albums WHERE creator_user_id = $1", + ) + .bind(*creator_id.as_uuid()) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + let mut albums = Vec::with_capacity(rows.len()); + for r in rows { + let entries = self.load_entries(&r.album_id).await?; + albums.push(album_from_row(r, entries)); + } + Ok(albums) + } + + async fn save(&self, album: &Album) -> Result<(), DomainError> { + sqlx::query( + "INSERT INTO albums (album_id, title, description, creator_user_id, cover_asset_id, + start_date, end_date, created_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + ON CONFLICT (album_id) DO UPDATE SET + title = EXCLUDED.title, + description = EXCLUDED.description, + cover_asset_id = EXCLUDED.cover_asset_id, + start_date = EXCLUDED.start_date, + end_date = EXCLUDED.end_date", + ) + .bind(*album.album_id.as_uuid()) + .bind(&album.title) + .bind(&album.description) + .bind(*album.creator_user_id.as_uuid()) + .bind(album.cover_asset_id.as_ref().map(|id| *id.as_uuid())) + .bind(album.start_date.as_ref().map(|d| d.as_datetime()).copied()) + .bind(album.end_date.as_ref().map(|d| d.as_datetime()).copied()) + .bind(album.created_at.as_datetime()) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + // Sync entries: delete all then re-insert + sqlx::query("DELETE FROM album_entries WHERE album_id = $1") + .bind(*album.album_id.as_uuid()) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + for entry in &album.entries { + sqlx::query( + "INSERT INTO album_entries (album_id, asset_id, sort_order, added_at, added_by_user_id) + VALUES ($1, $2, $3, $4, $5)", + ) + .bind(*album.album_id.as_uuid()) + .bind(*entry.asset_id.as_uuid()) + .bind(entry.sort_order as i32) + .bind(entry.added_at.as_datetime()) + .bind(*entry.added_by_user_id.as_uuid()) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + } + + Ok(()) + } + + async fn delete(&self, id: &SystemId) -> Result<(), DomainError> { + // Entries cascade-delete + sqlx::query("DELETE FROM albums WHERE album_id = $1") + .bind(*id.as_uuid()) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } +} diff --git a/crates/adapters/postgres/src/asset_metadata_repository.rs b/crates/adapters/postgres/src/asset_metadata_repository.rs new file mode 100644 index 0000000..76a8236 --- /dev/null +++ b/crates/adapters/postgres/src/asset_metadata_repository.rs @@ -0,0 +1,166 @@ +use crate::db::PgPool; +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use domain::{ + entities::{AssetMetadata, MetadataSource}, + errors::DomainError, + ports::AssetMetadataRepository, + value_objects::{DateTimeStamp, MetadataValue, StructuredData, SystemId}, +}; +use uuid::Uuid; + +#[derive(sqlx::FromRow)] +struct AssetMetadataRow { + asset_id: Uuid, + metadata_source: String, + data: serde_json::Value, + updated_at: DateTime, +} + +fn source_from_str(s: &str) -> MetadataSource { + match s { + "exif_extracted" => MetadataSource::ExifExtracted, + "ai_generated" => MetadataSource::AiGenerated, + "user_edited" => MetadataSource::UserEdited, + _ => MetadataSource::ExifExtracted, + } +} + +fn source_to_str(s: &MetadataSource) -> &'static str { + match s { + MetadataSource::ExifExtracted => "exif_extracted", + MetadataSource::AiGenerated => "ai_generated", + MetadataSource::UserEdited => "user_edited", + } +} + +fn json_to_structured_data(v: serde_json::Value) -> StructuredData { + let mut sd = StructuredData::new(); + if let serde_json::Value::Object(map) = v { + for (key, val) in map { + let mv = match val { + serde_json::Value::String(s) => MetadataValue::String(s), + serde_json::Value::Number(n) => { + if let Some(i) = n.as_i64() { + MetadataValue::Integer(i) + } else if let Some(f) = n.as_f64() { + MetadataValue::Float(f) + } else { + MetadataValue::Null + } + } + serde_json::Value::Bool(b) => MetadataValue::Boolean(b), + serde_json::Value::Null => MetadataValue::Null, + _ => MetadataValue::String(val.to_string()), + }; + sd.insert(key, mv); + } + } + sd +} + +fn structured_data_to_json(sd: &StructuredData) -> serde_json::Value { + let mut map = serde_json::Map::new(); + for (key, val) in sd.inner() { + let jv = match val { + MetadataValue::String(s) => serde_json::Value::String(s.clone()), + MetadataValue::Integer(i) => serde_json::Value::Number((*i).into()), + MetadataValue::Float(f) => serde_json::Number::from_f64(*f) + .map(serde_json::Value::Number) + .unwrap_or(serde_json::Value::Null), + MetadataValue::Boolean(b) => serde_json::Value::Bool(*b), + MetadataValue::Null => serde_json::Value::Null, + }; + map.insert(key.clone(), jv); + } + serde_json::Value::Object(map) +} + +impl From for AssetMetadata { + fn from(r: AssetMetadataRow) -> Self { + Self { + asset_id: SystemId::from_uuid(r.asset_id), + metadata_source: source_from_str(&r.metadata_source), + data: json_to_structured_data(r.data), + updated_at: DateTimeStamp::from_datetime(r.updated_at), + } + } +} + +pub struct PostgresAssetMetadataRepository { + pool: PgPool, +} + +impl PostgresAssetMetadataRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl AssetMetadataRepository for PostgresAssetMetadataRepository { + async fn find_by_asset(&self, asset_id: &SystemId) -> Result, DomainError> { + let rows = sqlx::query_as::<_, AssetMetadataRow>( + "SELECT asset_id, metadata_source, data, updated_at + FROM asset_metadata WHERE asset_id = $1", + ) + .bind(*asset_id.as_uuid()) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(rows.into_iter().map(Into::into).collect()) + } + + async fn find_by_asset_and_source( + &self, + asset_id: &SystemId, + source: MetadataSource, + ) -> Result, DomainError> { + let row = sqlx::query_as::<_, AssetMetadataRow>( + "SELECT asset_id, metadata_source, data, updated_at + FROM asset_metadata WHERE asset_id = $1 AND metadata_source = $2", + ) + .bind(*asset_id.as_uuid()) + .bind(source_to_str(&source)) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(row.map(Into::into)) + } + + async fn save(&self, metadata: &AssetMetadata) -> Result<(), DomainError> { + sqlx::query( + "INSERT INTO asset_metadata (asset_id, metadata_source, data, updated_at) + VALUES ($1, $2, $3, $4) + ON CONFLICT (asset_id, metadata_source) DO UPDATE SET + data = EXCLUDED.data, + updated_at = EXCLUDED.updated_at", + ) + .bind(*metadata.asset_id.as_uuid()) + .bind(source_to_str(&metadata.metadata_source)) + .bind(structured_data_to_json(&metadata.data)) + .bind(metadata.updated_at.as_datetime()) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } + + async fn delete_by_asset_and_source( + &self, + asset_id: &SystemId, + source: MetadataSource, + ) -> Result<(), DomainError> { + sqlx::query( + "DELETE FROM asset_metadata WHERE asset_id = $1 AND metadata_source = $2", + ) + .bind(*asset_id.as_uuid()) + .bind(source_to_str(&source)) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } +} diff --git a/crates/adapters/postgres/src/asset_repository.rs b/crates/adapters/postgres/src/asset_repository.rs new file mode 100644 index 0000000..851b13e --- /dev/null +++ b/crates/adapters/postgres/src/asset_repository.rs @@ -0,0 +1,165 @@ +use crate::db::PgPool; +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use domain::{ + entities::{Asset, AssetType, SourceReference}, + errors::DomainError, + ports::AssetRepository, + value_objects::{Checksum, DateTimeStamp, SystemId}, +}; +use uuid::Uuid; + +#[derive(sqlx::FromRow)] +struct AssetRow { + asset_id: Uuid, + volume_id: Uuid, + relative_path: String, + checksum: String, + asset_type: String, + mime_type: String, + file_size: i64, + is_processed: bool, + owner_user_id: Uuid, + created_at: DateTime, +} + +fn asset_type_from_str(s: &str) -> AssetType { + match s { + "image" => AssetType::Image, + "video" => AssetType::Video, + "live_photo" => AssetType::LivePhoto, + _ => AssetType::Image, + } +} + +fn asset_type_to_str(t: &AssetType) -> &'static str { + match t { + AssetType::Image => "image", + AssetType::Video => "video", + AssetType::LivePhoto => "live_photo", + } +} + +impl TryFrom for Asset { + type Error = DomainError; + fn try_from(r: AssetRow) -> Result { + Ok(Self { + asset_id: SystemId::from_uuid(r.asset_id), + source_reference: SourceReference { + volume_id: SystemId::from_uuid(r.volume_id), + relative_path: r.relative_path, + checksum: Checksum::new(r.checksum)?, + }, + asset_type: asset_type_from_str(&r.asset_type), + mime_type: r.mime_type, + file_size: r.file_size as u64, + is_processed: r.is_processed, + owner_user_id: SystemId::from_uuid(r.owner_user_id), + created_at: DateTimeStamp::from_datetime(r.created_at), + }) + } +} + +pub struct PostgresAssetRepository { + pool: PgPool, +} + +impl PostgresAssetRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl AssetRepository for PostgresAssetRepository { + async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { + let row = sqlx::query_as::<_, AssetRow>( + "SELECT asset_id, volume_id, relative_path, checksum, asset_type, mime_type, + file_size, is_processed, owner_user_id, created_at + FROM assets WHERE asset_id = $1", + ) + .bind(*id.as_uuid()) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + row.map(TryInto::try_into).transpose() + } + + async fn find_by_checksum(&self, checksum: &Checksum) -> Result, DomainError> { + let rows = sqlx::query_as::<_, AssetRow>( + "SELECT asset_id, volume_id, relative_path, checksum, asset_type, mime_type, + file_size, is_processed, owner_user_id, created_at + FROM assets WHERE checksum = $1", + ) + .bind(checksum.as_str()) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + rows.into_iter().map(TryInto::try_into).collect() + } + + async fn find_by_owner( + &self, + owner_id: &SystemId, + limit: u32, + offset: u32, + ) -> Result, DomainError> { + let rows = sqlx::query_as::<_, AssetRow>( + "SELECT asset_id, volume_id, relative_path, checksum, asset_type, mime_type, + file_size, is_processed, owner_user_id, created_at + FROM assets WHERE owner_user_id = $1 + ORDER BY created_at DESC + LIMIT $2 OFFSET $3", + ) + .bind(*owner_id.as_uuid()) + .bind(limit as i64) + .bind(offset as i64) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + rows.into_iter().map(TryInto::try_into).collect() + } + + async fn save(&self, asset: &Asset) -> Result<(), DomainError> { + sqlx::query( + "INSERT INTO assets (asset_id, volume_id, relative_path, checksum, asset_type, + mime_type, file_size, is_processed, owner_user_id, created_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + ON CONFLICT (asset_id) DO UPDATE SET + volume_id = EXCLUDED.volume_id, + relative_path = EXCLUDED.relative_path, + checksum = EXCLUDED.checksum, + asset_type = EXCLUDED.asset_type, + mime_type = EXCLUDED.mime_type, + file_size = EXCLUDED.file_size, + is_processed = EXCLUDED.is_processed, + owner_user_id = EXCLUDED.owner_user_id", + ) + .bind(*asset.asset_id.as_uuid()) + .bind(*asset.source_reference.volume_id.as_uuid()) + .bind(&asset.source_reference.relative_path) + .bind(asset.source_reference.checksum.as_str()) + .bind(asset_type_to_str(&asset.asset_type)) + .bind(&asset.mime_type) + .bind(asset.file_size as i64) + .bind(asset.is_processed) + .bind(*asset.owner_user_id.as_uuid()) + .bind(asset.created_at.as_datetime()) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } + + async fn delete(&self, id: &SystemId) -> Result<(), DomainError> { + sqlx::query("DELETE FROM assets WHERE asset_id = $1") + .bind(*id.as_uuid()) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } +} diff --git a/crates/adapters/postgres/src/ingest_session_repository.rs b/crates/adapters/postgres/src/ingest_session_repository.rs new file mode 100644 index 0000000..4659bed --- /dev/null +++ b/crates/adapters/postgres/src/ingest_session_repository.rs @@ -0,0 +1,126 @@ +use crate::db::PgPool; +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use domain::{ + entities::{IngestSession, IngestStatus}, + errors::DomainError, + ports::IngestSessionRepository, + value_objects::{Checksum, DateTimeStamp, SystemId}, +}; +use uuid::Uuid; + +#[derive(sqlx::FromRow)] +struct IngestSessionRow { + session_id: Uuid, + uploader_user_id: Uuid, + client_device_id: String, + original_filename: String, + client_checksum: String, + target_library_path_id: Uuid, + status: String, + created_at: DateTime, + error_message: Option, +} + +fn status_from_str(s: &str) -> IngestStatus { + match s { + "uploading" => IngestStatus::Uploading, + "awaiting_processing" => IngestStatus::AwaitingProcessing, + "processing" => IngestStatus::Processing, + "completed" => IngestStatus::Completed, + "failed" => IngestStatus::Failed, + _ => IngestStatus::Uploading, + } +} + +fn status_to_str(s: &IngestStatus) -> &'static str { + match s { + IngestStatus::Uploading => "uploading", + IngestStatus::AwaitingProcessing => "awaiting_processing", + IngestStatus::Processing => "processing", + IngestStatus::Completed => "completed", + IngestStatus::Failed => "failed", + } +} + +impl TryFrom for IngestSession { + type Error = DomainError; + fn try_from(r: IngestSessionRow) -> Result { + Ok(Self { + session_id: SystemId::from_uuid(r.session_id), + uploader_user_id: SystemId::from_uuid(r.uploader_user_id), + client_device_id: r.client_device_id, + original_filename: r.original_filename, + client_checksum: Checksum::new(r.client_checksum)?, + target_library_path_id: SystemId::from_uuid(r.target_library_path_id), + status: status_from_str(&r.status), + created_at: DateTimeStamp::from_datetime(r.created_at), + error_message: r.error_message, + }) + } +} + +pub struct PostgresIngestSessionRepository { + pool: PgPool, +} + +impl PostgresIngestSessionRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl IngestSessionRepository for PostgresIngestSessionRepository { + async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { + let row = sqlx::query_as::<_, IngestSessionRow>( + "SELECT session_id, uploader_user_id, client_device_id, original_filename, + client_checksum, target_library_path_id, status, created_at, error_message + FROM ingest_sessions WHERE session_id = $1", + ) + .bind(*id.as_uuid()) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + row.map(TryInto::try_into).transpose() + } + + async fn find_by_user(&self, user_id: &SystemId) -> Result, DomainError> { + let rows = sqlx::query_as::<_, IngestSessionRow>( + "SELECT session_id, uploader_user_id, client_device_id, original_filename, + client_checksum, target_library_path_id, status, created_at, error_message + FROM ingest_sessions WHERE uploader_user_id = $1", + ) + .bind(*user_id.as_uuid()) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + rows.into_iter().map(TryInto::try_into).collect() + } + + async fn save(&self, session: &IngestSession) -> Result<(), DomainError> { + sqlx::query( + "INSERT INTO ingest_sessions (session_id, uploader_user_id, client_device_id, original_filename, + client_checksum, target_library_path_id, status, created_at, error_message) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + ON CONFLICT (session_id) DO UPDATE SET + status = EXCLUDED.status, + error_message = EXCLUDED.error_message", + ) + .bind(*session.session_id.as_uuid()) + .bind(*session.uploader_user_id.as_uuid()) + .bind(&session.client_device_id) + .bind(&session.original_filename) + .bind(session.client_checksum.as_str()) + .bind(*session.target_library_path_id.as_uuid()) + .bind(status_to_str(&session.status)) + .bind(session.created_at.as_datetime()) + .bind(session.error_message.as_deref()) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } +} diff --git a/crates/adapters/postgres/src/lib.rs b/crates/adapters/postgres/src/lib.rs index 46f66b2..2d663bc 100644 --- a/crates/adapters/postgres/src/lib.rs +++ b/crates/adapters/postgres/src/lib.rs @@ -1,5 +1,21 @@ pub mod db; + +pub mod album_repository; +pub mod asset_metadata_repository; +pub mod asset_repository; +pub mod ingest_session_repository; +pub mod library_path_repository; +pub mod quota_repository; +pub mod storage_volume_repository; pub mod user_repository; pub use db::{PgPool, connect, run_migrations}; + +pub use album_repository::PostgresAlbumRepository; +pub use asset_metadata_repository::PostgresAssetMetadataRepository; +pub use asset_repository::PostgresAssetRepository; +pub use ingest_session_repository::PostgresIngestSessionRepository; +pub use library_path_repository::PostgresLibraryPathRepository; +pub use quota_repository::{PostgresQuotaRepository, PostgresUsageLedgerRepository}; +pub use storage_volume_repository::PostgresStorageVolumeRepository; pub use user_repository::PostgresUserRepository; diff --git a/crates/adapters/postgres/src/library_path_repository.rs b/crates/adapters/postgres/src/library_path_repository.rs new file mode 100644 index 0000000..72449e9 --- /dev/null +++ b/crates/adapters/postgres/src/library_path_repository.rs @@ -0,0 +1,136 @@ +use crate::db::PgPool; +use async_trait::async_trait; +use domain::{ + entities::{LibraryPath, OwnershipPolicy}, + errors::DomainError, + ports::LibraryPathRepository, + value_objects::SystemId, +}; +use uuid::Uuid; + +#[derive(sqlx::FromRow)] +struct LibraryPathRow { + path_id: Uuid, + volume_id: Uuid, + relative_path: String, + is_ingest_destination: bool, + ownership_policy: String, + designated_owner_id: Option, +} + +fn policy_from_str(s: &str) -> OwnershipPolicy { + match s { + "user_owned" => OwnershipPolicy::UserOwned, + "group_owned" => OwnershipPolicy::GroupOwned, + _ => OwnershipPolicy::Unassigned, + } +} + +fn policy_to_str(p: &OwnershipPolicy) -> &'static str { + match p { + OwnershipPolicy::UserOwned => "user_owned", + OwnershipPolicy::GroupOwned => "group_owned", + OwnershipPolicy::Unassigned => "unassigned", + } +} + +impl From for LibraryPath { + fn from(r: LibraryPathRow) -> Self { + Self { + path_id: SystemId::from_uuid(r.path_id), + volume_id: SystemId::from_uuid(r.volume_id), + relative_path: r.relative_path, + is_ingest_destination: r.is_ingest_destination, + ownership_policy: policy_from_str(&r.ownership_policy), + designated_owner_id: r.designated_owner_id.map(SystemId::from_uuid), + } + } +} + +pub struct PostgresLibraryPathRepository { + pool: PgPool, +} + +impl PostgresLibraryPathRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl LibraryPathRepository for PostgresLibraryPathRepository { + async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { + let row = sqlx::query_as::<_, LibraryPathRow>( + "SELECT path_id, volume_id, relative_path, is_ingest_destination, ownership_policy, designated_owner_id + FROM library_paths WHERE path_id = $1", + ) + .bind(*id.as_uuid()) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(row.map(Into::into)) + } + + async fn find_by_volume(&self, volume_id: &SystemId) -> Result, DomainError> { + let rows = sqlx::query_as::<_, LibraryPathRow>( + "SELECT path_id, volume_id, relative_path, is_ingest_destination, ownership_policy, designated_owner_id + FROM library_paths WHERE volume_id = $1", + ) + .bind(*volume_id.as_uuid()) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(rows.into_iter().map(Into::into).collect()) + } + + async fn find_ingest_destinations( + &self, + owner_id: &SystemId, + ) -> Result, DomainError> { + let rows = sqlx::query_as::<_, LibraryPathRow>( + "SELECT path_id, volume_id, relative_path, is_ingest_destination, ownership_policy, designated_owner_id + FROM library_paths + WHERE is_ingest_destination = true AND designated_owner_id = $1", + ) + .bind(*owner_id.as_uuid()) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(rows.into_iter().map(Into::into).collect()) + } + + async fn save(&self, path: &LibraryPath) -> Result<(), DomainError> { + sqlx::query( + "INSERT INTO library_paths (path_id, volume_id, relative_path, is_ingest_destination, ownership_policy, designated_owner_id) + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT (path_id) DO UPDATE SET + volume_id = EXCLUDED.volume_id, + relative_path = EXCLUDED.relative_path, + is_ingest_destination = EXCLUDED.is_ingest_destination, + ownership_policy = EXCLUDED.ownership_policy, + designated_owner_id = EXCLUDED.designated_owner_id", + ) + .bind(*path.path_id.as_uuid()) + .bind(*path.volume_id.as_uuid()) + .bind(&path.relative_path) + .bind(path.is_ingest_destination) + .bind(policy_to_str(&path.ownership_policy)) + .bind(path.designated_owner_id.as_ref().map(|id| *id.as_uuid())) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } + + async fn delete(&self, id: &SystemId) -> Result<(), DomainError> { + sqlx::query("DELETE FROM library_paths WHERE path_id = $1") + .bind(*id.as_uuid()) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } +} diff --git a/crates/adapters/postgres/src/quota_repository.rs b/crates/adapters/postgres/src/quota_repository.rs new file mode 100644 index 0000000..8779330 --- /dev/null +++ b/crates/adapters/postgres/src/quota_repository.rs @@ -0,0 +1,261 @@ +use crate::db::PgPool; +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use domain::{ + entities::{QuotaDefinition, QuotaRule, TimePeriod, UsageLedgerEntry, UsageType}, + errors::DomainError, + ports::{QuotaRepository, UsageLedgerRepository}, + value_objects::{DateTimeStamp, SystemId}, +}; +use uuid::Uuid; + +// --- Enum mappings --- + +fn usage_type_from_str(s: &str) -> UsageType { + match s { + "storage_bytes" => UsageType::StorageBytes, + "process_jobs" => UsageType::ProcessJobs, + "api_calls" => UsageType::ApiCalls, + "indexing_size" => UsageType::IndexingSize, + _ => UsageType::StorageBytes, + } +} + +fn usage_type_to_str(t: &UsageType) -> &'static str { + match t { + UsageType::StorageBytes => "storage_bytes", + UsageType::ProcessJobs => "process_jobs", + UsageType::ApiCalls => "api_calls", + UsageType::IndexingSize => "indexing_size", + } +} + +fn time_period_from_str(s: &str) -> TimePeriod { + match s { + "daily" => TimePeriod::Daily, + "monthly" => TimePeriod::Monthly, + "lifetime" => TimePeriod::Lifetime, + _ => TimePeriod::Lifetime, + } +} + +fn time_period_to_str(p: &TimePeriod) -> &'static str { + match p { + TimePeriod::Daily => "daily", + TimePeriod::Monthly => "monthly", + TimePeriod::Lifetime => "lifetime", + } +} + +// --- Row structs --- + +#[derive(sqlx::FromRow)] +struct QuotaDefRow { + quota_id: Uuid, + owner_scope: Uuid, + is_enforced: bool, +} + +#[derive(sqlx::FromRow)] +#[allow(dead_code)] +struct QuotaRuleRow { + rule_id: Uuid, + quota_id: Uuid, + dimension: String, + limit_value: i64, + time_period: String, + is_unlimited: bool, +} + +#[derive(sqlx::FromRow)] +struct UsageLedgerRow { + entry_id: Uuid, + user_id: Uuid, + usage_type: String, + consumed_amount: i64, + timestamp: DateTime, + context: String, +} + +#[derive(sqlx::FromRow)] +struct SumRow { + total: i64, +} + +impl From for QuotaRule { + fn from(r: QuotaRuleRow) -> Self { + Self { + rule_id: SystemId::from_uuid(r.rule_id), + dimension: usage_type_from_str(&r.dimension), + limit_value: r.limit_value as u64, + time_period: time_period_from_str(&r.time_period), + is_unlimited: r.is_unlimited, + } + } +} + +impl From for UsageLedgerEntry { + fn from(r: UsageLedgerRow) -> Self { + Self { + entry_id: SystemId::from_uuid(r.entry_id), + user_id: SystemId::from_uuid(r.user_id), + usage_type: usage_type_from_str(&r.usage_type), + consumed_amount: r.consumed_amount as u64, + timestamp: DateTimeStamp::from_datetime(r.timestamp), + context: r.context, + } + } +} + +// --- PostgresQuotaRepository --- + +pub struct PostgresQuotaRepository { + pool: PgPool, +} + +impl PostgresQuotaRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl QuotaRepository for PostgresQuotaRepository { + async fn find_by_owner( + &self, + owner_id: &SystemId, + ) -> Result, DomainError> { + let def_row = sqlx::query_as::<_, QuotaDefRow>( + "SELECT quota_id, owner_scope, is_enforced FROM quota_definitions WHERE owner_scope = $1", + ) + .bind(*owner_id.as_uuid()) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + let Some(def) = def_row else { + return Ok(None); + }; + + let rule_rows = sqlx::query_as::<_, QuotaRuleRow>( + "SELECT rule_id, quota_id, dimension, limit_value, time_period, is_unlimited + FROM quota_rules WHERE quota_id = $1", + ) + .bind(def.quota_id) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(Some(QuotaDefinition { + quota_id: SystemId::from_uuid(def.quota_id), + owner_scope: SystemId::from_uuid(def.owner_scope), + is_enforced: def.is_enforced, + rules: rule_rows.into_iter().map(Into::into).collect(), + })) + } + + async fn save(&self, quota: &QuotaDefinition) -> Result<(), DomainError> { + sqlx::query( + "INSERT INTO quota_definitions (quota_id, owner_scope, is_enforced) + VALUES ($1, $2, $3) + ON CONFLICT (quota_id) DO UPDATE SET + owner_scope = EXCLUDED.owner_scope, + is_enforced = EXCLUDED.is_enforced", + ) + .bind(*quota.quota_id.as_uuid()) + .bind(*quota.owner_scope.as_uuid()) + .bind(quota.is_enforced) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + // Delete old rules then re-insert + sqlx::query("DELETE FROM quota_rules WHERE quota_id = $1") + .bind(*quota.quota_id.as_uuid()) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + for rule in "a.rules { + sqlx::query( + "INSERT INTO quota_rules (rule_id, quota_id, dimension, limit_value, time_period, is_unlimited) + VALUES ($1, $2, $3, $4, $5, $6)", + ) + .bind(*rule.rule_id.as_uuid()) + .bind(*quota.quota_id.as_uuid()) + .bind(usage_type_to_str(&rule.dimension)) + .bind(rule.limit_value as i64) + .bind(time_period_to_str(&rule.time_period)) + .bind(rule.is_unlimited) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + } + + Ok(()) + } + + async fn delete(&self, id: &SystemId) -> Result<(), DomainError> { + // Rules cascade-delete + sqlx::query("DELETE FROM quota_definitions WHERE quota_id = $1") + .bind(*id.as_uuid()) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } +} + +// --- PostgresUsageLedgerRepository --- + +pub struct PostgresUsageLedgerRepository { + pool: PgPool, +} + +impl PostgresUsageLedgerRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl UsageLedgerRepository for PostgresUsageLedgerRepository { + async fn record(&self, entry: &UsageLedgerEntry) -> Result<(), DomainError> { + sqlx::query( + "INSERT INTO usage_ledger (entry_id, user_id, usage_type, consumed_amount, timestamp, context) + VALUES ($1, $2, $3, $4, $5, $6)", + ) + .bind(*entry.entry_id.as_uuid()) + .bind(*entry.user_id.as_uuid()) + .bind(usage_type_to_str(&entry.usage_type)) + .bind(entry.consumed_amount as i64) + .bind(entry.timestamp.as_datetime()) + .bind(&entry.context) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } + + async fn sum_usage( + &self, + user_id: &SystemId, + usage_type: UsageType, + since: Option, + ) -> Result { + let since_dt: Option> = since.map(|s| *s.as_datetime()); + let row = sqlx::query_as::<_, SumRow>( + "SELECT COALESCE(SUM(consumed_amount), 0) as total + FROM usage_ledger + WHERE user_id = $1 AND usage_type = $2 AND ($3::timestamptz IS NULL OR timestamp >= $3)", + ) + .bind(*user_id.as_uuid()) + .bind(usage_type_to_str(&usage_type)) + .bind(since_dt) + .fetch_one(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(row.total as u64) + } +} diff --git a/crates/adapters/postgres/src/storage_volume_repository.rs b/crates/adapters/postgres/src/storage_volume_repository.rs new file mode 100644 index 0000000..beb4a17 --- /dev/null +++ b/crates/adapters/postgres/src/storage_volume_repository.rs @@ -0,0 +1,98 @@ +use crate::db::PgPool; +use async_trait::async_trait; +use domain::{ + entities::StorageVolume, + errors::DomainError, + ports::StorageVolumeRepository, + value_objects::SystemId, +}; +use uuid::Uuid; + +#[derive(sqlx::FromRow)] +struct StorageVolumeRow { + volume_id: Uuid, + volume_name: String, + uri_prefix: String, + is_writable: bool, + available_bytes: i64, +} + +impl From for StorageVolume { + fn from(r: StorageVolumeRow) -> Self { + Self { + volume_id: SystemId::from_uuid(r.volume_id), + volume_name: r.volume_name, + uri_prefix: r.uri_prefix, + is_writable: r.is_writable, + available_bytes: r.available_bytes as u64, + } + } +} + +pub struct PostgresStorageVolumeRepository { + pool: PgPool, +} + +impl PostgresStorageVolumeRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl StorageVolumeRepository for PostgresStorageVolumeRepository { + async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { + let row = sqlx::query_as::<_, StorageVolumeRow>( + "SELECT volume_id, volume_name, uri_prefix, is_writable, available_bytes + FROM storage_volumes WHERE volume_id = $1", + ) + .bind(*id.as_uuid()) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(row.map(Into::into)) + } + + async fn find_all(&self) -> Result, DomainError> { + let rows = sqlx::query_as::<_, StorageVolumeRow>( + "SELECT volume_id, volume_name, uri_prefix, is_writable, available_bytes + FROM storage_volumes", + ) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + Ok(rows.into_iter().map(Into::into).collect()) + } + + async fn save(&self, volume: &StorageVolume) -> Result<(), DomainError> { + sqlx::query( + "INSERT INTO storage_volumes (volume_id, volume_name, uri_prefix, is_writable, available_bytes) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (volume_id) DO UPDATE SET + volume_name = EXCLUDED.volume_name, + uri_prefix = EXCLUDED.uri_prefix, + is_writable = EXCLUDED.is_writable, + available_bytes = EXCLUDED.available_bytes", + ) + .bind(*volume.volume_id.as_uuid()) + .bind(&volume.volume_name) + .bind(&volume.uri_prefix) + .bind(volume.is_writable) + .bind(volume.available_bytes as i64) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } + + async fn delete(&self, id: &SystemId) -> Result<(), DomainError> { + sqlx::query("DELETE FROM storage_volumes WHERE volume_id = $1") + .bind(*id.as_uuid()) + .execute(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + Ok(()) + } +} diff --git a/crates/adapters/postgres/src/user_repository.rs b/crates/adapters/postgres/src/user_repository.rs index 2cb2c76..6950811 100644 --- a/crates/adapters/postgres/src/user_repository.rs +++ b/crates/adapters/postgres/src/user_repository.rs @@ -1,11 +1,34 @@ use crate::db::PgPool; use async_trait::async_trait; +use chrono::{DateTime, Utc}; use domain::{ - entities::User, errors::DomainError, ports::UserRepository, value_objects::{Email, PasswordHash, SystemId}, }; +use uuid::Uuid; + +#[derive(sqlx::FromRow)] +struct UserRow { + id: Uuid, + username: String, + email: String, + password_hash: String, + created_at: DateTime, +} + +impl TryFrom for domain::entities::User { + type Error = DomainError; + fn try_from(r: UserRow) -> Result { + Ok(Self { + id: SystemId::from_uuid(r.id), + username: r.username, + email: Email::new(r.email)?, + password_hash: PasswordHash::from_hash(r.password_hash), + created_at: r.created_at, + }) + } +} pub struct PostgresUserRepository { pool: PgPool, @@ -19,66 +42,66 @@ impl PostgresUserRepository { #[async_trait] impl UserRepository for PostgresUserRepository { - async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { - let row = sqlx::query!( - "SELECT id, email, password_hash, created_at FROM users WHERE id = $1", - *id.as_uuid() + async fn find_by_id(&self, id: &SystemId) -> Result, DomainError> { + let row = sqlx::query_as::<_, UserRow>( + "SELECT id, username, email, password_hash, created_at FROM users WHERE id = $1", ) + .bind(*id.as_uuid()) .fetch_optional(&self.pool) .await .map_err(|e| DomainError::Internal(e.to_string()))?; - row.map(|r| { - Ok(User { - id: SystemId::from_uuid(r.id), - email: Email::new(r.email)?, - password_hash: PasswordHash::from_hash(r.password_hash), - created_at: r.created_at, - }) - }) - .transpose() + row.map(TryInto::try_into).transpose() } - async fn find_by_email(&self, email: &Email) -> Result, DomainError> { - let row = sqlx::query!( - "SELECT id, email, password_hash, created_at FROM users WHERE email = $1", - email.as_str() + async fn find_by_email(&self, email: &Email) -> Result, DomainError> { + let row = sqlx::query_as::<_, UserRow>( + "SELECT id, username, email, password_hash, created_at FROM users WHERE email = $1", ) + .bind(email.as_str()) .fetch_optional(&self.pool) .await .map_err(|e| DomainError::Internal(e.to_string()))?; - row.map(|r| { - Ok(User { - id: SystemId::from_uuid(r.id), - email: Email::new(r.email)?, - password_hash: PasswordHash::from_hash(r.password_hash), - created_at: r.created_at, - }) - }) - .transpose() + row.map(TryInto::try_into).transpose() } - async fn save(&self, user: &User) -> Result<(), DomainError> { - sqlx::query!( - "INSERT INTO users (id, email, password_hash, created_at) - VALUES ($1, $2, $3, $4) + async fn find_by_username(&self, username: &str) -> Result, DomainError> { + let row = sqlx::query_as::<_, UserRow>( + "SELECT id, username, email, password_hash, created_at FROM users WHERE username = $1", + ) + .bind(username) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::Internal(e.to_string()))?; + + row.map(TryInto::try_into).transpose() + } + + async fn save(&self, user: &domain::entities::User) -> Result<(), DomainError> { + sqlx::query_as::<_, UserRow>( + "INSERT INTO users (id, username, email, password_hash, created_at) + VALUES ($1, $2, $3, $4, $5) ON CONFLICT (id) DO UPDATE SET + username = EXCLUDED.username, email = EXCLUDED.email, - password_hash = EXCLUDED.password_hash", - *user.id.as_uuid(), - user.email.as_str(), - user.password_hash.as_str(), - user.created_at + password_hash = EXCLUDED.password_hash + RETURNING id, username, email, password_hash, created_at", ) - .execute(&self.pool) + .bind(*user.id.as_uuid()) + .bind(&user.username) + .bind(user.email.as_str()) + .bind(user.password_hash.as_str()) + .bind(user.created_at) + .fetch_one(&self.pool) .await .map_err(|e| DomainError::Internal(e.to_string()))?; Ok(()) } async fn delete(&self, id: &SystemId) -> Result<(), DomainError> { - sqlx::query!("DELETE FROM users WHERE id = $1", *id.as_uuid()) + sqlx::query("DELETE FROM users WHERE id = $1") + .bind(*id.as_uuid()) .execute(&self.pool) .await .map_err(|e| DomainError::Internal(e.to_string()))?;