//! SQLite and PostgreSQL adapters for ScheduleRepository use async_trait::async_trait; use chrono::{DateTime, Utc}; use sqlx::FromRow; use uuid::Uuid; use domain::{ ChannelId, DomainError, DomainResult, GeneratedSchedule, MediaItem, MediaItemId, PlaybackRecord, ScheduleRepository, ScheduledSlot, }; // ============================================================================ // Row types // ============================================================================ #[derive(Debug, FromRow)] struct ScheduleRow { id: String, channel_id: String, valid_from: String, valid_until: String, generation: i64, } #[derive(Debug, FromRow)] struct SlotRow { id: String, // schedule_id selected but only used to drive the JOIN; not needed for domain type #[allow(dead_code)] schedule_id: String, start_at: String, end_at: String, item: String, source_block_id: String, } #[derive(Debug, FromRow)] struct PlaybackRecordRow { id: String, channel_id: String, item_id: String, played_at: String, generation: i64, } // ============================================================================ // Mapping // ============================================================================ fn parse_dt(s: &str) -> Result, DomainError> { DateTime::parse_from_rfc3339(s) .map(|dt| dt.with_timezone(&Utc)) .or_else(|_| { chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S").map(|dt| dt.and_utc()) }) .map_err(|e| DomainError::RepositoryError(format!("Invalid datetime '{}': {}", s, e))) } fn map_slot_row(row: SlotRow) -> Result { let id = Uuid::parse_str(&row.id) .map_err(|e| DomainError::RepositoryError(format!("Invalid slot UUID: {}", e)))?; let source_block_id = Uuid::parse_str(&row.source_block_id) .map_err(|e| DomainError::RepositoryError(format!("Invalid block UUID: {}", e)))?; let item: MediaItem = serde_json::from_str(&row.item) .map_err(|e| DomainError::RepositoryError(format!("Invalid slot item JSON: {}", e)))?; Ok(ScheduledSlot { id, start_at: parse_dt(&row.start_at)?, end_at: parse_dt(&row.end_at)?, item, source_block_id, }) } fn map_schedule(row: ScheduleRow, slot_rows: Vec) -> Result { let id = Uuid::parse_str(&row.id) .map_err(|e| DomainError::RepositoryError(format!("Invalid schedule UUID: {}", e)))?; let channel_id = Uuid::parse_str(&row.channel_id) .map_err(|e| DomainError::RepositoryError(format!("Invalid channel UUID: {}", e)))?; let slots: Result, _> = slot_rows.into_iter().map(map_slot_row).collect(); Ok(GeneratedSchedule { id, channel_id, valid_from: parse_dt(&row.valid_from)?, valid_until: parse_dt(&row.valid_until)?, generation: row.generation as u32, slots: slots?, }) } impl TryFrom for PlaybackRecord { type Error = DomainError; fn try_from(row: PlaybackRecordRow) -> Result { let id = Uuid::parse_str(&row.id) .map_err(|e| DomainError::RepositoryError(format!("Invalid UUID: {}", e)))?; let channel_id = Uuid::parse_str(&row.channel_id) .map_err(|e| DomainError::RepositoryError(format!("Invalid channel UUID: {}", e)))?; Ok(PlaybackRecord { id, channel_id, item_id: MediaItemId::new(row.item_id), played_at: parse_dt(&row.played_at)?, generation: row.generation as u32, }) } } // ============================================================================ // SQLite adapter // ============================================================================ #[cfg(feature = "sqlite")] pub struct SqliteScheduleRepository { pool: sqlx::SqlitePool, } #[cfg(feature = "sqlite")] impl SqliteScheduleRepository { pub fn new(pool: sqlx::SqlitePool) -> Self { Self { pool } } async fn fetch_slots(&self, schedule_id: &str) -> DomainResult> { sqlx::query_as( "SELECT id, schedule_id, start_at, end_at, item, source_block_id \ FROM scheduled_slots WHERE schedule_id = ? ORDER BY start_at", ) .bind(schedule_id) .fetch_all(&self.pool) .await .map_err(|e| DomainError::RepositoryError(e.to_string())) } } #[cfg(feature = "sqlite")] #[async_trait] impl ScheduleRepository for SqliteScheduleRepository { async fn find_active( &self, channel_id: ChannelId, at: DateTime, ) -> DomainResult> { let at_str = at.to_rfc3339(); let row: Option = sqlx::query_as( "SELECT id, channel_id, valid_from, valid_until, generation \ FROM generated_schedules \ WHERE channel_id = ? AND valid_from <= ? AND valid_until > ? \ LIMIT 1", ) .bind(channel_id.to_string()) .bind(&at_str) .bind(&at_str) .fetch_optional(&self.pool) .await .map_err(|e| DomainError::RepositoryError(e.to_string()))?; match row { None => Ok(None), Some(r) => { let slots = self.fetch_slots(&r.id).await?; Some(map_schedule(r, slots)).transpose() } } } async fn find_latest(&self, channel_id: ChannelId) -> DomainResult> { let row: Option = sqlx::query_as( "SELECT id, channel_id, valid_from, valid_until, generation \ FROM generated_schedules \ WHERE channel_id = ? ORDER BY valid_from DESC LIMIT 1", ) .bind(channel_id.to_string()) .fetch_optional(&self.pool) .await .map_err(|e| DomainError::RepositoryError(e.to_string()))?; match row { None => Ok(None), Some(r) => { let slots = self.fetch_slots(&r.id).await?; Some(map_schedule(r, slots)).transpose() } } } async fn save(&self, schedule: &GeneratedSchedule) -> DomainResult<()> { // Upsert the schedule header sqlx::query( r#" INSERT INTO generated_schedules (id, channel_id, valid_from, valid_until, generation) VALUES (?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET valid_from = excluded.valid_from, valid_until = excluded.valid_until, generation = excluded.generation "#, ) .bind(schedule.id.to_string()) .bind(schedule.channel_id.to_string()) .bind(schedule.valid_from.to_rfc3339()) .bind(schedule.valid_until.to_rfc3339()) .bind(schedule.generation as i64) .execute(&self.pool) .await .map_err(|e| DomainError::RepositoryError(e.to_string()))?; // Replace all slots (delete-then-insert is safe here; schedule saves are // infrequent and atomic within a single-writer SQLite connection) sqlx::query("DELETE FROM scheduled_slots WHERE schedule_id = ?") .bind(schedule.id.to_string()) .execute(&self.pool) .await .map_err(|e| DomainError::RepositoryError(e.to_string()))?; for slot in &schedule.slots { let item_json = serde_json::to_string(&slot.item).map_err(|e| { DomainError::RepositoryError(format!("Failed to serialize slot item: {}", e)) })?; sqlx::query( r#" INSERT INTO scheduled_slots (id, schedule_id, start_at, end_at, item, source_block_id) VALUES (?, ?, ?, ?, ?, ?) "#, ) .bind(slot.id.to_string()) .bind(schedule.id.to_string()) .bind(slot.start_at.to_rfc3339()) .bind(slot.end_at.to_rfc3339()) .bind(&item_json) .bind(slot.source_block_id.to_string()) .execute(&self.pool) .await .map_err(|e| DomainError::RepositoryError(e.to_string()))?; } Ok(()) } async fn find_playback_history( &self, channel_id: ChannelId, ) -> DomainResult> { let rows: Vec = sqlx::query_as( "SELECT id, channel_id, item_id, played_at, generation \ FROM playback_records WHERE channel_id = ? ORDER BY played_at DESC", ) .bind(channel_id.to_string()) .fetch_all(&self.pool) .await .map_err(|e| DomainError::RepositoryError(e.to_string()))?; rows.into_iter().map(PlaybackRecord::try_from).collect() } async fn save_playback_record(&self, record: &PlaybackRecord) -> DomainResult<()> { sqlx::query( r#" INSERT INTO playback_records (id, channel_id, item_id, played_at, generation) VALUES (?, ?, ?, ?, ?) ON CONFLICT(id) DO NOTHING "#, ) .bind(record.id.to_string()) .bind(record.channel_id.to_string()) .bind(record.item_id.as_ref()) .bind(record.played_at.to_rfc3339()) .bind(record.generation as i64) .execute(&self.pool) .await .map_err(|e| DomainError::RepositoryError(e.to_string()))?; Ok(()) } } // ============================================================================ // PostgreSQL adapter // ============================================================================ #[cfg(feature = "postgres")] pub struct PostgresScheduleRepository { pool: sqlx::Pool, } #[cfg(feature = "postgres")] impl PostgresScheduleRepository { pub fn new(pool: sqlx::Pool) -> Self { Self { pool } } async fn fetch_slots(&self, schedule_id: &str) -> DomainResult> { sqlx::query_as( "SELECT id, schedule_id, start_at, end_at, item, source_block_id \ FROM scheduled_slots WHERE schedule_id = $1 ORDER BY start_at", ) .bind(schedule_id) .fetch_all(&self.pool) .await .map_err(|e| DomainError::RepositoryError(e.to_string())) } } #[cfg(feature = "postgres")] #[async_trait] impl ScheduleRepository for PostgresScheduleRepository { async fn find_active( &self, channel_id: ChannelId, at: DateTime, ) -> DomainResult> { let at_str = at.to_rfc3339(); let row: Option = sqlx::query_as( "SELECT id, channel_id, valid_from, valid_until, generation \ FROM generated_schedules \ WHERE channel_id = $1 AND valid_from <= $2 AND valid_until > $3 \ LIMIT 1", ) .bind(channel_id.to_string()) .bind(&at_str) .bind(&at_str) .fetch_optional(&self.pool) .await .map_err(|e| DomainError::RepositoryError(e.to_string()))?; match row { None => Ok(None), Some(r) => { let slots = self.fetch_slots(&r.id).await?; Some(map_schedule(r, slots)).transpose() } } } async fn find_latest(&self, channel_id: ChannelId) -> DomainResult> { let row: Option = sqlx::query_as( "SELECT id, channel_id, valid_from, valid_until, generation \ FROM generated_schedules \ WHERE channel_id = $1 ORDER BY valid_from DESC LIMIT 1", ) .bind(channel_id.to_string()) .fetch_optional(&self.pool) .await .map_err(|e| DomainError::RepositoryError(e.to_string()))?; match row { None => Ok(None), Some(r) => { let slots = self.fetch_slots(&r.id).await?; Some(map_schedule(r, slots)).transpose() } } } async fn save(&self, schedule: &GeneratedSchedule) -> DomainResult<()> { sqlx::query( r#" INSERT INTO generated_schedules (id, channel_id, valid_from, valid_until, generation) VALUES ($1, $2, $3, $4, $5) ON CONFLICT(id) DO UPDATE SET valid_from = EXCLUDED.valid_from, valid_until = EXCLUDED.valid_until, generation = EXCLUDED.generation "#, ) .bind(schedule.id.to_string()) .bind(schedule.channel_id.to_string()) .bind(schedule.valid_from.to_rfc3339()) .bind(schedule.valid_until.to_rfc3339()) .bind(schedule.generation as i64) .execute(&self.pool) .await .map_err(|e| DomainError::RepositoryError(e.to_string()))?; sqlx::query("DELETE FROM scheduled_slots WHERE schedule_id = $1") .bind(schedule.id.to_string()) .execute(&self.pool) .await .map_err(|e| DomainError::RepositoryError(e.to_string()))?; for slot in &schedule.slots { let item_json = serde_json::to_string(&slot.item).map_err(|e| { DomainError::RepositoryError(format!("Failed to serialize slot item: {}", e)) })?; sqlx::query( r#" INSERT INTO scheduled_slots (id, schedule_id, start_at, end_at, item, source_block_id) VALUES ($1, $2, $3, $4, $5, $6) "#, ) .bind(slot.id.to_string()) .bind(schedule.id.to_string()) .bind(slot.start_at.to_rfc3339()) .bind(slot.end_at.to_rfc3339()) .bind(&item_json) .bind(slot.source_block_id.to_string()) .execute(&self.pool) .await .map_err(|e| DomainError::RepositoryError(e.to_string()))?; } Ok(()) } async fn find_playback_history( &self, channel_id: ChannelId, ) -> DomainResult> { let rows: Vec = sqlx::query_as( "SELECT id, channel_id, item_id, played_at, generation \ FROM playback_records WHERE channel_id = $1 ORDER BY played_at DESC", ) .bind(channel_id.to_string()) .fetch_all(&self.pool) .await .map_err(|e| DomainError::RepositoryError(e.to_string()))?; rows.into_iter().map(PlaybackRecord::try_from).collect() } async fn save_playback_record(&self, record: &PlaybackRecord) -> DomainResult<()> { sqlx::query( r#" INSERT INTO playback_records (id, channel_id, item_id, played_at, generation) VALUES ($1, $2, $3, $4, $5) ON CONFLICT(id) DO NOTHING "#, ) .bind(record.id.to_string()) .bind(record.channel_id.to_string()) .bind(record.item_id.as_ref()) .bind(record.played_at.to_rfc3339()) .bind(record.generation as i64) .execute(&self.pool) .await .map_err(|e| DomainError::RepositoryError(e.to_string()))?; Ok(()) } }