//! Domain Services //! //! Services contain the business logic of the application. use std::collections::{HashMap, HashSet}; use std::sync::Arc; use chrono::{DateTime, Duration, TimeZone, Utc}; use chrono_tz::Tz; use rand::seq::SliceRandom; use uuid::Uuid; use crate::entities::{ BlockContent, CurrentBroadcast, GeneratedSchedule, MediaItem, PlaybackRecord, ProgrammingBlock, ScheduledSlot, }; use crate::errors::{DomainError, DomainResult}; use crate::ports::IMediaProvider; use crate::repositories::{ChannelRepository, ScheduleRepository, UserRepository}; use crate::value_objects::{ BlockId, ChannelId, Email, FillStrategy, MediaFilter, MediaItemId, RecyclePolicy, }; // ============================================================================ // UserService // ============================================================================ /// Service for managing users. pub struct UserService { user_repository: Arc, } impl UserService { pub fn new(user_repository: Arc) -> Self { Self { user_repository } } pub async fn find_or_create(&self, subject: &str, email: &str) -> DomainResult { if let Some(user) = self.user_repository.find_by_subject(subject).await? { return Ok(user); } if let Some(mut user) = self.user_repository.find_by_email(email).await? { if user.subject != subject { user.subject = subject.to_string(); self.user_repository.save(&user).await?; } return Ok(user); } let email = Email::try_from(email)?; let user = crate::entities::User::new(subject, email); self.user_repository.save(&user).await?; Ok(user) } pub async fn find_by_id(&self, id: Uuid) -> DomainResult { self.user_repository .find_by_id(id) .await? .ok_or(DomainError::UserNotFound(id)) } pub async fn find_by_email(&self, email: &str) -> DomainResult> { self.user_repository.find_by_email(email).await } pub async fn create_local( &self, email: &str, password_hash: &str, ) -> DomainResult { let email = Email::try_from(email)?; let user = crate::entities::User::new_local(email, password_hash); self.user_repository.save(&user).await?; Ok(user) } } // ============================================================================ // ChannelService // ============================================================================ /// Service for managing channels (CRUD + ownership enforcement). pub struct ChannelService { channel_repo: Arc, } impl ChannelService { pub fn new(channel_repo: Arc) -> Self { Self { channel_repo } } pub async fn create( &self, owner_id: crate::value_objects::UserId, name: &str, timezone: &str, ) -> DomainResult { let channel = crate::entities::Channel::new(owner_id, name, timezone); self.channel_repo.save(&channel).await?; Ok(channel) } pub async fn find_by_id( &self, id: ChannelId, ) -> DomainResult { self.channel_repo .find_by_id(id) .await? .ok_or(DomainError::ChannelNotFound(id)) } pub async fn find_all(&self) -> DomainResult> { self.channel_repo.find_all().await } pub async fn find_by_owner( &self, owner_id: crate::value_objects::UserId, ) -> DomainResult> { self.channel_repo.find_by_owner(owner_id).await } pub async fn update( &self, channel: crate::entities::Channel, ) -> DomainResult { self.channel_repo.save(&channel).await?; Ok(channel) } /// Delete a channel, enforcing that `requester_id` is the owner. pub async fn delete( &self, id: ChannelId, requester_id: crate::value_objects::UserId, ) -> DomainResult<()> { let channel = self.find_by_id(id).await?; if channel.owner_id != requester_id { return Err(DomainError::forbidden("You don't own this channel")); } self.channel_repo.delete(id).await } } // ============================================================================ // ScheduleEngineService // ============================================================================ /// Core scheduling engine. /// /// Generates 48-hour broadcast schedules by walking through a channel's /// `ScheduleConfig` day by day, resolving each `ProgrammingBlock` into concrete /// `ScheduledSlot`s via the `IMediaProvider`, and applying the `RecyclePolicy` /// to avoid replaying recently aired items. pub struct ScheduleEngineService { media_provider: Arc, channel_repo: Arc, schedule_repo: Arc, } impl ScheduleEngineService { pub fn new( media_provider: Arc, channel_repo: Arc, schedule_repo: Arc, ) -> Self { Self { media_provider, channel_repo, schedule_repo, } } // ------------------------------------------------------------------------- // Public API // ------------------------------------------------------------------------- /// Generate and persist a 48-hour schedule for `channel_id` starting at `from`. /// /// The algorithm: /// 1. Walk each calendar day in the 48-hour window. /// 2. For each `ProgrammingBlock`, compute its UTC wall-clock interval for that day. /// 3. Clip the interval to `[from, from + 48h)`. /// 4. Resolve the block content via the media provider, applying the recycle policy. /// 5. For `Sequential` blocks, resume from where the previous generation left off /// (series continuity — see `fill_sequential`). /// 6. Record every played item in the playback history. /// /// Gaps between blocks are left empty — clients render them as a no-signal state. pub async fn generate_schedule( &self, channel_id: ChannelId, from: DateTime, ) -> DomainResult { let channel = self .channel_repo .find_by_id(channel_id) .await? .ok_or(DomainError::ChannelNotFound(channel_id))?; let tz: Tz = channel .timezone .parse() .map_err(|_| DomainError::TimezoneError(channel.timezone.clone()))?; let history = self .schedule_repo .find_playback_history(channel_id) .await?; // Load the most recent schedule for two purposes: // 1. Derive the next generation number. // 2. Know where each Sequential block left off (series continuity). let latest_schedule = self.schedule_repo.find_latest(channel_id).await?; let generation = latest_schedule .as_ref() .map(|s| s.generation + 1) .unwrap_or(1); // Build the initial per-block continuity map from the previous generation's // last slot per block. The map is updated as each block occurrence is resolved // within this generation so that the second day of a 48h schedule continues // from where the first day ended. let mut block_continuity: HashMap = latest_schedule .iter() .flat_map(|s| &s.slots) .fold(HashMap::new(), |mut map, slot| { // keep only the *last* slot per block (slots are sorted ascending) map.insert(slot.source_block_id, slot.item.id.clone()); map }); let valid_from = from; let valid_until = from + Duration::hours(48); let start_date = from.with_timezone(&tz).date_naive(); let end_date = valid_until.with_timezone(&tz).date_naive(); let mut slots: Vec = Vec::new(); let mut current_date = start_date; while current_date <= end_date { for block in &channel.schedule_config.blocks { let naive_start = current_date.and_time(block.start_time); // `earliest()` handles DST gaps — if the local time doesn't exist // (e.g. clocks spring forward) we skip this block occurrence. let block_start_utc = match tz.from_local_datetime(&naive_start).earliest() { Some(dt) => dt.with_timezone(&Utc), None => continue, }; let block_end_utc = block_start_utc + Duration::minutes(block.duration_mins as i64); // Clip to the 48-hour window. let slot_start = block_start_utc.max(valid_from); let slot_end = block_end_utc.min(valid_until); if slot_end <= slot_start { continue; } // For Sequential blocks: resume from the last item aired in this block. let last_item_id = block_continuity.get(&block.id); let mut block_slots = self .resolve_block( block, slot_start, slot_end, &history, &channel.recycle_policy, generation, last_item_id, ) .await?; // Update continuity so the next occurrence of this block (same // generation, next calendar day) continues from here. if let Some(last_slot) = block_slots.last() { block_continuity.insert(block.id, last_slot.item.id.clone()); } slots.append(&mut block_slots); } current_date = current_date.succ_opt().ok_or_else(|| { DomainError::validation("Date overflow during schedule generation") })?; } // Blocks in ScheduleConfig are not required to be sorted; sort resolved slots. slots.sort_by_key(|s| s.start_at); let schedule = GeneratedSchedule { id: Uuid::new_v4(), channel_id, valid_from, valid_until, generation, slots, }; self.schedule_repo.save(&schedule).await?; // Persist playback history so the recycle policy has data for next generation. for slot in &schedule.slots { let record = PlaybackRecord::new(channel_id, slot.item.id.clone(), generation); self.schedule_repo.save_playback_record(&record).await?; } Ok(schedule) } /// Determine what is currently broadcasting on a schedule. /// /// Returns `None` when `now` falls in a gap between blocks — the client /// should display a no-signal / static screen in that case. pub fn get_current_broadcast( schedule: &GeneratedSchedule, now: DateTime, ) -> Option { schedule .slots .iter() .find(|s| s.start_at <= now && now < s.end_at) .map(|slot| CurrentBroadcast { slot: slot.clone(), offset_secs: (now - slot.start_at).num_seconds() as u32, }) } /// Look up the schedule currently active at `at` without generating a new one. pub async fn get_active_schedule( &self, channel_id: ChannelId, at: DateTime, ) -> DomainResult> { self.schedule_repo.find_active(channel_id, at).await } /// Delegate stream URL resolution to the configured media provider. pub async fn get_stream_url(&self, item_id: &MediaItemId) -> DomainResult { self.media_provider.get_stream_url(item_id).await } /// Return all slots that overlap the given time window — the EPG data. pub fn get_epg<'a>( schedule: &'a GeneratedSchedule, from: DateTime, until: DateTime, ) -> Vec<&'a ScheduledSlot> { schedule .slots .iter() .filter(|s| s.start_at < until && s.end_at > from) .collect() } // ------------------------------------------------------------------------- // Block resolution // ------------------------------------------------------------------------- async fn resolve_block( &self, block: &ProgrammingBlock, start: DateTime, end: DateTime, history: &[PlaybackRecord], policy: &RecyclePolicy, generation: u32, last_item_id: Option<&MediaItemId>, ) -> DomainResult> { match &block.content { BlockContent::Manual { items } => { self.resolve_manual(items, start, end, block.id).await } BlockContent::Algorithmic { filter, strategy } => { self.resolve_algorithmic( filter, strategy, start, end, history, policy, generation, block.id, last_item_id, ) .await } } } /// Resolve a manual block by fetching each hand-picked item in order. /// Stops when the block's time budget (`end`) is exhausted. async fn resolve_manual( &self, item_ids: &[MediaItemId], start: DateTime, end: DateTime, block_id: BlockId, ) -> DomainResult> { let mut slots = Vec::new(); let mut cursor = start; for item_id in item_ids { if cursor >= end { break; } if let Some(item) = self.media_provider.fetch_by_id(item_id).await? { let item_end = (cursor + Duration::seconds(item.duration_secs as i64)).min(end); slots.push(ScheduledSlot { id: Uuid::new_v4(), start_at: cursor, end_at: item_end, item, source_block_id: block_id, }); cursor = item_end; } // If item is not found (deleted/unavailable), silently skip it. } Ok(slots) } /// Resolve an algorithmic block: fetch candidates, apply recycle policy, /// run the fill strategy, and build slots. /// /// `last_item_id` is the ID of the last item scheduled in this block in the /// previous generation. Used only by `Sequential` for series continuity. async fn resolve_algorithmic( &self, filter: &MediaFilter, strategy: &FillStrategy, start: DateTime, end: DateTime, history: &[PlaybackRecord], policy: &RecyclePolicy, generation: u32, block_id: BlockId, last_item_id: Option<&MediaItemId>, ) -> DomainResult> { // `candidates` — all items matching the filter, in provider order. // Kept separate from `pool` so Sequential can rotate through the full // ordered list while still honouring cooldowns. let candidates = self.media_provider.fetch_items(filter).await?; if candidates.is_empty() { return Ok(vec![]); } let pool = Self::apply_recycle_policy(&candidates, history, policy, generation); let target_secs = (end - start).num_seconds() as u32; let selected = Self::fill_block(&candidates, &pool, target_secs, strategy, last_item_id); let mut slots = Vec::new(); let mut cursor = start; for item in selected { if cursor >= end { break; } let item_end = (cursor + Duration::seconds(item.duration_secs as i64)).min(end); slots.push(ScheduledSlot { id: Uuid::new_v4(), start_at: cursor, end_at: item_end, item: item.clone(), source_block_id: block_id, }); cursor = item_end; } Ok(slots) } // ------------------------------------------------------------------------- // Recycle policy // ------------------------------------------------------------------------- /// Filter `candidates` according to `policy`, returning the eligible pool. /// /// An item is on cooldown if *either* the day-based or generation-based /// threshold is exceeded. If honouring all cooldowns would leave fewer items /// than `policy.min_available_ratio` of the total, all cooldowns are waived /// and the full pool is returned (prevents small libraries from stalling). fn apply_recycle_policy( candidates: &[MediaItem], history: &[PlaybackRecord], policy: &RecyclePolicy, current_generation: u32, ) -> Vec { let now = Utc::now(); let excluded: HashSet = history .iter() .filter(|record| { let by_days = policy .cooldown_days .map(|days| (now - record.played_at).num_days() < days as i64) .unwrap_or(false); let by_gen = policy .cooldown_generations .map(|gens| { current_generation.saturating_sub(record.generation) < gens }) .unwrap_or(false); by_days || by_gen }) .map(|r| r.item_id.clone()) .collect(); let available: Vec = candidates .iter() .filter(|i| !excluded.contains(&i.id)) .cloned() .collect(); let min_count = (candidates.len() as f32 * policy.min_available_ratio).ceil() as usize; if available.len() < min_count { // Pool too small after applying cooldowns — recycle everything. candidates.to_vec() } else { available } } // ------------------------------------------------------------------------- // Fill strategies // ------------------------------------------------------------------------- fn fill_block<'a>( candidates: &'a [MediaItem], pool: &'a [MediaItem], target_secs: u32, strategy: &FillStrategy, last_item_id: Option<&MediaItemId>, ) -> Vec<&'a MediaItem> { match strategy { FillStrategy::BestFit => Self::fill_best_fit(pool, target_secs), FillStrategy::Sequential => { Self::fill_sequential(candidates, pool, target_secs, last_item_id) } FillStrategy::Random => { let mut indices: Vec = (0..pool.len()).collect(); indices.shuffle(&mut rand::thread_rng()); let mut remaining = target_secs; let mut result = Vec::new(); for i in indices { let item = &pool[i]; if item.duration_secs <= remaining { remaining -= item.duration_secs; result.push(item); } } result } } } /// Greedy bin-packing: at each step pick the longest item that still fits /// in the remaining budget, without repeating items within the same block. fn fill_best_fit(pool: &[MediaItem], target_secs: u32) -> Vec<&MediaItem> { let mut remaining = target_secs; let mut selected: Vec<&MediaItem> = Vec::new(); let mut used: HashSet = HashSet::new(); loop { let best = pool .iter() .enumerate() .filter(|(idx, item)| { !used.contains(idx) && item.duration_secs <= remaining }) .max_by_key(|(_, item)| item.duration_secs); match best { Some((idx, item)) => { remaining -= item.duration_secs; used.insert(idx); selected.push(item); } None => break, } } selected } /// Sequential fill with cross-generation series continuity. /// /// `candidates` — all items matching the filter, in Jellyfin's natural order /// (typically by season + episode number for TV shows). /// `pool` — candidates filtered by the recycle policy (eligible to air). /// `last_item_id` — the last item scheduled in this block in the previous /// generation or in an earlier occurrence of this block within /// the current generation. Used to resume the series from the /// next episode rather than restarting from episode 1. /// /// Algorithm: /// 1. Find `last_item_id`'s position in `candidates` and start from the next index. /// 2. Walk the full `candidates` list in order (wrapping around at the end), /// but only pick items that are in `pool` (i.e. not on cooldown). /// 3. Greedily fill the time budget with items in that order. /// /// This ensures episodes always air in series order, the series wraps correctly /// when the last episode has been reached, and cooldowns are still respected. fn fill_sequential<'a>( candidates: &'a [MediaItem], pool: &'a [MediaItem], target_secs: u32, last_item_id: Option<&MediaItemId>, ) -> Vec<&'a MediaItem> { if pool.is_empty() { return vec![]; } // Set of item IDs currently eligible to air. let available: HashSet<&MediaItemId> = pool.iter().map(|i| &i.id).collect(); // Find where in the full ordered list to resume. // Falls back to index 0 if last_item_id is absent or was removed from the library. let start_idx = last_item_id .and_then(|id| candidates.iter().position(|c| &c.id == id)) .map(|pos| (pos + 1) % candidates.len()) .unwrap_or(0); // Walk candidates in order from start_idx, wrapping around once, // skipping any that are on cooldown (not in `available`). let ordered: Vec<&MediaItem> = (0..candidates.len()) .map(|i| &candidates[(start_idx + i) % candidates.len()]) .filter(|item| available.contains(&item.id)) .collect(); // Greedily fill the block's time budget in episode order. let mut remaining = target_secs; let mut result = Vec::new(); for item in ordered { if item.duration_secs <= remaining { remaining -= item.duration_secs; result.push(item); } } result } }