refactor: extract pg_repo macro and MapDomainError trait to reduce postgres adapter boilerplate

This commit is contained in:
2026-05-31 18:24:16 +02:00
parent 2fe0a4c245
commit c16c9d4581
9 changed files with 144 additions and 249 deletions

View File

@@ -1,4 +1,4 @@
use crate::db::PgPool; use crate::helpers::{pg_repo, MapDomainError};
use async_trait::async_trait; use async_trait::async_trait;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use domain::{ use domain::{
@@ -67,15 +67,7 @@ impl TryFrom<AssetRow> for Asset {
} }
} }
pub struct PostgresAssetRepository { pg_repo!(PostgresAssetRepository);
pool: PgPool,
}
impl PostgresAssetRepository {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
}
#[async_trait] #[async_trait]
impl AssetRepository for PostgresAssetRepository { impl AssetRepository for PostgresAssetRepository {
@@ -88,7 +80,7 @@ impl AssetRepository for PostgresAssetRepository {
.bind(*id.as_uuid()) .bind(*id.as_uuid())
.fetch_optional(&self.pool) .fetch_optional(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
row.map(TryInto::try_into).transpose() row.map(TryInto::try_into).transpose()
} }
@@ -102,7 +94,7 @@ impl AssetRepository for PostgresAssetRepository {
.bind(checksum.as_str()) .bind(checksum.as_str())
.fetch_all(&self.pool) .fetch_all(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
rows.into_iter().map(TryInto::try_into).collect() rows.into_iter().map(TryInto::try_into).collect()
} }
@@ -125,7 +117,7 @@ impl AssetRepository for PostgresAssetRepository {
.bind(offset as i64) .bind(offset as i64)
.fetch_all(&self.pool) .fetch_all(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
rows.into_iter().map(TryInto::try_into).collect() rows.into_iter().map(TryInto::try_into).collect()
} }
@@ -157,7 +149,7 @@ impl AssetRepository for PostgresAssetRepository {
.bind(asset.created_at.as_datetime()) .bind(asset.created_at.as_datetime())
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(()) Ok(())
} }
@@ -166,7 +158,7 @@ impl AssetRepository for PostgresAssetRepository {
.bind(*id.as_uuid()) .bind(*id.as_uuid())
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(()) Ok(())
} }
} }
@@ -253,15 +245,7 @@ impl From<AssetMetadataRow> for AssetMetadata {
} }
} }
pub struct PostgresAssetMetadataRepository { pg_repo!(PostgresAssetMetadataRepository);
pool: PgPool,
}
impl PostgresAssetMetadataRepository {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
}
#[async_trait] #[async_trait]
impl AssetMetadataRepository for PostgresAssetMetadataRepository { impl AssetMetadataRepository for PostgresAssetMetadataRepository {
@@ -273,7 +257,7 @@ impl AssetMetadataRepository for PostgresAssetMetadataRepository {
.bind(*asset_id.as_uuid()) .bind(*asset_id.as_uuid())
.fetch_all(&self.pool) .fetch_all(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(rows.into_iter().map(Into::into).collect()) Ok(rows.into_iter().map(Into::into).collect())
} }
@@ -291,7 +275,7 @@ impl AssetMetadataRepository for PostgresAssetMetadataRepository {
.bind(source_to_str(&source)) .bind(source_to_str(&source))
.fetch_optional(&self.pool) .fetch_optional(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(row.map(Into::into)) Ok(row.map(Into::into))
} }
@@ -310,7 +294,7 @@ impl AssetMetadataRepository for PostgresAssetMetadataRepository {
.bind(metadata.updated_at.as_datetime()) .bind(metadata.updated_at.as_datetime())
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(()) Ok(())
} }
@@ -324,7 +308,7 @@ impl AssetMetadataRepository for PostgresAssetMetadataRepository {
.bind(source_to_str(&source)) .bind(source_to_str(&source))
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(()) Ok(())
} }
} }
@@ -407,15 +391,7 @@ impl From<GroupRow> for DuplicateGroup {
} }
} }
pub struct PostgresDuplicateRepository { pg_repo!(PostgresDuplicateRepository);
pool: PgPool,
}
impl PostgresDuplicateRepository {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
}
#[async_trait] #[async_trait]
impl DuplicateRepository for PostgresDuplicateRepository { impl DuplicateRepository for PostgresDuplicateRepository {
@@ -427,7 +403,7 @@ impl DuplicateRepository for PostgresDuplicateRepository {
.bind(*id.as_uuid()) .bind(*id.as_uuid())
.fetch_optional(&self.pool) .fetch_optional(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(row.map(Into::into)) Ok(row.map(Into::into))
} }
@@ -439,7 +415,7 @@ impl DuplicateRepository for PostgresDuplicateRepository {
) )
.fetch_all(&self.pool) .fetch_all(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(rows.into_iter().map(Into::into).collect()) Ok(rows.into_iter().map(Into::into).collect())
} }
@@ -452,7 +428,7 @@ impl DuplicateRepository for PostgresDuplicateRepository {
.bind(serde_json::json!([{"asset_id": asset_id.as_uuid()}])) .bind(serde_json::json!([{"asset_id": asset_id.as_uuid()}]))
.fetch_all(&self.pool) .fetch_all(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(rows.into_iter().map(Into::into).collect()) Ok(rows.into_iter().map(Into::into).collect())
} }
@@ -472,7 +448,7 @@ impl DuplicateRepository for PostgresDuplicateRepository {
.bind(candidates_to_json(&group.candidates)) .bind(candidates_to_json(&group.candidates))
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(()) Ok(())
} }
} }

View File

@@ -0,0 +1,36 @@
use domain::errors::DomainError;
/// Extension trait for converting `sqlx::Error` into `DomainError`.
pub trait MapDomainError<T> {
fn map_pg(self) -> Result<T, DomainError>;
}
impl<T> MapDomainError<T> for Result<T, sqlx::Error> {
fn map_pg(self) -> Result<T, DomainError> {
self.map_err(|e| DomainError::Internal(e.to_string()))
}
}
/// Generates a Postgres repository struct with a `PgPool` field and a `new` constructor.
///
/// ```ignore
/// pg_repo!(PostgresFooRepository);
/// // expands to:
/// // pub struct PostgresFooRepository { pool: PgPool }
/// // impl PostgresFooRepository { pub fn new(pool: PgPool) -> Self { Self { pool } } }
/// ```
macro_rules! pg_repo {
($name:ident) => {
pub struct $name {
pool: crate::db::PgPool,
}
impl $name {
pub fn new(pool: crate::db::PgPool) -> Self {
Self { pool }
}
}
};
}
pub(crate) use pg_repo;

View File

@@ -1,4 +1,4 @@
use crate::db::PgPool; use crate::helpers::{pg_repo, MapDomainError};
use async_trait::async_trait; use async_trait::async_trait;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use domain::{ use domain::{
@@ -30,15 +30,7 @@ impl TryFrom<UserRow> for domain::entities::User {
} }
} }
pub struct PostgresUserRepository { pg_repo!(PostgresUserRepository);
pool: PgPool,
}
impl PostgresUserRepository {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
}
#[async_trait] #[async_trait]
impl UserRepository for PostgresUserRepository { impl UserRepository for PostgresUserRepository {
@@ -52,7 +44,7 @@ impl UserRepository for PostgresUserRepository {
.bind(*id.as_uuid()) .bind(*id.as_uuid())
.fetch_optional(&self.pool) .fetch_optional(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
row.map(TryInto::try_into).transpose() row.map(TryInto::try_into).transpose()
} }
@@ -67,7 +59,7 @@ impl UserRepository for PostgresUserRepository {
.bind(email.as_str()) .bind(email.as_str())
.fetch_optional(&self.pool) .fetch_optional(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
row.map(TryInto::try_into).transpose() row.map(TryInto::try_into).transpose()
} }
@@ -82,7 +74,7 @@ impl UserRepository for PostgresUserRepository {
.bind(username) .bind(username)
.fetch_optional(&self.pool) .fetch_optional(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
row.map(TryInto::try_into).transpose() row.map(TryInto::try_into).transpose()
} }
@@ -104,7 +96,7 @@ impl UserRepository for PostgresUserRepository {
.bind(user.created_at) .bind(user.created_at)
.fetch_one(&self.pool) .fetch_one(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(()) Ok(())
} }
@@ -113,7 +105,7 @@ impl UserRepository for PostgresUserRepository {
.bind(*id.as_uuid()) .bind(*id.as_uuid())
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(()) Ok(())
} }
} }

View File

@@ -1,4 +1,5 @@
pub mod db; pub mod db;
mod helpers;
pub mod catalog; pub mod catalog;
pub mod identity; pub mod identity;

View File

@@ -1,4 +1,4 @@
use crate::db::PgPool; use crate::helpers::{pg_repo, MapDomainError};
use async_trait::async_trait; use async_trait::async_trait;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use domain::{ use domain::{
@@ -60,15 +60,9 @@ fn album_from_row(r: AlbumRow, entries: Vec<AlbumEntry>) -> Album {
} }
} }
pub struct PostgresAlbumRepository { pg_repo!(PostgresAlbumRepository);
pool: PgPool,
}
impl PostgresAlbumRepository { impl PostgresAlbumRepository {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
async fn load_entries(&self, album_id: &Uuid) -> Result<Vec<AlbumEntry>, DomainError> { async fn load_entries(&self, album_id: &Uuid) -> Result<Vec<AlbumEntry>, DomainError> {
let rows = sqlx::query_as::<_, AlbumEntryRow>( let rows = sqlx::query_as::<_, AlbumEntryRow>(
"SELECT album_id, asset_id, sort_order, added_at, added_by_user_id "SELECT album_id, asset_id, sort_order, added_at, added_by_user_id
@@ -78,7 +72,7 @@ impl PostgresAlbumRepository {
.bind(album_id) .bind(album_id)
.fetch_all(&self.pool) .fetch_all(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(rows.into_iter().map(Into::into).collect()) Ok(rows.into_iter().map(Into::into).collect())
} }
@@ -95,7 +89,7 @@ impl AlbumRepository for PostgresAlbumRepository {
.bind(*id.as_uuid()) .bind(*id.as_uuid())
.fetch_optional(&self.pool) .fetch_optional(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
let Some(r) = row else { let Some(r) = row else {
return Ok(None); return Ok(None);
@@ -114,7 +108,7 @@ impl AlbumRepository for PostgresAlbumRepository {
.bind(*creator_id.as_uuid()) .bind(*creator_id.as_uuid())
.fetch_all(&self.pool) .fetch_all(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
let mut albums = Vec::with_capacity(rows.len()); let mut albums = Vec::with_capacity(rows.len());
for r in rows { for r in rows {
@@ -146,14 +140,14 @@ impl AlbumRepository for PostgresAlbumRepository {
.bind(album.created_at.as_datetime()) .bind(album.created_at.as_datetime())
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
// Sync entries: delete all then re-insert // Sync entries: delete all then re-insert
sqlx::query("DELETE FROM album_entries WHERE album_id = $1") sqlx::query("DELETE FROM album_entries WHERE album_id = $1")
.bind(*album.album_id.as_uuid()) .bind(*album.album_id.as_uuid())
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
for entry in &album.entries { for entry in &album.entries {
sqlx::query( sqlx::query(
@@ -167,7 +161,7 @@ impl AlbumRepository for PostgresAlbumRepository {
.bind(*entry.added_by_user_id.as_uuid()) .bind(*entry.added_by_user_id.as_uuid())
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
} }
Ok(()) Ok(())
@@ -179,7 +173,7 @@ impl AlbumRepository for PostgresAlbumRepository {
.bind(*id.as_uuid()) .bind(*id.as_uuid())
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(()) Ok(())
} }
} }
@@ -240,15 +234,7 @@ impl From<AssetTagRow> for AssetTag {
} }
} }
pub struct PostgresTagRepository { pg_repo!(PostgresTagRepository);
pool: PgPool,
}
impl PostgresTagRepository {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
}
#[async_trait] #[async_trait]
impl TagRepository for PostgresTagRepository { impl TagRepository for PostgresTagRepository {
@@ -259,7 +245,7 @@ impl TagRepository for PostgresTagRepository {
.bind(*id.as_uuid()) .bind(*id.as_uuid())
.fetch_optional(&self.pool) .fetch_optional(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(row.map(Into::into)) Ok(row.map(Into::into))
} }
@@ -271,7 +257,7 @@ impl TagRepository for PostgresTagRepository {
.bind(name) .bind(name)
.fetch_optional(&self.pool) .fetch_optional(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(row.map(Into::into)) Ok(row.map(Into::into))
} }
@@ -288,7 +274,7 @@ impl TagRepository for PostgresTagRepository {
.bind(*asset_id.as_uuid()) .bind(*asset_id.as_uuid())
.fetch_all(&self.pool) .fetch_all(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
let at_rows = sqlx::query_as::<_, AssetTagRow>( let at_rows = sqlx::query_as::<_, AssetTagRow>(
"SELECT asset_id, tag_id, tagged_by_user_id, confidence "SELECT asset_id, tag_id, tagged_by_user_id, confidence
@@ -297,7 +283,7 @@ impl TagRepository for PostgresTagRepository {
.bind(*asset_id.as_uuid()) .bind(*asset_id.as_uuid())
.fetch_all(&self.pool) .fetch_all(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
let tags: Vec<Tag> = rows.into_iter().map(Into::into).collect(); let tags: Vec<Tag> = rows.into_iter().map(Into::into).collect();
let asset_tags: Vec<AssetTag> = at_rows.into_iter().map(Into::into).collect(); let asset_tags: Vec<AssetTag> = at_rows.into_iter().map(Into::into).collect();
@@ -316,7 +302,7 @@ impl TagRepository for PostgresTagRepository {
.bind(tag_source_to_str(&tag.tag_source)) .bind(tag_source_to_str(&tag.tag_source))
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(()) Ok(())
} }
@@ -334,7 +320,7 @@ impl TagRepository for PostgresTagRepository {
.bind(asset_tag.confidence) .bind(asset_tag.confidence)
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(()) Ok(())
} }
@@ -348,7 +334,7 @@ impl TagRepository for PostgresTagRepository {
.bind(*tag_id.as_uuid()) .bind(*tag_id.as_uuid())
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(()) Ok(())
} }
} }

View File

@@ -1,4 +1,4 @@
use crate::db::PgPool; use crate::helpers::{pg_repo, MapDomainError};
use async_trait::async_trait; use async_trait::async_trait;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use domain::{ use domain::{
@@ -119,15 +119,7 @@ impl From<JobRow> for Job {
} }
} }
pub struct PostgresJobRepository { pg_repo!(PostgresJobRepository);
pool: PgPool,
}
impl PostgresJobRepository {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
}
#[async_trait] #[async_trait]
impl JobRepository for PostgresJobRepository { impl JobRepository for PostgresJobRepository {
@@ -141,7 +133,7 @@ impl JobRepository for PostgresJobRepository {
.bind(*id.as_uuid()) .bind(*id.as_uuid())
.fetch_optional(&self.pool) .fetch_optional(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(row.map(Into::into)) Ok(row.map(Into::into))
} }
@@ -157,7 +149,7 @@ impl JobRepository for PostgresJobRepository {
) )
.fetch_optional(&self.pool) .fetch_optional(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(row.map(Into::into)) Ok(row.map(Into::into))
} }
@@ -173,7 +165,7 @@ impl JobRepository for PostgresJobRepository {
.bind(*batch_id.as_uuid()) .bind(*batch_id.as_uuid())
.fetch_all(&self.pool) .fetch_all(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(rows.into_iter().map(Into::into).collect()) Ok(rows.into_iter().map(Into::into).collect())
} }
@@ -210,7 +202,7 @@ impl JobRepository for PostgresJobRepository {
.bind(&job.error_message) .bind(&job.error_message)
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(()) Ok(())
} }
} }
@@ -261,15 +253,7 @@ impl From<BatchRow> for JobBatch {
} }
} }
pub struct PostgresJobBatchRepository { pg_repo!(PostgresJobBatchRepository);
pool: PgPool,
}
impl PostgresJobBatchRepository {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
}
#[async_trait] #[async_trait]
impl JobBatchRepository for PostgresJobBatchRepository { impl JobBatchRepository for PostgresJobBatchRepository {
@@ -281,7 +265,7 @@ impl JobBatchRepository for PostgresJobBatchRepository {
.bind(*id.as_uuid()) .bind(*id.as_uuid())
.fetch_optional(&self.pool) .fetch_optional(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(row.map(Into::into)) Ok(row.map(Into::into))
} }
@@ -304,7 +288,7 @@ impl JobBatchRepository for PostgresJobBatchRepository {
.bind(batch_status_to_str(&batch.status)) .bind(batch_status_to_str(&batch.status))
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(()) Ok(())
} }
} }
@@ -351,15 +335,7 @@ impl From<PluginRow> for Plugin {
} }
} }
pub struct PostgresPluginRepository { pg_repo!(PostgresPluginRepository);
pool: PgPool,
}
impl PostgresPluginRepository {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
}
#[async_trait] #[async_trait]
impl PluginRepository for PostgresPluginRepository { impl PluginRepository for PostgresPluginRepository {
@@ -371,7 +347,7 @@ impl PluginRepository for PostgresPluginRepository {
.bind(*id.as_uuid()) .bind(*id.as_uuid())
.fetch_optional(&self.pool) .fetch_optional(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(row.map(Into::into)) Ok(row.map(Into::into))
} }
@@ -383,7 +359,7 @@ impl PluginRepository for PostgresPluginRepository {
) )
.fetch_all(&self.pool) .fetch_all(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(rows.into_iter().map(Into::into).collect()) Ok(rows.into_iter().map(Into::into).collect())
} }
@@ -405,7 +381,7 @@ impl PluginRepository for PostgresPluginRepository {
.bind(structured_to_json(&plugin.configuration)) .bind(structured_to_json(&plugin.configuration))
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(()) Ok(())
} }
} }
@@ -475,15 +451,7 @@ impl From<PipelineRow> for ProcessingPipeline {
} }
} }
pub struct PostgresPipelineRepository { pg_repo!(PostgresPipelineRepository);
pool: PgPool,
}
impl PostgresPipelineRepository {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
}
#[async_trait] #[async_trait]
impl PipelineRepository for PostgresPipelineRepository { impl PipelineRepository for PostgresPipelineRepository {
@@ -495,7 +463,7 @@ impl PipelineRepository for PostgresPipelineRepository {
.bind(*id.as_uuid()) .bind(*id.as_uuid())
.fetch_optional(&self.pool) .fetch_optional(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(row.map(Into::into)) Ok(row.map(Into::into))
} }
@@ -508,7 +476,7 @@ impl PipelineRepository for PostgresPipelineRepository {
.bind(event) .bind(event)
.fetch_all(&self.pool) .fetch_all(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(rows.into_iter().map(Into::into).collect()) Ok(rows.into_iter().map(Into::into).collect())
} }
@@ -526,7 +494,7 @@ impl PipelineRepository for PostgresPipelineRepository {
.bind(steps_to_json(&pipeline.steps)) .bind(steps_to_json(&pipeline.steps))
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(()) Ok(())
} }
} }

View File

@@ -1,4 +1,4 @@
use crate::db::PgPool; use crate::helpers::{pg_repo, MapDomainError};
use async_trait::async_trait; use async_trait::async_trait;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use domain::{ use domain::{
@@ -207,15 +207,7 @@ impl TryFrom<InviteCodeRow> for InviteCode {
} }
} }
pub struct PostgresShareRepository { pg_repo!(PostgresShareRepository);
pool: PgPool,
}
impl PostgresShareRepository {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
}
#[async_trait] #[async_trait]
impl ShareRepository for PostgresShareRepository { impl ShareRepository for PostgresShareRepository {
@@ -238,7 +230,7 @@ impl ShareRepository for PostgresShareRepository {
.bind(scope.created_at.as_datetime()) .bind(scope.created_at.as_datetime())
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(()) Ok(())
} }
@@ -250,7 +242,7 @@ impl ShareRepository for PostgresShareRepository {
.bind(*id.as_uuid()) .bind(*id.as_uuid())
.fetch_optional(&self.pool) .fetch_optional(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
row.map(TryInto::try_into).transpose() row.map(TryInto::try_into).transpose()
} }
@@ -266,7 +258,7 @@ impl ShareRepository for PostgresShareRepository {
.bind(*resource_id.as_uuid()) .bind(*resource_id.as_uuid())
.fetch_all(&self.pool) .fetch_all(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
rows.into_iter().map(TryInto::try_into).collect() rows.into_iter().map(TryInto::try_into).collect()
} }
@@ -276,7 +268,7 @@ impl ShareRepository for PostgresShareRepository {
.bind(*id.as_uuid()) .bind(*id.as_uuid())
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(()) Ok(())
} }
@@ -296,7 +288,7 @@ impl ShareRepository for PostgresShareRepository {
.bind(*target.role_id.as_uuid()) .bind(*target.role_id.as_uuid())
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(()) Ok(())
} }
@@ -311,7 +303,7 @@ impl ShareRepository for PostgresShareRepository {
.bind(*scope_id.as_uuid()) .bind(*scope_id.as_uuid())
.fetch_all(&self.pool) .fetch_all(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
rows.into_iter().map(TryInto::try_into).collect() rows.into_iter().map(TryInto::try_into).collect()
} }
@@ -327,7 +319,7 @@ impl ShareRepository for PostgresShareRepository {
.bind(*user_id.as_uuid()) .bind(*user_id.as_uuid())
.fetch_all(&self.pool) .fetch_all(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
rows.into_iter().map(TryInto::try_into).collect() rows.into_iter().map(TryInto::try_into).collect()
} }
@@ -355,7 +347,7 @@ impl ShareRepository for PostgresShareRepository {
.bind(link.use_count as i32) .bind(link.use_count as i32)
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(()) Ok(())
} }
@@ -367,7 +359,7 @@ impl ShareRepository for PostgresShareRepository {
.bind(token) .bind(token)
.fetch_optional(&self.pool) .fetch_optional(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
row.map(TryInto::try_into).transpose() row.map(TryInto::try_into).transpose()
} }
@@ -392,7 +384,7 @@ impl ShareRepository for PostgresShareRepository {
.bind(*invite.assigned_role_id.as_uuid()) .bind(*invite.assigned_role_id.as_uuid())
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(()) Ok(())
} }
@@ -404,7 +396,7 @@ impl ShareRepository for PostgresShareRepository {
.bind(*id.as_uuid()) .bind(*id.as_uuid())
.fetch_optional(&self.pool) .fetch_optional(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
row.map(TryInto::try_into).transpose() row.map(TryInto::try_into).transpose()
} }
@@ -433,15 +425,7 @@ impl From<VisibilityFilterRow> for VisibilityFilter {
} }
} }
pub struct PostgresVisibilityFilterRepository { pg_repo!(PostgresVisibilityFilterRepository);
pool: PgPool,
}
impl PostgresVisibilityFilterRepository {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
}
#[async_trait] #[async_trait]
impl VisibilityFilterRepository for PostgresVisibilityFilterRepository { impl VisibilityFilterRepository for PostgresVisibilityFilterRepository {
@@ -458,7 +442,7 @@ impl VisibilityFilterRepository for PostgresVisibilityFilterRepository {
.bind(*role_id.as_uuid()) .bind(*role_id.as_uuid())
.fetch_optional(&self.pool) .fetch_optional(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(row.map(Into::into)) Ok(row.map(Into::into))
} }
@@ -476,7 +460,7 @@ impl VisibilityFilterRepository for PostgresVisibilityFilterRepository {
.bind(&filter.hidden_fields) .bind(&filter.hidden_fields)
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(()) Ok(())
} }
@@ -485,7 +469,7 @@ impl VisibilityFilterRepository for PostgresVisibilityFilterRepository {
.bind(*id.as_uuid()) .bind(*id.as_uuid())
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(()) Ok(())
} }
} }

View File

@@ -1,4 +1,4 @@
use crate::db::PgPool; use crate::helpers::{pg_repo, MapDomainError};
use async_trait::async_trait; use async_trait::async_trait;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use domain::{ use domain::{
@@ -55,15 +55,7 @@ impl TryFrom<SidecarRow> for SidecarRecord {
} }
} }
pub struct PostgresSidecarRepository { pg_repo!(PostgresSidecarRepository);
pool: PgPool,
}
impl PostgresSidecarRepository {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
}
#[async_trait] #[async_trait]
impl SidecarRepository for PostgresSidecarRepository { impl SidecarRepository for PostgresSidecarRepository {
@@ -79,7 +71,7 @@ impl SidecarRepository for PostgresSidecarRepository {
.bind(*asset_id.as_uuid()) .bind(*asset_id.as_uuid())
.fetch_optional(&self.pool) .fetch_optional(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
row.map(TryInto::try_into).transpose() row.map(TryInto::try_into).transpose()
} }
@@ -93,7 +85,7 @@ impl SidecarRepository for PostgresSidecarRepository {
.bind(sync_status_to_str(&status)) .bind(sync_status_to_str(&status))
.fetch_all(&self.pool) .fetch_all(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
rows.into_iter().map(TryInto::try_into).collect() rows.into_iter().map(TryInto::try_into).collect()
} }
@@ -118,7 +110,7 @@ impl SidecarRepository for PostgresSidecarRepository {
.bind(&record.error_message) .bind(&record.error_message)
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(()) Ok(())
} }
@@ -127,7 +119,7 @@ impl SidecarRepository for PostgresSidecarRepository {
.bind(*asset_id.as_uuid()) .bind(*asset_id.as_uuid())
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(()) Ok(())
} }
} }

View File

@@ -1,4 +1,4 @@
use crate::db::PgPool; use crate::helpers::{pg_repo, MapDomainError};
use async_trait::async_trait; use async_trait::async_trait;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use domain::{ use domain::{
@@ -40,15 +40,7 @@ impl From<StorageVolumeRow> for StorageVolume {
} }
} }
pub struct PostgresStorageVolumeRepository { pg_repo!(PostgresStorageVolumeRepository);
pool: PgPool,
}
impl PostgresStorageVolumeRepository {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
}
#[async_trait] #[async_trait]
impl StorageVolumeRepository for PostgresStorageVolumeRepository { impl StorageVolumeRepository for PostgresStorageVolumeRepository {
@@ -60,7 +52,7 @@ impl StorageVolumeRepository for PostgresStorageVolumeRepository {
.bind(*id.as_uuid()) .bind(*id.as_uuid())
.fetch_optional(&self.pool) .fetch_optional(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(row.map(Into::into)) Ok(row.map(Into::into))
} }
@@ -72,7 +64,7 @@ impl StorageVolumeRepository for PostgresStorageVolumeRepository {
) )
.fetch_all(&self.pool) .fetch_all(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(rows.into_iter().map(Into::into).collect()) Ok(rows.into_iter().map(Into::into).collect())
} }
@@ -94,7 +86,7 @@ impl StorageVolumeRepository for PostgresStorageVolumeRepository {
.bind(volume.available_bytes as i64) .bind(volume.available_bytes as i64)
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(()) Ok(())
} }
@@ -103,7 +95,7 @@ impl StorageVolumeRepository for PostgresStorageVolumeRepository {
.bind(*id.as_uuid()) .bind(*id.as_uuid())
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(()) Ok(())
} }
} }
@@ -151,15 +143,7 @@ impl From<LibraryPathRow> for LibraryPath {
} }
} }
pub struct PostgresLibraryPathRepository { pg_repo!(PostgresLibraryPathRepository);
pool: PgPool,
}
impl PostgresLibraryPathRepository {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
}
#[async_trait] #[async_trait]
impl LibraryPathRepository for PostgresLibraryPathRepository { impl LibraryPathRepository for PostgresLibraryPathRepository {
@@ -171,7 +155,7 @@ impl LibraryPathRepository for PostgresLibraryPathRepository {
.bind(*id.as_uuid()) .bind(*id.as_uuid())
.fetch_optional(&self.pool) .fetch_optional(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(row.map(Into::into)) Ok(row.map(Into::into))
} }
@@ -184,7 +168,7 @@ impl LibraryPathRepository for PostgresLibraryPathRepository {
.bind(*volume_id.as_uuid()) .bind(*volume_id.as_uuid())
.fetch_all(&self.pool) .fetch_all(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(rows.into_iter().map(Into::into).collect()) Ok(rows.into_iter().map(Into::into).collect())
} }
@@ -201,7 +185,7 @@ impl LibraryPathRepository for PostgresLibraryPathRepository {
.bind(*owner_id.as_uuid()) .bind(*owner_id.as_uuid())
.fetch_all(&self.pool) .fetch_all(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(rows.into_iter().map(Into::into).collect()) Ok(rows.into_iter().map(Into::into).collect())
} }
@@ -225,7 +209,7 @@ impl LibraryPathRepository for PostgresLibraryPathRepository {
.bind(path.designated_owner_id.as_ref().map(|id| *id.as_uuid())) .bind(path.designated_owner_id.as_ref().map(|id| *id.as_uuid()))
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(()) Ok(())
} }
@@ -234,7 +218,7 @@ impl LibraryPathRepository for PostgresLibraryPathRepository {
.bind(*id.as_uuid()) .bind(*id.as_uuid())
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(()) Ok(())
} }
} }
@@ -294,15 +278,7 @@ impl TryFrom<IngestSessionRow> for IngestSession {
} }
} }
pub struct PostgresIngestSessionRepository { pg_repo!(PostgresIngestSessionRepository);
pool: PgPool,
}
impl PostgresIngestSessionRepository {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
}
#[async_trait] #[async_trait]
impl IngestSessionRepository for PostgresIngestSessionRepository { impl IngestSessionRepository for PostgresIngestSessionRepository {
@@ -315,7 +291,7 @@ impl IngestSessionRepository for PostgresIngestSessionRepository {
.bind(*id.as_uuid()) .bind(*id.as_uuid())
.fetch_optional(&self.pool) .fetch_optional(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
row.map(TryInto::try_into).transpose() row.map(TryInto::try_into).transpose()
} }
@@ -329,7 +305,7 @@ impl IngestSessionRepository for PostgresIngestSessionRepository {
.bind(*user_id.as_uuid()) .bind(*user_id.as_uuid())
.fetch_all(&self.pool) .fetch_all(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
rows.into_iter().map(TryInto::try_into).collect() rows.into_iter().map(TryInto::try_into).collect()
} }
@@ -354,7 +330,7 @@ impl IngestSessionRepository for PostgresIngestSessionRepository {
.bind(session.error_message.as_deref()) .bind(session.error_message.as_deref())
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(()) Ok(())
} }
} }
@@ -457,15 +433,7 @@ impl From<UsageLedgerRow> for UsageLedgerEntry {
} }
} }
pub struct PostgresQuotaRepository { pg_repo!(PostgresQuotaRepository);
pool: PgPool,
}
impl PostgresQuotaRepository {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
}
#[async_trait] #[async_trait]
impl QuotaRepository for PostgresQuotaRepository { impl QuotaRepository for PostgresQuotaRepository {
@@ -479,7 +447,7 @@ impl QuotaRepository for PostgresQuotaRepository {
.bind(*owner_id.as_uuid()) .bind(*owner_id.as_uuid())
.fetch_optional(&self.pool) .fetch_optional(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
let Some(def) = def_row else { let Some(def) = def_row else {
return Ok(None); return Ok(None);
@@ -492,7 +460,7 @@ impl QuotaRepository for PostgresQuotaRepository {
.bind(def.quota_id) .bind(def.quota_id)
.fetch_all(&self.pool) .fetch_all(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(Some(QuotaDefinition { Ok(Some(QuotaDefinition {
quota_id: SystemId::from_uuid(def.quota_id), quota_id: SystemId::from_uuid(def.quota_id),
@@ -515,14 +483,14 @@ impl QuotaRepository for PostgresQuotaRepository {
.bind(quota.is_enforced) .bind(quota.is_enforced)
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
// Delete old rules then re-insert // Delete old rules then re-insert
sqlx::query("DELETE FROM quota_rules WHERE quota_id = $1") sqlx::query("DELETE FROM quota_rules WHERE quota_id = $1")
.bind(*quota.quota_id.as_uuid()) .bind(*quota.quota_id.as_uuid())
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
for rule in &quota.rules { for rule in &quota.rules {
sqlx::query( sqlx::query(
@@ -537,7 +505,7 @@ impl QuotaRepository for PostgresQuotaRepository {
.bind(rule.is_unlimited) .bind(rule.is_unlimited)
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
} }
Ok(()) Ok(())
@@ -549,20 +517,12 @@ impl QuotaRepository for PostgresQuotaRepository {
.bind(*id.as_uuid()) .bind(*id.as_uuid())
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(()) Ok(())
} }
} }
pub struct PostgresUsageLedgerRepository { pg_repo!(PostgresUsageLedgerRepository);
pool: PgPool,
}
impl PostgresUsageLedgerRepository {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
}
#[async_trait] #[async_trait]
impl UsageLedgerRepository for PostgresUsageLedgerRepository { impl UsageLedgerRepository for PostgresUsageLedgerRepository {
@@ -579,7 +539,7 @@ impl UsageLedgerRepository for PostgresUsageLedgerRepository {
.bind(&entry.context) .bind(&entry.context)
.execute(&self.pool) .execute(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(()) Ok(())
} }
@@ -600,7 +560,7 @@ impl UsageLedgerRepository for PostgresUsageLedgerRepository {
.bind(since_dt) .bind(since_dt)
.fetch_one(&self.pool) .fetch_one(&self.pool)
.await .await
.map_err(|e| DomainError::Internal(e.to_string()))?; .map_pg()?;
Ok(row.total as u64) Ok(row.total as u64)
} }