use async_trait::async_trait; use chrono::{DateTime, Utc}; use sqlx::Row; use uuid::Uuid; use domain::{Channel, ChannelConfigSnapshot, ChannelId, ChannelRepository, DomainError, DomainResult, ScheduleConfig, ScheduleConfigCompat, UserId}; use super::mapping::{ChannelRow, SELECT_COLS}; pub struct SqliteChannelRepository { pool: sqlx::SqlitePool, } impl SqliteChannelRepository { pub fn new(pool: sqlx::SqlitePool) -> Self { Self { pool } } } #[async_trait] impl ChannelRepository for SqliteChannelRepository { async fn find_by_id(&self, id: ChannelId) -> DomainResult> { let sql = format!("SELECT {SELECT_COLS} FROM channels WHERE id = ?"); let row: Option = sqlx::query_as(&sql) .bind(id.to_string()) .fetch_optional(&self.pool) .await .map_err(|e| DomainError::RepositoryError(e.to_string()))?; row.map(Channel::try_from).transpose() } async fn find_by_owner(&self, owner_id: UserId) -> DomainResult> { let sql = format!( "SELECT {SELECT_COLS} FROM channels WHERE owner_id = ? ORDER BY created_at ASC" ); let rows: Vec = sqlx::query_as(&sql) .bind(owner_id.to_string()) .fetch_all(&self.pool) .await .map_err(|e| DomainError::RepositoryError(e.to_string()))?; rows.into_iter().map(Channel::try_from).collect() } async fn find_all(&self) -> DomainResult> { let sql = format!("SELECT {SELECT_COLS} FROM channels ORDER BY created_at ASC"); let rows: Vec = sqlx::query_as(&sql) .fetch_all(&self.pool) .await .map_err(|e| DomainError::RepositoryError(e.to_string()))?; rows.into_iter().map(Channel::try_from).collect() } async fn save(&self, channel: &Channel) -> DomainResult<()> { let schedule_config = serde_json::to_string(&channel.schedule_config).map_err(|e| { DomainError::RepositoryError(format!("Failed to serialize schedule_config: {}", e)) })?; let recycle_policy = serde_json::to_string(&channel.recycle_policy).map_err(|e| { DomainError::RepositoryError(format!("Failed to serialize recycle_policy: {}", e)) })?; let access_mode = serde_json::to_value(&channel.access_mode) .ok() .and_then(|v| v.as_str().map(str::to_owned)) .unwrap_or_else(|| "public".to_owned()); let logo_position = serde_json::to_value(&channel.logo_position) .ok() .and_then(|v| v.as_str().map(str::to_owned)) .unwrap_or_else(|| "top_right".to_owned()); sqlx::query( r#" INSERT INTO channels (id, owner_id, name, description, timezone, schedule_config, recycle_policy, auto_schedule, access_mode, access_password_hash, logo, logo_position, logo_opacity, webhook_url, webhook_poll_interval_secs, webhook_body_template, webhook_headers, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET name = excluded.name, description = excluded.description, timezone = excluded.timezone, schedule_config = excluded.schedule_config, recycle_policy = excluded.recycle_policy, auto_schedule = excluded.auto_schedule, access_mode = excluded.access_mode, access_password_hash = excluded.access_password_hash, logo = excluded.logo, logo_position = excluded.logo_position, logo_opacity = excluded.logo_opacity, webhook_url = excluded.webhook_url, webhook_poll_interval_secs = excluded.webhook_poll_interval_secs, webhook_body_template = excluded.webhook_body_template, webhook_headers = excluded.webhook_headers, updated_at = excluded.updated_at "#, ) .bind(channel.id.to_string()) .bind(channel.owner_id.to_string()) .bind(&channel.name) .bind(&channel.description) .bind(&channel.timezone) .bind(&schedule_config) .bind(&recycle_policy) .bind(channel.auto_schedule as i64) .bind(&access_mode) .bind(&channel.access_password_hash) .bind(&channel.logo) .bind(&logo_position) .bind(channel.logo_opacity) .bind(&channel.webhook_url) .bind(channel.webhook_poll_interval_secs as i64) .bind(&channel.webhook_body_template) .bind(&channel.webhook_headers) .bind(channel.created_at.to_rfc3339()) .bind(channel.updated_at.to_rfc3339()) .execute(&self.pool) .await .map_err(|e| DomainError::RepositoryError(e.to_string()))?; Ok(()) } async fn find_auto_schedule_enabled(&self) -> DomainResult> { let sql = format!( "SELECT {SELECT_COLS} FROM channels WHERE auto_schedule = 1 ORDER BY created_at ASC" ); let rows: Vec = sqlx::query_as(&sql) .fetch_all(&self.pool) .await .map_err(|e| DomainError::RepositoryError(e.to_string()))?; rows.into_iter().map(Channel::try_from).collect() } async fn delete(&self, id: ChannelId) -> DomainResult<()> { sqlx::query("DELETE FROM channels WHERE id = ?") .bind(id.to_string()) .execute(&self.pool) .await .map_err(|e| DomainError::RepositoryError(e.to_string()))?; Ok(()) } async fn save_config_snapshot( &self, channel_id: ChannelId, config: &ScheduleConfig, label: Option, ) -> DomainResult { let id = Uuid::new_v4(); let now = Utc::now(); let config_json = serde_json::to_string(config) .map_err(|e| DomainError::RepositoryError(e.to_string()))?; let mut tx = self.pool.begin().await .map_err(|e| DomainError::RepositoryError(e.to_string()))?; let version_num: i64 = sqlx::query_scalar( "SELECT COALESCE(MAX(version_num), 0) + 1 FROM channel_config_snapshots WHERE channel_id = ?" ) .bind(channel_id.to_string()) .fetch_one(&mut *tx) .await .map_err(|e| DomainError::RepositoryError(e.to_string()))?; sqlx::query( "INSERT INTO channel_config_snapshots (id, channel_id, config_json, version_num, label, created_at) VALUES (?, ?, ?, ?, ?, ?)" ) .bind(id.to_string()) .bind(channel_id.to_string()) .bind(&config_json) .bind(version_num) .bind(&label) .bind(now.to_rfc3339()) .execute(&mut *tx) .await .map_err(|e| DomainError::RepositoryError(e.to_string()))?; tx.commit().await.map_err(|e| DomainError::RepositoryError(e.to_string()))?; Ok(ChannelConfigSnapshot { id, channel_id, config: config.clone(), version_num, label, created_at: now }) } async fn list_config_snapshots( &self, channel_id: ChannelId, ) -> DomainResult> { let rows = sqlx::query( "SELECT id, config_json, version_num, label, created_at FROM channel_config_snapshots WHERE channel_id = ? ORDER BY version_num DESC" ) .bind(channel_id.to_string()) .fetch_all(&self.pool) .await .map_err(|e| DomainError::RepositoryError(e.to_string()))?; rows.iter().map(|row| { let id: Uuid = row.get::("id").parse() .map_err(|_| DomainError::RepositoryError("bad uuid".into()))?; let config_json: String = row.get("config_json"); let config_compat: ScheduleConfigCompat = serde_json::from_str(&config_json) .map_err(|e| DomainError::RepositoryError(e.to_string()))?; let config: ScheduleConfig = config_compat.into(); let version_num: i64 = row.get("version_num"); let label: Option = row.get("label"); let created_at_str: String = row.get("created_at"); let created_at = created_at_str.parse::>() .map_err(|e| DomainError::RepositoryError(e.to_string()))?; Ok(ChannelConfigSnapshot { id, channel_id, config, version_num, label, created_at }) }).collect() } async fn get_config_snapshot( &self, channel_id: ChannelId, snapshot_id: Uuid, ) -> DomainResult> { let row = sqlx::query( "SELECT id, config_json, version_num, label, created_at FROM channel_config_snapshots WHERE id = ? AND channel_id = ?" ) .bind(snapshot_id.to_string()) .bind(channel_id.to_string()) .fetch_optional(&self.pool) .await .map_err(|e| DomainError::RepositoryError(e.to_string()))?; match row { None => Ok(None), Some(row) => { let config_json: String = row.get("config_json"); let config_compat: ScheduleConfigCompat = serde_json::from_str(&config_json) .map_err(|e| DomainError::RepositoryError(e.to_string()))?; let config: ScheduleConfig = config_compat.into(); let version_num: i64 = row.get("version_num"); let label: Option = row.get("label"); let created_at_str: String = row.get("created_at"); let created_at = created_at_str.parse::>() .map_err(|e| DomainError::RepositoryError(e.to_string()))?; Ok(Some(ChannelConfigSnapshot { id: snapshot_id, channel_id, config, version_num, label, created_at })) } } } async fn patch_config_snapshot_label( &self, channel_id: ChannelId, snapshot_id: Uuid, label: Option, ) -> DomainResult> { let updated = sqlx::query( "UPDATE channel_config_snapshots SET label = ? WHERE id = ? AND channel_id = ? RETURNING id" ) .bind(&label) .bind(snapshot_id.to_string()) .bind(channel_id.to_string()) .fetch_optional(&self.pool) .await .map_err(|e| DomainError::RepositoryError(e.to_string()))?; if updated.is_none() { return Ok(None); } self.get_config_snapshot(channel_id, snapshot_id).await } }