465 lines
16 KiB
Rust
465 lines
16 KiB
Rust
//! BroadcastPoller background task.
|
|
//!
|
|
//! Polls each channel that has a webhook_url configured. On each tick (every 1s)
|
|
//! it checks which channels are due for a poll (elapsed >= webhook_poll_interval_secs)
|
|
//! and emits BroadcastTransition or NoSignal events when the current slot changes.
|
|
|
|
use std::collections::HashMap;
|
|
use std::sync::Arc;
|
|
use std::time::{Duration, Instant};
|
|
|
|
use chrono::Utc;
|
|
use tokio::sync::broadcast;
|
|
use tracing::error;
|
|
use uuid::Uuid;
|
|
|
|
use domain::{ChannelRepository, DomainError, DomainEvent, ScheduleEngineService};
|
|
|
|
/// Per-channel poller state.
|
|
#[derive(Debug)]
|
|
pub struct ChannelPollState {
|
|
/// ID of the last slot we saw as current (None = no signal).
|
|
last_slot_id: Option<Uuid>,
|
|
/// Wall-clock instant of the last poll for this channel.
|
|
last_checked: Instant,
|
|
}
|
|
|
|
/// Polls channels with webhook URLs and emits broadcast transition events.
|
|
pub async fn run_broadcast_poller(
|
|
schedule_engine: Arc<ScheduleEngineService>,
|
|
channel_repo: Arc<dyn ChannelRepository>,
|
|
event_tx: broadcast::Sender<DomainEvent>,
|
|
) {
|
|
let mut state: HashMap<Uuid, ChannelPollState> = HashMap::new();
|
|
|
|
loop {
|
|
tokio::time::sleep(Duration::from_secs(1)).await;
|
|
poll_tick(&schedule_engine, &channel_repo, &event_tx, &mut state).await;
|
|
}
|
|
}
|
|
|
|
pub(crate) async fn poll_tick(
|
|
schedule_engine: &Arc<ScheduleEngineService>,
|
|
channel_repo: &Arc<dyn ChannelRepository>,
|
|
event_tx: &broadcast::Sender<DomainEvent>,
|
|
state: &mut HashMap<Uuid, ChannelPollState>,
|
|
) {
|
|
let channels = match channel_repo.find_all().await {
|
|
Ok(c) => c,
|
|
Err(e) => {
|
|
error!("broadcast poller: failed to load channels: {}", e);
|
|
return;
|
|
}
|
|
};
|
|
|
|
// Remove deleted channels from state
|
|
let live_ids: std::collections::HashSet<Uuid> = channels.iter().map(|c| c.id).collect();
|
|
state.retain(|id, _| live_ids.contains(id));
|
|
|
|
let now = Utc::now();
|
|
|
|
for channel in channels {
|
|
// Only poll channels with a configured webhook URL
|
|
if channel.webhook_url.is_none() {
|
|
state.remove(&channel.id);
|
|
continue;
|
|
}
|
|
|
|
let poll_interval = Duration::from_secs(channel.webhook_poll_interval_secs as u64);
|
|
|
|
let entry = state.entry(channel.id).or_insert_with(|| ChannelPollState {
|
|
last_slot_id: None,
|
|
last_checked: Instant::now() - poll_interval, // trigger immediately on first encounter
|
|
});
|
|
|
|
if entry.last_checked.elapsed() < poll_interval {
|
|
continue; // Not yet due for a poll
|
|
}
|
|
|
|
entry.last_checked = Instant::now();
|
|
|
|
// Find the current slot
|
|
let current_slot_id = match schedule_engine.get_active_schedule(channel.id, now).await {
|
|
Ok(Some(schedule)) => schedule
|
|
.slots
|
|
.iter()
|
|
.find(|s| s.start_at <= now && now < s.end_at)
|
|
.map(|s| s.id),
|
|
Ok(None) => None,
|
|
Err(DomainError::NoActiveSchedule(_)) => None,
|
|
Err(DomainError::ChannelNotFound(_)) => {
|
|
state.remove(&channel.id);
|
|
continue;
|
|
}
|
|
Err(e) => {
|
|
error!(
|
|
"broadcast poller: error checking schedule for channel {}: {}",
|
|
channel.id, e
|
|
);
|
|
continue;
|
|
}
|
|
};
|
|
|
|
if current_slot_id == entry.last_slot_id {
|
|
continue;
|
|
}
|
|
|
|
// State changed — emit appropriate event
|
|
match ¤t_slot_id {
|
|
Some(slot_id) => {
|
|
if let Ok(Some(schedule)) =
|
|
schedule_engine.get_active_schedule(channel.id, now).await
|
|
{
|
|
if let Some(slot) = schedule.slots.iter().find(|s| s.id == *slot_id).cloned() {
|
|
let _ = event_tx.send(DomainEvent::BroadcastTransition {
|
|
channel_id: channel.id,
|
|
slot,
|
|
});
|
|
}
|
|
}
|
|
}
|
|
None => {
|
|
let _ = event_tx.send(DomainEvent::NoSignal {
|
|
channel_id: channel.id,
|
|
});
|
|
}
|
|
}
|
|
|
|
entry.last_slot_id = current_slot_id;
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use std::collections::HashMap;
|
|
use std::sync::{Arc, Mutex};
|
|
|
|
use async_trait::async_trait;
|
|
use chrono::{DateTime, Duration, Utc};
|
|
use domain::value_objects::{ChannelId, ContentType, UserId};
|
|
use domain::{
|
|
Channel, ChannelRepository, Collection, DomainResult, GeneratedSchedule, IProviderRegistry,
|
|
MediaFilter, MediaItem, MediaItemId, PlaybackRecord, ProviderCapabilities,
|
|
ScheduleEngineService, ScheduleRepository, SeriesSummary, StreamQuality,
|
|
};
|
|
use tokio::sync::broadcast;
|
|
use uuid::Uuid;
|
|
|
|
// ── Mocks ─────────────────────────────────────────────────────────────────
|
|
|
|
struct MockChannelRepo {
|
|
channels: Vec<Channel>,
|
|
}
|
|
|
|
#[async_trait]
|
|
impl ChannelRepository for MockChannelRepo {
|
|
async fn find_by_id(&self, id: ChannelId) -> DomainResult<Option<Channel>> {
|
|
Ok(self.channels.iter().find(|c| c.id == id).cloned())
|
|
}
|
|
async fn find_by_owner(&self, _owner_id: UserId) -> DomainResult<Vec<Channel>> {
|
|
unimplemented!()
|
|
}
|
|
async fn find_all(&self) -> DomainResult<Vec<Channel>> {
|
|
Ok(self.channels.clone())
|
|
}
|
|
async fn find_auto_schedule_enabled(&self) -> DomainResult<Vec<Channel>> {
|
|
unimplemented!()
|
|
}
|
|
async fn save(&self, _channel: &Channel) -> DomainResult<()> {
|
|
unimplemented!()
|
|
}
|
|
async fn delete(&self, _id: ChannelId) -> DomainResult<()> {
|
|
unimplemented!()
|
|
}
|
|
}
|
|
|
|
struct MockScheduleRepo {
|
|
active: Option<GeneratedSchedule>,
|
|
saved: Arc<Mutex<Vec<GeneratedSchedule>>>,
|
|
}
|
|
|
|
#[async_trait]
|
|
impl ScheduleRepository for MockScheduleRepo {
|
|
async fn find_active(
|
|
&self,
|
|
_channel_id: ChannelId,
|
|
_at: DateTime<Utc>,
|
|
) -> DomainResult<Option<GeneratedSchedule>> {
|
|
Ok(self.active.clone())
|
|
}
|
|
async fn find_latest(
|
|
&self,
|
|
_channel_id: ChannelId,
|
|
) -> DomainResult<Option<GeneratedSchedule>> {
|
|
Ok(self.active.clone())
|
|
}
|
|
async fn save(&self, schedule: &GeneratedSchedule) -> DomainResult<()> {
|
|
self.saved.lock().unwrap().push(schedule.clone());
|
|
Ok(())
|
|
}
|
|
async fn find_playback_history(
|
|
&self,
|
|
_channel_id: ChannelId,
|
|
) -> DomainResult<Vec<PlaybackRecord>> {
|
|
Ok(vec![])
|
|
}
|
|
async fn save_playback_record(&self, _record: &PlaybackRecord) -> DomainResult<()> {
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
struct MockRegistry;
|
|
|
|
#[async_trait]
|
|
impl IProviderRegistry for MockRegistry {
|
|
async fn fetch_items(
|
|
&self,
|
|
_provider_id: &str,
|
|
_filter: &MediaFilter,
|
|
) -> DomainResult<Vec<MediaItem>> {
|
|
Ok(vec![])
|
|
}
|
|
async fn fetch_by_id(&self, _item_id: &MediaItemId) -> DomainResult<Option<MediaItem>> {
|
|
Ok(None)
|
|
}
|
|
async fn get_stream_url(
|
|
&self,
|
|
_item_id: &MediaItemId,
|
|
_quality: &StreamQuality,
|
|
) -> DomainResult<String> {
|
|
unimplemented!()
|
|
}
|
|
fn provider_ids(&self) -> Vec<String> {
|
|
vec![]
|
|
}
|
|
fn primary_id(&self) -> &str {
|
|
""
|
|
}
|
|
fn capabilities(&self, _provider_id: &str) -> Option<ProviderCapabilities> {
|
|
None
|
|
}
|
|
async fn list_collections(&self, _provider_id: &str) -> DomainResult<Vec<Collection>> {
|
|
unimplemented!()
|
|
}
|
|
async fn list_series(
|
|
&self,
|
|
_provider_id: &str,
|
|
_collection_id: Option<&str>,
|
|
) -> DomainResult<Vec<SeriesSummary>> {
|
|
unimplemented!()
|
|
}
|
|
async fn list_genres(
|
|
&self,
|
|
_provider_id: &str,
|
|
_content_type: Option<&ContentType>,
|
|
) -> DomainResult<Vec<String>> {
|
|
unimplemented!()
|
|
}
|
|
}
|
|
|
|
// ── Helpers ───────────────────────────────────────────────────────────────
|
|
|
|
fn make_channel_with_webhook(channel_id: Uuid) -> Channel {
|
|
let mut ch = Channel::new(Uuid::new_v4(), "Test", "UTC");
|
|
ch.id = channel_id;
|
|
ch.webhook_url = Some("http://example.com/hook".to_string());
|
|
ch.webhook_poll_interval_secs = 0; // always due
|
|
ch
|
|
}
|
|
|
|
fn make_slot(channel_id: Uuid, slot_id: Uuid) -> domain::ScheduledSlot {
|
|
use domain::entities::MediaItem;
|
|
let now = Utc::now();
|
|
domain::ScheduledSlot {
|
|
id: slot_id,
|
|
start_at: now - Duration::minutes(1),
|
|
end_at: now + Duration::minutes(29),
|
|
item: MediaItem {
|
|
id: MediaItemId::new("test-item"),
|
|
title: "Test Movie".to_string(),
|
|
content_type: ContentType::Movie,
|
|
duration_secs: 1800,
|
|
description: None,
|
|
genres: vec![],
|
|
year: None,
|
|
tags: vec![],
|
|
series_name: None,
|
|
season_number: None,
|
|
episode_number: None,
|
|
},
|
|
source_block_id: Uuid::new_v4(),
|
|
}
|
|
}
|
|
|
|
fn make_schedule(channel_id: Uuid, slots: Vec<domain::ScheduledSlot>) -> GeneratedSchedule {
|
|
let now = Utc::now();
|
|
GeneratedSchedule {
|
|
id: Uuid::new_v4(),
|
|
channel_id,
|
|
valid_from: now - Duration::hours(1),
|
|
valid_until: now + Duration::hours(47),
|
|
generation: 1,
|
|
slots,
|
|
}
|
|
}
|
|
|
|
fn make_engine(
|
|
channel_repo: Arc<dyn ChannelRepository>,
|
|
schedule_repo: Arc<dyn ScheduleRepository>,
|
|
) -> Arc<ScheduleEngineService> {
|
|
Arc::new(ScheduleEngineService::new(
|
|
Arc::new(MockRegistry),
|
|
channel_repo,
|
|
schedule_repo,
|
|
))
|
|
}
|
|
|
|
// ── Tests ─────────────────────────────────────────────────────────────────
|
|
|
|
#[tokio::test]
|
|
async fn test_broadcast_transition_emitted_on_slot_change() {
|
|
let channel_id = Uuid::new_v4();
|
|
let slot_id = Uuid::new_v4();
|
|
let ch = make_channel_with_webhook(channel_id);
|
|
let slot = make_slot(channel_id, slot_id);
|
|
let schedule = make_schedule(channel_id, vec![slot]);
|
|
|
|
let channel_repo: Arc<dyn ChannelRepository> =
|
|
Arc::new(MockChannelRepo { channels: vec![ch] });
|
|
let schedule_repo: Arc<dyn ScheduleRepository> = Arc::new(MockScheduleRepo {
|
|
active: Some(schedule),
|
|
saved: Arc::new(Mutex::new(vec![])),
|
|
});
|
|
let engine = make_engine(channel_repo.clone(), schedule_repo);
|
|
|
|
let (event_tx, mut event_rx) = broadcast::channel(8);
|
|
let mut state: HashMap<Uuid, ChannelPollState> = HashMap::new();
|
|
|
|
poll_tick(&engine, &channel_repo, &event_tx, &mut state).await;
|
|
|
|
let event = event_rx.try_recv().expect("expected an event");
|
|
match event {
|
|
DomainEvent::BroadcastTransition {
|
|
channel_id: cid,
|
|
slot: s,
|
|
} => {
|
|
assert_eq!(cid, channel_id);
|
|
assert_eq!(s.id, slot_id);
|
|
}
|
|
other => panic!("expected BroadcastTransition, got something else"),
|
|
}
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_no_event_when_slot_unchanged() {
|
|
let channel_id = Uuid::new_v4();
|
|
let slot_id = Uuid::new_v4();
|
|
let ch = make_channel_with_webhook(channel_id);
|
|
let slot = make_slot(channel_id, slot_id);
|
|
let schedule = make_schedule(channel_id, vec![slot]);
|
|
|
|
let channel_repo: Arc<dyn ChannelRepository> =
|
|
Arc::new(MockChannelRepo { channels: vec![ch] });
|
|
let schedule_repo: Arc<dyn ScheduleRepository> = Arc::new(MockScheduleRepo {
|
|
active: Some(schedule),
|
|
saved: Arc::new(Mutex::new(vec![])),
|
|
});
|
|
let engine = make_engine(channel_repo.clone(), schedule_repo);
|
|
|
|
let (event_tx, mut event_rx) = broadcast::channel(8);
|
|
let mut state: HashMap<Uuid, ChannelPollState> = HashMap::new();
|
|
|
|
// First tick — emits BroadcastTransition
|
|
poll_tick(&engine, &channel_repo, &event_tx, &mut state).await;
|
|
let _ = event_rx.try_recv();
|
|
|
|
// Second tick — same slot, no event
|
|
poll_tick(&engine, &channel_repo, &event_tx, &mut state).await;
|
|
assert!(
|
|
event_rx.try_recv().is_err(),
|
|
"no event expected when slot unchanged"
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_no_signal_emitted_when_slot_goes_to_none() {
|
|
let channel_id = Uuid::new_v4();
|
|
let slot_id = Uuid::new_v4();
|
|
let ch = make_channel_with_webhook(channel_id);
|
|
let slot = make_slot(channel_id, slot_id);
|
|
let schedule_with_slot = make_schedule(channel_id, vec![slot]);
|
|
|
|
// Repo that starts with a slot then returns empty schedule
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
|
struct SwitchingScheduleRepo {
|
|
first: GeneratedSchedule,
|
|
second: GeneratedSchedule,
|
|
called: AtomicBool,
|
|
}
|
|
#[async_trait]
|
|
impl ScheduleRepository for SwitchingScheduleRepo {
|
|
async fn find_active(
|
|
&self,
|
|
_channel_id: ChannelId,
|
|
_at: DateTime<Utc>,
|
|
) -> DomainResult<Option<GeneratedSchedule>> {
|
|
if self.called.swap(true, Ordering::SeqCst) {
|
|
Ok(Some(self.second.clone()))
|
|
} else {
|
|
Ok(Some(self.first.clone()))
|
|
}
|
|
}
|
|
async fn find_latest(&self, _: ChannelId) -> DomainResult<Option<GeneratedSchedule>> {
|
|
Ok(None)
|
|
}
|
|
async fn save(&self, _: &GeneratedSchedule) -> DomainResult<()> {
|
|
Ok(())
|
|
}
|
|
async fn find_playback_history(
|
|
&self,
|
|
_: ChannelId,
|
|
) -> DomainResult<Vec<PlaybackRecord>> {
|
|
Ok(vec![])
|
|
}
|
|
async fn save_playback_record(&self, _: &PlaybackRecord) -> DomainResult<()> {
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
let now = Utc::now();
|
|
let empty_schedule = GeneratedSchedule {
|
|
id: Uuid::new_v4(),
|
|
channel_id,
|
|
valid_from: now - Duration::hours(1),
|
|
valid_until: now + Duration::hours(47),
|
|
generation: 2,
|
|
slots: vec![], // no current slot
|
|
};
|
|
|
|
let channel_repo: Arc<dyn ChannelRepository> =
|
|
Arc::new(MockChannelRepo { channels: vec![ch] });
|
|
let schedule_repo: Arc<dyn ScheduleRepository> = Arc::new(SwitchingScheduleRepo {
|
|
first: schedule_with_slot,
|
|
second: empty_schedule,
|
|
called: AtomicBool::new(false),
|
|
});
|
|
let engine = make_engine(channel_repo.clone(), schedule_repo);
|
|
|
|
let (event_tx, mut event_rx) = broadcast::channel(8);
|
|
let mut state: HashMap<Uuid, ChannelPollState> = HashMap::new();
|
|
|
|
// First tick — emits BroadcastTransition (slot present)
|
|
poll_tick(&engine, &channel_repo, &event_tx, &mut state).await;
|
|
let _ = event_rx.try_recv();
|
|
|
|
// Second tick — schedule has no current slot, emits NoSignal
|
|
poll_tick(&engine, &channel_repo, &event_tx, &mut state).await;
|
|
let event = event_rx.try_recv().expect("expected NoSignal event");
|
|
match event {
|
|
DomainEvent::NoSignal { channel_id: cid } => assert_eq!(cid, channel_id),
|
|
_ => panic!("expected NoSignal"),
|
|
}
|
|
}
|
|
}
|