diff --git a/k-tv-backend/api/src/poller.rs b/k-tv-backend/api/src/poller.rs index a4f6075..0867cf7 100644 --- a/k-tv-backend/api/src/poller.rs +++ b/k-tv-backend/api/src/poller.rs @@ -139,8 +139,8 @@ mod tests { use chrono::{DateTime, Duration, Utc}; use domain::value_objects::{ChannelId, ContentType, UserId}; use domain::{ - Channel, ChannelRepository, Collection, DomainResult, GeneratedSchedule, IProviderRegistry, - MediaFilter, MediaItem, MediaItemId, PlaybackRecord, ProviderCapabilities, + BlockId, Channel, ChannelRepository, Collection, DomainResult, GeneratedSchedule, + IProviderRegistry, MediaFilter, MediaItem, MediaItemId, PlaybackRecord, ProviderCapabilities, ScheduleEngineService, ScheduleRepository, SeriesSummary, StreamQuality, }; use tokio::sync::broadcast; @@ -207,6 +207,12 @@ mod tests { async fn save_playback_record(&self, _record: &PlaybackRecord) -> DomainResult<()> { Ok(()) } + async fn find_last_slot_per_block( + &self, + _channel_id: ChannelId, + ) -> DomainResult> { + Ok(HashMap::new()) + } } struct MockRegistry; @@ -425,6 +431,12 @@ mod tests { async fn save_playback_record(&self, _: &PlaybackRecord) -> DomainResult<()> { Ok(()) } + async fn find_last_slot_per_block( + &self, + _: ChannelId, + ) -> DomainResult> { + Ok(HashMap::new()) + } } let now = Utc::now(); diff --git a/k-tv-backend/api/src/scheduler.rs b/k-tv-backend/api/src/scheduler.rs index d957531..6a7bda7 100644 --- a/k-tv-backend/api/src/scheduler.rs +++ b/k-tv-backend/api/src/scheduler.rs @@ -93,9 +93,10 @@ mod tests { use async_trait::async_trait; use chrono::{DateTime, Duration, Utc}; use domain::value_objects::{ChannelId, ContentType, UserId}; + use std::collections::HashMap; use domain::{ - Channel, ChannelRepository, Collection, DomainResult, GeneratedSchedule, IProviderRegistry, - MediaFilter, MediaItem, MediaItemId, PlaybackRecord, ProviderCapabilities, + BlockId, Channel, ChannelRepository, Collection, DomainResult, GeneratedSchedule, + IProviderRegistry, MediaFilter, MediaItem, MediaItemId, PlaybackRecord, ProviderCapabilities, ScheduleEngineService, ScheduleRepository, SeriesSummary, StreamQuality, }; use uuid::Uuid; @@ -161,6 +162,12 @@ mod tests { async fn save_playback_record(&self, _record: &PlaybackRecord) -> DomainResult<()> { Ok(()) } + async fn find_last_slot_per_block( + &self, + _channel_id: ChannelId, + ) -> DomainResult> { + Ok(HashMap::new()) + } } struct MockRegistry; diff --git a/k-tv-backend/domain/src/repositories.rs b/k-tv-backend/domain/src/repositories.rs index e8f0ede..ce81387 100644 --- a/k-tv-backend/domain/src/repositories.rs +++ b/k-tv-backend/domain/src/repositories.rs @@ -3,6 +3,8 @@ //! These traits define the interface for data persistence. //! Implementations live in the infra layer. +use std::collections::HashMap; + use async_trait::async_trait; use chrono::DateTime; use chrono::Utc; @@ -10,7 +12,7 @@ use uuid::Uuid; use crate::entities::{Channel, GeneratedSchedule, PlaybackRecord, User}; use crate::errors::DomainResult; -use crate::value_objects::{ChannelId, UserId}; +use crate::value_objects::{BlockId, ChannelId, MediaItemId, UserId}; /// An in-app activity event stored in the database for the admin log view. #[derive(Debug, Clone)] @@ -98,6 +100,13 @@ pub trait ScheduleRepository: Send + Sync { ) -> DomainResult>; async fn save_playback_record(&self, record: &PlaybackRecord) -> DomainResult<()>; + + /// Return the most recent slot per block_id across ALL schedules for a channel. + /// Resilient to any single generation having empty slots for a block. + async fn find_last_slot_per_block( + &self, + channel_id: ChannelId, + ) -> DomainResult>; } /// Repository port for activity log persistence. diff --git a/k-tv-backend/domain/src/services/schedule/fill.rs b/k-tv-backend/domain/src/services/schedule/fill.rs index a69a61d..b371d6b 100644 --- a/k-tv-backend/domain/src/services/schedule/fill.rs +++ b/k-tv-backend/domain/src/services/schedule/fill.rs @@ -127,12 +127,22 @@ pub(super) fn fill_sequential<'a>( }; // Greedily fill the block's time budget in episode order. + // Stop at the first episode that doesn't fit — skipping would break ordering. let mut remaining = target_secs; let mut result = Vec::new(); - for item in ordered { + for item in &ordered { if item.duration_secs <= remaining { remaining -= item.duration_secs; - result.push(item); + result.push(*item); + } else { + break; + } + } + // Edge case: if the very first episode is longer than the entire block, + // still include it — the slot builder clips it to block end via .min(end). + if result.is_empty() { + if let Some(&first) = ordered.first() { + result.push(first); } } result diff --git a/k-tv-backend/domain/src/services/schedule/mod.rs b/k-tv-backend/domain/src/services/schedule/mod.rs index 6e82d60..ed21a98 100644 --- a/k-tv-backend/domain/src/services/schedule/mod.rs +++ b/k-tv-backend/domain/src/services/schedule/mod.rs @@ -1,4 +1,3 @@ -use std::collections::HashMap; use std::sync::Arc; use chrono::{DateTime, Duration, TimeZone, Utc}; @@ -91,18 +90,15 @@ impl ScheduleEngineService { .map(|s| s.generation + 1) .unwrap_or(1); - // Build the initial per-block continuity map from the previous generation's - // last slot per block. The map is updated as each block occurrence is resolved - // within this generation so that the second day of a 48h schedule continues - // from where the first day ended. - let mut block_continuity: HashMap = latest_schedule - .iter() - .flat_map(|s| &s.slots) - .fold(HashMap::new(), |mut map, slot| { - // keep only the *last* slot per block (slots are sorted ascending) - map.insert(slot.source_block_id, slot.item.id.clone()); - map - }); + // Build the initial per-block continuity map from the most recent slot per + // block across ALL schedules. This is resilient to any single generation + // having empty slots for a block (e.g. provider returned nothing transiently). + // The map is updated as each block occurrence is resolved within this + // generation so the second day of a 48h schedule continues from here. + let mut block_continuity = self + .schedule_repo + .find_last_slot_per_block(channel_id) + .await?; let valid_from = from; let valid_until = from + Duration::hours(48); diff --git a/k-tv-backend/infra/src/jellyfin/provider.rs b/k-tv-backend/infra/src/jellyfin/provider.rs index 1efe5e3..c1307df 100644 --- a/k-tv-backend/infra/src/jellyfin/provider.rs +++ b/k-tv-backend/infra/src/jellyfin/provider.rs @@ -73,6 +73,10 @@ impl JellyfinMediaProvider { // requested — season first, then episode within the season. params.push(("SortBy", "ParentIndexNumber,IndexNumber".into())); params.push(("SortOrder", "Ascending".into())); + // Prevent Jellyfin from returning Season/Series container items. + if filter.content_type.is_none() { + params.push(("IncludeItemTypes", "Episode".into())); + } } else { // No series filter — scope to the collection (library) if one is set. if let Some(parent_id) = filter.collections.first() { diff --git a/k-tv-backend/infra/src/schedule_repository/mapping.rs b/k-tv-backend/infra/src/schedule_repository/mapping.rs index 25c0c1f..d8147cf 100644 --- a/k-tv-backend/infra/src/schedule_repository/mapping.rs +++ b/k-tv-backend/infra/src/schedule_repository/mapping.rs @@ -29,6 +29,12 @@ pub(super) struct SlotRow { pub source_block_id: String, } +#[derive(Debug, FromRow)] +pub(super) struct LastSlotRow { + pub source_block_id: String, + pub item: String, +} + #[derive(Debug, FromRow)] pub(super) struct PlaybackRecordRow { pub id: String, diff --git a/k-tv-backend/infra/src/schedule_repository/postgres.rs b/k-tv-backend/infra/src/schedule_repository/postgres.rs index 0cc93b2..c8be8af 100644 --- a/k-tv-backend/infra/src/schedule_repository/postgres.rs +++ b/k-tv-backend/infra/src/schedule_repository/postgres.rs @@ -1,9 +1,11 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; -use domain::{ChannelId, DomainError, DomainResult, GeneratedSchedule, PlaybackRecord, ScheduleRepository}; +use std::collections::HashMap; -use super::mapping::{map_schedule, PlaybackRecordRow, ScheduleRow, SlotRow}; +use domain::{BlockId, ChannelId, DomainError, DomainResult, GeneratedSchedule, MediaItemId, PlaybackRecord, ScheduleRepository}; + +use super::mapping::{map_schedule, LastSlotRow, PlaybackRecordRow, ScheduleRow, SlotRow}; pub struct PostgresScheduleRepository { pool: sqlx::Pool, @@ -143,6 +145,41 @@ impl ScheduleRepository for PostgresScheduleRepository { rows.into_iter().map(PlaybackRecord::try_from).collect() } + async fn find_last_slot_per_block( + &self, + channel_id: ChannelId, + ) -> DomainResult> { + let channel_id_str = channel_id.to_string(); + let rows: Vec = sqlx::query_as( + "SELECT ss.source_block_id, ss.item \ + FROM scheduled_slots ss \ + INNER JOIN generated_schedules gs ON gs.id = ss.schedule_id \ + WHERE gs.channel_id = $1 \ + AND ss.start_at = ( \ + SELECT MAX(ss2.start_at) \ + FROM scheduled_slots ss2 \ + INNER JOIN generated_schedules gs2 ON gs2.id = ss2.schedule_id \ + WHERE ss2.source_block_id = ss.source_block_id \ + AND gs2.channel_id = $2 \ + )", + ) + .bind(&channel_id_str) + .bind(&channel_id_str) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + + let mut map = HashMap::new(); + for row in rows { + let block_id = uuid::Uuid::parse_str(&row.source_block_id) + .map_err(|e| DomainError::RepositoryError(format!("Invalid block UUID: {}", e)))?; + let item: domain::MediaItem = serde_json::from_str(&row.item) + .map_err(|e| DomainError::RepositoryError(format!("Invalid slot item JSON: {}", e)))?; + map.insert(block_id, item.id); + } + Ok(map) + } + async fn save_playback_record(&self, record: &PlaybackRecord) -> DomainResult<()> { sqlx::query( r#" diff --git a/k-tv-backend/infra/src/schedule_repository/sqlite.rs b/k-tv-backend/infra/src/schedule_repository/sqlite.rs index 9fec40d..2f2dff1 100644 --- a/k-tv-backend/infra/src/schedule_repository/sqlite.rs +++ b/k-tv-backend/infra/src/schedule_repository/sqlite.rs @@ -1,9 +1,11 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; -use domain::{ChannelId, DomainError, DomainResult, GeneratedSchedule, PlaybackRecord, ScheduleRepository}; +use std::collections::HashMap; -use super::mapping::{map_schedule, PlaybackRecordRow, ScheduleRow, SlotRow}; +use domain::{BlockId, ChannelId, DomainError, DomainResult, GeneratedSchedule, MediaItemId, PlaybackRecord, ScheduleRepository}; + +use super::mapping::{map_schedule, LastSlotRow, PlaybackRecordRow, ScheduleRow, SlotRow}; pub struct SqliteScheduleRepository { pool: sqlx::SqlitePool, @@ -146,6 +148,41 @@ impl ScheduleRepository for SqliteScheduleRepository { rows.into_iter().map(PlaybackRecord::try_from).collect() } + async fn find_last_slot_per_block( + &self, + channel_id: ChannelId, + ) -> DomainResult> { + let channel_id_str = channel_id.to_string(); + let rows: Vec = sqlx::query_as( + "SELECT ss.source_block_id, ss.item \ + FROM scheduled_slots ss \ + INNER JOIN generated_schedules gs ON gs.id = ss.schedule_id \ + WHERE gs.channel_id = ? \ + AND ss.start_at = ( \ + SELECT MAX(ss2.start_at) \ + FROM scheduled_slots ss2 \ + INNER JOIN generated_schedules gs2 ON gs2.id = ss2.schedule_id \ + WHERE ss2.source_block_id = ss.source_block_id \ + AND gs2.channel_id = ? \ + )", + ) + .bind(&channel_id_str) + .bind(&channel_id_str) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + + let mut map = HashMap::new(); + for row in rows { + let block_id = uuid::Uuid::parse_str(&row.source_block_id) + .map_err(|e| DomainError::RepositoryError(format!("Invalid block UUID: {}", e)))?; + let item: domain::MediaItem = serde_json::from_str(&row.item) + .map_err(|e| DomainError::RepositoryError(format!("Invalid slot item JSON: {}", e)))?; + map.insert(block_id, item.id); + } + Ok(map) + } + async fn save_playback_record(&self, record: &PlaybackRecord) -> DomainResult<()> { sqlx::query( r#"