From c0fb8f69de7b33a7d52b5beaeb663130974d1cc7 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Tue, 17 Mar 2026 14:32:04 +0100 Subject: [PATCH] feat(infra): implement config snapshot repository methods --- .../infra/src/channel_repository/sqlite.rs | 130 +++++++++++++++++- 1 file changed, 129 insertions(+), 1 deletion(-) diff --git a/k-tv-backend/infra/src/channel_repository/sqlite.rs b/k-tv-backend/infra/src/channel_repository/sqlite.rs index 7e2107b..77e9382 100644 --- a/k-tv-backend/infra/src/channel_repository/sqlite.rs +++ b/k-tv-backend/infra/src/channel_repository/sqlite.rs @@ -1,6 +1,9 @@ use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use sqlx::Row; +use uuid::Uuid; -use domain::{Channel, ChannelId, ChannelRepository, DomainError, DomainResult, UserId}; +use domain::{Channel, ChannelConfigSnapshot, ChannelId, ChannelRepository, DomainError, DomainResult, ScheduleConfig, ScheduleConfigCompat, UserId}; use super::mapping::{ChannelRow, SELECT_COLS}; @@ -139,4 +142,129 @@ impl ChannelRepository for SqliteChannelRepository { Ok(()) } + + async fn save_config_snapshot( + &self, + channel_id: ChannelId, + config: &ScheduleConfig, + label: Option, + ) -> DomainResult { + let id = Uuid::new_v4(); + let now = Utc::now(); + let config_json = serde_json::to_string(config) + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + + let mut tx = self.pool.begin().await + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + + let version_num: i64 = sqlx::query_scalar( + "SELECT COALESCE(MAX(version_num), 0) + 1 FROM channel_config_snapshots WHERE channel_id = ?" + ) + .bind(channel_id.to_string()) + .fetch_one(&mut *tx) + .await + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + + sqlx::query( + "INSERT INTO channel_config_snapshots (id, channel_id, config_json, version_num, label, created_at) + VALUES (?, ?, ?, ?, ?, ?)" + ) + .bind(id.to_string()) + .bind(channel_id.to_string()) + .bind(&config_json) + .bind(version_num) + .bind(&label) + .bind(now.to_rfc3339()) + .execute(&mut *tx) + .await + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + + tx.commit().await.map_err(|e| DomainError::RepositoryError(e.to_string()))?; + + Ok(ChannelConfigSnapshot { id, channel_id, config: config.clone(), version_num, label, created_at: now }) + } + + async fn list_config_snapshots( + &self, + channel_id: ChannelId, + ) -> DomainResult> { + let rows = sqlx::query( + "SELECT id, config_json, version_num, label, created_at + FROM channel_config_snapshots WHERE channel_id = ? + ORDER BY version_num DESC" + ) + .bind(channel_id.to_string()) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + + rows.iter().map(|row| { + let id: Uuid = row.get::("id").parse() + .map_err(|_| DomainError::RepositoryError("bad uuid".into()))?; + let config_json: String = row.get("config_json"); + let config_compat: ScheduleConfigCompat = serde_json::from_str(&config_json) + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + let config: ScheduleConfig = config_compat.into(); + let version_num: i64 = row.get("version_num"); + let label: Option = row.get("label"); + let created_at_str: String = row.get("created_at"); + let created_at = created_at_str.parse::>() + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + Ok(ChannelConfigSnapshot { id, channel_id, config, version_num, label, created_at }) + }).collect() + } + + async fn get_config_snapshot( + &self, + channel_id: ChannelId, + snapshot_id: Uuid, + ) -> DomainResult> { + let row = sqlx::query( + "SELECT id, config_json, version_num, label, created_at + FROM channel_config_snapshots WHERE id = ? AND channel_id = ?" + ) + .bind(snapshot_id.to_string()) + .bind(channel_id.to_string()) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + + match row { + None => Ok(None), + Some(row) => { + let config_json: String = row.get("config_json"); + let config_compat: ScheduleConfigCompat = serde_json::from_str(&config_json) + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + let config: ScheduleConfig = config_compat.into(); + let version_num: i64 = row.get("version_num"); + let label: Option = row.get("label"); + let created_at_str: String = row.get("created_at"); + let created_at = created_at_str.parse::>() + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + Ok(Some(ChannelConfigSnapshot { id: snapshot_id, channel_id, config, version_num, label, created_at })) + } + } + } + + async fn patch_config_snapshot_label( + &self, + channel_id: ChannelId, + snapshot_id: Uuid, + label: Option, + ) -> DomainResult> { + let updated = sqlx::query( + "UPDATE channel_config_snapshots SET label = ? WHERE id = ? AND channel_id = ? RETURNING id" + ) + .bind(&label) + .bind(snapshot_id.to_string()) + .bind(channel_id.to_string()) + .fetch_optional(&self.pool) + .await + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + + if updated.is_none() { + return Ok(None); + } + self.get_config_snapshot(channel_id, snapshot_id).await + } }