From c16c9d45819b4227925e820583b42360cbd84c99 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Sun, 31 May 2026 18:24:16 +0200 Subject: [PATCH] refactor: extract pg_repo macro and MapDomainError trait to reduce postgres adapter boilerplate --- crates/adapters/postgres/src/catalog/mod.rs | 58 ++++-------- crates/adapters/postgres/src/helpers.rs | 36 ++++++++ crates/adapters/postgres/src/identity/mod.rs | 22 ++--- crates/adapters/postgres/src/lib.rs | 1 + .../adapters/postgres/src/organization/mod.rs | 48 ++++------ .../adapters/postgres/src/processing/mod.rs | 66 ++++--------- crates/adapters/postgres/src/sharing/mod.rs | 50 ++++------ crates/adapters/postgres/src/sidecar/mod.rs | 20 ++-- crates/adapters/postgres/src/storage/mod.rs | 92 ++++++------------- 9 files changed, 144 insertions(+), 249 deletions(-) create mode 100644 crates/adapters/postgres/src/helpers.rs diff --git a/crates/adapters/postgres/src/catalog/mod.rs b/crates/adapters/postgres/src/catalog/mod.rs index d33634f..55068c8 100644 --- a/crates/adapters/postgres/src/catalog/mod.rs +++ b/crates/adapters/postgres/src/catalog/mod.rs @@ -1,4 +1,4 @@ -use crate::db::PgPool; +use crate::helpers::{pg_repo, MapDomainError}; use async_trait::async_trait; use chrono::{DateTime, Utc}; use domain::{ @@ -67,15 +67,7 @@ impl TryFrom for Asset { } } -pub struct PostgresAssetRepository { - pool: PgPool, -} - -impl PostgresAssetRepository { - pub fn new(pool: PgPool) -> Self { - Self { pool } - } -} +pg_repo!(PostgresAssetRepository); #[async_trait] impl AssetRepository for PostgresAssetRepository { @@ -88,7 +80,7 @@ impl AssetRepository for PostgresAssetRepository { .bind(*id.as_uuid()) .fetch_optional(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; row.map(TryInto::try_into).transpose() } @@ -102,7 +94,7 @@ impl AssetRepository for PostgresAssetRepository { .bind(checksum.as_str()) .fetch_all(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; rows.into_iter().map(TryInto::try_into).collect() } @@ -125,7 +117,7 @@ impl AssetRepository for PostgresAssetRepository { .bind(offset as i64) .fetch_all(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; rows.into_iter().map(TryInto::try_into).collect() } @@ -157,7 +149,7 @@ impl AssetRepository for PostgresAssetRepository { .bind(asset.created_at.as_datetime()) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(()) } @@ -166,7 +158,7 @@ impl AssetRepository for PostgresAssetRepository { .bind(*id.as_uuid()) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(()) } } @@ -253,15 +245,7 @@ impl From for AssetMetadata { } } -pub struct PostgresAssetMetadataRepository { - pool: PgPool, -} - -impl PostgresAssetMetadataRepository { - pub fn new(pool: PgPool) -> Self { - Self { pool } - } -} +pg_repo!(PostgresAssetMetadataRepository); #[async_trait] impl AssetMetadataRepository for PostgresAssetMetadataRepository { @@ -273,7 +257,7 @@ impl AssetMetadataRepository for PostgresAssetMetadataRepository { .bind(*asset_id.as_uuid()) .fetch_all(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(rows.into_iter().map(Into::into).collect()) } @@ -291,7 +275,7 @@ impl AssetMetadataRepository for PostgresAssetMetadataRepository { .bind(source_to_str(&source)) .fetch_optional(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(row.map(Into::into)) } @@ -310,7 +294,7 @@ impl AssetMetadataRepository for PostgresAssetMetadataRepository { .bind(metadata.updated_at.as_datetime()) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(()) } @@ -324,7 +308,7 @@ impl AssetMetadataRepository for PostgresAssetMetadataRepository { .bind(source_to_str(&source)) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(()) } } @@ -407,15 +391,7 @@ impl From for DuplicateGroup { } } -pub struct PostgresDuplicateRepository { - pool: PgPool, -} - -impl PostgresDuplicateRepository { - pub fn new(pool: PgPool) -> Self { - Self { pool } - } -} +pg_repo!(PostgresDuplicateRepository); #[async_trait] impl DuplicateRepository for PostgresDuplicateRepository { @@ -427,7 +403,7 @@ impl DuplicateRepository for PostgresDuplicateRepository { .bind(*id.as_uuid()) .fetch_optional(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(row.map(Into::into)) } @@ -439,7 +415,7 @@ impl DuplicateRepository for PostgresDuplicateRepository { ) .fetch_all(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(rows.into_iter().map(Into::into).collect()) } @@ -452,7 +428,7 @@ impl DuplicateRepository for PostgresDuplicateRepository { .bind(serde_json::json!([{"asset_id": asset_id.as_uuid()}])) .fetch_all(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(rows.into_iter().map(Into::into).collect()) } @@ -472,7 +448,7 @@ impl DuplicateRepository for PostgresDuplicateRepository { .bind(candidates_to_json(&group.candidates)) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(()) } } diff --git a/crates/adapters/postgres/src/helpers.rs b/crates/adapters/postgres/src/helpers.rs new file mode 100644 index 0000000..4e54135 --- /dev/null +++ b/crates/adapters/postgres/src/helpers.rs @@ -0,0 +1,36 @@ +use domain::errors::DomainError; + +/// Extension trait for converting `sqlx::Error` into `DomainError`. +pub trait MapDomainError { + fn map_pg(self) -> Result; +} + +impl MapDomainError for Result { + fn map_pg(self) -> Result { + self.map_err(|e| DomainError::Internal(e.to_string())) + } +} + +/// Generates a Postgres repository struct with a `PgPool` field and a `new` constructor. +/// +/// ```ignore +/// pg_repo!(PostgresFooRepository); +/// // expands to: +/// // pub struct PostgresFooRepository { pool: PgPool } +/// // impl PostgresFooRepository { pub fn new(pool: PgPool) -> Self { Self { pool } } } +/// ``` +macro_rules! pg_repo { + ($name:ident) => { + pub struct $name { + pool: crate::db::PgPool, + } + + impl $name { + pub fn new(pool: crate::db::PgPool) -> Self { + Self { pool } + } + } + }; +} + +pub(crate) use pg_repo; diff --git a/crates/adapters/postgres/src/identity/mod.rs b/crates/adapters/postgres/src/identity/mod.rs index c02a02a..3463d4e 100644 --- a/crates/adapters/postgres/src/identity/mod.rs +++ b/crates/adapters/postgres/src/identity/mod.rs @@ -1,4 +1,4 @@ -use crate::db::PgPool; +use crate::helpers::{pg_repo, MapDomainError}; use async_trait::async_trait; use chrono::{DateTime, Utc}; use domain::{ @@ -30,15 +30,7 @@ impl TryFrom for domain::entities::User { } } -pub struct PostgresUserRepository { - pool: PgPool, -} - -impl PostgresUserRepository { - pub fn new(pool: PgPool) -> Self { - Self { pool } - } -} +pg_repo!(PostgresUserRepository); #[async_trait] impl UserRepository for PostgresUserRepository { @@ -52,7 +44,7 @@ impl UserRepository for PostgresUserRepository { .bind(*id.as_uuid()) .fetch_optional(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; row.map(TryInto::try_into).transpose() } @@ -67,7 +59,7 @@ impl UserRepository for PostgresUserRepository { .bind(email.as_str()) .fetch_optional(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; row.map(TryInto::try_into).transpose() } @@ -82,7 +74,7 @@ impl UserRepository for PostgresUserRepository { .bind(username) .fetch_optional(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; row.map(TryInto::try_into).transpose() } @@ -104,7 +96,7 @@ impl UserRepository for PostgresUserRepository { .bind(user.created_at) .fetch_one(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(()) } @@ -113,7 +105,7 @@ impl UserRepository for PostgresUserRepository { .bind(*id.as_uuid()) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(()) } } diff --git a/crates/adapters/postgres/src/lib.rs b/crates/adapters/postgres/src/lib.rs index c047e0c..ce1d970 100644 --- a/crates/adapters/postgres/src/lib.rs +++ b/crates/adapters/postgres/src/lib.rs @@ -1,4 +1,5 @@ pub mod db; +mod helpers; pub mod catalog; pub mod identity; diff --git a/crates/adapters/postgres/src/organization/mod.rs b/crates/adapters/postgres/src/organization/mod.rs index fa340f5..8760b1f 100644 --- a/crates/adapters/postgres/src/organization/mod.rs +++ b/crates/adapters/postgres/src/organization/mod.rs @@ -1,4 +1,4 @@ -use crate::db::PgPool; +use crate::helpers::{pg_repo, MapDomainError}; use async_trait::async_trait; use chrono::{DateTime, Utc}; use domain::{ @@ -60,15 +60,9 @@ fn album_from_row(r: AlbumRow, entries: Vec) -> Album { } } -pub struct PostgresAlbumRepository { - pool: PgPool, -} +pg_repo!(PostgresAlbumRepository); impl PostgresAlbumRepository { - pub fn new(pool: PgPool) -> Self { - Self { pool } - } - async fn load_entries(&self, album_id: &Uuid) -> Result, DomainError> { let rows = sqlx::query_as::<_, AlbumEntryRow>( "SELECT album_id, asset_id, sort_order, added_at, added_by_user_id @@ -78,7 +72,7 @@ impl PostgresAlbumRepository { .bind(album_id) .fetch_all(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(rows.into_iter().map(Into::into).collect()) } @@ -95,7 +89,7 @@ impl AlbumRepository for PostgresAlbumRepository { .bind(*id.as_uuid()) .fetch_optional(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; let Some(r) = row else { return Ok(None); @@ -114,7 +108,7 @@ impl AlbumRepository for PostgresAlbumRepository { .bind(*creator_id.as_uuid()) .fetch_all(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; let mut albums = Vec::with_capacity(rows.len()); for r in rows { @@ -146,14 +140,14 @@ impl AlbumRepository for PostgresAlbumRepository { .bind(album.created_at.as_datetime()) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; // Sync entries: delete all then re-insert sqlx::query("DELETE FROM album_entries WHERE album_id = $1") .bind(*album.album_id.as_uuid()) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; for entry in &album.entries { sqlx::query( @@ -167,7 +161,7 @@ impl AlbumRepository for PostgresAlbumRepository { .bind(*entry.added_by_user_id.as_uuid()) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; } Ok(()) @@ -179,7 +173,7 @@ impl AlbumRepository for PostgresAlbumRepository { .bind(*id.as_uuid()) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(()) } } @@ -240,15 +234,7 @@ impl From for AssetTag { } } -pub struct PostgresTagRepository { - pool: PgPool, -} - -impl PostgresTagRepository { - pub fn new(pool: PgPool) -> Self { - Self { pool } - } -} +pg_repo!(PostgresTagRepository); #[async_trait] impl TagRepository for PostgresTagRepository { @@ -259,7 +245,7 @@ impl TagRepository for PostgresTagRepository { .bind(*id.as_uuid()) .fetch_optional(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(row.map(Into::into)) } @@ -271,7 +257,7 @@ impl TagRepository for PostgresTagRepository { .bind(name) .fetch_optional(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(row.map(Into::into)) } @@ -288,7 +274,7 @@ impl TagRepository for PostgresTagRepository { .bind(*asset_id.as_uuid()) .fetch_all(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; let at_rows = sqlx::query_as::<_, AssetTagRow>( "SELECT asset_id, tag_id, tagged_by_user_id, confidence @@ -297,7 +283,7 @@ impl TagRepository for PostgresTagRepository { .bind(*asset_id.as_uuid()) .fetch_all(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; let tags: Vec = rows.into_iter().map(Into::into).collect(); let asset_tags: Vec = at_rows.into_iter().map(Into::into).collect(); @@ -316,7 +302,7 @@ impl TagRepository for PostgresTagRepository { .bind(tag_source_to_str(&tag.tag_source)) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(()) } @@ -334,7 +320,7 @@ impl TagRepository for PostgresTagRepository { .bind(asset_tag.confidence) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(()) } @@ -348,7 +334,7 @@ impl TagRepository for PostgresTagRepository { .bind(*tag_id.as_uuid()) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(()) } } diff --git a/crates/adapters/postgres/src/processing/mod.rs b/crates/adapters/postgres/src/processing/mod.rs index ccdf4bb..4465470 100644 --- a/crates/adapters/postgres/src/processing/mod.rs +++ b/crates/adapters/postgres/src/processing/mod.rs @@ -1,4 +1,4 @@ -use crate::db::PgPool; +use crate::helpers::{pg_repo, MapDomainError}; use async_trait::async_trait; use chrono::{DateTime, Utc}; use domain::{ @@ -119,15 +119,7 @@ impl From for Job { } } -pub struct PostgresJobRepository { - pool: PgPool, -} - -impl PostgresJobRepository { - pub fn new(pool: PgPool) -> Self { - Self { pool } - } -} +pg_repo!(PostgresJobRepository); #[async_trait] impl JobRepository for PostgresJobRepository { @@ -141,7 +133,7 @@ impl JobRepository for PostgresJobRepository { .bind(*id.as_uuid()) .fetch_optional(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(row.map(Into::into)) } @@ -157,7 +149,7 @@ impl JobRepository for PostgresJobRepository { ) .fetch_optional(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(row.map(Into::into)) } @@ -173,7 +165,7 @@ impl JobRepository for PostgresJobRepository { .bind(*batch_id.as_uuid()) .fetch_all(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(rows.into_iter().map(Into::into).collect()) } @@ -210,7 +202,7 @@ impl JobRepository for PostgresJobRepository { .bind(&job.error_message) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(()) } } @@ -261,15 +253,7 @@ impl From for JobBatch { } } -pub struct PostgresJobBatchRepository { - pool: PgPool, -} - -impl PostgresJobBatchRepository { - pub fn new(pool: PgPool) -> Self { - Self { pool } - } -} +pg_repo!(PostgresJobBatchRepository); #[async_trait] impl JobBatchRepository for PostgresJobBatchRepository { @@ -281,7 +265,7 @@ impl JobBatchRepository for PostgresJobBatchRepository { .bind(*id.as_uuid()) .fetch_optional(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(row.map(Into::into)) } @@ -304,7 +288,7 @@ impl JobBatchRepository for PostgresJobBatchRepository { .bind(batch_status_to_str(&batch.status)) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(()) } } @@ -351,15 +335,7 @@ impl From for Plugin { } } -pub struct PostgresPluginRepository { - pool: PgPool, -} - -impl PostgresPluginRepository { - pub fn new(pool: PgPool) -> Self { - Self { pool } - } -} +pg_repo!(PostgresPluginRepository); #[async_trait] impl PluginRepository for PostgresPluginRepository { @@ -371,7 +347,7 @@ impl PluginRepository for PostgresPluginRepository { .bind(*id.as_uuid()) .fetch_optional(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(row.map(Into::into)) } @@ -383,7 +359,7 @@ impl PluginRepository for PostgresPluginRepository { ) .fetch_all(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(rows.into_iter().map(Into::into).collect()) } @@ -405,7 +381,7 @@ impl PluginRepository for PostgresPluginRepository { .bind(structured_to_json(&plugin.configuration)) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(()) } } @@ -475,15 +451,7 @@ impl From for ProcessingPipeline { } } -pub struct PostgresPipelineRepository { - pool: PgPool, -} - -impl PostgresPipelineRepository { - pub fn new(pool: PgPool) -> Self { - Self { pool } - } -} +pg_repo!(PostgresPipelineRepository); #[async_trait] impl PipelineRepository for PostgresPipelineRepository { @@ -495,7 +463,7 @@ impl PipelineRepository for PostgresPipelineRepository { .bind(*id.as_uuid()) .fetch_optional(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(row.map(Into::into)) } @@ -508,7 +476,7 @@ impl PipelineRepository for PostgresPipelineRepository { .bind(event) .fetch_all(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(rows.into_iter().map(Into::into).collect()) } @@ -526,7 +494,7 @@ impl PipelineRepository for PostgresPipelineRepository { .bind(steps_to_json(&pipeline.steps)) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(()) } } diff --git a/crates/adapters/postgres/src/sharing/mod.rs b/crates/adapters/postgres/src/sharing/mod.rs index eee9f75..50ca08e 100644 --- a/crates/adapters/postgres/src/sharing/mod.rs +++ b/crates/adapters/postgres/src/sharing/mod.rs @@ -1,4 +1,4 @@ -use crate::db::PgPool; +use crate::helpers::{pg_repo, MapDomainError}; use async_trait::async_trait; use chrono::{DateTime, Utc}; use domain::{ @@ -207,15 +207,7 @@ impl TryFrom for InviteCode { } } -pub struct PostgresShareRepository { - pool: PgPool, -} - -impl PostgresShareRepository { - pub fn new(pool: PgPool) -> Self { - Self { pool } - } -} +pg_repo!(PostgresShareRepository); #[async_trait] impl ShareRepository for PostgresShareRepository { @@ -238,7 +230,7 @@ impl ShareRepository for PostgresShareRepository { .bind(scope.created_at.as_datetime()) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(()) } @@ -250,7 +242,7 @@ impl ShareRepository for PostgresShareRepository { .bind(*id.as_uuid()) .fetch_optional(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; row.map(TryInto::try_into).transpose() } @@ -266,7 +258,7 @@ impl ShareRepository for PostgresShareRepository { .bind(*resource_id.as_uuid()) .fetch_all(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; rows.into_iter().map(TryInto::try_into).collect() } @@ -276,7 +268,7 @@ impl ShareRepository for PostgresShareRepository { .bind(*id.as_uuid()) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(()) } @@ -296,7 +288,7 @@ impl ShareRepository for PostgresShareRepository { .bind(*target.role_id.as_uuid()) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(()) } @@ -311,7 +303,7 @@ impl ShareRepository for PostgresShareRepository { .bind(*scope_id.as_uuid()) .fetch_all(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; rows.into_iter().map(TryInto::try_into).collect() } @@ -327,7 +319,7 @@ impl ShareRepository for PostgresShareRepository { .bind(*user_id.as_uuid()) .fetch_all(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; rows.into_iter().map(TryInto::try_into).collect() } @@ -355,7 +347,7 @@ impl ShareRepository for PostgresShareRepository { .bind(link.use_count as i32) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(()) } @@ -367,7 +359,7 @@ impl ShareRepository for PostgresShareRepository { .bind(token) .fetch_optional(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; row.map(TryInto::try_into).transpose() } @@ -392,7 +384,7 @@ impl ShareRepository for PostgresShareRepository { .bind(*invite.assigned_role_id.as_uuid()) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(()) } @@ -404,7 +396,7 @@ impl ShareRepository for PostgresShareRepository { .bind(*id.as_uuid()) .fetch_optional(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; row.map(TryInto::try_into).transpose() } @@ -433,15 +425,7 @@ impl From for VisibilityFilter { } } -pub struct PostgresVisibilityFilterRepository { - pool: PgPool, -} - -impl PostgresVisibilityFilterRepository { - pub fn new(pool: PgPool) -> Self { - Self { pool } - } -} +pg_repo!(PostgresVisibilityFilterRepository); #[async_trait] impl VisibilityFilterRepository for PostgresVisibilityFilterRepository { @@ -458,7 +442,7 @@ impl VisibilityFilterRepository for PostgresVisibilityFilterRepository { .bind(*role_id.as_uuid()) .fetch_optional(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(row.map(Into::into)) } @@ -476,7 +460,7 @@ impl VisibilityFilterRepository for PostgresVisibilityFilterRepository { .bind(&filter.hidden_fields) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(()) } @@ -485,7 +469,7 @@ impl VisibilityFilterRepository for PostgresVisibilityFilterRepository { .bind(*id.as_uuid()) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(()) } } diff --git a/crates/adapters/postgres/src/sidecar/mod.rs b/crates/adapters/postgres/src/sidecar/mod.rs index 88f17a8..6fda3eb 100644 --- a/crates/adapters/postgres/src/sidecar/mod.rs +++ b/crates/adapters/postgres/src/sidecar/mod.rs @@ -1,4 +1,4 @@ -use crate::db::PgPool; +use crate::helpers::{pg_repo, MapDomainError}; use async_trait::async_trait; use chrono::{DateTime, Utc}; use domain::{ @@ -55,15 +55,7 @@ impl TryFrom for SidecarRecord { } } -pub struct PostgresSidecarRepository { - pool: PgPool, -} - -impl PostgresSidecarRepository { - pub fn new(pool: PgPool) -> Self { - Self { pool } - } -} +pg_repo!(PostgresSidecarRepository); #[async_trait] impl SidecarRepository for PostgresSidecarRepository { @@ -79,7 +71,7 @@ impl SidecarRepository for PostgresSidecarRepository { .bind(*asset_id.as_uuid()) .fetch_optional(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; row.map(TryInto::try_into).transpose() } @@ -93,7 +85,7 @@ impl SidecarRepository for PostgresSidecarRepository { .bind(sync_status_to_str(&status)) .fetch_all(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; rows.into_iter().map(TryInto::try_into).collect() } @@ -118,7 +110,7 @@ impl SidecarRepository for PostgresSidecarRepository { .bind(&record.error_message) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(()) } @@ -127,7 +119,7 @@ impl SidecarRepository for PostgresSidecarRepository { .bind(*asset_id.as_uuid()) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(()) } } diff --git a/crates/adapters/postgres/src/storage/mod.rs b/crates/adapters/postgres/src/storage/mod.rs index 0dd2cb5..cf3a33f 100644 --- a/crates/adapters/postgres/src/storage/mod.rs +++ b/crates/adapters/postgres/src/storage/mod.rs @@ -1,4 +1,4 @@ -use crate::db::PgPool; +use crate::helpers::{pg_repo, MapDomainError}; use async_trait::async_trait; use chrono::{DateTime, Utc}; use domain::{ @@ -40,15 +40,7 @@ impl From for StorageVolume { } } -pub struct PostgresStorageVolumeRepository { - pool: PgPool, -} - -impl PostgresStorageVolumeRepository { - pub fn new(pool: PgPool) -> Self { - Self { pool } - } -} +pg_repo!(PostgresStorageVolumeRepository); #[async_trait] impl StorageVolumeRepository for PostgresStorageVolumeRepository { @@ -60,7 +52,7 @@ impl StorageVolumeRepository for PostgresStorageVolumeRepository { .bind(*id.as_uuid()) .fetch_optional(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(row.map(Into::into)) } @@ -72,7 +64,7 @@ impl StorageVolumeRepository for PostgresStorageVolumeRepository { ) .fetch_all(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(rows.into_iter().map(Into::into).collect()) } @@ -94,7 +86,7 @@ impl StorageVolumeRepository for PostgresStorageVolumeRepository { .bind(volume.available_bytes as i64) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(()) } @@ -103,7 +95,7 @@ impl StorageVolumeRepository for PostgresStorageVolumeRepository { .bind(*id.as_uuid()) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(()) } } @@ -151,15 +143,7 @@ impl From for LibraryPath { } } -pub struct PostgresLibraryPathRepository { - pool: PgPool, -} - -impl PostgresLibraryPathRepository { - pub fn new(pool: PgPool) -> Self { - Self { pool } - } -} +pg_repo!(PostgresLibraryPathRepository); #[async_trait] impl LibraryPathRepository for PostgresLibraryPathRepository { @@ -171,7 +155,7 @@ impl LibraryPathRepository for PostgresLibraryPathRepository { .bind(*id.as_uuid()) .fetch_optional(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(row.map(Into::into)) } @@ -184,7 +168,7 @@ impl LibraryPathRepository for PostgresLibraryPathRepository { .bind(*volume_id.as_uuid()) .fetch_all(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(rows.into_iter().map(Into::into).collect()) } @@ -201,7 +185,7 @@ impl LibraryPathRepository for PostgresLibraryPathRepository { .bind(*owner_id.as_uuid()) .fetch_all(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(rows.into_iter().map(Into::into).collect()) } @@ -225,7 +209,7 @@ impl LibraryPathRepository for PostgresLibraryPathRepository { .bind(path.designated_owner_id.as_ref().map(|id| *id.as_uuid())) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(()) } @@ -234,7 +218,7 @@ impl LibraryPathRepository for PostgresLibraryPathRepository { .bind(*id.as_uuid()) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(()) } } @@ -294,15 +278,7 @@ impl TryFrom for IngestSession { } } -pub struct PostgresIngestSessionRepository { - pool: PgPool, -} - -impl PostgresIngestSessionRepository { - pub fn new(pool: PgPool) -> Self { - Self { pool } - } -} +pg_repo!(PostgresIngestSessionRepository); #[async_trait] impl IngestSessionRepository for PostgresIngestSessionRepository { @@ -315,7 +291,7 @@ impl IngestSessionRepository for PostgresIngestSessionRepository { .bind(*id.as_uuid()) .fetch_optional(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; row.map(TryInto::try_into).transpose() } @@ -329,7 +305,7 @@ impl IngestSessionRepository for PostgresIngestSessionRepository { .bind(*user_id.as_uuid()) .fetch_all(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; rows.into_iter().map(TryInto::try_into).collect() } @@ -354,7 +330,7 @@ impl IngestSessionRepository for PostgresIngestSessionRepository { .bind(session.error_message.as_deref()) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(()) } } @@ -457,15 +433,7 @@ impl From for UsageLedgerEntry { } } -pub struct PostgresQuotaRepository { - pool: PgPool, -} - -impl PostgresQuotaRepository { - pub fn new(pool: PgPool) -> Self { - Self { pool } - } -} +pg_repo!(PostgresQuotaRepository); #[async_trait] impl QuotaRepository for PostgresQuotaRepository { @@ -479,7 +447,7 @@ impl QuotaRepository for PostgresQuotaRepository { .bind(*owner_id.as_uuid()) .fetch_optional(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; let Some(def) = def_row else { return Ok(None); @@ -492,7 +460,7 @@ impl QuotaRepository for PostgresQuotaRepository { .bind(def.quota_id) .fetch_all(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(Some(QuotaDefinition { quota_id: SystemId::from_uuid(def.quota_id), @@ -515,14 +483,14 @@ impl QuotaRepository for PostgresQuotaRepository { .bind(quota.is_enforced) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; // Delete old rules then re-insert sqlx::query("DELETE FROM quota_rules WHERE quota_id = $1") .bind(*quota.quota_id.as_uuid()) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; for rule in "a.rules { sqlx::query( @@ -537,7 +505,7 @@ impl QuotaRepository for PostgresQuotaRepository { .bind(rule.is_unlimited) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; } Ok(()) @@ -549,20 +517,12 @@ impl QuotaRepository for PostgresQuotaRepository { .bind(*id.as_uuid()) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(()) } } -pub struct PostgresUsageLedgerRepository { - pool: PgPool, -} - -impl PostgresUsageLedgerRepository { - pub fn new(pool: PgPool) -> Self { - Self { pool } - } -} +pg_repo!(PostgresUsageLedgerRepository); #[async_trait] impl UsageLedgerRepository for PostgresUsageLedgerRepository { @@ -579,7 +539,7 @@ impl UsageLedgerRepository for PostgresUsageLedgerRepository { .bind(&entry.context) .execute(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(()) } @@ -600,7 +560,7 @@ impl UsageLedgerRepository for PostgresUsageLedgerRepository { .bind(since_dt) .fetch_one(&self.pool) .await - .map_err(|e| DomainError::Internal(e.to_string()))?; + .map_pg()?; Ok(row.total as u64) }