Files
k-photos/crates/adapters/postgres/src/storage/mod.rs

616 lines
20 KiB
Rust

use crate::helpers::{pg_repo, MapDomainError};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use domain::{
entities::{
Asset, IngestSession, IngestStatus, LibraryPath, OwnershipPolicy, QuotaDefinition,
QuotaRule, StorageVolume, TimePeriod, UsageLedgerEntry, UsageType,
},
errors::DomainError,
ports::{
IngestSessionRepository, IngestTransaction, LibraryPathRepository, QuotaRepository,
StorageVolumeRepository, UsageLedgerRepository,
},
value_objects::{Checksum, DateTimeStamp, SystemId},
};
use uuid::Uuid;
// ──────────────────────────────────────────────
// StorageVolume
// ──────────────────────────────────────────────
#[derive(sqlx::FromRow)]
struct StorageVolumeRow {
volume_id: Uuid,
volume_name: String,
uri_prefix: String,
is_writable: bool,
available_bytes: i64,
}
impl From<StorageVolumeRow> for StorageVolume {
fn from(r: StorageVolumeRow) -> Self {
Self {
volume_id: SystemId::from_uuid(r.volume_id),
volume_name: r.volume_name,
uri_prefix: r.uri_prefix,
is_writable: r.is_writable,
available_bytes: r.available_bytes as u64,
}
}
}
pg_repo!(PostgresStorageVolumeRepository);
#[async_trait]
impl StorageVolumeRepository for PostgresStorageVolumeRepository {
async fn find_by_id(&self, id: &SystemId) -> Result<Option<StorageVolume>, DomainError> {
let row = sqlx::query_as::<_, StorageVolumeRow>(
"SELECT volume_id, volume_name, uri_prefix, is_writable, available_bytes
FROM storage_volumes WHERE volume_id = $1",
)
.bind(*id.as_uuid())
.fetch_optional(&self.pool)
.await
.map_pg()?;
Ok(row.map(Into::into))
}
async fn find_all(&self) -> Result<Vec<StorageVolume>, DomainError> {
let rows = sqlx::query_as::<_, StorageVolumeRow>(
"SELECT volume_id, volume_name, uri_prefix, is_writable, available_bytes
FROM storage_volumes",
)
.fetch_all(&self.pool)
.await
.map_pg()?;
Ok(rows.into_iter().map(Into::into).collect())
}
async fn save(&self, volume: &StorageVolume) -> Result<(), DomainError> {
sqlx::query(
"INSERT INTO storage_volumes (volume_id, volume_name, uri_prefix, is_writable, available_bytes)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (volume_id) DO UPDATE SET
volume_name = EXCLUDED.volume_name,
uri_prefix = EXCLUDED.uri_prefix,
is_writable = EXCLUDED.is_writable,
available_bytes = EXCLUDED.available_bytes",
)
.bind(*volume.volume_id.as_uuid())
.bind(&volume.volume_name)
.bind(&volume.uri_prefix)
.bind(volume.is_writable)
.bind(volume.available_bytes as i64)
.execute(&self.pool)
.await
.map_pg()?;
Ok(())
}
async fn delete(&self, id: &SystemId) -> Result<(), DomainError> {
sqlx::query("DELETE FROM storage_volumes WHERE volume_id = $1")
.bind(*id.as_uuid())
.execute(&self.pool)
.await
.map_pg()?;
Ok(())
}
}
// ──────────────────────────────────────────────
// LibraryPath
// ──────────────────────────────────────────────
#[derive(sqlx::FromRow)]
struct LibraryPathRow {
path_id: Uuid,
volume_id: Uuid,
relative_path: String,
is_ingest_destination: bool,
ownership_policy: String,
designated_owner_id: Option<Uuid>,
}
fn policy_from_str(s: &str) -> OwnershipPolicy {
match s {
"user_owned" => OwnershipPolicy::UserOwned,
"group_owned" => OwnershipPolicy::GroupOwned,
_ => OwnershipPolicy::Unassigned,
}
}
fn policy_to_str(p: &OwnershipPolicy) -> &'static str {
match p {
OwnershipPolicy::UserOwned => "user_owned",
OwnershipPolicy::GroupOwned => "group_owned",
OwnershipPolicy::Unassigned => "unassigned",
}
}
impl From<LibraryPathRow> for LibraryPath {
fn from(r: LibraryPathRow) -> Self {
Self {
path_id: SystemId::from_uuid(r.path_id),
volume_id: SystemId::from_uuid(r.volume_id),
relative_path: r.relative_path,
is_ingest_destination: r.is_ingest_destination,
ownership_policy: policy_from_str(&r.ownership_policy),
designated_owner_id: r.designated_owner_id.map(SystemId::from_uuid),
}
}
}
pg_repo!(PostgresLibraryPathRepository);
#[async_trait]
impl LibraryPathRepository for PostgresLibraryPathRepository {
async fn find_by_id(&self, id: &SystemId) -> Result<Option<LibraryPath>, DomainError> {
let row = sqlx::query_as::<_, LibraryPathRow>(
"SELECT path_id, volume_id, relative_path, is_ingest_destination, ownership_policy, designated_owner_id
FROM library_paths WHERE path_id = $1",
)
.bind(*id.as_uuid())
.fetch_optional(&self.pool)
.await
.map_pg()?;
Ok(row.map(Into::into))
}
async fn find_by_volume(&self, volume_id: &SystemId) -> Result<Vec<LibraryPath>, DomainError> {
let rows = sqlx::query_as::<_, LibraryPathRow>(
"SELECT path_id, volume_id, relative_path, is_ingest_destination, ownership_policy, designated_owner_id
FROM library_paths WHERE volume_id = $1",
)
.bind(*volume_id.as_uuid())
.fetch_all(&self.pool)
.await
.map_pg()?;
Ok(rows.into_iter().map(Into::into).collect())
}
async fn find_ingest_destinations(
&self,
owner_id: &SystemId,
) -> Result<Vec<LibraryPath>, DomainError> {
let rows = sqlx::query_as::<_, LibraryPathRow>(
"SELECT path_id, volume_id, relative_path, is_ingest_destination, ownership_policy, designated_owner_id
FROM library_paths
WHERE is_ingest_destination = true AND designated_owner_id = $1",
)
.bind(*owner_id.as_uuid())
.fetch_all(&self.pool)
.await
.map_pg()?;
Ok(rows.into_iter().map(Into::into).collect())
}
async fn save(&self, path: &LibraryPath) -> Result<(), DomainError> {
sqlx::query(
"INSERT INTO library_paths (path_id, volume_id, relative_path, is_ingest_destination, ownership_policy, designated_owner_id)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (path_id) DO UPDATE SET
volume_id = EXCLUDED.volume_id,
relative_path = EXCLUDED.relative_path,
is_ingest_destination = EXCLUDED.is_ingest_destination,
ownership_policy = EXCLUDED.ownership_policy,
designated_owner_id = EXCLUDED.designated_owner_id",
)
.bind(*path.path_id.as_uuid())
.bind(*path.volume_id.as_uuid())
.bind(&path.relative_path)
.bind(path.is_ingest_destination)
.bind(policy_to_str(&path.ownership_policy))
.bind(path.designated_owner_id.as_ref().map(|id| *id.as_uuid()))
.execute(&self.pool)
.await
.map_pg()?;
Ok(())
}
async fn delete(&self, id: &SystemId) -> Result<(), DomainError> {
sqlx::query("DELETE FROM library_paths WHERE path_id = $1")
.bind(*id.as_uuid())
.execute(&self.pool)
.await
.map_pg()?;
Ok(())
}
}
// ──────────────────────────────────────────────
// IngestSession
// ──────────────────────────────────────────────
#[derive(sqlx::FromRow)]
struct IngestSessionRow {
session_id: Uuid,
uploader_user_id: Uuid,
client_device_id: String,
original_filename: String,
client_checksum: String,
target_library_path_id: Uuid,
status: String,
created_at: DateTime<Utc>,
error_message: Option<String>,
}
fn ingest_status_from_str(s: &str) -> IngestStatus {
match s {
"uploading" => IngestStatus::Uploading,
"awaiting_processing" => IngestStatus::AwaitingProcessing,
"processing" => IngestStatus::Processing,
"completed" => IngestStatus::Completed,
"failed" => IngestStatus::Failed,
_ => IngestStatus::Uploading,
}
}
fn ingest_status_to_str(s: &IngestStatus) -> &'static str {
match s {
IngestStatus::Uploading => "uploading",
IngestStatus::AwaitingProcessing => "awaiting_processing",
IngestStatus::Processing => "processing",
IngestStatus::Completed => "completed",
IngestStatus::Failed => "failed",
}
}
impl TryFrom<IngestSessionRow> for IngestSession {
type Error = DomainError;
fn try_from(r: IngestSessionRow) -> Result<Self, Self::Error> {
Ok(Self {
session_id: SystemId::from_uuid(r.session_id),
uploader_user_id: SystemId::from_uuid(r.uploader_user_id),
client_device_id: r.client_device_id,
original_filename: r.original_filename,
client_checksum: Checksum::new(r.client_checksum)?,
target_library_path_id: SystemId::from_uuid(r.target_library_path_id),
status: ingest_status_from_str(&r.status),
created_at: DateTimeStamp::from_datetime(r.created_at),
error_message: r.error_message,
})
}
}
pg_repo!(PostgresIngestSessionRepository);
#[async_trait]
impl IngestSessionRepository for PostgresIngestSessionRepository {
async fn find_by_id(&self, id: &SystemId) -> Result<Option<IngestSession>, DomainError> {
let row = sqlx::query_as::<_, IngestSessionRow>(
"SELECT session_id, uploader_user_id, client_device_id, original_filename,
client_checksum, target_library_path_id, status, created_at, error_message
FROM ingest_sessions WHERE session_id = $1",
)
.bind(*id.as_uuid())
.fetch_optional(&self.pool)
.await
.map_pg()?;
row.map(TryInto::try_into).transpose()
}
async fn find_by_user(&self, user_id: &SystemId) -> Result<Vec<IngestSession>, DomainError> {
let rows = sqlx::query_as::<_, IngestSessionRow>(
"SELECT session_id, uploader_user_id, client_device_id, original_filename,
client_checksum, target_library_path_id, status, created_at, error_message
FROM ingest_sessions WHERE uploader_user_id = $1",
)
.bind(*user_id.as_uuid())
.fetch_all(&self.pool)
.await
.map_pg()?;
rows.into_iter().map(TryInto::try_into).collect()
}
async fn save(&self, session: &IngestSession) -> Result<(), DomainError> {
sqlx::query(
"INSERT INTO ingest_sessions (session_id, uploader_user_id, client_device_id, original_filename,
client_checksum, target_library_path_id, status, created_at, error_message)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (session_id) DO UPDATE SET
status = EXCLUDED.status,
error_message = EXCLUDED.error_message",
)
.bind(*session.session_id.as_uuid())
.bind(*session.uploader_user_id.as_uuid())
.bind(&session.client_device_id)
.bind(&session.original_filename)
.bind(session.client_checksum.as_str())
.bind(*session.target_library_path_id.as_uuid())
.bind(ingest_status_to_str(&session.status))
.bind(session.created_at.as_datetime())
.bind(session.error_message.as_deref())
.execute(&self.pool)
.await
.map_pg()?;
Ok(())
}
}
// ──────────────────────────────────────────────
// Quota + UsageLedger
// ──────────────────────────────────────────────
fn usage_type_from_str(s: &str) -> UsageType {
match s {
"storage_bytes" => UsageType::StorageBytes,
"process_jobs" => UsageType::ProcessJobs,
"api_calls" => UsageType::ApiCalls,
"indexing_size" => UsageType::IndexingSize,
_ => UsageType::StorageBytes,
}
}
fn usage_type_to_str(t: &UsageType) -> &'static str {
match t {
UsageType::StorageBytes => "storage_bytes",
UsageType::ProcessJobs => "process_jobs",
UsageType::ApiCalls => "api_calls",
UsageType::IndexingSize => "indexing_size",
}
}
fn time_period_from_str(s: &str) -> TimePeriod {
match s {
"daily" => TimePeriod::Daily,
"monthly" => TimePeriod::Monthly,
"lifetime" => TimePeriod::Lifetime,
_ => TimePeriod::Lifetime,
}
}
fn time_period_to_str(p: &TimePeriod) -> &'static str {
match p {
TimePeriod::Daily => "daily",
TimePeriod::Monthly => "monthly",
TimePeriod::Lifetime => "lifetime",
}
}
#[derive(sqlx::FromRow)]
struct QuotaDefRow {
quota_id: Uuid,
owner_scope: Uuid,
is_enforced: bool,
}
#[derive(sqlx::FromRow)]
#[allow(dead_code)]
struct QuotaRuleRow {
rule_id: Uuid,
quota_id: Uuid,
dimension: String,
limit_value: i64,
time_period: String,
is_unlimited: bool,
}
#[derive(sqlx::FromRow)]
struct UsageLedgerRow {
entry_id: Uuid,
user_id: Uuid,
usage_type: String,
consumed_amount: i64,
timestamp: DateTime<Utc>,
context: String,
}
#[derive(sqlx::FromRow)]
struct SumRow {
total: i64,
}
impl From<QuotaRuleRow> for QuotaRule {
fn from(r: QuotaRuleRow) -> Self {
Self {
rule_id: SystemId::from_uuid(r.rule_id),
dimension: usage_type_from_str(&r.dimension),
limit_value: r.limit_value as u64,
time_period: time_period_from_str(&r.time_period),
is_unlimited: r.is_unlimited,
}
}
}
impl From<UsageLedgerRow> for UsageLedgerEntry {
fn from(r: UsageLedgerRow) -> Self {
Self {
entry_id: SystemId::from_uuid(r.entry_id),
user_id: SystemId::from_uuid(r.user_id),
usage_type: usage_type_from_str(&r.usage_type),
consumed_amount: r.consumed_amount as u64,
timestamp: DateTimeStamp::from_datetime(r.timestamp),
context: r.context,
}
}
}
pg_repo!(PostgresQuotaRepository);
#[async_trait]
impl QuotaRepository for PostgresQuotaRepository {
async fn find_by_owner(
&self,
owner_id: &SystemId,
) -> Result<Option<QuotaDefinition>, DomainError> {
let def_row = sqlx::query_as::<_, QuotaDefRow>(
"SELECT quota_id, owner_scope, is_enforced FROM quota_definitions WHERE owner_scope = $1",
)
.bind(*owner_id.as_uuid())
.fetch_optional(&self.pool)
.await
.map_pg()?;
let Some(def) = def_row else {
return Ok(None);
};
let rule_rows = sqlx::query_as::<_, QuotaRuleRow>(
"SELECT rule_id, quota_id, dimension, limit_value, time_period, is_unlimited
FROM quota_rules WHERE quota_id = $1",
)
.bind(def.quota_id)
.fetch_all(&self.pool)
.await
.map_pg()?;
Ok(Some(QuotaDefinition {
quota_id: SystemId::from_uuid(def.quota_id),
owner_scope: SystemId::from_uuid(def.owner_scope),
is_enforced: def.is_enforced,
rules: rule_rows.into_iter().map(Into::into).collect(),
}))
}
async fn save(&self, quota: &QuotaDefinition) -> Result<(), DomainError> {
sqlx::query(
"INSERT INTO quota_definitions (quota_id, owner_scope, is_enforced)
VALUES ($1, $2, $3)
ON CONFLICT (quota_id) DO UPDATE SET
owner_scope = EXCLUDED.owner_scope,
is_enforced = EXCLUDED.is_enforced",
)
.bind(*quota.quota_id.as_uuid())
.bind(*quota.owner_scope.as_uuid())
.bind(quota.is_enforced)
.execute(&self.pool)
.await
.map_pg()?;
// Delete old rules then re-insert
sqlx::query("DELETE FROM quota_rules WHERE quota_id = $1")
.bind(*quota.quota_id.as_uuid())
.execute(&self.pool)
.await
.map_pg()?;
for rule in &quota.rules {
sqlx::query(
"INSERT INTO quota_rules (rule_id, quota_id, dimension, limit_value, time_period, is_unlimited)
VALUES ($1, $2, $3, $4, $5, $6)",
)
.bind(*rule.rule_id.as_uuid())
.bind(*quota.quota_id.as_uuid())
.bind(usage_type_to_str(&rule.dimension))
.bind(rule.limit_value as i64)
.bind(time_period_to_str(&rule.time_period))
.bind(rule.is_unlimited)
.execute(&self.pool)
.await
.map_pg()?;
}
Ok(())
}
async fn delete(&self, id: &SystemId) -> Result<(), DomainError> {
// Rules cascade-delete
sqlx::query("DELETE FROM quota_definitions WHERE quota_id = $1")
.bind(*id.as_uuid())
.execute(&self.pool)
.await
.map_pg()?;
Ok(())
}
}
pg_repo!(PostgresUsageLedgerRepository);
#[async_trait]
impl UsageLedgerRepository for PostgresUsageLedgerRepository {
async fn record(&self, entry: &UsageLedgerEntry) -> Result<(), DomainError> {
sqlx::query(
"INSERT INTO usage_ledger (entry_id, user_id, usage_type, consumed_amount, timestamp, context)
VALUES ($1, $2, $3, $4, $5, $6)",
)
.bind(*entry.entry_id.as_uuid())
.bind(*entry.user_id.as_uuid())
.bind(usage_type_to_str(&entry.usage_type))
.bind(entry.consumed_amount as i64)
.bind(entry.timestamp.as_datetime())
.bind(&entry.context)
.execute(&self.pool)
.await
.map_pg()?;
Ok(())
}
async fn sum_usage(
&self,
user_id: &SystemId,
usage_type: UsageType,
since: Option<DateTimeStamp>,
) -> Result<u64, DomainError> {
let since_dt: Option<DateTime<Utc>> = since.map(|s| *s.as_datetime());
let row = sqlx::query_as::<_, SumRow>(
"SELECT COALESCE(SUM(consumed_amount), 0) as total
FROM usage_ledger
WHERE user_id = $1 AND usage_type = $2 AND ($3::timestamptz IS NULL OR timestamp >= $3)",
)
.bind(*user_id.as_uuid())
.bind(usage_type_to_str(&usage_type))
.bind(since_dt)
.fetch_one(&self.pool)
.await
.map_pg()?;
Ok(row.total as u64)
}
}
// ──────────────────────────────────────────────
// IngestTransaction (composite port)
// ──────────────────────────────────────────────
pg_repo!(PostgresIngestTransaction);
#[async_trait]
impl IngestTransaction for PostgresIngestTransaction {
async fn save_asset(&self, asset: &Asset) -> Result<(), DomainError> {
use domain::ports::AssetRepository;
crate::PostgresAssetRepository::new(self.pool.clone())
.save(asset)
.await
}
async fn save_session(&self, session: &IngestSession) -> Result<(), DomainError> {
PostgresIngestSessionRepository::new(self.pool.clone())
.save(session)
.await
}
async fn find_quota(
&self,
owner_id: &SystemId,
) -> Result<Option<QuotaDefinition>, DomainError> {
PostgresQuotaRepository::new(self.pool.clone())
.find_by_owner(owner_id)
.await
}
async fn sum_usage(
&self,
user_id: &SystemId,
usage_type: UsageType,
since: Option<DateTimeStamp>,
) -> Result<u64, DomainError> {
PostgresUsageLedgerRepository::new(self.pool.clone())
.sum_usage(user_id, usage_type, since)
.await
}
async fn record_usage(&self, entry: &UsageLedgerEntry) -> Result<(), DomainError> {
PostgresUsageLedgerRepository::new(self.pool.clone())
.record(entry)
.await
}
}