diff --git a/k-tv-backend/api/src/main.rs b/k-tv-backend/api/src/main.rs index 99fe37a..b71021b 100644 --- a/k-tv-backend/api/src/main.rs +++ b/k-tv-backend/api/src/main.rs @@ -22,9 +22,11 @@ mod config; mod dto; mod error; mod extractors; +mod poller; mod routes; mod scheduler; mod state; +mod webhook; use crate::config::Config; use crate::state::AppState; diff --git a/k-tv-backend/api/src/poller.rs b/k-tv-backend/api/src/poller.rs new file mode 100644 index 0000000..4e12c0d --- /dev/null +++ b/k-tv-backend/api/src/poller.rs @@ -0,0 +1,130 @@ +//! BroadcastPoller background task. +//! +//! Polls each channel that has a webhook_url configured. On each tick (every 1s) +//! it checks which channels are due for a poll (elapsed >= webhook_poll_interval_secs) +//! and emits BroadcastTransition or NoSignal events when the current slot changes. + +use std::collections::HashMap; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use chrono::Utc; +use tokio::sync::broadcast; +use tracing::error; +use uuid::Uuid; + +use domain::{ChannelRepository, DomainError, DomainEvent, ScheduleEngineService}; + +/// Per-channel poller state. +#[derive(Debug)] +struct ChannelPollState { + /// ID of the last slot we saw as current (None = no signal). + last_slot_id: Option, + /// Wall-clock instant of the last poll for this channel. + last_checked: Instant, +} + +/// Polls channels with webhook URLs and emits broadcast transition events. +pub async fn run_broadcast_poller( + schedule_engine: Arc, + channel_repo: Arc, + event_tx: broadcast::Sender, +) { + let mut state: HashMap = HashMap::new(); + + loop { + tokio::time::sleep(Duration::from_secs(1)).await; + poll_tick(&schedule_engine, &channel_repo, &event_tx, &mut state).await; + } +} + +async fn poll_tick( + schedule_engine: &Arc, + channel_repo: &Arc, + event_tx: &broadcast::Sender, + state: &mut HashMap, +) { + let channels = match channel_repo.find_all().await { + Ok(c) => c, + Err(e) => { + error!("broadcast poller: failed to load channels: {}", e); + return; + } + }; + + // Remove deleted channels from state + let live_ids: std::collections::HashSet = channels.iter().map(|c| c.id).collect(); + state.retain(|id, _| live_ids.contains(id)); + + let now = Utc::now(); + + for channel in channels { + // Only poll channels with a configured webhook URL + if channel.webhook_url.is_none() { + state.remove(&channel.id); + continue; + } + + let poll_interval = Duration::from_secs(channel.webhook_poll_interval_secs as u64); + + let entry = state.entry(channel.id).or_insert_with(|| ChannelPollState { + last_slot_id: None, + last_checked: Instant::now() - poll_interval, // trigger immediately on first encounter + }); + + if entry.last_checked.elapsed() < poll_interval { + continue; // Not yet due for a poll + } + + entry.last_checked = Instant::now(); + + // Find the current slot + let current_slot_id = match schedule_engine.get_active_schedule(channel.id, now).await { + Ok(Some(schedule)) => { + schedule + .slots + .iter() + .find(|s| s.start_at <= now && now < s.end_at) + .map(|s| s.id) + } + Ok(None) => None, + Err(DomainError::NoActiveSchedule(_)) => None, + Err(DomainError::ChannelNotFound(_)) => { + state.remove(&channel.id); + continue; + } + Err(e) => { + error!( + "broadcast poller: error checking schedule for channel {}: {}", + channel.id, e + ); + continue; + } + }; + + if current_slot_id == entry.last_slot_id { + continue; + } + + // State changed — emit appropriate event + match ¤t_slot_id { + Some(slot_id) => { + if let Ok(Some(schedule)) = schedule_engine.get_active_schedule(channel.id, now).await { + if let Some(slot) = schedule.slots.iter().find(|s| s.id == *slot_id).cloned() { + let _ = event_tx.send(DomainEvent::BroadcastTransition { + channel_id: channel.id, + slot, + }); + } + } + } + None => { + let _ = event_tx.send(DomainEvent::NoSignal { + channel_id: channel.id, + }); + } + } + + entry.last_slot_id = current_slot_id; + } +} diff --git a/k-tv-backend/api/src/webhook.rs b/k-tv-backend/api/src/webhook.rs new file mode 100644 index 0000000..3eb9ef3 --- /dev/null +++ b/k-tv-backend/api/src/webhook.rs @@ -0,0 +1,164 @@ +//! WebhookConsumer background task. +//! +//! Subscribes to the domain-event broadcast channel, looks up each channel's +//! webhook_url, and fires HTTP POST requests (fire-and-forget). + +use chrono::Utc; +use serde_json::{Value, json}; +use std::sync::Arc; +use tokio::sync::broadcast; +use tracing::{info, warn}; +use uuid::Uuid; + +use domain::{ChannelRepository, DomainEvent}; + +/// Consumes domain events and delivers them to per-channel webhook URLs. +/// +/// Uses fire-and-forget HTTP POST — failures are logged as warnings, never retried. +pub async fn run_webhook_consumer( + mut rx: broadcast::Receiver, + channel_repo: Arc, + client: reqwest::Client, +) { + loop { + match rx.recv().await { + Ok(event) => { + let channel_id = event_channel_id(&event); + let payload = build_payload(&event); + + match channel_repo.find_by_id(channel_id).await { + Ok(Some(channel)) => { + if let Some(url) = channel.webhook_url { + let client = client.clone(); + tokio::spawn(async move { + post_webhook(&client, &url, payload).await; + }); + } + // No webhook_url configured — skip silently + } + Ok(None) => { + // Channel deleted — nothing to do + } + Err(e) => { + warn!("webhook consumer: failed to look up channel {}: {}", channel_id, e); + } + } + } + Err(broadcast::error::RecvError::Lagged(n)) => { + warn!("webhook consumer lagged, {} events dropped", n); + // Continue — don't break; catch up from current position + } + Err(broadcast::error::RecvError::Closed) => { + info!("webhook consumer: event bus closed, shutting down"); + break; + } + } + } +} + +/// Extract the channel_id from any event variant. +fn event_channel_id(event: &DomainEvent) -> Uuid { + match event { + DomainEvent::BroadcastTransition { channel_id, .. } => *channel_id, + DomainEvent::NoSignal { channel_id } => *channel_id, + DomainEvent::ScheduleGenerated { channel_id, .. } => *channel_id, + DomainEvent::ChannelCreated { channel } => channel.id, + DomainEvent::ChannelUpdated { channel } => channel.id, + DomainEvent::ChannelDeleted { channel_id } => *channel_id, + } +} + +/// Build the JSON payload for an event. +fn build_payload(event: &DomainEvent) -> Value { + let now = Utc::now().to_rfc3339(); + match event { + DomainEvent::BroadcastTransition { channel_id, slot } => { + let offset_secs = (Utc::now() - slot.start_at).num_seconds().max(0) as u64; + json!({ + "event": "broadcast_transition", + "timestamp": now, + "channel_id": channel_id, + "data": { + "slot_id": slot.id, + "item": { + "id": slot.item.id.as_ref(), + "title": slot.item.title, + "duration_secs": slot.item.duration_secs, + }, + "start_at": slot.start_at.to_rfc3339(), + "end_at": slot.end_at.to_rfc3339(), + "offset_secs": offset_secs, + } + }) + } + DomainEvent::NoSignal { channel_id } => { + json!({ + "event": "no_signal", + "timestamp": now, + "channel_id": channel_id, + "data": {} + }) + } + DomainEvent::ScheduleGenerated { channel_id, schedule } => { + json!({ + "event": "schedule_generated", + "timestamp": now, + "channel_id": channel_id, + "data": { + "generation": schedule.generation, + "valid_from": schedule.valid_from.to_rfc3339(), + "valid_until": schedule.valid_until.to_rfc3339(), + "slot_count": schedule.slots.len(), + } + }) + } + DomainEvent::ChannelCreated { channel } => { + json!({ + "event": "channel_created", + "timestamp": now, + "channel_id": channel.id, + "data": { + "name": channel.name, + "description": channel.description, + } + }) + } + DomainEvent::ChannelUpdated { channel } => { + json!({ + "event": "channel_updated", + "timestamp": now, + "channel_id": channel.id, + "data": { + "name": channel.name, + "description": channel.description, + } + }) + } + DomainEvent::ChannelDeleted { channel_id } => { + json!({ + "event": "channel_deleted", + "timestamp": now, + "channel_id": channel_id, + "data": {} + }) + } + } +} + +/// Fire-and-forget HTTP POST to a webhook URL. +async fn post_webhook(client: &reqwest::Client, url: &str, payload: Value) { + match client.post(url).json(&payload).send().await { + Ok(resp) => { + if !resp.status().is_success() { + warn!( + "webhook POST to {} returned status {}", + url, + resp.status() + ); + } + } + Err(e) => { + warn!("webhook POST to {} failed: {}", url, e); + } + } +}