feat: enhance schedule generation with series continuity for sequential blocks

This commit is contained in:
2026-03-11 22:42:44 +01:00
parent ee64fc0b8a
commit 20aed753d8
2 changed files with 100 additions and 16 deletions

View File

@@ -2,7 +2,7 @@
//! //!
//! Services contain the business logic of the application. //! Services contain the business logic of the application.
use std::collections::HashSet; use std::collections::{HashMap, HashSet};
use std::sync::Arc; use std::sync::Arc;
use chrono::{DateTime, Duration, TimeZone, Utc}; use chrono::{DateTime, Duration, TimeZone, Utc};
@@ -185,7 +185,9 @@ impl ScheduleEngineService {
/// 2. For each `ProgrammingBlock`, compute its UTC wall-clock interval for that day. /// 2. For each `ProgrammingBlock`, compute its UTC wall-clock interval for that day.
/// 3. Clip the interval to `[from, from + 48h)`. /// 3. Clip the interval to `[from, from + 48h)`.
/// 4. Resolve the block content via the media provider, applying the recycle policy. /// 4. Resolve the block content via the media provider, applying the recycle policy.
/// 5. Record every played item in the playback history. /// 5. For `Sequential` blocks, resume from where the previous generation left off
/// (series continuity — see `fill_sequential`).
/// 6. Record every played item in the playback history.
/// ///
/// Gaps between blocks are left empty — clients render them as a no-signal state. /// Gaps between blocks are left empty — clients render them as a no-signal state.
pub async fn generate_schedule( pub async fn generate_schedule(
@@ -209,13 +211,29 @@ impl ScheduleEngineService {
.find_playback_history(channel_id) .find_playback_history(channel_id)
.await?; .await?;
let generation = self // Load the most recent schedule for two purposes:
.schedule_repo // 1. Derive the next generation number.
.find_latest(channel_id) // 2. Know where each Sequential block left off (series continuity).
.await? let latest_schedule = self.schedule_repo.find_latest(channel_id).await?;
let generation = latest_schedule
.as_ref()
.map(|s| s.generation + 1) .map(|s| s.generation + 1)
.unwrap_or(1); .unwrap_or(1);
// Build the initial per-block continuity map from the previous generation's
// last slot per block. The map is updated as each block occurrence is resolved
// within this generation so that the second day of a 48h schedule continues
// from where the first day ended.
let mut block_continuity: HashMap<BlockId, MediaItemId> = latest_schedule
.iter()
.flat_map(|s| &s.slots)
.fold(HashMap::new(), |mut map, slot| {
// keep only the *last* slot per block (slots are sorted ascending)
map.insert(slot.source_block_id, slot.item.id.clone());
map
});
let valid_from = from; let valid_from = from;
let valid_until = from + Duration::hours(48); let valid_until = from + Duration::hours(48);
@@ -247,6 +265,9 @@ impl ScheduleEngineService {
continue; continue;
} }
// For Sequential blocks: resume from the last item aired in this block.
let last_item_id = block_continuity.get(&block.id);
let mut block_slots = self let mut block_slots = self
.resolve_block( .resolve_block(
block, block,
@@ -255,9 +276,16 @@ impl ScheduleEngineService {
&history, &history,
&channel.recycle_policy, &channel.recycle_policy,
generation, generation,
last_item_id,
) )
.await?; .await?;
// Update continuity so the next occurrence of this block (same
// generation, next calendar day) continues from here.
if let Some(last_slot) = block_slots.last() {
block_continuity.insert(block.id, last_slot.item.id.clone());
}
slots.append(&mut block_slots); slots.append(&mut block_slots);
} }
@@ -347,6 +375,7 @@ impl ScheduleEngineService {
history: &[PlaybackRecord], history: &[PlaybackRecord],
policy: &RecyclePolicy, policy: &RecyclePolicy,
generation: u32, generation: u32,
last_item_id: Option<&MediaItemId>,
) -> DomainResult<Vec<ScheduledSlot>> { ) -> DomainResult<Vec<ScheduledSlot>> {
match &block.content { match &block.content {
BlockContent::Manual { items } => { BlockContent::Manual { items } => {
@@ -354,7 +383,8 @@ impl ScheduleEngineService {
} }
BlockContent::Algorithmic { filter, strategy } => { BlockContent::Algorithmic { filter, strategy } => {
self.resolve_algorithmic( self.resolve_algorithmic(
filter, strategy, start, end, history, policy, generation, block.id, filter, strategy, start, end, history, policy, generation,
block.id, last_item_id,
) )
.await .await
} }
@@ -397,6 +427,9 @@ impl ScheduleEngineService {
/// Resolve an algorithmic block: fetch candidates, apply recycle policy, /// Resolve an algorithmic block: fetch candidates, apply recycle policy,
/// run the fill strategy, and build slots. /// run the fill strategy, and build slots.
///
/// `last_item_id` is the ID of the last item scheduled in this block in the
/// previous generation. Used only by `Sequential` for series continuity.
async fn resolve_algorithmic( async fn resolve_algorithmic(
&self, &self,
filter: &MediaFilter, filter: &MediaFilter,
@@ -407,16 +440,20 @@ impl ScheduleEngineService {
policy: &RecyclePolicy, policy: &RecyclePolicy,
generation: u32, generation: u32,
block_id: BlockId, block_id: BlockId,
last_item_id: Option<&MediaItemId>,
) -> DomainResult<Vec<ScheduledSlot>> { ) -> DomainResult<Vec<ScheduledSlot>> {
// `candidates` — all items matching the filter, in provider order.
// Kept separate from `pool` so Sequential can rotate through the full
// ordered list while still honouring cooldowns.
let candidates = self.media_provider.fetch_items(filter).await?; let candidates = self.media_provider.fetch_items(filter).await?;
if candidates.is_empty() { if candidates.is_empty() {
return Ok(vec![]); return Ok(vec![]);
} }
let pool = Self::apply_recycle_policy(candidates, history, policy, generation); let pool = Self::apply_recycle_policy(&candidates, history, policy, generation);
let target_secs = (end - start).num_seconds() as u32; let target_secs = (end - start).num_seconds() as u32;
let selected = Self::fill_block(&pool, target_secs, strategy); let selected = Self::fill_block(&candidates, &pool, target_secs, strategy, last_item_id);
let mut slots = Vec::new(); let mut slots = Vec::new();
let mut cursor = start; let mut cursor = start;
@@ -451,7 +488,7 @@ impl ScheduleEngineService {
/// than `policy.min_available_ratio` of the total, all cooldowns are waived /// than `policy.min_available_ratio` of the total, all cooldowns are waived
/// and the full pool is returned (prevents small libraries from stalling). /// and the full pool is returned (prevents small libraries from stalling).
fn apply_recycle_policy( fn apply_recycle_policy(
candidates: Vec<MediaItem>, candidates: &[MediaItem],
history: &[PlaybackRecord], history: &[PlaybackRecord],
policy: &RecyclePolicy, policy: &RecyclePolicy,
current_generation: u32, current_generation: u32,
@@ -489,7 +526,7 @@ impl ScheduleEngineService {
if available.len() < min_count { if available.len() < min_count {
// Pool too small after applying cooldowns — recycle everything. // Pool too small after applying cooldowns — recycle everything.
candidates candidates.to_vec()
} else { } else {
available available
} }
@@ -500,13 +537,17 @@ impl ScheduleEngineService {
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
fn fill_block<'a>( fn fill_block<'a>(
candidates: &'a [MediaItem],
pool: &'a [MediaItem], pool: &'a [MediaItem],
target_secs: u32, target_secs: u32,
strategy: &FillStrategy, strategy: &FillStrategy,
last_item_id: Option<&MediaItemId>,
) -> Vec<&'a MediaItem> { ) -> Vec<&'a MediaItem> {
match strategy { match strategy {
FillStrategy::BestFit => Self::fill_best_fit(pool, target_secs), FillStrategy::BestFit => Self::fill_best_fit(pool, target_secs),
FillStrategy::Sequential => Self::fill_sequential(pool, target_secs), FillStrategy::Sequential => {
Self::fill_sequential(candidates, pool, target_secs, last_item_id)
}
FillStrategy::Random => { FillStrategy::Random => {
let mut indices: Vec<usize> = (0..pool.len()).collect(); let mut indices: Vec<usize> = (0..pool.len()).collect();
indices.shuffle(&mut rand::thread_rng()); indices.shuffle(&mut rand::thread_rng());
@@ -553,12 +594,55 @@ impl ScheduleEngineService {
selected selected
} }
/// Sequential: iterate the pool in order, picking items that fit within /// Sequential fill with cross-generation series continuity.
/// the remaining budget. Good for series where episode order matters. ///
fn fill_sequential(pool: &[MediaItem], target_secs: u32) -> Vec<&MediaItem> { /// `candidates` — all items matching the filter, in Jellyfin's natural order
/// (typically by season + episode number for TV shows).
/// `pool` — candidates filtered by the recycle policy (eligible to air).
/// `last_item_id` — the last item scheduled in this block in the previous
/// generation or in an earlier occurrence of this block within
/// the current generation. Used to resume the series from the
/// next episode rather than restarting from episode 1.
///
/// Algorithm:
/// 1. Find `last_item_id`'s position in `candidates` and start from the next index.
/// 2. Walk the full `candidates` list in order (wrapping around at the end),
/// but only pick items that are in `pool` (i.e. not on cooldown).
/// 3. Greedily fill the time budget with items in that order.
///
/// This ensures episodes always air in series order, the series wraps correctly
/// when the last episode has been reached, and cooldowns are still respected.
fn fill_sequential<'a>(
candidates: &'a [MediaItem],
pool: &'a [MediaItem],
target_secs: u32,
last_item_id: Option<&MediaItemId>,
) -> Vec<&'a MediaItem> {
if pool.is_empty() {
return vec![];
}
// Set of item IDs currently eligible to air.
let available: HashSet<&MediaItemId> = pool.iter().map(|i| &i.id).collect();
// Find where in the full ordered list to resume.
// Falls back to index 0 if last_item_id is absent or was removed from the library.
let start_idx = last_item_id
.and_then(|id| candidates.iter().position(|c| &c.id == id))
.map(|pos| (pos + 1) % candidates.len())
.unwrap_or(0);
// Walk candidates in order from start_idx, wrapping around once,
// skipping any that are on cooldown (not in `available`).
let ordered: Vec<&MediaItem> = (0..candidates.len())
.map(|i| &candidates[(start_idx + i) % candidates.len()])
.filter(|item| available.contains(&item.id))
.collect();
// Greedily fill the block's time budget in episode order.
let mut remaining = target_secs; let mut remaining = target_secs;
let mut result = Vec::new(); let mut result = Vec::new();
for item in pool { for item in ordered {
if item.duration_secs <= remaining { if item.duration_secs <= remaining {
remaining -= item.duration_secs; remaining -= item.duration_secs;
result.push(item); result.push(item);

Binary file not shown.

After

Width:  |  Height:  |  Size: 48 KiB