feat: add postgres migrations and repository adapters for vertical slice
This commit is contained in:
181
crates/adapters/postgres/src/album_repository.rs
Normal file
181
crates/adapters/postgres/src/album_repository.rs
Normal file
@@ -0,0 +1,181 @@
|
||||
use crate::db::PgPool;
|
||||
use async_trait::async_trait;
|
||||
use chrono::{DateTime, Utc};
|
||||
use domain::{
|
||||
entities::{Album, AlbumEntry},
|
||||
errors::DomainError,
|
||||
ports::AlbumRepository,
|
||||
value_objects::{DateTimeStamp, SystemId},
|
||||
};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct AlbumRow {
|
||||
album_id: Uuid,
|
||||
title: String,
|
||||
description: String,
|
||||
creator_user_id: Uuid,
|
||||
cover_asset_id: Option<Uuid>,
|
||||
start_date: Option<DateTime<Utc>>,
|
||||
end_date: Option<DateTime<Utc>>,
|
||||
created_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
#[allow(dead_code)]
|
||||
struct AlbumEntryRow {
|
||||
album_id: Uuid,
|
||||
asset_id: Uuid,
|
||||
sort_order: i32,
|
||||
added_at: DateTime<Utc>,
|
||||
added_by_user_id: Uuid,
|
||||
}
|
||||
|
||||
impl From<AlbumEntryRow> for AlbumEntry {
|
||||
fn from(r: AlbumEntryRow) -> Self {
|
||||
Self {
|
||||
asset_id: SystemId::from_uuid(r.asset_id),
|
||||
sort_order: r.sort_order as u32,
|
||||
added_at: DateTimeStamp::from_datetime(r.added_at),
|
||||
added_by_user_id: SystemId::from_uuid(r.added_by_user_id),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn album_from_row(r: AlbumRow, entries: Vec<AlbumEntry>) -> Album {
|
||||
Album {
|
||||
album_id: SystemId::from_uuid(r.album_id),
|
||||
title: r.title,
|
||||
description: r.description,
|
||||
creator_user_id: SystemId::from_uuid(r.creator_user_id),
|
||||
cover_asset_id: r.cover_asset_id.map(SystemId::from_uuid),
|
||||
start_date: r.start_date.map(DateTimeStamp::from_datetime),
|
||||
end_date: r.end_date.map(DateTimeStamp::from_datetime),
|
||||
entries,
|
||||
created_at: DateTimeStamp::from_datetime(r.created_at),
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PostgresAlbumRepository {
|
||||
pool: PgPool,
|
||||
}
|
||||
|
||||
impl PostgresAlbumRepository {
|
||||
pub fn new(pool: PgPool) -> Self {
|
||||
Self { pool }
|
||||
}
|
||||
|
||||
async fn load_entries(&self, album_id: &Uuid) -> Result<Vec<AlbumEntry>, DomainError> {
|
||||
let rows = sqlx::query_as::<_, AlbumEntryRow>(
|
||||
"SELECT album_id, asset_id, sort_order, added_at, added_by_user_id
|
||||
FROM album_entries WHERE album_id = $1
|
||||
ORDER BY sort_order ASC",
|
||||
)
|
||||
.bind(album_id)
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
|
||||
Ok(rows.into_iter().map(Into::into).collect())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl AlbumRepository for PostgresAlbumRepository {
|
||||
async fn find_by_id(&self, id: &SystemId) -> Result<Option<Album>, DomainError> {
|
||||
let row = sqlx::query_as::<_, AlbumRow>(
|
||||
"SELECT album_id, title, description, creator_user_id, cover_asset_id,
|
||||
start_date, end_date, created_at
|
||||
FROM albums WHERE album_id = $1",
|
||||
)
|
||||
.bind(*id.as_uuid())
|
||||
.fetch_optional(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
|
||||
let Some(r) = row else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let entries = self.load_entries(&r.album_id).await?;
|
||||
Ok(Some(album_from_row(r, entries)))
|
||||
}
|
||||
|
||||
async fn find_by_creator(&self, creator_id: &SystemId) -> Result<Vec<Album>, DomainError> {
|
||||
let rows = sqlx::query_as::<_, AlbumRow>(
|
||||
"SELECT album_id, title, description, creator_user_id, cover_asset_id,
|
||||
start_date, end_date, created_at
|
||||
FROM albums WHERE creator_user_id = $1",
|
||||
)
|
||||
.bind(*creator_id.as_uuid())
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
|
||||
let mut albums = Vec::with_capacity(rows.len());
|
||||
for r in rows {
|
||||
let entries = self.load_entries(&r.album_id).await?;
|
||||
albums.push(album_from_row(r, entries));
|
||||
}
|
||||
Ok(albums)
|
||||
}
|
||||
|
||||
async fn save(&self, album: &Album) -> Result<(), DomainError> {
|
||||
sqlx::query(
|
||||
"INSERT INTO albums (album_id, title, description, creator_user_id, cover_asset_id,
|
||||
start_date, end_date, created_at)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||
ON CONFLICT (album_id) DO UPDATE SET
|
||||
title = EXCLUDED.title,
|
||||
description = EXCLUDED.description,
|
||||
cover_asset_id = EXCLUDED.cover_asset_id,
|
||||
start_date = EXCLUDED.start_date,
|
||||
end_date = EXCLUDED.end_date",
|
||||
)
|
||||
.bind(*album.album_id.as_uuid())
|
||||
.bind(&album.title)
|
||||
.bind(&album.description)
|
||||
.bind(*album.creator_user_id.as_uuid())
|
||||
.bind(album.cover_asset_id.as_ref().map(|id| *id.as_uuid()))
|
||||
.bind(album.start_date.as_ref().map(|d| d.as_datetime()).copied())
|
||||
.bind(album.end_date.as_ref().map(|d| d.as_datetime()).copied())
|
||||
.bind(album.created_at.as_datetime())
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
|
||||
// Sync entries: delete all then re-insert
|
||||
sqlx::query("DELETE FROM album_entries WHERE album_id = $1")
|
||||
.bind(*album.album_id.as_uuid())
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
|
||||
for entry in &album.entries {
|
||||
sqlx::query(
|
||||
"INSERT INTO album_entries (album_id, asset_id, sort_order, added_at, added_by_user_id)
|
||||
VALUES ($1, $2, $3, $4, $5)",
|
||||
)
|
||||
.bind(*album.album_id.as_uuid())
|
||||
.bind(*entry.asset_id.as_uuid())
|
||||
.bind(entry.sort_order as i32)
|
||||
.bind(entry.added_at.as_datetime())
|
||||
.bind(*entry.added_by_user_id.as_uuid())
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn delete(&self, id: &SystemId) -> Result<(), DomainError> {
|
||||
// Entries cascade-delete
|
||||
sqlx::query("DELETE FROM albums WHERE album_id = $1")
|
||||
.bind(*id.as_uuid())
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
166
crates/adapters/postgres/src/asset_metadata_repository.rs
Normal file
166
crates/adapters/postgres/src/asset_metadata_repository.rs
Normal file
@@ -0,0 +1,166 @@
|
||||
use crate::db::PgPool;
|
||||
use async_trait::async_trait;
|
||||
use chrono::{DateTime, Utc};
|
||||
use domain::{
|
||||
entities::{AssetMetadata, MetadataSource},
|
||||
errors::DomainError,
|
||||
ports::AssetMetadataRepository,
|
||||
value_objects::{DateTimeStamp, MetadataValue, StructuredData, SystemId},
|
||||
};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct AssetMetadataRow {
|
||||
asset_id: Uuid,
|
||||
metadata_source: String,
|
||||
data: serde_json::Value,
|
||||
updated_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
fn source_from_str(s: &str) -> MetadataSource {
|
||||
match s {
|
||||
"exif_extracted" => MetadataSource::ExifExtracted,
|
||||
"ai_generated" => MetadataSource::AiGenerated,
|
||||
"user_edited" => MetadataSource::UserEdited,
|
||||
_ => MetadataSource::ExifExtracted,
|
||||
}
|
||||
}
|
||||
|
||||
fn source_to_str(s: &MetadataSource) -> &'static str {
|
||||
match s {
|
||||
MetadataSource::ExifExtracted => "exif_extracted",
|
||||
MetadataSource::AiGenerated => "ai_generated",
|
||||
MetadataSource::UserEdited => "user_edited",
|
||||
}
|
||||
}
|
||||
|
||||
fn json_to_structured_data(v: serde_json::Value) -> StructuredData {
|
||||
let mut sd = StructuredData::new();
|
||||
if let serde_json::Value::Object(map) = v {
|
||||
for (key, val) in map {
|
||||
let mv = match val {
|
||||
serde_json::Value::String(s) => MetadataValue::String(s),
|
||||
serde_json::Value::Number(n) => {
|
||||
if let Some(i) = n.as_i64() {
|
||||
MetadataValue::Integer(i)
|
||||
} else if let Some(f) = n.as_f64() {
|
||||
MetadataValue::Float(f)
|
||||
} else {
|
||||
MetadataValue::Null
|
||||
}
|
||||
}
|
||||
serde_json::Value::Bool(b) => MetadataValue::Boolean(b),
|
||||
serde_json::Value::Null => MetadataValue::Null,
|
||||
_ => MetadataValue::String(val.to_string()),
|
||||
};
|
||||
sd.insert(key, mv);
|
||||
}
|
||||
}
|
||||
sd
|
||||
}
|
||||
|
||||
fn structured_data_to_json(sd: &StructuredData) -> serde_json::Value {
|
||||
let mut map = serde_json::Map::new();
|
||||
for (key, val) in sd.inner() {
|
||||
let jv = match val {
|
||||
MetadataValue::String(s) => serde_json::Value::String(s.clone()),
|
||||
MetadataValue::Integer(i) => serde_json::Value::Number((*i).into()),
|
||||
MetadataValue::Float(f) => serde_json::Number::from_f64(*f)
|
||||
.map(serde_json::Value::Number)
|
||||
.unwrap_or(serde_json::Value::Null),
|
||||
MetadataValue::Boolean(b) => serde_json::Value::Bool(*b),
|
||||
MetadataValue::Null => serde_json::Value::Null,
|
||||
};
|
||||
map.insert(key.clone(), jv);
|
||||
}
|
||||
serde_json::Value::Object(map)
|
||||
}
|
||||
|
||||
impl From<AssetMetadataRow> for AssetMetadata {
|
||||
fn from(r: AssetMetadataRow) -> Self {
|
||||
Self {
|
||||
asset_id: SystemId::from_uuid(r.asset_id),
|
||||
metadata_source: source_from_str(&r.metadata_source),
|
||||
data: json_to_structured_data(r.data),
|
||||
updated_at: DateTimeStamp::from_datetime(r.updated_at),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PostgresAssetMetadataRepository {
|
||||
pool: PgPool,
|
||||
}
|
||||
|
||||
impl PostgresAssetMetadataRepository {
|
||||
pub fn new(pool: PgPool) -> Self {
|
||||
Self { pool }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl AssetMetadataRepository for PostgresAssetMetadataRepository {
|
||||
async fn find_by_asset(&self, asset_id: &SystemId) -> Result<Vec<AssetMetadata>, DomainError> {
|
||||
let rows = sqlx::query_as::<_, AssetMetadataRow>(
|
||||
"SELECT asset_id, metadata_source, data, updated_at
|
||||
FROM asset_metadata WHERE asset_id = $1",
|
||||
)
|
||||
.bind(*asset_id.as_uuid())
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
|
||||
Ok(rows.into_iter().map(Into::into).collect())
|
||||
}
|
||||
|
||||
async fn find_by_asset_and_source(
|
||||
&self,
|
||||
asset_id: &SystemId,
|
||||
source: MetadataSource,
|
||||
) -> Result<Option<AssetMetadata>, DomainError> {
|
||||
let row = sqlx::query_as::<_, AssetMetadataRow>(
|
||||
"SELECT asset_id, metadata_source, data, updated_at
|
||||
FROM asset_metadata WHERE asset_id = $1 AND metadata_source = $2",
|
||||
)
|
||||
.bind(*asset_id.as_uuid())
|
||||
.bind(source_to_str(&source))
|
||||
.fetch_optional(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
|
||||
Ok(row.map(Into::into))
|
||||
}
|
||||
|
||||
async fn save(&self, metadata: &AssetMetadata) -> Result<(), DomainError> {
|
||||
sqlx::query(
|
||||
"INSERT INTO asset_metadata (asset_id, metadata_source, data, updated_at)
|
||||
VALUES ($1, $2, $3, $4)
|
||||
ON CONFLICT (asset_id, metadata_source) DO UPDATE SET
|
||||
data = EXCLUDED.data,
|
||||
updated_at = EXCLUDED.updated_at",
|
||||
)
|
||||
.bind(*metadata.asset_id.as_uuid())
|
||||
.bind(source_to_str(&metadata.metadata_source))
|
||||
.bind(structured_data_to_json(&metadata.data))
|
||||
.bind(metadata.updated_at.as_datetime())
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn delete_by_asset_and_source(
|
||||
&self,
|
||||
asset_id: &SystemId,
|
||||
source: MetadataSource,
|
||||
) -> Result<(), DomainError> {
|
||||
sqlx::query(
|
||||
"DELETE FROM asset_metadata WHERE asset_id = $1 AND metadata_source = $2",
|
||||
)
|
||||
.bind(*asset_id.as_uuid())
|
||||
.bind(source_to_str(&source))
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
165
crates/adapters/postgres/src/asset_repository.rs
Normal file
165
crates/adapters/postgres/src/asset_repository.rs
Normal file
@@ -0,0 +1,165 @@
|
||||
use crate::db::PgPool;
|
||||
use async_trait::async_trait;
|
||||
use chrono::{DateTime, Utc};
|
||||
use domain::{
|
||||
entities::{Asset, AssetType, SourceReference},
|
||||
errors::DomainError,
|
||||
ports::AssetRepository,
|
||||
value_objects::{Checksum, DateTimeStamp, SystemId},
|
||||
};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct AssetRow {
|
||||
asset_id: Uuid,
|
||||
volume_id: Uuid,
|
||||
relative_path: String,
|
||||
checksum: String,
|
||||
asset_type: String,
|
||||
mime_type: String,
|
||||
file_size: i64,
|
||||
is_processed: bool,
|
||||
owner_user_id: Uuid,
|
||||
created_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
fn asset_type_from_str(s: &str) -> AssetType {
|
||||
match s {
|
||||
"image" => AssetType::Image,
|
||||
"video" => AssetType::Video,
|
||||
"live_photo" => AssetType::LivePhoto,
|
||||
_ => AssetType::Image,
|
||||
}
|
||||
}
|
||||
|
||||
fn asset_type_to_str(t: &AssetType) -> &'static str {
|
||||
match t {
|
||||
AssetType::Image => "image",
|
||||
AssetType::Video => "video",
|
||||
AssetType::LivePhoto => "live_photo",
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<AssetRow> for Asset {
|
||||
type Error = DomainError;
|
||||
fn try_from(r: AssetRow) -> Result<Self, Self::Error> {
|
||||
Ok(Self {
|
||||
asset_id: SystemId::from_uuid(r.asset_id),
|
||||
source_reference: SourceReference {
|
||||
volume_id: SystemId::from_uuid(r.volume_id),
|
||||
relative_path: r.relative_path,
|
||||
checksum: Checksum::new(r.checksum)?,
|
||||
},
|
||||
asset_type: asset_type_from_str(&r.asset_type),
|
||||
mime_type: r.mime_type,
|
||||
file_size: r.file_size as u64,
|
||||
is_processed: r.is_processed,
|
||||
owner_user_id: SystemId::from_uuid(r.owner_user_id),
|
||||
created_at: DateTimeStamp::from_datetime(r.created_at),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PostgresAssetRepository {
|
||||
pool: PgPool,
|
||||
}
|
||||
|
||||
impl PostgresAssetRepository {
|
||||
pub fn new(pool: PgPool) -> Self {
|
||||
Self { pool }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl AssetRepository for PostgresAssetRepository {
|
||||
async fn find_by_id(&self, id: &SystemId) -> Result<Option<Asset>, DomainError> {
|
||||
let row = sqlx::query_as::<_, AssetRow>(
|
||||
"SELECT asset_id, volume_id, relative_path, checksum, asset_type, mime_type,
|
||||
file_size, is_processed, owner_user_id, created_at
|
||||
FROM assets WHERE asset_id = $1",
|
||||
)
|
||||
.bind(*id.as_uuid())
|
||||
.fetch_optional(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
|
||||
row.map(TryInto::try_into).transpose()
|
||||
}
|
||||
|
||||
async fn find_by_checksum(&self, checksum: &Checksum) -> Result<Vec<Asset>, DomainError> {
|
||||
let rows = sqlx::query_as::<_, AssetRow>(
|
||||
"SELECT asset_id, volume_id, relative_path, checksum, asset_type, mime_type,
|
||||
file_size, is_processed, owner_user_id, created_at
|
||||
FROM assets WHERE checksum = $1",
|
||||
)
|
||||
.bind(checksum.as_str())
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
|
||||
rows.into_iter().map(TryInto::try_into).collect()
|
||||
}
|
||||
|
||||
async fn find_by_owner(
|
||||
&self,
|
||||
owner_id: &SystemId,
|
||||
limit: u32,
|
||||
offset: u32,
|
||||
) -> Result<Vec<Asset>, DomainError> {
|
||||
let rows = sqlx::query_as::<_, AssetRow>(
|
||||
"SELECT asset_id, volume_id, relative_path, checksum, asset_type, mime_type,
|
||||
file_size, is_processed, owner_user_id, created_at
|
||||
FROM assets WHERE owner_user_id = $1
|
||||
ORDER BY created_at DESC
|
||||
LIMIT $2 OFFSET $3",
|
||||
)
|
||||
.bind(*owner_id.as_uuid())
|
||||
.bind(limit as i64)
|
||||
.bind(offset as i64)
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
|
||||
rows.into_iter().map(TryInto::try_into).collect()
|
||||
}
|
||||
|
||||
async fn save(&self, asset: &Asset) -> Result<(), DomainError> {
|
||||
sqlx::query(
|
||||
"INSERT INTO assets (asset_id, volume_id, relative_path, checksum, asset_type,
|
||||
mime_type, file_size, is_processed, owner_user_id, created_at)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
|
||||
ON CONFLICT (asset_id) DO UPDATE SET
|
||||
volume_id = EXCLUDED.volume_id,
|
||||
relative_path = EXCLUDED.relative_path,
|
||||
checksum = EXCLUDED.checksum,
|
||||
asset_type = EXCLUDED.asset_type,
|
||||
mime_type = EXCLUDED.mime_type,
|
||||
file_size = EXCLUDED.file_size,
|
||||
is_processed = EXCLUDED.is_processed,
|
||||
owner_user_id = EXCLUDED.owner_user_id",
|
||||
)
|
||||
.bind(*asset.asset_id.as_uuid())
|
||||
.bind(*asset.source_reference.volume_id.as_uuid())
|
||||
.bind(&asset.source_reference.relative_path)
|
||||
.bind(asset.source_reference.checksum.as_str())
|
||||
.bind(asset_type_to_str(&asset.asset_type))
|
||||
.bind(&asset.mime_type)
|
||||
.bind(asset.file_size as i64)
|
||||
.bind(asset.is_processed)
|
||||
.bind(*asset.owner_user_id.as_uuid())
|
||||
.bind(asset.created_at.as_datetime())
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn delete(&self, id: &SystemId) -> Result<(), DomainError> {
|
||||
sqlx::query("DELETE FROM assets WHERE asset_id = $1")
|
||||
.bind(*id.as_uuid())
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
126
crates/adapters/postgres/src/ingest_session_repository.rs
Normal file
126
crates/adapters/postgres/src/ingest_session_repository.rs
Normal file
@@ -0,0 +1,126 @@
|
||||
use crate::db::PgPool;
|
||||
use async_trait::async_trait;
|
||||
use chrono::{DateTime, Utc};
|
||||
use domain::{
|
||||
entities::{IngestSession, IngestStatus},
|
||||
errors::DomainError,
|
||||
ports::IngestSessionRepository,
|
||||
value_objects::{Checksum, DateTimeStamp, SystemId},
|
||||
};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct IngestSessionRow {
|
||||
session_id: Uuid,
|
||||
uploader_user_id: Uuid,
|
||||
client_device_id: String,
|
||||
original_filename: String,
|
||||
client_checksum: String,
|
||||
target_library_path_id: Uuid,
|
||||
status: String,
|
||||
created_at: DateTime<Utc>,
|
||||
error_message: Option<String>,
|
||||
}
|
||||
|
||||
fn status_from_str(s: &str) -> IngestStatus {
|
||||
match s {
|
||||
"uploading" => IngestStatus::Uploading,
|
||||
"awaiting_processing" => IngestStatus::AwaitingProcessing,
|
||||
"processing" => IngestStatus::Processing,
|
||||
"completed" => IngestStatus::Completed,
|
||||
"failed" => IngestStatus::Failed,
|
||||
_ => IngestStatus::Uploading,
|
||||
}
|
||||
}
|
||||
|
||||
fn status_to_str(s: &IngestStatus) -> &'static str {
|
||||
match s {
|
||||
IngestStatus::Uploading => "uploading",
|
||||
IngestStatus::AwaitingProcessing => "awaiting_processing",
|
||||
IngestStatus::Processing => "processing",
|
||||
IngestStatus::Completed => "completed",
|
||||
IngestStatus::Failed => "failed",
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<IngestSessionRow> for IngestSession {
|
||||
type Error = DomainError;
|
||||
fn try_from(r: IngestSessionRow) -> Result<Self, Self::Error> {
|
||||
Ok(Self {
|
||||
session_id: SystemId::from_uuid(r.session_id),
|
||||
uploader_user_id: SystemId::from_uuid(r.uploader_user_id),
|
||||
client_device_id: r.client_device_id,
|
||||
original_filename: r.original_filename,
|
||||
client_checksum: Checksum::new(r.client_checksum)?,
|
||||
target_library_path_id: SystemId::from_uuid(r.target_library_path_id),
|
||||
status: status_from_str(&r.status),
|
||||
created_at: DateTimeStamp::from_datetime(r.created_at),
|
||||
error_message: r.error_message,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PostgresIngestSessionRepository {
|
||||
pool: PgPool,
|
||||
}
|
||||
|
||||
impl PostgresIngestSessionRepository {
|
||||
pub fn new(pool: PgPool) -> Self {
|
||||
Self { pool }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl IngestSessionRepository for PostgresIngestSessionRepository {
|
||||
async fn find_by_id(&self, id: &SystemId) -> Result<Option<IngestSession>, DomainError> {
|
||||
let row = sqlx::query_as::<_, IngestSessionRow>(
|
||||
"SELECT session_id, uploader_user_id, client_device_id, original_filename,
|
||||
client_checksum, target_library_path_id, status, created_at, error_message
|
||||
FROM ingest_sessions WHERE session_id = $1",
|
||||
)
|
||||
.bind(*id.as_uuid())
|
||||
.fetch_optional(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
|
||||
row.map(TryInto::try_into).transpose()
|
||||
}
|
||||
|
||||
async fn find_by_user(&self, user_id: &SystemId) -> Result<Vec<IngestSession>, DomainError> {
|
||||
let rows = sqlx::query_as::<_, IngestSessionRow>(
|
||||
"SELECT session_id, uploader_user_id, client_device_id, original_filename,
|
||||
client_checksum, target_library_path_id, status, created_at, error_message
|
||||
FROM ingest_sessions WHERE uploader_user_id = $1",
|
||||
)
|
||||
.bind(*user_id.as_uuid())
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
|
||||
rows.into_iter().map(TryInto::try_into).collect()
|
||||
}
|
||||
|
||||
async fn save(&self, session: &IngestSession) -> Result<(), DomainError> {
|
||||
sqlx::query(
|
||||
"INSERT INTO ingest_sessions (session_id, uploader_user_id, client_device_id, original_filename,
|
||||
client_checksum, target_library_path_id, status, created_at, error_message)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
|
||||
ON CONFLICT (session_id) DO UPDATE SET
|
||||
status = EXCLUDED.status,
|
||||
error_message = EXCLUDED.error_message",
|
||||
)
|
||||
.bind(*session.session_id.as_uuid())
|
||||
.bind(*session.uploader_user_id.as_uuid())
|
||||
.bind(&session.client_device_id)
|
||||
.bind(&session.original_filename)
|
||||
.bind(session.client_checksum.as_str())
|
||||
.bind(*session.target_library_path_id.as_uuid())
|
||||
.bind(status_to_str(&session.status))
|
||||
.bind(session.created_at.as_datetime())
|
||||
.bind(session.error_message.as_deref())
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,21 @@
|
||||
pub mod db;
|
||||
|
||||
pub mod album_repository;
|
||||
pub mod asset_metadata_repository;
|
||||
pub mod asset_repository;
|
||||
pub mod ingest_session_repository;
|
||||
pub mod library_path_repository;
|
||||
pub mod quota_repository;
|
||||
pub mod storage_volume_repository;
|
||||
pub mod user_repository;
|
||||
|
||||
pub use db::{PgPool, connect, run_migrations};
|
||||
|
||||
pub use album_repository::PostgresAlbumRepository;
|
||||
pub use asset_metadata_repository::PostgresAssetMetadataRepository;
|
||||
pub use asset_repository::PostgresAssetRepository;
|
||||
pub use ingest_session_repository::PostgresIngestSessionRepository;
|
||||
pub use library_path_repository::PostgresLibraryPathRepository;
|
||||
pub use quota_repository::{PostgresQuotaRepository, PostgresUsageLedgerRepository};
|
||||
pub use storage_volume_repository::PostgresStorageVolumeRepository;
|
||||
pub use user_repository::PostgresUserRepository;
|
||||
|
||||
136
crates/adapters/postgres/src/library_path_repository.rs
Normal file
136
crates/adapters/postgres/src/library_path_repository.rs
Normal file
@@ -0,0 +1,136 @@
|
||||
use crate::db::PgPool;
|
||||
use async_trait::async_trait;
|
||||
use domain::{
|
||||
entities::{LibraryPath, OwnershipPolicy},
|
||||
errors::DomainError,
|
||||
ports::LibraryPathRepository,
|
||||
value_objects::SystemId,
|
||||
};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct LibraryPathRow {
|
||||
path_id: Uuid,
|
||||
volume_id: Uuid,
|
||||
relative_path: String,
|
||||
is_ingest_destination: bool,
|
||||
ownership_policy: String,
|
||||
designated_owner_id: Option<Uuid>,
|
||||
}
|
||||
|
||||
fn policy_from_str(s: &str) -> OwnershipPolicy {
|
||||
match s {
|
||||
"user_owned" => OwnershipPolicy::UserOwned,
|
||||
"group_owned" => OwnershipPolicy::GroupOwned,
|
||||
_ => OwnershipPolicy::Unassigned,
|
||||
}
|
||||
}
|
||||
|
||||
fn policy_to_str(p: &OwnershipPolicy) -> &'static str {
|
||||
match p {
|
||||
OwnershipPolicy::UserOwned => "user_owned",
|
||||
OwnershipPolicy::GroupOwned => "group_owned",
|
||||
OwnershipPolicy::Unassigned => "unassigned",
|
||||
}
|
||||
}
|
||||
|
||||
impl From<LibraryPathRow> for LibraryPath {
|
||||
fn from(r: LibraryPathRow) -> Self {
|
||||
Self {
|
||||
path_id: SystemId::from_uuid(r.path_id),
|
||||
volume_id: SystemId::from_uuid(r.volume_id),
|
||||
relative_path: r.relative_path,
|
||||
is_ingest_destination: r.is_ingest_destination,
|
||||
ownership_policy: policy_from_str(&r.ownership_policy),
|
||||
designated_owner_id: r.designated_owner_id.map(SystemId::from_uuid),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PostgresLibraryPathRepository {
|
||||
pool: PgPool,
|
||||
}
|
||||
|
||||
impl PostgresLibraryPathRepository {
|
||||
pub fn new(pool: PgPool) -> Self {
|
||||
Self { pool }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl LibraryPathRepository for PostgresLibraryPathRepository {
|
||||
async fn find_by_id(&self, id: &SystemId) -> Result<Option<LibraryPath>, DomainError> {
|
||||
let row = sqlx::query_as::<_, LibraryPathRow>(
|
||||
"SELECT path_id, volume_id, relative_path, is_ingest_destination, ownership_policy, designated_owner_id
|
||||
FROM library_paths WHERE path_id = $1",
|
||||
)
|
||||
.bind(*id.as_uuid())
|
||||
.fetch_optional(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
|
||||
Ok(row.map(Into::into))
|
||||
}
|
||||
|
||||
async fn find_by_volume(&self, volume_id: &SystemId) -> Result<Vec<LibraryPath>, DomainError> {
|
||||
let rows = sqlx::query_as::<_, LibraryPathRow>(
|
||||
"SELECT path_id, volume_id, relative_path, is_ingest_destination, ownership_policy, designated_owner_id
|
||||
FROM library_paths WHERE volume_id = $1",
|
||||
)
|
||||
.bind(*volume_id.as_uuid())
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
|
||||
Ok(rows.into_iter().map(Into::into).collect())
|
||||
}
|
||||
|
||||
async fn find_ingest_destinations(
|
||||
&self,
|
||||
owner_id: &SystemId,
|
||||
) -> Result<Vec<LibraryPath>, DomainError> {
|
||||
let rows = sqlx::query_as::<_, LibraryPathRow>(
|
||||
"SELECT path_id, volume_id, relative_path, is_ingest_destination, ownership_policy, designated_owner_id
|
||||
FROM library_paths
|
||||
WHERE is_ingest_destination = true AND designated_owner_id = $1",
|
||||
)
|
||||
.bind(*owner_id.as_uuid())
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
|
||||
Ok(rows.into_iter().map(Into::into).collect())
|
||||
}
|
||||
|
||||
async fn save(&self, path: &LibraryPath) -> Result<(), DomainError> {
|
||||
sqlx::query(
|
||||
"INSERT INTO library_paths (path_id, volume_id, relative_path, is_ingest_destination, ownership_policy, designated_owner_id)
|
||||
VALUES ($1, $2, $3, $4, $5, $6)
|
||||
ON CONFLICT (path_id) DO UPDATE SET
|
||||
volume_id = EXCLUDED.volume_id,
|
||||
relative_path = EXCLUDED.relative_path,
|
||||
is_ingest_destination = EXCLUDED.is_ingest_destination,
|
||||
ownership_policy = EXCLUDED.ownership_policy,
|
||||
designated_owner_id = EXCLUDED.designated_owner_id",
|
||||
)
|
||||
.bind(*path.path_id.as_uuid())
|
||||
.bind(*path.volume_id.as_uuid())
|
||||
.bind(&path.relative_path)
|
||||
.bind(path.is_ingest_destination)
|
||||
.bind(policy_to_str(&path.ownership_policy))
|
||||
.bind(path.designated_owner_id.as_ref().map(|id| *id.as_uuid()))
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn delete(&self, id: &SystemId) -> Result<(), DomainError> {
|
||||
sqlx::query("DELETE FROM library_paths WHERE path_id = $1")
|
||||
.bind(*id.as_uuid())
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
261
crates/adapters/postgres/src/quota_repository.rs
Normal file
261
crates/adapters/postgres/src/quota_repository.rs
Normal file
@@ -0,0 +1,261 @@
|
||||
use crate::db::PgPool;
|
||||
use async_trait::async_trait;
|
||||
use chrono::{DateTime, Utc};
|
||||
use domain::{
|
||||
entities::{QuotaDefinition, QuotaRule, TimePeriod, UsageLedgerEntry, UsageType},
|
||||
errors::DomainError,
|
||||
ports::{QuotaRepository, UsageLedgerRepository},
|
||||
value_objects::{DateTimeStamp, SystemId},
|
||||
};
|
||||
use uuid::Uuid;
|
||||
|
||||
// --- Enum mappings ---
|
||||
|
||||
fn usage_type_from_str(s: &str) -> UsageType {
|
||||
match s {
|
||||
"storage_bytes" => UsageType::StorageBytes,
|
||||
"process_jobs" => UsageType::ProcessJobs,
|
||||
"api_calls" => UsageType::ApiCalls,
|
||||
"indexing_size" => UsageType::IndexingSize,
|
||||
_ => UsageType::StorageBytes,
|
||||
}
|
||||
}
|
||||
|
||||
fn usage_type_to_str(t: &UsageType) -> &'static str {
|
||||
match t {
|
||||
UsageType::StorageBytes => "storage_bytes",
|
||||
UsageType::ProcessJobs => "process_jobs",
|
||||
UsageType::ApiCalls => "api_calls",
|
||||
UsageType::IndexingSize => "indexing_size",
|
||||
}
|
||||
}
|
||||
|
||||
fn time_period_from_str(s: &str) -> TimePeriod {
|
||||
match s {
|
||||
"daily" => TimePeriod::Daily,
|
||||
"monthly" => TimePeriod::Monthly,
|
||||
"lifetime" => TimePeriod::Lifetime,
|
||||
_ => TimePeriod::Lifetime,
|
||||
}
|
||||
}
|
||||
|
||||
fn time_period_to_str(p: &TimePeriod) -> &'static str {
|
||||
match p {
|
||||
TimePeriod::Daily => "daily",
|
||||
TimePeriod::Monthly => "monthly",
|
||||
TimePeriod::Lifetime => "lifetime",
|
||||
}
|
||||
}
|
||||
|
||||
// --- Row structs ---
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct QuotaDefRow {
|
||||
quota_id: Uuid,
|
||||
owner_scope: Uuid,
|
||||
is_enforced: bool,
|
||||
}
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
#[allow(dead_code)]
|
||||
struct QuotaRuleRow {
|
||||
rule_id: Uuid,
|
||||
quota_id: Uuid,
|
||||
dimension: String,
|
||||
limit_value: i64,
|
||||
time_period: String,
|
||||
is_unlimited: bool,
|
||||
}
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct UsageLedgerRow {
|
||||
entry_id: Uuid,
|
||||
user_id: Uuid,
|
||||
usage_type: String,
|
||||
consumed_amount: i64,
|
||||
timestamp: DateTime<Utc>,
|
||||
context: String,
|
||||
}
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct SumRow {
|
||||
total: i64,
|
||||
}
|
||||
|
||||
impl From<QuotaRuleRow> for QuotaRule {
|
||||
fn from(r: QuotaRuleRow) -> Self {
|
||||
Self {
|
||||
rule_id: SystemId::from_uuid(r.rule_id),
|
||||
dimension: usage_type_from_str(&r.dimension),
|
||||
limit_value: r.limit_value as u64,
|
||||
time_period: time_period_from_str(&r.time_period),
|
||||
is_unlimited: r.is_unlimited,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<UsageLedgerRow> for UsageLedgerEntry {
|
||||
fn from(r: UsageLedgerRow) -> Self {
|
||||
Self {
|
||||
entry_id: SystemId::from_uuid(r.entry_id),
|
||||
user_id: SystemId::from_uuid(r.user_id),
|
||||
usage_type: usage_type_from_str(&r.usage_type),
|
||||
consumed_amount: r.consumed_amount as u64,
|
||||
timestamp: DateTimeStamp::from_datetime(r.timestamp),
|
||||
context: r.context,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// --- PostgresQuotaRepository ---
|
||||
|
||||
pub struct PostgresQuotaRepository {
|
||||
pool: PgPool,
|
||||
}
|
||||
|
||||
impl PostgresQuotaRepository {
|
||||
pub fn new(pool: PgPool) -> Self {
|
||||
Self { pool }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl QuotaRepository for PostgresQuotaRepository {
|
||||
async fn find_by_owner(
|
||||
&self,
|
||||
owner_id: &SystemId,
|
||||
) -> Result<Option<QuotaDefinition>, DomainError> {
|
||||
let def_row = sqlx::query_as::<_, QuotaDefRow>(
|
||||
"SELECT quota_id, owner_scope, is_enforced FROM quota_definitions WHERE owner_scope = $1",
|
||||
)
|
||||
.bind(*owner_id.as_uuid())
|
||||
.fetch_optional(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
|
||||
let Some(def) = def_row else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let rule_rows = sqlx::query_as::<_, QuotaRuleRow>(
|
||||
"SELECT rule_id, quota_id, dimension, limit_value, time_period, is_unlimited
|
||||
FROM quota_rules WHERE quota_id = $1",
|
||||
)
|
||||
.bind(def.quota_id)
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
|
||||
Ok(Some(QuotaDefinition {
|
||||
quota_id: SystemId::from_uuid(def.quota_id),
|
||||
owner_scope: SystemId::from_uuid(def.owner_scope),
|
||||
is_enforced: def.is_enforced,
|
||||
rules: rule_rows.into_iter().map(Into::into).collect(),
|
||||
}))
|
||||
}
|
||||
|
||||
async fn save(&self, quota: &QuotaDefinition) -> Result<(), DomainError> {
|
||||
sqlx::query(
|
||||
"INSERT INTO quota_definitions (quota_id, owner_scope, is_enforced)
|
||||
VALUES ($1, $2, $3)
|
||||
ON CONFLICT (quota_id) DO UPDATE SET
|
||||
owner_scope = EXCLUDED.owner_scope,
|
||||
is_enforced = EXCLUDED.is_enforced",
|
||||
)
|
||||
.bind(*quota.quota_id.as_uuid())
|
||||
.bind(*quota.owner_scope.as_uuid())
|
||||
.bind(quota.is_enforced)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
|
||||
// Delete old rules then re-insert
|
||||
sqlx::query("DELETE FROM quota_rules WHERE quota_id = $1")
|
||||
.bind(*quota.quota_id.as_uuid())
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
|
||||
for rule in "a.rules {
|
||||
sqlx::query(
|
||||
"INSERT INTO quota_rules (rule_id, quota_id, dimension, limit_value, time_period, is_unlimited)
|
||||
VALUES ($1, $2, $3, $4, $5, $6)",
|
||||
)
|
||||
.bind(*rule.rule_id.as_uuid())
|
||||
.bind(*quota.quota_id.as_uuid())
|
||||
.bind(usage_type_to_str(&rule.dimension))
|
||||
.bind(rule.limit_value as i64)
|
||||
.bind(time_period_to_str(&rule.time_period))
|
||||
.bind(rule.is_unlimited)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn delete(&self, id: &SystemId) -> Result<(), DomainError> {
|
||||
// Rules cascade-delete
|
||||
sqlx::query("DELETE FROM quota_definitions WHERE quota_id = $1")
|
||||
.bind(*id.as_uuid())
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// --- PostgresUsageLedgerRepository ---
|
||||
|
||||
pub struct PostgresUsageLedgerRepository {
|
||||
pool: PgPool,
|
||||
}
|
||||
|
||||
impl PostgresUsageLedgerRepository {
|
||||
pub fn new(pool: PgPool) -> Self {
|
||||
Self { pool }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl UsageLedgerRepository for PostgresUsageLedgerRepository {
|
||||
async fn record(&self, entry: &UsageLedgerEntry) -> Result<(), DomainError> {
|
||||
sqlx::query(
|
||||
"INSERT INTO usage_ledger (entry_id, user_id, usage_type, consumed_amount, timestamp, context)
|
||||
VALUES ($1, $2, $3, $4, $5, $6)",
|
||||
)
|
||||
.bind(*entry.entry_id.as_uuid())
|
||||
.bind(*entry.user_id.as_uuid())
|
||||
.bind(usage_type_to_str(&entry.usage_type))
|
||||
.bind(entry.consumed_amount as i64)
|
||||
.bind(entry.timestamp.as_datetime())
|
||||
.bind(&entry.context)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn sum_usage(
|
||||
&self,
|
||||
user_id: &SystemId,
|
||||
usage_type: UsageType,
|
||||
since: Option<DateTimeStamp>,
|
||||
) -> Result<u64, DomainError> {
|
||||
let since_dt: Option<DateTime<Utc>> = since.map(|s| *s.as_datetime());
|
||||
let row = sqlx::query_as::<_, SumRow>(
|
||||
"SELECT COALESCE(SUM(consumed_amount), 0) as total
|
||||
FROM usage_ledger
|
||||
WHERE user_id = $1 AND usage_type = $2 AND ($3::timestamptz IS NULL OR timestamp >= $3)",
|
||||
)
|
||||
.bind(*user_id.as_uuid())
|
||||
.bind(usage_type_to_str(&usage_type))
|
||||
.bind(since_dt)
|
||||
.fetch_one(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
|
||||
Ok(row.total as u64)
|
||||
}
|
||||
}
|
||||
98
crates/adapters/postgres/src/storage_volume_repository.rs
Normal file
98
crates/adapters/postgres/src/storage_volume_repository.rs
Normal file
@@ -0,0 +1,98 @@
|
||||
use crate::db::PgPool;
|
||||
use async_trait::async_trait;
|
||||
use domain::{
|
||||
entities::StorageVolume,
|
||||
errors::DomainError,
|
||||
ports::StorageVolumeRepository,
|
||||
value_objects::SystemId,
|
||||
};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct StorageVolumeRow {
|
||||
volume_id: Uuid,
|
||||
volume_name: String,
|
||||
uri_prefix: String,
|
||||
is_writable: bool,
|
||||
available_bytes: i64,
|
||||
}
|
||||
|
||||
impl From<StorageVolumeRow> for StorageVolume {
|
||||
fn from(r: StorageVolumeRow) -> Self {
|
||||
Self {
|
||||
volume_id: SystemId::from_uuid(r.volume_id),
|
||||
volume_name: r.volume_name,
|
||||
uri_prefix: r.uri_prefix,
|
||||
is_writable: r.is_writable,
|
||||
available_bytes: r.available_bytes as u64,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PostgresStorageVolumeRepository {
|
||||
pool: PgPool,
|
||||
}
|
||||
|
||||
impl PostgresStorageVolumeRepository {
|
||||
pub fn new(pool: PgPool) -> Self {
|
||||
Self { pool }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl StorageVolumeRepository for PostgresStorageVolumeRepository {
|
||||
async fn find_by_id(&self, id: &SystemId) -> Result<Option<StorageVolume>, DomainError> {
|
||||
let row = sqlx::query_as::<_, StorageVolumeRow>(
|
||||
"SELECT volume_id, volume_name, uri_prefix, is_writable, available_bytes
|
||||
FROM storage_volumes WHERE volume_id = $1",
|
||||
)
|
||||
.bind(*id.as_uuid())
|
||||
.fetch_optional(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
|
||||
Ok(row.map(Into::into))
|
||||
}
|
||||
|
||||
async fn find_all(&self) -> Result<Vec<StorageVolume>, DomainError> {
|
||||
let rows = sqlx::query_as::<_, StorageVolumeRow>(
|
||||
"SELECT volume_id, volume_name, uri_prefix, is_writable, available_bytes
|
||||
FROM storage_volumes",
|
||||
)
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
|
||||
Ok(rows.into_iter().map(Into::into).collect())
|
||||
}
|
||||
|
||||
async fn save(&self, volume: &StorageVolume) -> Result<(), DomainError> {
|
||||
sqlx::query(
|
||||
"INSERT INTO storage_volumes (volume_id, volume_name, uri_prefix, is_writable, available_bytes)
|
||||
VALUES ($1, $2, $3, $4, $5)
|
||||
ON CONFLICT (volume_id) DO UPDATE SET
|
||||
volume_name = EXCLUDED.volume_name,
|
||||
uri_prefix = EXCLUDED.uri_prefix,
|
||||
is_writable = EXCLUDED.is_writable,
|
||||
available_bytes = EXCLUDED.available_bytes",
|
||||
)
|
||||
.bind(*volume.volume_id.as_uuid())
|
||||
.bind(&volume.volume_name)
|
||||
.bind(&volume.uri_prefix)
|
||||
.bind(volume.is_writable)
|
||||
.bind(volume.available_bytes as i64)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn delete(&self, id: &SystemId) -> Result<(), DomainError> {
|
||||
sqlx::query("DELETE FROM storage_volumes WHERE volume_id = $1")
|
||||
.bind(*id.as_uuid())
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -1,11 +1,34 @@
|
||||
use crate::db::PgPool;
|
||||
use async_trait::async_trait;
|
||||
use chrono::{DateTime, Utc};
|
||||
use domain::{
|
||||
entities::User,
|
||||
errors::DomainError,
|
||||
ports::UserRepository,
|
||||
value_objects::{Email, PasswordHash, SystemId},
|
||||
};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct UserRow {
|
||||
id: Uuid,
|
||||
username: String,
|
||||
email: String,
|
||||
password_hash: String,
|
||||
created_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
impl TryFrom<UserRow> for domain::entities::User {
|
||||
type Error = DomainError;
|
||||
fn try_from(r: UserRow) -> Result<Self, Self::Error> {
|
||||
Ok(Self {
|
||||
id: SystemId::from_uuid(r.id),
|
||||
username: r.username,
|
||||
email: Email::new(r.email)?,
|
||||
password_hash: PasswordHash::from_hash(r.password_hash),
|
||||
created_at: r.created_at,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PostgresUserRepository {
|
||||
pool: PgPool,
|
||||
@@ -19,66 +42,66 @@ impl PostgresUserRepository {
|
||||
|
||||
#[async_trait]
|
||||
impl UserRepository for PostgresUserRepository {
|
||||
async fn find_by_id(&self, id: &SystemId) -> Result<Option<User>, DomainError> {
|
||||
let row = sqlx::query!(
|
||||
"SELECT id, email, password_hash, created_at FROM users WHERE id = $1",
|
||||
*id.as_uuid()
|
||||
async fn find_by_id(&self, id: &SystemId) -> Result<Option<domain::entities::User>, DomainError> {
|
||||
let row = sqlx::query_as::<_, UserRow>(
|
||||
"SELECT id, username, email, password_hash, created_at FROM users WHERE id = $1",
|
||||
)
|
||||
.bind(*id.as_uuid())
|
||||
.fetch_optional(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
|
||||
row.map(|r| {
|
||||
Ok(User {
|
||||
id: SystemId::from_uuid(r.id),
|
||||
email: Email::new(r.email)?,
|
||||
password_hash: PasswordHash::from_hash(r.password_hash),
|
||||
created_at: r.created_at,
|
||||
})
|
||||
})
|
||||
.transpose()
|
||||
row.map(TryInto::try_into).transpose()
|
||||
}
|
||||
|
||||
async fn find_by_email(&self, email: &Email) -> Result<Option<User>, DomainError> {
|
||||
let row = sqlx::query!(
|
||||
"SELECT id, email, password_hash, created_at FROM users WHERE email = $1",
|
||||
email.as_str()
|
||||
async fn find_by_email(&self, email: &Email) -> Result<Option<domain::entities::User>, DomainError> {
|
||||
let row = sqlx::query_as::<_, UserRow>(
|
||||
"SELECT id, username, email, password_hash, created_at FROM users WHERE email = $1",
|
||||
)
|
||||
.bind(email.as_str())
|
||||
.fetch_optional(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
|
||||
row.map(|r| {
|
||||
Ok(User {
|
||||
id: SystemId::from_uuid(r.id),
|
||||
email: Email::new(r.email)?,
|
||||
password_hash: PasswordHash::from_hash(r.password_hash),
|
||||
created_at: r.created_at,
|
||||
})
|
||||
})
|
||||
.transpose()
|
||||
row.map(TryInto::try_into).transpose()
|
||||
}
|
||||
|
||||
async fn save(&self, user: &User) -> Result<(), DomainError> {
|
||||
sqlx::query!(
|
||||
"INSERT INTO users (id, email, password_hash, created_at)
|
||||
VALUES ($1, $2, $3, $4)
|
||||
async fn find_by_username(&self, username: &str) -> Result<Option<domain::entities::User>, DomainError> {
|
||||
let row = sqlx::query_as::<_, UserRow>(
|
||||
"SELECT id, username, email, password_hash, created_at FROM users WHERE username = $1",
|
||||
)
|
||||
.bind(username)
|
||||
.fetch_optional(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
|
||||
row.map(TryInto::try_into).transpose()
|
||||
}
|
||||
|
||||
async fn save(&self, user: &domain::entities::User) -> Result<(), DomainError> {
|
||||
sqlx::query_as::<_, UserRow>(
|
||||
"INSERT INTO users (id, username, email, password_hash, created_at)
|
||||
VALUES ($1, $2, $3, $4, $5)
|
||||
ON CONFLICT (id) DO UPDATE SET
|
||||
username = EXCLUDED.username,
|
||||
email = EXCLUDED.email,
|
||||
password_hash = EXCLUDED.password_hash",
|
||||
*user.id.as_uuid(),
|
||||
user.email.as_str(),
|
||||
user.password_hash.as_str(),
|
||||
user.created_at
|
||||
password_hash = EXCLUDED.password_hash
|
||||
RETURNING id, username, email, password_hash, created_at",
|
||||
)
|
||||
.execute(&self.pool)
|
||||
.bind(*user.id.as_uuid())
|
||||
.bind(&user.username)
|
||||
.bind(user.email.as_str())
|
||||
.bind(user.password_hash.as_str())
|
||||
.bind(user.created_at)
|
||||
.fetch_one(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn delete(&self, id: &SystemId) -> Result<(), DomainError> {
|
||||
sqlx::query!("DELETE FROM users WHERE id = $1", *id.as_uuid())
|
||||
sqlx::query("DELETE FROM users WHERE id = $1")
|
||||
.bind(*id.as_uuid())
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
|
||||
Reference in New Issue
Block a user