use async_trait::async_trait; use chrono::{DateTime, Utc}; use std::collections::HashMap; use domain::{BlockId, ChannelId, DomainError, DomainResult, GeneratedSchedule, MediaItemId, PlaybackRecord, ScheduleRepository}; use super::mapping::{map_schedule, LastSlotRow, PlaybackRecordRow, ScheduleRow, SlotRow}; pub struct PostgresScheduleRepository { pool: sqlx::Pool, } impl PostgresScheduleRepository { pub fn new(pool: sqlx::Pool) -> Self { Self { pool } } async fn fetch_slots(&self, schedule_id: &str) -> DomainResult> { sqlx::query_as( "SELECT id, schedule_id, start_at, end_at, item, source_block_id \ FROM scheduled_slots WHERE schedule_id = $1 ORDER BY start_at", ) .bind(schedule_id) .fetch_all(&self.pool) .await .map_err(|e| DomainError::RepositoryError(e.to_string())) } } #[async_trait] impl ScheduleRepository for PostgresScheduleRepository { async fn find_active( &self, channel_id: ChannelId, at: DateTime, ) -> DomainResult> { let at_str = at.to_rfc3339(); let row: Option = sqlx::query_as( "SELECT id, channel_id, valid_from, valid_until, generation \ FROM generated_schedules \ WHERE channel_id = $1 AND valid_from <= $2 AND valid_until > $3 \ LIMIT 1", ) .bind(channel_id.to_string()) .bind(&at_str) .bind(&at_str) .fetch_optional(&self.pool) .await .map_err(|e| DomainError::RepositoryError(e.to_string()))?; match row { None => Ok(None), Some(r) => { let slots = self.fetch_slots(&r.id).await?; Some(map_schedule(r, slots)).transpose() } } } async fn find_latest(&self, channel_id: ChannelId) -> DomainResult> { let row: Option = sqlx::query_as( "SELECT id, channel_id, valid_from, valid_until, generation \ FROM generated_schedules \ WHERE channel_id = $1 ORDER BY valid_from DESC LIMIT 1", ) .bind(channel_id.to_string()) .fetch_optional(&self.pool) .await .map_err(|e| DomainError::RepositoryError(e.to_string()))?; match row { None => Ok(None), Some(r) => { let slots = self.fetch_slots(&r.id).await?; Some(map_schedule(r, slots)).transpose() } } } async fn save(&self, schedule: &GeneratedSchedule) -> DomainResult<()> { sqlx::query( r#" INSERT INTO generated_schedules (id, channel_id, valid_from, valid_until, generation) VALUES ($1, $2, $3, $4, $5) ON CONFLICT(id) DO UPDATE SET valid_from = EXCLUDED.valid_from, valid_until = EXCLUDED.valid_until, generation = EXCLUDED.generation "#, ) .bind(schedule.id.to_string()) .bind(schedule.channel_id.to_string()) .bind(schedule.valid_from.to_rfc3339()) .bind(schedule.valid_until.to_rfc3339()) .bind(schedule.generation as i64) .execute(&self.pool) .await .map_err(|e| DomainError::RepositoryError(e.to_string()))?; sqlx::query("DELETE FROM scheduled_slots WHERE schedule_id = $1") .bind(schedule.id.to_string()) .execute(&self.pool) .await .map_err(|e| DomainError::RepositoryError(e.to_string()))?; for slot in &schedule.slots { let item_json = serde_json::to_string(&slot.item).map_err(|e| { DomainError::RepositoryError(format!("Failed to serialize slot item: {}", e)) })?; sqlx::query( r#" INSERT INTO scheduled_slots (id, schedule_id, start_at, end_at, item, source_block_id) VALUES ($1, $2, $3, $4, $5, $6) "#, ) .bind(slot.id.to_string()) .bind(schedule.id.to_string()) .bind(slot.start_at.to_rfc3339()) .bind(slot.end_at.to_rfc3339()) .bind(&item_json) .bind(slot.source_block_id.to_string()) .execute(&self.pool) .await .map_err(|e| DomainError::RepositoryError(e.to_string()))?; } Ok(()) } async fn find_playback_history( &self, channel_id: ChannelId, ) -> DomainResult> { let rows: Vec = sqlx::query_as( "SELECT id, channel_id, item_id, played_at, generation \ FROM playback_records WHERE channel_id = $1 ORDER BY played_at DESC", ) .bind(channel_id.to_string()) .fetch_all(&self.pool) .await .map_err(|e| DomainError::RepositoryError(e.to_string()))?; rows.into_iter().map(PlaybackRecord::try_from).collect() } async fn find_last_slot_per_block( &self, channel_id: ChannelId, ) -> DomainResult> { let channel_id_str = channel_id.to_string(); let rows: Vec = sqlx::query_as( "SELECT ss.source_block_id, ss.item \ FROM scheduled_slots ss \ INNER JOIN generated_schedules gs ON gs.id = ss.schedule_id \ WHERE gs.channel_id = $1 \ AND ss.start_at = ( \ SELECT MAX(ss2.start_at) \ FROM scheduled_slots ss2 \ INNER JOIN generated_schedules gs2 ON gs2.id = ss2.schedule_id \ WHERE ss2.source_block_id = ss.source_block_id \ AND gs2.channel_id = $2 \ )", ) .bind(&channel_id_str) .bind(&channel_id_str) .fetch_all(&self.pool) .await .map_err(|e| DomainError::RepositoryError(e.to_string()))?; let mut map = HashMap::new(); for row in rows { let block_id = uuid::Uuid::parse_str(&row.source_block_id) .map_err(|e| DomainError::RepositoryError(format!("Invalid block UUID: {}", e)))?; let item: domain::MediaItem = serde_json::from_str(&row.item) .map_err(|e| DomainError::RepositoryError(format!("Invalid slot item JSON: {}", e)))?; map.insert(block_id, item.id); } Ok(map) } async fn save_playback_record(&self, record: &PlaybackRecord) -> DomainResult<()> { sqlx::query( r#" INSERT INTO playback_records (id, channel_id, item_id, played_at, generation) VALUES ($1, $2, $3, $4, $5) ON CONFLICT(id) DO NOTHING "#, ) .bind(record.id.to_string()) .bind(record.channel_id.to_string()) .bind(record.item_id.as_ref()) .bind(record.played_at.to_rfc3339()) .bind(record.generation as i64) .execute(&self.pool) .await .map_err(|e| DomainError::RepositoryError(e.to_string()))?; Ok(()) } }