webhooks (#1)
Reviewed-on: #1
This commit was merged in pull request #1.
This commit is contained in:
432
k-tv-backend/api/src/poller.rs
Normal file
432
k-tv-backend/api/src/poller.rs
Normal file
@@ -0,0 +1,432 @@
|
||||
//! BroadcastPoller background task.
|
||||
//!
|
||||
//! Polls each channel that has a webhook_url configured. On each tick (every 1s)
|
||||
//! it checks which channels are due for a poll (elapsed >= webhook_poll_interval_secs)
|
||||
//! and emits BroadcastTransition or NoSignal events when the current slot changes.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use chrono::Utc;
|
||||
use tokio::sync::broadcast;
|
||||
use tracing::error;
|
||||
use uuid::Uuid;
|
||||
|
||||
use domain::{ChannelRepository, DomainError, DomainEvent, ScheduleEngineService};
|
||||
|
||||
/// Per-channel poller state.
|
||||
#[derive(Debug)]
|
||||
struct ChannelPollState {
|
||||
/// ID of the last slot we saw as current (None = no signal).
|
||||
last_slot_id: Option<Uuid>,
|
||||
/// Wall-clock instant of the last poll for this channel.
|
||||
last_checked: Instant,
|
||||
}
|
||||
|
||||
/// Polls channels with webhook URLs and emits broadcast transition events.
|
||||
pub async fn run_broadcast_poller(
|
||||
schedule_engine: Arc<ScheduleEngineService>,
|
||||
channel_repo: Arc<dyn ChannelRepository>,
|
||||
event_tx: broadcast::Sender<DomainEvent>,
|
||||
) {
|
||||
let mut state: HashMap<Uuid, ChannelPollState> = HashMap::new();
|
||||
|
||||
loop {
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
poll_tick(&schedule_engine, &channel_repo, &event_tx, &mut state).await;
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn poll_tick(
|
||||
schedule_engine: &Arc<ScheduleEngineService>,
|
||||
channel_repo: &Arc<dyn ChannelRepository>,
|
||||
event_tx: &broadcast::Sender<DomainEvent>,
|
||||
state: &mut HashMap<Uuid, ChannelPollState>,
|
||||
) {
|
||||
let channels = match channel_repo.find_all().await {
|
||||
Ok(c) => c,
|
||||
Err(e) => {
|
||||
error!("broadcast poller: failed to load channels: {}", e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Remove deleted channels from state
|
||||
let live_ids: std::collections::HashSet<Uuid> = channels.iter().map(|c| c.id).collect();
|
||||
state.retain(|id, _| live_ids.contains(id));
|
||||
|
||||
let now = Utc::now();
|
||||
|
||||
for channel in channels {
|
||||
// Only poll channels with a configured webhook URL
|
||||
if channel.webhook_url.is_none() {
|
||||
state.remove(&channel.id);
|
||||
continue;
|
||||
}
|
||||
|
||||
let poll_interval = Duration::from_secs(channel.webhook_poll_interval_secs as u64);
|
||||
|
||||
let entry = state.entry(channel.id).or_insert_with(|| ChannelPollState {
|
||||
last_slot_id: None,
|
||||
last_checked: Instant::now() - poll_interval, // trigger immediately on first encounter
|
||||
});
|
||||
|
||||
if entry.last_checked.elapsed() < poll_interval {
|
||||
continue; // Not yet due for a poll
|
||||
}
|
||||
|
||||
entry.last_checked = Instant::now();
|
||||
|
||||
// Find the current slot
|
||||
let current_slot_id = match schedule_engine.get_active_schedule(channel.id, now).await {
|
||||
Ok(Some(schedule)) => {
|
||||
schedule
|
||||
.slots
|
||||
.iter()
|
||||
.find(|s| s.start_at <= now && now < s.end_at)
|
||||
.map(|s| s.id)
|
||||
}
|
||||
Ok(None) => None,
|
||||
Err(DomainError::NoActiveSchedule(_)) => None,
|
||||
Err(DomainError::ChannelNotFound(_)) => {
|
||||
state.remove(&channel.id);
|
||||
continue;
|
||||
}
|
||||
Err(e) => {
|
||||
error!(
|
||||
"broadcast poller: error checking schedule for channel {}: {}",
|
||||
channel.id, e
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if current_slot_id == entry.last_slot_id {
|
||||
continue;
|
||||
}
|
||||
|
||||
// State changed — emit appropriate event
|
||||
match ¤t_slot_id {
|
||||
Some(slot_id) => {
|
||||
if let Ok(Some(schedule)) = schedule_engine.get_active_schedule(channel.id, now).await {
|
||||
if let Some(slot) = schedule.slots.iter().find(|s| s.id == *slot_id).cloned() {
|
||||
let _ = event_tx.send(DomainEvent::BroadcastTransition {
|
||||
channel_id: channel.id,
|
||||
slot,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
None => {
|
||||
let _ = event_tx.send(DomainEvent::NoSignal {
|
||||
channel_id: channel.id,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
entry.last_slot_id = current_slot_id;
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use chrono::{DateTime, Duration, Utc};
|
||||
use domain::{
|
||||
Channel, ChannelRepository, Collection, DomainResult, GeneratedSchedule, IProviderRegistry,
|
||||
MediaFilter, MediaItem, MediaItemId, PlaybackRecord, ProviderCapabilities,
|
||||
ScheduleEngineService, ScheduleRepository, SeriesSummary, StreamQuality, StreamingProtocol,
|
||||
};
|
||||
use domain::value_objects::{ChannelId, ContentType, UserId};
|
||||
use tokio::sync::broadcast;
|
||||
use uuid::Uuid;
|
||||
|
||||
// ── Mocks ─────────────────────────────────────────────────────────────────
|
||||
|
||||
struct MockChannelRepo {
|
||||
channels: Vec<Channel>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ChannelRepository for MockChannelRepo {
|
||||
async fn find_by_id(&self, id: ChannelId) -> DomainResult<Option<Channel>> {
|
||||
Ok(self.channels.iter().find(|c| c.id == id).cloned())
|
||||
}
|
||||
async fn find_by_owner(&self, _owner_id: UserId) -> DomainResult<Vec<Channel>> {
|
||||
unimplemented!()
|
||||
}
|
||||
async fn find_all(&self) -> DomainResult<Vec<Channel>> {
|
||||
Ok(self.channels.clone())
|
||||
}
|
||||
async fn find_auto_schedule_enabled(&self) -> DomainResult<Vec<Channel>> {
|
||||
unimplemented!()
|
||||
}
|
||||
async fn save(&self, _channel: &Channel) -> DomainResult<()> {
|
||||
unimplemented!()
|
||||
}
|
||||
async fn delete(&self, _id: ChannelId) -> DomainResult<()> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
struct MockScheduleRepo {
|
||||
active: Option<GeneratedSchedule>,
|
||||
saved: Arc<Mutex<Vec<GeneratedSchedule>>>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ScheduleRepository for MockScheduleRepo {
|
||||
async fn find_active(
|
||||
&self,
|
||||
_channel_id: ChannelId,
|
||||
_at: DateTime<Utc>,
|
||||
) -> DomainResult<Option<GeneratedSchedule>> {
|
||||
Ok(self.active.clone())
|
||||
}
|
||||
async fn find_latest(&self, _channel_id: ChannelId) -> DomainResult<Option<GeneratedSchedule>> {
|
||||
Ok(self.active.clone())
|
||||
}
|
||||
async fn save(&self, schedule: &GeneratedSchedule) -> DomainResult<()> {
|
||||
self.saved.lock().unwrap().push(schedule.clone());
|
||||
Ok(())
|
||||
}
|
||||
async fn find_playback_history(&self, _channel_id: ChannelId) -> DomainResult<Vec<PlaybackRecord>> {
|
||||
Ok(vec![])
|
||||
}
|
||||
async fn save_playback_record(&self, _record: &PlaybackRecord) -> DomainResult<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
struct MockRegistry;
|
||||
|
||||
#[async_trait]
|
||||
impl IProviderRegistry for MockRegistry {
|
||||
async fn fetch_items(&self, _provider_id: &str, _filter: &MediaFilter) -> DomainResult<Vec<MediaItem>> {
|
||||
Ok(vec![])
|
||||
}
|
||||
async fn fetch_by_id(&self, _item_id: &MediaItemId) -> DomainResult<Option<MediaItem>> {
|
||||
Ok(None)
|
||||
}
|
||||
async fn get_stream_url(&self, _item_id: &MediaItemId, _quality: &StreamQuality) -> DomainResult<String> {
|
||||
unimplemented!()
|
||||
}
|
||||
fn provider_ids(&self) -> Vec<String> {
|
||||
vec![]
|
||||
}
|
||||
fn primary_id(&self) -> &str {
|
||||
""
|
||||
}
|
||||
fn capabilities(&self, _provider_id: &str) -> Option<ProviderCapabilities> {
|
||||
None
|
||||
}
|
||||
async fn list_collections(&self, _provider_id: &str) -> DomainResult<Vec<Collection>> {
|
||||
unimplemented!()
|
||||
}
|
||||
async fn list_series(&self, _provider_id: &str, _collection_id: Option<&str>) -> DomainResult<Vec<SeriesSummary>> {
|
||||
unimplemented!()
|
||||
}
|
||||
async fn list_genres(&self, _provider_id: &str, _content_type: Option<&ContentType>) -> DomainResult<Vec<String>> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
// ── Helpers ───────────────────────────────────────────────────────────────
|
||||
|
||||
fn make_channel_with_webhook(channel_id: Uuid) -> Channel {
|
||||
let mut ch = Channel::new(Uuid::new_v4(), "Test", "UTC");
|
||||
ch.id = channel_id;
|
||||
ch.webhook_url = Some("http://example.com/hook".to_string());
|
||||
ch.webhook_poll_interval_secs = 0; // always due
|
||||
ch
|
||||
}
|
||||
|
||||
fn make_slot(channel_id: Uuid, slot_id: Uuid) -> domain::ScheduledSlot {
|
||||
use domain::entities::MediaItem;
|
||||
let now = Utc::now();
|
||||
domain::ScheduledSlot {
|
||||
id: slot_id,
|
||||
start_at: now - Duration::minutes(1),
|
||||
end_at: now + Duration::minutes(29),
|
||||
item: MediaItem {
|
||||
id: MediaItemId::new("test-item"),
|
||||
title: "Test Movie".to_string(),
|
||||
content_type: ContentType::Movie,
|
||||
duration_secs: 1800,
|
||||
description: None,
|
||||
genres: vec![],
|
||||
year: None,
|
||||
tags: vec![],
|
||||
series_name: None,
|
||||
season_number: None,
|
||||
episode_number: None,
|
||||
},
|
||||
source_block_id: Uuid::new_v4(),
|
||||
}
|
||||
}
|
||||
|
||||
fn make_schedule(channel_id: Uuid, slots: Vec<domain::ScheduledSlot>) -> GeneratedSchedule {
|
||||
let now = Utc::now();
|
||||
GeneratedSchedule {
|
||||
id: Uuid::new_v4(),
|
||||
channel_id,
|
||||
valid_from: now - Duration::hours(1),
|
||||
valid_until: now + Duration::hours(47),
|
||||
generation: 1,
|
||||
slots,
|
||||
}
|
||||
}
|
||||
|
||||
fn make_engine(
|
||||
channel_repo: Arc<dyn ChannelRepository>,
|
||||
schedule_repo: Arc<dyn ScheduleRepository>,
|
||||
) -> Arc<ScheduleEngineService> {
|
||||
Arc::new(ScheduleEngineService::new(
|
||||
Arc::new(MockRegistry),
|
||||
channel_repo,
|
||||
schedule_repo,
|
||||
))
|
||||
}
|
||||
|
||||
// ── Tests ─────────────────────────────────────────────────────────────────
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_broadcast_transition_emitted_on_slot_change() {
|
||||
let channel_id = Uuid::new_v4();
|
||||
let slot_id = Uuid::new_v4();
|
||||
let ch = make_channel_with_webhook(channel_id);
|
||||
let slot = make_slot(channel_id, slot_id);
|
||||
let schedule = make_schedule(channel_id, vec![slot]);
|
||||
|
||||
let channel_repo: Arc<dyn ChannelRepository> =
|
||||
Arc::new(MockChannelRepo { channels: vec![ch] });
|
||||
let schedule_repo: Arc<dyn ScheduleRepository> = Arc::new(MockScheduleRepo {
|
||||
active: Some(schedule),
|
||||
saved: Arc::new(Mutex::new(vec![])),
|
||||
});
|
||||
let engine = make_engine(channel_repo.clone(), schedule_repo);
|
||||
|
||||
let (event_tx, mut event_rx) = broadcast::channel(8);
|
||||
let mut state: HashMap<Uuid, ChannelPollState> = HashMap::new();
|
||||
|
||||
poll_tick(&engine, &channel_repo, &event_tx, &mut state).await;
|
||||
|
||||
let event = event_rx.try_recv().expect("expected an event");
|
||||
match event {
|
||||
DomainEvent::BroadcastTransition { channel_id: cid, slot: s } => {
|
||||
assert_eq!(cid, channel_id);
|
||||
assert_eq!(s.id, slot_id);
|
||||
}
|
||||
other => panic!("expected BroadcastTransition, got something else"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_no_event_when_slot_unchanged() {
|
||||
let channel_id = Uuid::new_v4();
|
||||
let slot_id = Uuid::new_v4();
|
||||
let ch = make_channel_with_webhook(channel_id);
|
||||
let slot = make_slot(channel_id, slot_id);
|
||||
let schedule = make_schedule(channel_id, vec![slot]);
|
||||
|
||||
let channel_repo: Arc<dyn ChannelRepository> =
|
||||
Arc::new(MockChannelRepo { channels: vec![ch] });
|
||||
let schedule_repo: Arc<dyn ScheduleRepository> = Arc::new(MockScheduleRepo {
|
||||
active: Some(schedule),
|
||||
saved: Arc::new(Mutex::new(vec![])),
|
||||
});
|
||||
let engine = make_engine(channel_repo.clone(), schedule_repo);
|
||||
|
||||
let (event_tx, mut event_rx) = broadcast::channel(8);
|
||||
let mut state: HashMap<Uuid, ChannelPollState> = HashMap::new();
|
||||
|
||||
// First tick — emits BroadcastTransition
|
||||
poll_tick(&engine, &channel_repo, &event_tx, &mut state).await;
|
||||
let _ = event_rx.try_recv();
|
||||
|
||||
// Second tick — same slot, no event
|
||||
poll_tick(&engine, &channel_repo, &event_tx, &mut state).await;
|
||||
assert!(
|
||||
event_rx.try_recv().is_err(),
|
||||
"no event expected when slot unchanged"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_no_signal_emitted_when_slot_goes_to_none() {
|
||||
let channel_id = Uuid::new_v4();
|
||||
let slot_id = Uuid::new_v4();
|
||||
let ch = make_channel_with_webhook(channel_id);
|
||||
let slot = make_slot(channel_id, slot_id);
|
||||
let schedule_with_slot = make_schedule(channel_id, vec![slot]);
|
||||
|
||||
// Repo that starts with a slot then returns empty schedule
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
struct SwitchingScheduleRepo {
|
||||
first: GeneratedSchedule,
|
||||
second: GeneratedSchedule,
|
||||
called: AtomicBool,
|
||||
}
|
||||
#[async_trait]
|
||||
impl ScheduleRepository for SwitchingScheduleRepo {
|
||||
async fn find_active(
|
||||
&self,
|
||||
_channel_id: ChannelId,
|
||||
_at: DateTime<Utc>,
|
||||
) -> DomainResult<Option<GeneratedSchedule>> {
|
||||
if self.called.swap(true, Ordering::SeqCst) {
|
||||
Ok(Some(self.second.clone()))
|
||||
} else {
|
||||
Ok(Some(self.first.clone()))
|
||||
}
|
||||
}
|
||||
async fn find_latest(&self, _: ChannelId) -> DomainResult<Option<GeneratedSchedule>> {
|
||||
Ok(None)
|
||||
}
|
||||
async fn save(&self, _: &GeneratedSchedule) -> DomainResult<()> { Ok(()) }
|
||||
async fn find_playback_history(&self, _: ChannelId) -> DomainResult<Vec<PlaybackRecord>> {
|
||||
Ok(vec![])
|
||||
}
|
||||
async fn save_playback_record(&self, _: &PlaybackRecord) -> DomainResult<()> { Ok(()) }
|
||||
}
|
||||
|
||||
let now = Utc::now();
|
||||
let empty_schedule = GeneratedSchedule {
|
||||
id: Uuid::new_v4(),
|
||||
channel_id,
|
||||
valid_from: now - Duration::hours(1),
|
||||
valid_until: now + Duration::hours(47),
|
||||
generation: 2,
|
||||
slots: vec![], // no current slot
|
||||
};
|
||||
|
||||
let channel_repo: Arc<dyn ChannelRepository> =
|
||||
Arc::new(MockChannelRepo { channels: vec![ch] });
|
||||
let schedule_repo: Arc<dyn ScheduleRepository> = Arc::new(SwitchingScheduleRepo {
|
||||
first: schedule_with_slot,
|
||||
second: empty_schedule,
|
||||
called: AtomicBool::new(false),
|
||||
});
|
||||
let engine = make_engine(channel_repo.clone(), schedule_repo);
|
||||
|
||||
let (event_tx, mut event_rx) = broadcast::channel(8);
|
||||
let mut state: HashMap<Uuid, ChannelPollState> = HashMap::new();
|
||||
|
||||
// First tick — emits BroadcastTransition (slot present)
|
||||
poll_tick(&engine, &channel_repo, &event_tx, &mut state).await;
|
||||
let _ = event_rx.try_recv();
|
||||
|
||||
// Second tick — schedule has no current slot, emits NoSignal
|
||||
poll_tick(&engine, &channel_repo, &event_tx, &mut state).await;
|
||||
let event = event_rx.try_recv().expect("expected NoSignal event");
|
||||
match event {
|
||||
DomainEvent::NoSignal { channel_id: cid } => assert_eq!(cid, channel_id),
|
||||
_ => panic!("expected NoSignal"),
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user