From 5d8b8f7a5f14c407fdffac9740f9136d3bf8d608 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Mon, 16 Mar 2026 00:46:45 +0100 Subject: [PATCH] feat(webhooks): add tests for DomainEvent variants and poller state machine --- k-tv-backend/api/src/poller.rs | 304 +++++++++++++++++++++++++++++- k-tv-backend/domain/src/events.rs | 78 ++++++++ 2 files changed, 381 insertions(+), 1 deletion(-) diff --git a/k-tv-backend/api/src/poller.rs b/k-tv-backend/api/src/poller.rs index 4e12c0d..20a5d10 100644 --- a/k-tv-backend/api/src/poller.rs +++ b/k-tv-backend/api/src/poller.rs @@ -38,7 +38,7 @@ pub async fn run_broadcast_poller( } } -async fn poll_tick( +pub(crate) async fn poll_tick( schedule_engine: &Arc, channel_repo: &Arc, event_tx: &broadcast::Sender, @@ -128,3 +128,305 @@ async fn poll_tick( entry.last_slot_id = current_slot_id; } } + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::HashMap; + use std::sync::{Arc, Mutex}; + + use async_trait::async_trait; + use chrono::{DateTime, Duration, Utc}; + use domain::{ + Channel, ChannelRepository, Collection, DomainResult, GeneratedSchedule, IProviderRegistry, + MediaFilter, MediaItem, MediaItemId, PlaybackRecord, ProviderCapabilities, + ScheduleEngineService, ScheduleRepository, SeriesSummary, StreamQuality, StreamingProtocol, + }; + use domain::value_objects::{ChannelId, ContentType, UserId}; + use tokio::sync::broadcast; + use uuid::Uuid; + + // ── Mocks ───────────────────────────────────────────────────────────────── + + struct MockChannelRepo { + channels: Vec, + } + + #[async_trait] + impl ChannelRepository for MockChannelRepo { + async fn find_by_id(&self, id: ChannelId) -> DomainResult> { + Ok(self.channels.iter().find(|c| c.id == id).cloned()) + } + async fn find_by_owner(&self, _owner_id: UserId) -> DomainResult> { + unimplemented!() + } + async fn find_all(&self) -> DomainResult> { + Ok(self.channels.clone()) + } + async fn find_auto_schedule_enabled(&self) -> DomainResult> { + unimplemented!() + } + async fn save(&self, _channel: &Channel) -> DomainResult<()> { + unimplemented!() + } + async fn delete(&self, _id: ChannelId) -> DomainResult<()> { + unimplemented!() + } + } + + struct MockScheduleRepo { + active: Option, + saved: Arc>>, + } + + #[async_trait] + impl ScheduleRepository for MockScheduleRepo { + async fn find_active( + &self, + _channel_id: ChannelId, + _at: DateTime, + ) -> DomainResult> { + Ok(self.active.clone()) + } + async fn find_latest(&self, _channel_id: ChannelId) -> DomainResult> { + Ok(self.active.clone()) + } + async fn save(&self, schedule: &GeneratedSchedule) -> DomainResult<()> { + self.saved.lock().unwrap().push(schedule.clone()); + Ok(()) + } + async fn find_playback_history(&self, _channel_id: ChannelId) -> DomainResult> { + Ok(vec![]) + } + async fn save_playback_record(&self, _record: &PlaybackRecord) -> DomainResult<()> { + Ok(()) + } + } + + struct MockRegistry; + + #[async_trait] + impl IProviderRegistry for MockRegistry { + async fn fetch_items(&self, _provider_id: &str, _filter: &MediaFilter) -> DomainResult> { + Ok(vec![]) + } + async fn fetch_by_id(&self, _item_id: &MediaItemId) -> DomainResult> { + Ok(None) + } + async fn get_stream_url(&self, _item_id: &MediaItemId, _quality: &StreamQuality) -> DomainResult { + unimplemented!() + } + fn provider_ids(&self) -> Vec { + vec![] + } + fn primary_id(&self) -> &str { + "" + } + fn capabilities(&self, _provider_id: &str) -> Option { + None + } + async fn list_collections(&self, _provider_id: &str) -> DomainResult> { + unimplemented!() + } + async fn list_series(&self, _provider_id: &str, _collection_id: Option<&str>) -> DomainResult> { + unimplemented!() + } + async fn list_genres(&self, _provider_id: &str, _content_type: Option<&ContentType>) -> DomainResult> { + unimplemented!() + } + } + + // ── Helpers ─────────────────────────────────────────────────────────────── + + fn make_channel_with_webhook(channel_id: Uuid) -> Channel { + let mut ch = Channel::new(Uuid::new_v4(), "Test", "UTC"); + ch.id = channel_id; + ch.webhook_url = Some("http://example.com/hook".to_string()); + ch.webhook_poll_interval_secs = 0; // always due + ch + } + + fn make_slot(channel_id: Uuid, slot_id: Uuid) -> domain::ScheduledSlot { + use domain::entities::MediaItem; + let now = Utc::now(); + domain::ScheduledSlot { + id: slot_id, + start_at: now - Duration::minutes(1), + end_at: now + Duration::minutes(29), + item: MediaItem { + id: MediaItemId::new("test-item"), + title: "Test Movie".to_string(), + content_type: ContentType::Movie, + duration_secs: 1800, + description: None, + genres: vec![], + year: None, + tags: vec![], + series_name: None, + season_number: None, + episode_number: None, + }, + source_block_id: Uuid::new_v4(), + } + } + + fn make_schedule(channel_id: Uuid, slots: Vec) -> GeneratedSchedule { + let now = Utc::now(); + GeneratedSchedule { + id: Uuid::new_v4(), + channel_id, + valid_from: now - Duration::hours(1), + valid_until: now + Duration::hours(47), + generation: 1, + slots, + } + } + + fn make_engine( + channel_repo: Arc, + schedule_repo: Arc, + ) -> Arc { + Arc::new(ScheduleEngineService::new( + Arc::new(MockRegistry), + channel_repo, + schedule_repo, + )) + } + + // ── Tests ───────────────────────────────────────────────────────────────── + + #[tokio::test] + async fn test_broadcast_transition_emitted_on_slot_change() { + let channel_id = Uuid::new_v4(); + let slot_id = Uuid::new_v4(); + let ch = make_channel_with_webhook(channel_id); + let slot = make_slot(channel_id, slot_id); + let schedule = make_schedule(channel_id, vec![slot]); + + let channel_repo: Arc = + Arc::new(MockChannelRepo { channels: vec![ch] }); + let schedule_repo: Arc = Arc::new(MockScheduleRepo { + active: Some(schedule), + saved: Arc::new(Mutex::new(vec![])), + }); + let engine = make_engine(channel_repo.clone(), schedule_repo); + + let (event_tx, mut event_rx) = broadcast::channel(8); + let mut state: HashMap = HashMap::new(); + + poll_tick(&engine, &channel_repo, &event_tx, &mut state).await; + + let event = event_rx.try_recv().expect("expected an event"); + match event { + DomainEvent::BroadcastTransition { channel_id: cid, slot: s } => { + assert_eq!(cid, channel_id); + assert_eq!(s.id, slot_id); + } + other => panic!("expected BroadcastTransition, got something else"), + } + } + + #[tokio::test] + async fn test_no_event_when_slot_unchanged() { + let channel_id = Uuid::new_v4(); + let slot_id = Uuid::new_v4(); + let ch = make_channel_with_webhook(channel_id); + let slot = make_slot(channel_id, slot_id); + let schedule = make_schedule(channel_id, vec![slot]); + + let channel_repo: Arc = + Arc::new(MockChannelRepo { channels: vec![ch] }); + let schedule_repo: Arc = Arc::new(MockScheduleRepo { + active: Some(schedule), + saved: Arc::new(Mutex::new(vec![])), + }); + let engine = make_engine(channel_repo.clone(), schedule_repo); + + let (event_tx, mut event_rx) = broadcast::channel(8); + let mut state: HashMap = HashMap::new(); + + // First tick — emits BroadcastTransition + poll_tick(&engine, &channel_repo, &event_tx, &mut state).await; + let _ = event_rx.try_recv(); + + // Second tick — same slot, no event + poll_tick(&engine, &channel_repo, &event_tx, &mut state).await; + assert!( + event_rx.try_recv().is_err(), + "no event expected when slot unchanged" + ); + } + + #[tokio::test] + async fn test_no_signal_emitted_when_slot_goes_to_none() { + let channel_id = Uuid::new_v4(); + let slot_id = Uuid::new_v4(); + let ch = make_channel_with_webhook(channel_id); + let slot = make_slot(channel_id, slot_id); + let schedule_with_slot = make_schedule(channel_id, vec![slot]); + + // Repo that starts with a slot then returns empty schedule + use std::sync::atomic::{AtomicBool, Ordering}; + struct SwitchingScheduleRepo { + first: GeneratedSchedule, + second: GeneratedSchedule, + called: AtomicBool, + } + #[async_trait] + impl ScheduleRepository for SwitchingScheduleRepo { + async fn find_active( + &self, + _channel_id: ChannelId, + _at: DateTime, + ) -> DomainResult> { + if self.called.swap(true, Ordering::SeqCst) { + Ok(Some(self.second.clone())) + } else { + Ok(Some(self.first.clone())) + } + } + async fn find_latest(&self, _: ChannelId) -> DomainResult> { + Ok(None) + } + async fn save(&self, _: &GeneratedSchedule) -> DomainResult<()> { Ok(()) } + async fn find_playback_history(&self, _: ChannelId) -> DomainResult> { + Ok(vec![]) + } + async fn save_playback_record(&self, _: &PlaybackRecord) -> DomainResult<()> { Ok(()) } + } + + let now = Utc::now(); + let empty_schedule = GeneratedSchedule { + id: Uuid::new_v4(), + channel_id, + valid_from: now - Duration::hours(1), + valid_until: now + Duration::hours(47), + generation: 2, + slots: vec![], // no current slot + }; + + let channel_repo: Arc = + Arc::new(MockChannelRepo { channels: vec![ch] }); + let schedule_repo: Arc = Arc::new(SwitchingScheduleRepo { + first: schedule_with_slot, + second: empty_schedule, + called: AtomicBool::new(false), + }); + let engine = make_engine(channel_repo.clone(), schedule_repo); + + let (event_tx, mut event_rx) = broadcast::channel(8); + let mut state: HashMap = HashMap::new(); + + // First tick — emits BroadcastTransition (slot present) + poll_tick(&engine, &channel_repo, &event_tx, &mut state).await; + let _ = event_rx.try_recv(); + + // Second tick — schedule has no current slot, emits NoSignal + poll_tick(&engine, &channel_repo, &event_tx, &mut state).await; + let event = event_rx.try_recv().expect("expected NoSignal event"); + match event { + DomainEvent::NoSignal { channel_id: cid } => assert_eq!(cid, channel_id), + _ => panic!("expected NoSignal"), + } + } +} diff --git a/k-tv-backend/domain/src/events.rs b/k-tv-backend/domain/src/events.rs index 611522b..02f1686 100644 --- a/k-tv-backend/domain/src/events.rs +++ b/k-tv-backend/domain/src/events.rs @@ -32,3 +32,81 @@ pub enum DomainEvent { channel_id: Uuid, }, } + +#[cfg(test)] +mod tests { + use super::*; + use uuid::Uuid; + + fn make_slot() -> crate::entities::ScheduledSlot { + use crate::entities::{MediaItem, ScheduledSlot}; + use crate::value_objects::{ContentType, MediaItemId}; + use chrono::Utc; + ScheduledSlot { + id: Uuid::new_v4(), + start_at: Utc::now(), + end_at: Utc::now() + chrono::Duration::minutes(30), + item: MediaItem { + id: MediaItemId::new("test-item".to_string()), + title: "Test Movie".to_string(), + content_type: ContentType::Movie, + duration_secs: 1800, + description: None, + genres: vec![], + year: None, + tags: vec![], + series_name: None, + season_number: None, + episode_number: None, + }, + source_block_id: Uuid::new_v4(), + } + } + + #[test] + fn broadcast_transition_carries_slot() { + let channel_id = Uuid::new_v4(); + let slot = make_slot(); + let event = DomainEvent::BroadcastTransition { channel_id, slot: slot.clone() }; + match event { + DomainEvent::BroadcastTransition { channel_id: cid, slot: s } => { + assert_eq!(cid, channel_id); + assert_eq!(s.item.title, "Test Movie"); + } + _ => panic!("wrong variant"), + } + } + + #[test] + fn no_signal_carries_channel_id() { + let channel_id = Uuid::new_v4(); + let event = DomainEvent::NoSignal { channel_id }; + match event { + DomainEvent::NoSignal { channel_id: cid } => assert_eq!(cid, channel_id), + _ => panic!("wrong variant"), + } + } + + #[test] + fn schedule_generated_carries_metadata() { + use crate::entities::GeneratedSchedule; + use chrono::Utc; + let channel_id = Uuid::new_v4(); + let schedule = GeneratedSchedule { + id: Uuid::new_v4(), + channel_id, + valid_from: Utc::now(), + valid_until: Utc::now() + chrono::Duration::hours(48), + generation: 3, + slots: vec![], + }; + let event = DomainEvent::ScheduleGenerated { channel_id, schedule: schedule.clone() }; + match event { + DomainEvent::ScheduleGenerated { schedule: s, .. } => { + assert_eq!(s.generation, 3); + assert_eq!(s.slots.len(), 0); + } + _ => panic!("wrong variant"), + } + } +}