refactor: group postgres adapters by bounded context
This commit is contained in:
478
crates/adapters/postgres/src/catalog/mod.rs
Normal file
478
crates/adapters/postgres/src/catalog/mod.rs
Normal file
@@ -0,0 +1,478 @@
|
||||
use crate::db::PgPool;
|
||||
use async_trait::async_trait;
|
||||
use chrono::{DateTime, Utc};
|
||||
use domain::{
|
||||
entities::{
|
||||
Asset, AssetMetadata, AssetType, DetectionMethod, DuplicateCandidate, DuplicateGroup,
|
||||
DuplicateStatus, MetadataSource, SourceReference,
|
||||
},
|
||||
errors::DomainError,
|
||||
ports::{AssetMetadataRepository, AssetRepository, DuplicateRepository},
|
||||
value_objects::{Checksum, DateTimeStamp, MetadataValue, StructuredData, SystemId},
|
||||
};
|
||||
use uuid::Uuid;
|
||||
|
||||
// ──────────────────────────────────────────────
|
||||
// Asset
|
||||
// ──────────────────────────────────────────────
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct AssetRow {
|
||||
asset_id: Uuid,
|
||||
volume_id: Uuid,
|
||||
relative_path: String,
|
||||
checksum: String,
|
||||
asset_type: String,
|
||||
mime_type: String,
|
||||
file_size: i64,
|
||||
is_processed: bool,
|
||||
owner_user_id: Uuid,
|
||||
created_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
fn asset_type_from_str(s: &str) -> AssetType {
|
||||
match s {
|
||||
"image" => AssetType::Image,
|
||||
"video" => AssetType::Video,
|
||||
"live_photo" => AssetType::LivePhoto,
|
||||
_ => AssetType::Image,
|
||||
}
|
||||
}
|
||||
|
||||
fn asset_type_to_str(t: &AssetType) -> &'static str {
|
||||
match t {
|
||||
AssetType::Image => "image",
|
||||
AssetType::Video => "video",
|
||||
AssetType::LivePhoto => "live_photo",
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<AssetRow> for Asset {
|
||||
type Error = DomainError;
|
||||
fn try_from(r: AssetRow) -> Result<Self, Self::Error> {
|
||||
Ok(Self {
|
||||
asset_id: SystemId::from_uuid(r.asset_id),
|
||||
source_reference: SourceReference {
|
||||
volume_id: SystemId::from_uuid(r.volume_id),
|
||||
relative_path: r.relative_path,
|
||||
checksum: Checksum::new(r.checksum)?,
|
||||
},
|
||||
asset_type: asset_type_from_str(&r.asset_type),
|
||||
mime_type: r.mime_type,
|
||||
file_size: r.file_size as u64,
|
||||
is_processed: r.is_processed,
|
||||
owner_user_id: SystemId::from_uuid(r.owner_user_id),
|
||||
created_at: DateTimeStamp::from_datetime(r.created_at),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PostgresAssetRepository {
|
||||
pool: PgPool,
|
||||
}
|
||||
|
||||
impl PostgresAssetRepository {
|
||||
pub fn new(pool: PgPool) -> Self {
|
||||
Self { pool }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl AssetRepository for PostgresAssetRepository {
|
||||
async fn find_by_id(&self, id: &SystemId) -> Result<Option<Asset>, DomainError> {
|
||||
let row = sqlx::query_as::<_, AssetRow>(
|
||||
"SELECT asset_id, volume_id, relative_path, checksum, asset_type, mime_type,
|
||||
file_size, is_processed, owner_user_id, created_at
|
||||
FROM assets WHERE asset_id = $1",
|
||||
)
|
||||
.bind(*id.as_uuid())
|
||||
.fetch_optional(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
|
||||
row.map(TryInto::try_into).transpose()
|
||||
}
|
||||
|
||||
async fn find_by_checksum(&self, checksum: &Checksum) -> Result<Vec<Asset>, DomainError> {
|
||||
let rows = sqlx::query_as::<_, AssetRow>(
|
||||
"SELECT asset_id, volume_id, relative_path, checksum, asset_type, mime_type,
|
||||
file_size, is_processed, owner_user_id, created_at
|
||||
FROM assets WHERE checksum = $1",
|
||||
)
|
||||
.bind(checksum.as_str())
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
|
||||
rows.into_iter().map(TryInto::try_into).collect()
|
||||
}
|
||||
|
||||
async fn find_by_owner(
|
||||
&self,
|
||||
owner_id: &SystemId,
|
||||
limit: u32,
|
||||
offset: u32,
|
||||
) -> Result<Vec<Asset>, DomainError> {
|
||||
let rows = sqlx::query_as::<_, AssetRow>(
|
||||
"SELECT asset_id, volume_id, relative_path, checksum, asset_type, mime_type,
|
||||
file_size, is_processed, owner_user_id, created_at
|
||||
FROM assets WHERE owner_user_id = $1
|
||||
ORDER BY created_at DESC
|
||||
LIMIT $2 OFFSET $3",
|
||||
)
|
||||
.bind(*owner_id.as_uuid())
|
||||
.bind(limit as i64)
|
||||
.bind(offset as i64)
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
|
||||
rows.into_iter().map(TryInto::try_into).collect()
|
||||
}
|
||||
|
||||
async fn save(&self, asset: &Asset) -> Result<(), DomainError> {
|
||||
sqlx::query(
|
||||
"INSERT INTO assets (asset_id, volume_id, relative_path, checksum, asset_type,
|
||||
mime_type, file_size, is_processed, owner_user_id, created_at)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
|
||||
ON CONFLICT (asset_id) DO UPDATE SET
|
||||
volume_id = EXCLUDED.volume_id,
|
||||
relative_path = EXCLUDED.relative_path,
|
||||
checksum = EXCLUDED.checksum,
|
||||
asset_type = EXCLUDED.asset_type,
|
||||
mime_type = EXCLUDED.mime_type,
|
||||
file_size = EXCLUDED.file_size,
|
||||
is_processed = EXCLUDED.is_processed,
|
||||
owner_user_id = EXCLUDED.owner_user_id",
|
||||
)
|
||||
.bind(*asset.asset_id.as_uuid())
|
||||
.bind(*asset.source_reference.volume_id.as_uuid())
|
||||
.bind(&asset.source_reference.relative_path)
|
||||
.bind(asset.source_reference.checksum.as_str())
|
||||
.bind(asset_type_to_str(&asset.asset_type))
|
||||
.bind(&asset.mime_type)
|
||||
.bind(asset.file_size as i64)
|
||||
.bind(asset.is_processed)
|
||||
.bind(*asset.owner_user_id.as_uuid())
|
||||
.bind(asset.created_at.as_datetime())
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn delete(&self, id: &SystemId) -> Result<(), DomainError> {
|
||||
sqlx::query("DELETE FROM assets WHERE asset_id = $1")
|
||||
.bind(*id.as_uuid())
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// ──────────────────────────────────────────────
|
||||
// AssetMetadata
|
||||
// ──────────────────────────────────────────────
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct AssetMetadataRow {
|
||||
asset_id: Uuid,
|
||||
metadata_source: String,
|
||||
data: serde_json::Value,
|
||||
updated_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
fn source_from_str(s: &str) -> MetadataSource {
|
||||
match s {
|
||||
"exif_extracted" => MetadataSource::ExifExtracted,
|
||||
"ai_generated" => MetadataSource::AiGenerated,
|
||||
"user_edited" => MetadataSource::UserEdited,
|
||||
_ => MetadataSource::ExifExtracted,
|
||||
}
|
||||
}
|
||||
|
||||
fn source_to_str(s: &MetadataSource) -> &'static str {
|
||||
match s {
|
||||
MetadataSource::ExifExtracted => "exif_extracted",
|
||||
MetadataSource::AiGenerated => "ai_generated",
|
||||
MetadataSource::UserEdited => "user_edited",
|
||||
}
|
||||
}
|
||||
|
||||
fn json_to_structured_data(v: serde_json::Value) -> StructuredData {
|
||||
let mut sd = StructuredData::new();
|
||||
if let serde_json::Value::Object(map) = v {
|
||||
for (key, val) in map {
|
||||
let mv = match val {
|
||||
serde_json::Value::String(s) => MetadataValue::String(s),
|
||||
serde_json::Value::Number(n) => {
|
||||
if let Some(i) = n.as_i64() {
|
||||
MetadataValue::Integer(i)
|
||||
} else if let Some(f) = n.as_f64() {
|
||||
MetadataValue::Float(f)
|
||||
} else {
|
||||
MetadataValue::Null
|
||||
}
|
||||
}
|
||||
serde_json::Value::Bool(b) => MetadataValue::Boolean(b),
|
||||
serde_json::Value::Null => MetadataValue::Null,
|
||||
_ => MetadataValue::String(val.to_string()),
|
||||
};
|
||||
sd.insert(key, mv);
|
||||
}
|
||||
}
|
||||
sd
|
||||
}
|
||||
|
||||
fn structured_data_to_json(sd: &StructuredData) -> serde_json::Value {
|
||||
let mut map = serde_json::Map::new();
|
||||
for (key, val) in sd.inner() {
|
||||
let jv = match val {
|
||||
MetadataValue::String(s) => serde_json::Value::String(s.clone()),
|
||||
MetadataValue::Integer(i) => serde_json::Value::Number((*i).into()),
|
||||
MetadataValue::Float(f) => serde_json::Number::from_f64(*f)
|
||||
.map(serde_json::Value::Number)
|
||||
.unwrap_or(serde_json::Value::Null),
|
||||
MetadataValue::Boolean(b) => serde_json::Value::Bool(*b),
|
||||
MetadataValue::Null => serde_json::Value::Null,
|
||||
};
|
||||
map.insert(key.clone(), jv);
|
||||
}
|
||||
serde_json::Value::Object(map)
|
||||
}
|
||||
|
||||
impl From<AssetMetadataRow> for AssetMetadata {
|
||||
fn from(r: AssetMetadataRow) -> Self {
|
||||
Self {
|
||||
asset_id: SystemId::from_uuid(r.asset_id),
|
||||
metadata_source: source_from_str(&r.metadata_source),
|
||||
data: json_to_structured_data(r.data),
|
||||
updated_at: DateTimeStamp::from_datetime(r.updated_at),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PostgresAssetMetadataRepository {
|
||||
pool: PgPool,
|
||||
}
|
||||
|
||||
impl PostgresAssetMetadataRepository {
|
||||
pub fn new(pool: PgPool) -> Self {
|
||||
Self { pool }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl AssetMetadataRepository for PostgresAssetMetadataRepository {
|
||||
async fn find_by_asset(&self, asset_id: &SystemId) -> Result<Vec<AssetMetadata>, DomainError> {
|
||||
let rows = sqlx::query_as::<_, AssetMetadataRow>(
|
||||
"SELECT asset_id, metadata_source, data, updated_at
|
||||
FROM asset_metadata WHERE asset_id = $1",
|
||||
)
|
||||
.bind(*asset_id.as_uuid())
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
|
||||
Ok(rows.into_iter().map(Into::into).collect())
|
||||
}
|
||||
|
||||
async fn find_by_asset_and_source(
|
||||
&self,
|
||||
asset_id: &SystemId,
|
||||
source: MetadataSource,
|
||||
) -> Result<Option<AssetMetadata>, DomainError> {
|
||||
let row = sqlx::query_as::<_, AssetMetadataRow>(
|
||||
"SELECT asset_id, metadata_source, data, updated_at
|
||||
FROM asset_metadata WHERE asset_id = $1 AND metadata_source = $2",
|
||||
)
|
||||
.bind(*asset_id.as_uuid())
|
||||
.bind(source_to_str(&source))
|
||||
.fetch_optional(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
|
||||
Ok(row.map(Into::into))
|
||||
}
|
||||
|
||||
async fn save(&self, metadata: &AssetMetadata) -> Result<(), DomainError> {
|
||||
sqlx::query(
|
||||
"INSERT INTO asset_metadata (asset_id, metadata_source, data, updated_at)
|
||||
VALUES ($1, $2, $3, $4)
|
||||
ON CONFLICT (asset_id, metadata_source) DO UPDATE SET
|
||||
data = EXCLUDED.data,
|
||||
updated_at = EXCLUDED.updated_at",
|
||||
)
|
||||
.bind(*metadata.asset_id.as_uuid())
|
||||
.bind(source_to_str(&metadata.metadata_source))
|
||||
.bind(structured_data_to_json(&metadata.data))
|
||||
.bind(metadata.updated_at.as_datetime())
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn delete_by_asset_and_source(
|
||||
&self,
|
||||
asset_id: &SystemId,
|
||||
source: MetadataSource,
|
||||
) -> Result<(), DomainError> {
|
||||
sqlx::query("DELETE FROM asset_metadata WHERE asset_id = $1 AND metadata_source = $2")
|
||||
.bind(*asset_id.as_uuid())
|
||||
.bind(source_to_str(&source))
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// ──────────────────────────────────────────────
|
||||
// Duplicate
|
||||
// ──────────────────────────────────────────────
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct GroupRow {
|
||||
group_id: Uuid,
|
||||
detection_method: String,
|
||||
status: String,
|
||||
candidates: serde_json::Value,
|
||||
}
|
||||
|
||||
fn detection_from_str(s: &str) -> DetectionMethod {
|
||||
match s {
|
||||
"perceptual_hash" => DetectionMethod::PerceptualHash,
|
||||
_ => DetectionMethod::ExactHash,
|
||||
}
|
||||
}
|
||||
|
||||
fn detection_to_str(d: &DetectionMethod) -> &'static str {
|
||||
match d {
|
||||
DetectionMethod::ExactHash => "exact_hash",
|
||||
DetectionMethod::PerceptualHash => "perceptual_hash",
|
||||
}
|
||||
}
|
||||
|
||||
fn dup_status_from_str(s: &str) -> DuplicateStatus {
|
||||
match s {
|
||||
"resolved" => DuplicateStatus::Resolved,
|
||||
_ => DuplicateStatus::Unresolved,
|
||||
}
|
||||
}
|
||||
|
||||
fn dup_status_to_str(s: &DuplicateStatus) -> &'static str {
|
||||
match s {
|
||||
DuplicateStatus::Unresolved => "unresolved",
|
||||
DuplicateStatus::Resolved => "resolved",
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize, serde::Deserialize)]
|
||||
struct CandidateJson {
|
||||
asset_id: Uuid,
|
||||
similarity_score: f64,
|
||||
}
|
||||
|
||||
fn candidates_from_json(v: serde_json::Value) -> Vec<DuplicateCandidate> {
|
||||
let arr: Vec<CandidateJson> = serde_json::from_value(v).unwrap_or_default();
|
||||
arr.into_iter()
|
||||
.map(|c| DuplicateCandidate {
|
||||
asset_id: SystemId::from_uuid(c.asset_id),
|
||||
similarity_score: c.similarity_score,
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn candidates_to_json(candidates: &[DuplicateCandidate]) -> serde_json::Value {
|
||||
let arr: Vec<CandidateJson> = candidates
|
||||
.iter()
|
||||
.map(|c| CandidateJson {
|
||||
asset_id: *c.asset_id.as_uuid(),
|
||||
similarity_score: c.similarity_score,
|
||||
})
|
||||
.collect();
|
||||
serde_json::to_value(arr).unwrap_or(serde_json::Value::Array(vec![]))
|
||||
}
|
||||
|
||||
impl From<GroupRow> for DuplicateGroup {
|
||||
fn from(r: GroupRow) -> Self {
|
||||
Self {
|
||||
group_id: SystemId::from_uuid(r.group_id),
|
||||
detection_method: detection_from_str(&r.detection_method),
|
||||
status: dup_status_from_str(&r.status),
|
||||
candidates: candidates_from_json(r.candidates),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PostgresDuplicateRepository {
|
||||
pool: PgPool,
|
||||
}
|
||||
|
||||
impl PostgresDuplicateRepository {
|
||||
pub fn new(pool: PgPool) -> Self {
|
||||
Self { pool }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl DuplicateRepository for PostgresDuplicateRepository {
|
||||
async fn find_by_id(&self, id: &SystemId) -> Result<Option<DuplicateGroup>, DomainError> {
|
||||
let row = sqlx::query_as::<_, GroupRow>(
|
||||
"SELECT group_id, detection_method, status, candidates
|
||||
FROM duplicate_groups WHERE group_id = $1",
|
||||
)
|
||||
.bind(*id.as_uuid())
|
||||
.fetch_optional(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
|
||||
Ok(row.map(Into::into))
|
||||
}
|
||||
|
||||
async fn find_unresolved(&self) -> Result<Vec<DuplicateGroup>, DomainError> {
|
||||
let rows = sqlx::query_as::<_, GroupRow>(
|
||||
"SELECT group_id, detection_method, status, candidates
|
||||
FROM duplicate_groups WHERE status = 'unresolved'",
|
||||
)
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
|
||||
Ok(rows.into_iter().map(Into::into).collect())
|
||||
}
|
||||
|
||||
async fn find_by_asset(&self, asset_id: &SystemId) -> Result<Vec<DuplicateGroup>, DomainError> {
|
||||
let rows = sqlx::query_as::<_, GroupRow>(
|
||||
"SELECT group_id, detection_method, status, candidates
|
||||
FROM duplicate_groups WHERE candidates @> $1::jsonb",
|
||||
)
|
||||
.bind(serde_json::json!([{"asset_id": asset_id.as_uuid()}]))
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
|
||||
Ok(rows.into_iter().map(Into::into).collect())
|
||||
}
|
||||
|
||||
async fn save(&self, group: &DuplicateGroup) -> Result<(), DomainError> {
|
||||
sqlx::query(
|
||||
"INSERT INTO duplicate_groups (group_id, detection_method, status, candidates)
|
||||
VALUES ($1, $2, $3, $4)
|
||||
ON CONFLICT (group_id) DO UPDATE SET
|
||||
detection_method = EXCLUDED.detection_method,
|
||||
status = EXCLUDED.status,
|
||||
candidates = EXCLUDED.candidates",
|
||||
)
|
||||
.bind(*group.group_id.as_uuid())
|
||||
.bind(detection_to_str(&group.detection_method))
|
||||
.bind(dup_status_to_str(&group.status))
|
||||
.bind(candidates_to_json(&group.candidates))
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user