//! Domain Services //! //! Services contain the business logic of the application. use std::collections::HashSet; use std::sync::Arc; use chrono::{DateTime, Duration, TimeZone, Utc}; use chrono_tz::Tz; use rand::seq::SliceRandom; use uuid::Uuid; use crate::entities::{ BlockContent, CurrentBroadcast, GeneratedSchedule, MediaItem, PlaybackRecord, ProgrammingBlock, ScheduledSlot, }; use crate::errors::{DomainError, DomainResult}; use crate::ports::IMediaProvider; use crate::repositories::{ChannelRepository, ScheduleRepository, UserRepository}; use crate::value_objects::{ BlockId, ChannelId, Email, FillStrategy, MediaFilter, MediaItemId, RecyclePolicy, }; // ============================================================================ // UserService // ============================================================================ /// Service for managing users. pub struct UserService { user_repository: Arc, } impl UserService { pub fn new(user_repository: Arc) -> Self { Self { user_repository } } pub async fn find_or_create(&self, subject: &str, email: &str) -> DomainResult { if let Some(user) = self.user_repository.find_by_subject(subject).await? { return Ok(user); } if let Some(mut user) = self.user_repository.find_by_email(email).await? { if user.subject != subject { user.subject = subject.to_string(); self.user_repository.save(&user).await?; } return Ok(user); } let email = Email::try_from(email)?; let user = crate::entities::User::new(subject, email); self.user_repository.save(&user).await?; Ok(user) } pub async fn find_by_id(&self, id: Uuid) -> DomainResult { self.user_repository .find_by_id(id) .await? .ok_or(DomainError::UserNotFound(id)) } pub async fn find_by_email(&self, email: &str) -> DomainResult> { self.user_repository.find_by_email(email).await } pub async fn create_local( &self, email: &str, password_hash: &str, ) -> DomainResult { let email = Email::try_from(email)?; let user = crate::entities::User::new_local(email, password_hash); self.user_repository.save(&user).await?; Ok(user) } } // ============================================================================ // ChannelService // ============================================================================ /// Service for managing channels (CRUD + ownership enforcement). pub struct ChannelService { channel_repo: Arc, } impl ChannelService { pub fn new(channel_repo: Arc) -> Self { Self { channel_repo } } pub async fn create( &self, owner_id: crate::value_objects::UserId, name: &str, timezone: &str, ) -> DomainResult { let channel = crate::entities::Channel::new(owner_id, name, timezone); self.channel_repo.save(&channel).await?; Ok(channel) } pub async fn find_by_id( &self, id: ChannelId, ) -> DomainResult { self.channel_repo .find_by_id(id) .await? .ok_or(DomainError::ChannelNotFound(id)) } pub async fn find_all(&self) -> DomainResult> { self.channel_repo.find_all().await } pub async fn find_by_owner( &self, owner_id: crate::value_objects::UserId, ) -> DomainResult> { self.channel_repo.find_by_owner(owner_id).await } pub async fn update( &self, channel: crate::entities::Channel, ) -> DomainResult { self.channel_repo.save(&channel).await?; Ok(channel) } /// Delete a channel, enforcing that `requester_id` is the owner. pub async fn delete( &self, id: ChannelId, requester_id: crate::value_objects::UserId, ) -> DomainResult<()> { let channel = self.find_by_id(id).await?; if channel.owner_id != requester_id { return Err(DomainError::forbidden("You don't own this channel")); } self.channel_repo.delete(id).await } } // ============================================================================ // ScheduleEngineService // ============================================================================ /// Core scheduling engine. /// /// Generates 48-hour broadcast schedules by walking through a channel's /// `ScheduleConfig` day by day, resolving each `ProgrammingBlock` into concrete /// `ScheduledSlot`s via the `IMediaProvider`, and applying the `RecyclePolicy` /// to avoid replaying recently aired items. pub struct ScheduleEngineService { media_provider: Arc, channel_repo: Arc, schedule_repo: Arc, } impl ScheduleEngineService { pub fn new( media_provider: Arc, channel_repo: Arc, schedule_repo: Arc, ) -> Self { Self { media_provider, channel_repo, schedule_repo, } } // ------------------------------------------------------------------------- // Public API // ------------------------------------------------------------------------- /// Generate and persist a 48-hour schedule for `channel_id` starting at `from`. /// /// The algorithm: /// 1. Walk each calendar day in the 48-hour window. /// 2. For each `ProgrammingBlock`, compute its UTC wall-clock interval for that day. /// 3. Clip the interval to `[from, from + 48h)`. /// 4. Resolve the block content via the media provider, applying the recycle policy. /// 5. Record every played item in the playback history. /// /// Gaps between blocks are left empty — clients render them as a no-signal state. pub async fn generate_schedule( &self, channel_id: ChannelId, from: DateTime, ) -> DomainResult { let channel = self .channel_repo .find_by_id(channel_id) .await? .ok_or(DomainError::ChannelNotFound(channel_id))?; let tz: Tz = channel .timezone .parse() .map_err(|_| DomainError::TimezoneError(channel.timezone.clone()))?; let history = self .schedule_repo .find_playback_history(channel_id) .await?; let generation = self .schedule_repo .find_latest(channel_id) .await? .map(|s| s.generation + 1) .unwrap_or(1); let valid_from = from; let valid_until = from + Duration::hours(48); let start_date = from.with_timezone(&tz).date_naive(); let end_date = valid_until.with_timezone(&tz).date_naive(); let mut slots: Vec = Vec::new(); let mut current_date = start_date; while current_date <= end_date { for block in &channel.schedule_config.blocks { let naive_start = current_date.and_time(block.start_time); // `earliest()` handles DST gaps — if the local time doesn't exist // (e.g. clocks spring forward) we skip this block occurrence. let block_start_utc = match tz.from_local_datetime(&naive_start).earliest() { Some(dt) => dt.with_timezone(&Utc), None => continue, }; let block_end_utc = block_start_utc + Duration::minutes(block.duration_mins as i64); // Clip to the 48-hour window. let slot_start = block_start_utc.max(valid_from); let slot_end = block_end_utc.min(valid_until); if slot_end <= slot_start { continue; } let mut block_slots = self .resolve_block( block, slot_start, slot_end, &history, &channel.recycle_policy, generation, ) .await?; slots.append(&mut block_slots); } current_date = current_date.succ_opt().ok_or_else(|| { DomainError::validation("Date overflow during schedule generation") })?; } // Blocks in ScheduleConfig are not required to be sorted; sort resolved slots. slots.sort_by_key(|s| s.start_at); let schedule = GeneratedSchedule { id: Uuid::new_v4(), channel_id, valid_from, valid_until, generation, slots, }; self.schedule_repo.save(&schedule).await?; // Persist playback history so the recycle policy has data for next generation. for slot in &schedule.slots { let record = PlaybackRecord::new(channel_id, slot.item.id.clone(), generation); self.schedule_repo.save_playback_record(&record).await?; } Ok(schedule) } /// Determine what is currently broadcasting on a schedule. /// /// Returns `None` when `now` falls in a gap between blocks — the client /// should display a no-signal / static screen in that case. pub fn get_current_broadcast( schedule: &GeneratedSchedule, now: DateTime, ) -> Option { schedule .slots .iter() .find(|s| s.start_at <= now && now < s.end_at) .map(|slot| CurrentBroadcast { slot: slot.clone(), offset_secs: (now - slot.start_at).num_seconds() as u32, }) } /// Look up the schedule currently active at `at` without generating a new one. pub async fn get_active_schedule( &self, channel_id: ChannelId, at: DateTime, ) -> DomainResult> { self.schedule_repo.find_active(channel_id, at).await } /// Delegate stream URL resolution to the configured media provider. pub async fn get_stream_url(&self, item_id: &MediaItemId) -> DomainResult { self.media_provider.get_stream_url(item_id).await } /// Return all slots that overlap the given time window — the EPG data. pub fn get_epg<'a>( schedule: &'a GeneratedSchedule, from: DateTime, until: DateTime, ) -> Vec<&'a ScheduledSlot> { schedule .slots .iter() .filter(|s| s.start_at < until && s.end_at > from) .collect() } // ------------------------------------------------------------------------- // Block resolution // ------------------------------------------------------------------------- async fn resolve_block( &self, block: &ProgrammingBlock, start: DateTime, end: DateTime, history: &[PlaybackRecord], policy: &RecyclePolicy, generation: u32, ) -> DomainResult> { match &block.content { BlockContent::Manual { items } => { self.resolve_manual(items, start, end, block.id).await } BlockContent::Algorithmic { filter, strategy } => { self.resolve_algorithmic( filter, strategy, start, end, history, policy, generation, block.id, ) .await } } } /// Resolve a manual block by fetching each hand-picked item in order. /// Stops when the block's time budget (`end`) is exhausted. async fn resolve_manual( &self, item_ids: &[MediaItemId], start: DateTime, end: DateTime, block_id: BlockId, ) -> DomainResult> { let mut slots = Vec::new(); let mut cursor = start; for item_id in item_ids { if cursor >= end { break; } if let Some(item) = self.media_provider.fetch_by_id(item_id).await? { let item_end = (cursor + Duration::seconds(item.duration_secs as i64)).min(end); slots.push(ScheduledSlot { id: Uuid::new_v4(), start_at: cursor, end_at: item_end, item, source_block_id: block_id, }); cursor = item_end; } // If item is not found (deleted/unavailable), silently skip it. } Ok(slots) } /// Resolve an algorithmic block: fetch candidates, apply recycle policy, /// run the fill strategy, and build slots. async fn resolve_algorithmic( &self, filter: &MediaFilter, strategy: &FillStrategy, start: DateTime, end: DateTime, history: &[PlaybackRecord], policy: &RecyclePolicy, generation: u32, block_id: BlockId, ) -> DomainResult> { let candidates = self.media_provider.fetch_items(filter).await?; if candidates.is_empty() { return Ok(vec![]); } let pool = Self::apply_recycle_policy(candidates, history, policy, generation); let target_secs = (end - start).num_seconds() as u32; let selected = Self::fill_block(&pool, target_secs, strategy); let mut slots = Vec::new(); let mut cursor = start; for item in selected { if cursor >= end { break; } let item_end = (cursor + Duration::seconds(item.duration_secs as i64)).min(end); slots.push(ScheduledSlot { id: Uuid::new_v4(), start_at: cursor, end_at: item_end, item: item.clone(), source_block_id: block_id, }); cursor = item_end; } Ok(slots) } // ------------------------------------------------------------------------- // Recycle policy // ------------------------------------------------------------------------- /// Filter `candidates` according to `policy`, returning the eligible pool. /// /// An item is on cooldown if *either* the day-based or generation-based /// threshold is exceeded. If honouring all cooldowns would leave fewer items /// than `policy.min_available_ratio` of the total, all cooldowns are waived /// and the full pool is returned (prevents small libraries from stalling). fn apply_recycle_policy( candidates: Vec, history: &[PlaybackRecord], policy: &RecyclePolicy, current_generation: u32, ) -> Vec { let now = Utc::now(); let excluded: HashSet = history .iter() .filter(|record| { let by_days = policy .cooldown_days .map(|days| (now - record.played_at).num_days() < days as i64) .unwrap_or(false); let by_gen = policy .cooldown_generations .map(|gens| { current_generation.saturating_sub(record.generation) < gens }) .unwrap_or(false); by_days || by_gen }) .map(|r| r.item_id.clone()) .collect(); let available: Vec = candidates .iter() .filter(|i| !excluded.contains(&i.id)) .cloned() .collect(); let min_count = (candidates.len() as f32 * policy.min_available_ratio).ceil() as usize; if available.len() < min_count { // Pool too small after applying cooldowns — recycle everything. candidates } else { available } } // ------------------------------------------------------------------------- // Fill strategies // ------------------------------------------------------------------------- fn fill_block<'a>( pool: &'a [MediaItem], target_secs: u32, strategy: &FillStrategy, ) -> Vec<&'a MediaItem> { match strategy { FillStrategy::BestFit => Self::fill_best_fit(pool, target_secs), FillStrategy::Sequential => Self::fill_sequential(pool, target_secs), FillStrategy::Random => { let mut indices: Vec = (0..pool.len()).collect(); indices.shuffle(&mut rand::thread_rng()); let mut remaining = target_secs; let mut result = Vec::new(); for i in indices { let item = &pool[i]; if item.duration_secs <= remaining { remaining -= item.duration_secs; result.push(item); } } result } } } /// Greedy bin-packing: at each step pick the longest item that still fits /// in the remaining budget, without repeating items within the same block. fn fill_best_fit(pool: &[MediaItem], target_secs: u32) -> Vec<&MediaItem> { let mut remaining = target_secs; let mut selected: Vec<&MediaItem> = Vec::new(); let mut used: HashSet = HashSet::new(); loop { let best = pool .iter() .enumerate() .filter(|(idx, item)| { !used.contains(idx) && item.duration_secs <= remaining }) .max_by_key(|(_, item)| item.duration_secs); match best { Some((idx, item)) => { remaining -= item.duration_secs; used.insert(idx); selected.push(item); } None => break, } } selected } /// Sequential: iterate the pool in order, picking items that fit within /// the remaining budget. Good for series where episode order matters. fn fill_sequential(pool: &[MediaItem], target_secs: u32) -> Vec<&MediaItem> { let mut remaining = target_secs; let mut result = Vec::new(); for item in pool { if item.duration_secs <= remaining { remaining -= item.duration_secs; result.push(item); } } result } }