feat(webhooks): add WebhookConsumer and BroadcastPoller tasks

This commit is contained in:
2026-03-16 00:37:57 +01:00
parent 1d0e640946
commit 5c978389b2
3 changed files with 296 additions and 0 deletions

View File

@@ -22,9 +22,11 @@ mod config;
mod dto; mod dto;
mod error; mod error;
mod extractors; mod extractors;
mod poller;
mod routes; mod routes;
mod scheduler; mod scheduler;
mod state; mod state;
mod webhook;
use crate::config::Config; use crate::config::Config;
use crate::state::AppState; use crate::state::AppState;

View File

@@ -0,0 +1,130 @@
//! BroadcastPoller background task.
//!
//! Polls each channel that has a webhook_url configured. On each tick (every 1s)
//! it checks which channels are due for a poll (elapsed >= webhook_poll_interval_secs)
//! and emits BroadcastTransition or NoSignal events when the current slot changes.
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use chrono::Utc;
use tokio::sync::broadcast;
use tracing::error;
use uuid::Uuid;
use domain::{ChannelRepository, DomainError, DomainEvent, ScheduleEngineService};
/// Per-channel poller state.
#[derive(Debug)]
struct ChannelPollState {
/// ID of the last slot we saw as current (None = no signal).
last_slot_id: Option<Uuid>,
/// Wall-clock instant of the last poll for this channel.
last_checked: Instant,
}
/// Polls channels with webhook URLs and emits broadcast transition events.
pub async fn run_broadcast_poller(
schedule_engine: Arc<ScheduleEngineService>,
channel_repo: Arc<dyn ChannelRepository>,
event_tx: broadcast::Sender<DomainEvent>,
) {
let mut state: HashMap<Uuid, ChannelPollState> = HashMap::new();
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
poll_tick(&schedule_engine, &channel_repo, &event_tx, &mut state).await;
}
}
async fn poll_tick(
schedule_engine: &Arc<ScheduleEngineService>,
channel_repo: &Arc<dyn ChannelRepository>,
event_tx: &broadcast::Sender<DomainEvent>,
state: &mut HashMap<Uuid, ChannelPollState>,
) {
let channels = match channel_repo.find_all().await {
Ok(c) => c,
Err(e) => {
error!("broadcast poller: failed to load channels: {}", e);
return;
}
};
// Remove deleted channels from state
let live_ids: std::collections::HashSet<Uuid> = channels.iter().map(|c| c.id).collect();
state.retain(|id, _| live_ids.contains(id));
let now = Utc::now();
for channel in channels {
// Only poll channels with a configured webhook URL
if channel.webhook_url.is_none() {
state.remove(&channel.id);
continue;
}
let poll_interval = Duration::from_secs(channel.webhook_poll_interval_secs as u64);
let entry = state.entry(channel.id).or_insert_with(|| ChannelPollState {
last_slot_id: None,
last_checked: Instant::now() - poll_interval, // trigger immediately on first encounter
});
if entry.last_checked.elapsed() < poll_interval {
continue; // Not yet due for a poll
}
entry.last_checked = Instant::now();
// Find the current slot
let current_slot_id = match schedule_engine.get_active_schedule(channel.id, now).await {
Ok(Some(schedule)) => {
schedule
.slots
.iter()
.find(|s| s.start_at <= now && now < s.end_at)
.map(|s| s.id)
}
Ok(None) => None,
Err(DomainError::NoActiveSchedule(_)) => None,
Err(DomainError::ChannelNotFound(_)) => {
state.remove(&channel.id);
continue;
}
Err(e) => {
error!(
"broadcast poller: error checking schedule for channel {}: {}",
channel.id, e
);
continue;
}
};
if current_slot_id == entry.last_slot_id {
continue;
}
// State changed — emit appropriate event
match &current_slot_id {
Some(slot_id) => {
if let Ok(Some(schedule)) = schedule_engine.get_active_schedule(channel.id, now).await {
if let Some(slot) = schedule.slots.iter().find(|s| s.id == *slot_id).cloned() {
let _ = event_tx.send(DomainEvent::BroadcastTransition {
channel_id: channel.id,
slot,
});
}
}
}
None => {
let _ = event_tx.send(DomainEvent::NoSignal {
channel_id: channel.id,
});
}
}
entry.last_slot_id = current_slot_id;
}
}

View File

@@ -0,0 +1,164 @@
//! WebhookConsumer background task.
//!
//! Subscribes to the domain-event broadcast channel, looks up each channel's
//! webhook_url, and fires HTTP POST requests (fire-and-forget).
use chrono::Utc;
use serde_json::{Value, json};
use std::sync::Arc;
use tokio::sync::broadcast;
use tracing::{info, warn};
use uuid::Uuid;
use domain::{ChannelRepository, DomainEvent};
/// Consumes domain events and delivers them to per-channel webhook URLs.
///
/// Uses fire-and-forget HTTP POST — failures are logged as warnings, never retried.
pub async fn run_webhook_consumer(
mut rx: broadcast::Receiver<DomainEvent>,
channel_repo: Arc<dyn ChannelRepository>,
client: reqwest::Client,
) {
loop {
match rx.recv().await {
Ok(event) => {
let channel_id = event_channel_id(&event);
let payload = build_payload(&event);
match channel_repo.find_by_id(channel_id).await {
Ok(Some(channel)) => {
if let Some(url) = channel.webhook_url {
let client = client.clone();
tokio::spawn(async move {
post_webhook(&client, &url, payload).await;
});
}
// No webhook_url configured — skip silently
}
Ok(None) => {
// Channel deleted — nothing to do
}
Err(e) => {
warn!("webhook consumer: failed to look up channel {}: {}", channel_id, e);
}
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
warn!("webhook consumer lagged, {} events dropped", n);
// Continue — don't break; catch up from current position
}
Err(broadcast::error::RecvError::Closed) => {
info!("webhook consumer: event bus closed, shutting down");
break;
}
}
}
}
/// Extract the channel_id from any event variant.
fn event_channel_id(event: &DomainEvent) -> Uuid {
match event {
DomainEvent::BroadcastTransition { channel_id, .. } => *channel_id,
DomainEvent::NoSignal { channel_id } => *channel_id,
DomainEvent::ScheduleGenerated { channel_id, .. } => *channel_id,
DomainEvent::ChannelCreated { channel } => channel.id,
DomainEvent::ChannelUpdated { channel } => channel.id,
DomainEvent::ChannelDeleted { channel_id } => *channel_id,
}
}
/// Build the JSON payload for an event.
fn build_payload(event: &DomainEvent) -> Value {
let now = Utc::now().to_rfc3339();
match event {
DomainEvent::BroadcastTransition { channel_id, slot } => {
let offset_secs = (Utc::now() - slot.start_at).num_seconds().max(0) as u64;
json!({
"event": "broadcast_transition",
"timestamp": now,
"channel_id": channel_id,
"data": {
"slot_id": slot.id,
"item": {
"id": slot.item.id.as_ref(),
"title": slot.item.title,
"duration_secs": slot.item.duration_secs,
},
"start_at": slot.start_at.to_rfc3339(),
"end_at": slot.end_at.to_rfc3339(),
"offset_secs": offset_secs,
}
})
}
DomainEvent::NoSignal { channel_id } => {
json!({
"event": "no_signal",
"timestamp": now,
"channel_id": channel_id,
"data": {}
})
}
DomainEvent::ScheduleGenerated { channel_id, schedule } => {
json!({
"event": "schedule_generated",
"timestamp": now,
"channel_id": channel_id,
"data": {
"generation": schedule.generation,
"valid_from": schedule.valid_from.to_rfc3339(),
"valid_until": schedule.valid_until.to_rfc3339(),
"slot_count": schedule.slots.len(),
}
})
}
DomainEvent::ChannelCreated { channel } => {
json!({
"event": "channel_created",
"timestamp": now,
"channel_id": channel.id,
"data": {
"name": channel.name,
"description": channel.description,
}
})
}
DomainEvent::ChannelUpdated { channel } => {
json!({
"event": "channel_updated",
"timestamp": now,
"channel_id": channel.id,
"data": {
"name": channel.name,
"description": channel.description,
}
})
}
DomainEvent::ChannelDeleted { channel_id } => {
json!({
"event": "channel_deleted",
"timestamp": now,
"channel_id": channel_id,
"data": {}
})
}
}
}
/// Fire-and-forget HTTP POST to a webhook URL.
async fn post_webhook(client: &reqwest::Client, url: &str, payload: Value) {
match client.post(url).json(&payload).send().await {
Ok(resp) => {
if !resp.status().is_success() {
warn!(
"webhook POST to {} returned status {}",
url,
resp.status()
);
}
}
Err(e) => {
warn!("webhook POST to {} failed: {}", url, e);
}
}
}