//! BroadcastPoller background task. //! //! Polls each channel that has a webhook_url configured. On each tick (every 1s) //! it checks which channels are due for a poll (elapsed >= webhook_poll_interval_secs) //! and emits BroadcastTransition or NoSignal events when the current slot changes. use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, Instant}; use chrono::Utc; use tokio::sync::broadcast; use tracing::error; use uuid::Uuid; use domain::{ChannelRepository, DomainError, DomainEvent, ScheduleEngineService}; /// Per-channel poller state. #[derive(Debug)] struct ChannelPollState { /// ID of the last slot we saw as current (None = no signal). last_slot_id: Option, /// Wall-clock instant of the last poll for this channel. last_checked: Instant, } /// Polls channels with webhook URLs and emits broadcast transition events. pub async fn run_broadcast_poller( schedule_engine: Arc, channel_repo: Arc, event_tx: broadcast::Sender, ) { let mut state: HashMap = HashMap::new(); loop { tokio::time::sleep(Duration::from_secs(1)).await; poll_tick(&schedule_engine, &channel_repo, &event_tx, &mut state).await; } } pub(crate) async fn poll_tick( schedule_engine: &Arc, channel_repo: &Arc, event_tx: &broadcast::Sender, state: &mut HashMap, ) { let channels = match channel_repo.find_all().await { Ok(c) => c, Err(e) => { error!("broadcast poller: failed to load channels: {}", e); return; } }; // Remove deleted channels from state let live_ids: std::collections::HashSet = channels.iter().map(|c| c.id).collect(); state.retain(|id, _| live_ids.contains(id)); let now = Utc::now(); for channel in channels { // Only poll channels with a configured webhook URL if channel.webhook_url.is_none() { state.remove(&channel.id); continue; } let poll_interval = Duration::from_secs(channel.webhook_poll_interval_secs as u64); let entry = state.entry(channel.id).or_insert_with(|| ChannelPollState { last_slot_id: None, last_checked: Instant::now() - poll_interval, // trigger immediately on first encounter }); if entry.last_checked.elapsed() < poll_interval { continue; // Not yet due for a poll } entry.last_checked = Instant::now(); // Find the current slot let current_slot_id = match schedule_engine.get_active_schedule(channel.id, now).await { Ok(Some(schedule)) => { schedule .slots .iter() .find(|s| s.start_at <= now && now < s.end_at) .map(|s| s.id) } Ok(None) => None, Err(DomainError::NoActiveSchedule(_)) => None, Err(DomainError::ChannelNotFound(_)) => { state.remove(&channel.id); continue; } Err(e) => { error!( "broadcast poller: error checking schedule for channel {}: {}", channel.id, e ); continue; } }; if current_slot_id == entry.last_slot_id { continue; } // State changed — emit appropriate event match ¤t_slot_id { Some(slot_id) => { if let Ok(Some(schedule)) = schedule_engine.get_active_schedule(channel.id, now).await { if let Some(slot) = schedule.slots.iter().find(|s| s.id == *slot_id).cloned() { let _ = event_tx.send(DomainEvent::BroadcastTransition { channel_id: channel.id, slot, }); } } } None => { let _ = event_tx.send(DomainEvent::NoSignal { channel_id: channel.id, }); } } entry.last_slot_id = current_slot_id; } } #[cfg(test)] mod tests { use super::*; use std::collections::HashMap; use std::sync::{Arc, Mutex}; use async_trait::async_trait; use chrono::{DateTime, Duration, Utc}; use domain::{ Channel, ChannelRepository, Collection, DomainResult, GeneratedSchedule, IProviderRegistry, MediaFilter, MediaItem, MediaItemId, PlaybackRecord, ProviderCapabilities, ScheduleEngineService, ScheduleRepository, SeriesSummary, StreamQuality, StreamingProtocol, }; use domain::value_objects::{ChannelId, ContentType, UserId}; use tokio::sync::broadcast; use uuid::Uuid; // ── Mocks ───────────────────────────────────────────────────────────────── struct MockChannelRepo { channels: Vec, } #[async_trait] impl ChannelRepository for MockChannelRepo { async fn find_by_id(&self, id: ChannelId) -> DomainResult> { Ok(self.channels.iter().find(|c| c.id == id).cloned()) } async fn find_by_owner(&self, _owner_id: UserId) -> DomainResult> { unimplemented!() } async fn find_all(&self) -> DomainResult> { Ok(self.channels.clone()) } async fn find_auto_schedule_enabled(&self) -> DomainResult> { unimplemented!() } async fn save(&self, _channel: &Channel) -> DomainResult<()> { unimplemented!() } async fn delete(&self, _id: ChannelId) -> DomainResult<()> { unimplemented!() } } struct MockScheduleRepo { active: Option, saved: Arc>>, } #[async_trait] impl ScheduleRepository for MockScheduleRepo { async fn find_active( &self, _channel_id: ChannelId, _at: DateTime, ) -> DomainResult> { Ok(self.active.clone()) } async fn find_latest(&self, _channel_id: ChannelId) -> DomainResult> { Ok(self.active.clone()) } async fn save(&self, schedule: &GeneratedSchedule) -> DomainResult<()> { self.saved.lock().unwrap().push(schedule.clone()); Ok(()) } async fn find_playback_history(&self, _channel_id: ChannelId) -> DomainResult> { Ok(vec![]) } async fn save_playback_record(&self, _record: &PlaybackRecord) -> DomainResult<()> { Ok(()) } } struct MockRegistry; #[async_trait] impl IProviderRegistry for MockRegistry { async fn fetch_items(&self, _provider_id: &str, _filter: &MediaFilter) -> DomainResult> { Ok(vec![]) } async fn fetch_by_id(&self, _item_id: &MediaItemId) -> DomainResult> { Ok(None) } async fn get_stream_url(&self, _item_id: &MediaItemId, _quality: &StreamQuality) -> DomainResult { unimplemented!() } fn provider_ids(&self) -> Vec { vec![] } fn primary_id(&self) -> &str { "" } fn capabilities(&self, _provider_id: &str) -> Option { None } async fn list_collections(&self, _provider_id: &str) -> DomainResult> { unimplemented!() } async fn list_series(&self, _provider_id: &str, _collection_id: Option<&str>) -> DomainResult> { unimplemented!() } async fn list_genres(&self, _provider_id: &str, _content_type: Option<&ContentType>) -> DomainResult> { unimplemented!() } } // ── Helpers ─────────────────────────────────────────────────────────────── fn make_channel_with_webhook(channel_id: Uuid) -> Channel { let mut ch = Channel::new(Uuid::new_v4(), "Test", "UTC"); ch.id = channel_id; ch.webhook_url = Some("http://example.com/hook".to_string()); ch.webhook_poll_interval_secs = 0; // always due ch } fn make_slot(channel_id: Uuid, slot_id: Uuid) -> domain::ScheduledSlot { use domain::entities::MediaItem; let now = Utc::now(); domain::ScheduledSlot { id: slot_id, start_at: now - Duration::minutes(1), end_at: now + Duration::minutes(29), item: MediaItem { id: MediaItemId::new("test-item"), title: "Test Movie".to_string(), content_type: ContentType::Movie, duration_secs: 1800, description: None, genres: vec![], year: None, tags: vec![], series_name: None, season_number: None, episode_number: None, }, source_block_id: Uuid::new_v4(), } } fn make_schedule(channel_id: Uuid, slots: Vec) -> GeneratedSchedule { let now = Utc::now(); GeneratedSchedule { id: Uuid::new_v4(), channel_id, valid_from: now - Duration::hours(1), valid_until: now + Duration::hours(47), generation: 1, slots, } } fn make_engine( channel_repo: Arc, schedule_repo: Arc, ) -> Arc { Arc::new(ScheduleEngineService::new( Arc::new(MockRegistry), channel_repo, schedule_repo, )) } // ── Tests ───────────────────────────────────────────────────────────────── #[tokio::test] async fn test_broadcast_transition_emitted_on_slot_change() { let channel_id = Uuid::new_v4(); let slot_id = Uuid::new_v4(); let ch = make_channel_with_webhook(channel_id); let slot = make_slot(channel_id, slot_id); let schedule = make_schedule(channel_id, vec![slot]); let channel_repo: Arc = Arc::new(MockChannelRepo { channels: vec![ch] }); let schedule_repo: Arc = Arc::new(MockScheduleRepo { active: Some(schedule), saved: Arc::new(Mutex::new(vec![])), }); let engine = make_engine(channel_repo.clone(), schedule_repo); let (event_tx, mut event_rx) = broadcast::channel(8); let mut state: HashMap = HashMap::new(); poll_tick(&engine, &channel_repo, &event_tx, &mut state).await; let event = event_rx.try_recv().expect("expected an event"); match event { DomainEvent::BroadcastTransition { channel_id: cid, slot: s } => { assert_eq!(cid, channel_id); assert_eq!(s.id, slot_id); } other => panic!("expected BroadcastTransition, got something else"), } } #[tokio::test] async fn test_no_event_when_slot_unchanged() { let channel_id = Uuid::new_v4(); let slot_id = Uuid::new_v4(); let ch = make_channel_with_webhook(channel_id); let slot = make_slot(channel_id, slot_id); let schedule = make_schedule(channel_id, vec![slot]); let channel_repo: Arc = Arc::new(MockChannelRepo { channels: vec![ch] }); let schedule_repo: Arc = Arc::new(MockScheduleRepo { active: Some(schedule), saved: Arc::new(Mutex::new(vec![])), }); let engine = make_engine(channel_repo.clone(), schedule_repo); let (event_tx, mut event_rx) = broadcast::channel(8); let mut state: HashMap = HashMap::new(); // First tick — emits BroadcastTransition poll_tick(&engine, &channel_repo, &event_tx, &mut state).await; let _ = event_rx.try_recv(); // Second tick — same slot, no event poll_tick(&engine, &channel_repo, &event_tx, &mut state).await; assert!( event_rx.try_recv().is_err(), "no event expected when slot unchanged" ); } #[tokio::test] async fn test_no_signal_emitted_when_slot_goes_to_none() { let channel_id = Uuid::new_v4(); let slot_id = Uuid::new_v4(); let ch = make_channel_with_webhook(channel_id); let slot = make_slot(channel_id, slot_id); let schedule_with_slot = make_schedule(channel_id, vec![slot]); // Repo that starts with a slot then returns empty schedule use std::sync::atomic::{AtomicBool, Ordering}; struct SwitchingScheduleRepo { first: GeneratedSchedule, second: GeneratedSchedule, called: AtomicBool, } #[async_trait] impl ScheduleRepository for SwitchingScheduleRepo { async fn find_active( &self, _channel_id: ChannelId, _at: DateTime, ) -> DomainResult> { if self.called.swap(true, Ordering::SeqCst) { Ok(Some(self.second.clone())) } else { Ok(Some(self.first.clone())) } } async fn find_latest(&self, _: ChannelId) -> DomainResult> { Ok(None) } async fn save(&self, _: &GeneratedSchedule) -> DomainResult<()> { Ok(()) } async fn find_playback_history(&self, _: ChannelId) -> DomainResult> { Ok(vec![]) } async fn save_playback_record(&self, _: &PlaybackRecord) -> DomainResult<()> { Ok(()) } } let now = Utc::now(); let empty_schedule = GeneratedSchedule { id: Uuid::new_v4(), channel_id, valid_from: now - Duration::hours(1), valid_until: now + Duration::hours(47), generation: 2, slots: vec![], // no current slot }; let channel_repo: Arc = Arc::new(MockChannelRepo { channels: vec![ch] }); let schedule_repo: Arc = Arc::new(SwitchingScheduleRepo { first: schedule_with_slot, second: empty_schedule, called: AtomicBool::new(false), }); let engine = make_engine(channel_repo.clone(), schedule_repo); let (event_tx, mut event_rx) = broadcast::channel(8); let mut state: HashMap = HashMap::new(); // First tick — emits BroadcastTransition (slot present) poll_tick(&engine, &channel_repo, &event_tx, &mut state).await; let _ = event_rx.try_recv(); // Second tick — schedule has no current slot, emits NoSignal poll_tick(&engine, &channel_repo, &event_tx, &mut state).await; let event = event_rx.try_recv().expect("expected NoSignal event"); match event { DomainEvent::NoSignal { channel_id: cid } => assert_eq!(cid, channel_id), _ => panic!("expected NoSignal"), } } }