From 425a6898e4a77f5138102eecba883821a852d0b2 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Mon, 16 Mar 2026 00:41:50 +0100 Subject: [PATCH] feat(webhooks): wire event bus into state, routes, and scheduler --- k-tv-backend/api/src/dto.rs | 9 ++++ k-tv-backend/api/src/main.rs | 21 +++++++- k-tv-backend/api/src/routes/channels/crud.rs | 18 +++++++ .../api/src/routes/channels/schedule.rs | 6 ++- k-tv-backend/api/src/scheduler.rs | 50 ++++++++++++------- k-tv-backend/api/src/state.rs | 4 ++ 6 files changed, 88 insertions(+), 20 deletions(-) diff --git a/k-tv-backend/api/src/dto.rs b/k-tv-backend/api/src/dto.rs index 845d541..2654704 100644 --- a/k-tv-backend/api/src/dto.rs +++ b/k-tv-backend/api/src/dto.rs @@ -72,6 +72,8 @@ pub struct CreateChannelRequest { pub access_mode: Option, /// Plain-text password; hashed before storage. pub access_password: Option, + pub webhook_url: Option, + pub webhook_poll_interval_secs: Option, } /// All fields are optional — only provided fields are updated. @@ -91,6 +93,9 @@ pub struct UpdateChannelRequest { pub logo: Option>, pub logo_position: Option, pub logo_opacity: Option, + /// `Some(None)` = clear, `Some(Some(url))` = set, `None` = unchanged. + pub webhook_url: Option>, + pub webhook_poll_interval_secs: Option, } #[derive(Debug, Serialize)] @@ -107,6 +112,8 @@ pub struct ChannelResponse { pub logo: Option, pub logo_position: domain::LogoPosition, pub logo_opacity: f32, + pub webhook_url: Option, + pub webhook_poll_interval_secs: u32, pub created_at: DateTime, pub updated_at: DateTime, } @@ -126,6 +133,8 @@ impl From for ChannelResponse { logo: c.logo, logo_position: c.logo_position, logo_opacity: c.logo_opacity, + webhook_url: c.webhook_url, + webhook_poll_interval_secs: c.webhook_poll_interval_secs, created_at: c.created_at, updated_at: c.updated_at, } diff --git a/k-tv-backend/api/src/main.rs b/k-tv-backend/api/src/main.rs index b71021b..4848fad 100644 --- a/k-tv-backend/api/src/main.rs +++ b/k-tv-backend/api/src/main.rs @@ -21,6 +21,7 @@ use tracing::info; mod config; mod dto; mod error; +mod events; mod extractors; mod poller; mod routes; @@ -152,7 +153,16 @@ async fn main() -> anyhow::Result<()> { let registry = Arc::new(registry); + let (event_tx, event_rx) = tokio::sync::broadcast::channel::(64); + let bg_channel_repo = channel_repo.clone(); + let webhook_channel_repo = channel_repo.clone(); + tokio::spawn(webhook::run_webhook_consumer( + event_rx, + webhook_channel_repo, + reqwest::Client::new(), + )); + let schedule_engine = ScheduleEngineService::new( Arc::clone(®istry) as Arc, channel_repo, @@ -166,6 +176,7 @@ async fn main() -> anyhow::Result<()> { schedule_engine, registry, config.clone(), + event_tx.clone(), ) .await?; @@ -180,8 +191,16 @@ async fn main() -> anyhow::Result<()> { cors_origins: config.cors_allowed_origins.clone(), }; + let bg_channel_repo_poller = bg_channel_repo.clone(); let bg_schedule_engine = Arc::clone(&state.schedule_engine); - tokio::spawn(scheduler::run_auto_scheduler(bg_schedule_engine, bg_channel_repo)); + tokio::spawn(scheduler::run_auto_scheduler(bg_schedule_engine, bg_channel_repo, event_tx.clone())); + + let bg_schedule_engine_poller = Arc::clone(&state.schedule_engine); + tokio::spawn(poller::run_broadcast_poller( + bg_schedule_engine_poller, + bg_channel_repo_poller, + event_tx, + )); let app = Router::new() .nest("/api/v1", routes::api_v1_router()) diff --git a/k-tv-backend/api/src/routes/channels/crud.rs b/k-tv-backend/api/src/routes/channels/crud.rs index ab9f545..f9a13c9 100644 --- a/k-tv-backend/api/src/routes/channels/crud.rs +++ b/k-tv-backend/api/src/routes/channels/crud.rs @@ -5,6 +5,7 @@ use axum::{ response::IntoResponse, }; use chrono::Utc; +use domain; use uuid::Uuid; use crate::{ @@ -47,10 +48,19 @@ pub(super) async fn create_channel( channel.access_password_hash = Some(infra::auth::hash_password(pw)); changed = true; } + if let Some(url) = payload.webhook_url { + channel.webhook_url = Some(url); + changed = true; + } + if let Some(interval) = payload.webhook_poll_interval_secs { + channel.webhook_poll_interval_secs = interval; + changed = true; + } if changed { channel = state.channel_service.update(channel).await?; } + let _ = state.event_tx.send(domain::DomainEvent::ChannelCreated { channel: channel.clone() }); Ok((StatusCode::CREATED, Json(ChannelResponse::from(channel)))) } @@ -110,9 +120,16 @@ pub(super) async fn update_channel( if let Some(opacity) = payload.logo_opacity { channel.logo_opacity = opacity.clamp(0.0, 1.0); } + if let Some(url) = payload.webhook_url { + channel.webhook_url = url; + } + if let Some(interval) = payload.webhook_poll_interval_secs { + channel.webhook_poll_interval_secs = interval; + } channel.updated_at = Utc::now(); let channel = state.channel_service.update(channel).await?; + let _ = state.event_tx.send(domain::DomainEvent::ChannelUpdated { channel: channel.clone() }); Ok(Json(ChannelResponse::from(channel))) } @@ -123,5 +140,6 @@ pub(super) async fn delete_channel( ) -> Result { // ChannelService::delete enforces ownership internally state.channel_service.delete(channel_id, user.id).await?; + let _ = state.event_tx.send(domain::DomainEvent::ChannelDeleted { channel_id }); Ok(StatusCode::NO_CONTENT) } diff --git a/k-tv-backend/api/src/routes/channels/schedule.rs b/k-tv-backend/api/src/routes/channels/schedule.rs index 4b1a567..d915d62 100644 --- a/k-tv-backend/api/src/routes/channels/schedule.rs +++ b/k-tv-backend/api/src/routes/channels/schedule.rs @@ -7,7 +7,7 @@ use axum::{ use chrono::Utc; use uuid::Uuid; -use domain::DomainError; +use domain::{self, DomainError}; use crate::{ dto::ScheduleResponse, @@ -33,6 +33,10 @@ pub(super) async fn generate_schedule( .generate_schedule(channel_id, Utc::now()) .await?; + let _ = state.event_tx.send(domain::DomainEvent::ScheduleGenerated { + channel_id, + schedule: schedule.clone(), + }); Ok((StatusCode::CREATED, Json(ScheduleResponse::from(schedule)))) } diff --git a/k-tv-backend/api/src/scheduler.rs b/k-tv-backend/api/src/scheduler.rs index b25d440..e68d15d 100644 --- a/k-tv-backend/api/src/scheduler.rs +++ b/k-tv-backend/api/src/scheduler.rs @@ -7,21 +7,24 @@ use std::sync::Arc; use std::time::Duration; use chrono::Utc; -use domain::{ChannelRepository, ScheduleEngineService}; +use domain::{ChannelRepository, DomainEvent, ScheduleEngineService}; +use tokio::sync::broadcast; pub async fn run_auto_scheduler( schedule_engine: Arc, channel_repo: Arc, + event_tx: broadcast::Sender, ) { loop { tokio::time::sleep(Duration::from_secs(3600)).await; - tick(&schedule_engine, &channel_repo).await; + tick(&schedule_engine, &channel_repo, &event_tx).await; } } async fn tick( schedule_engine: &Arc, channel_repo: &Arc, + event_tx: &broadcast::Sender, ) { let channels = match channel_repo.find_auto_schedule_enabled().await { Ok(c) => c, @@ -59,18 +62,25 @@ async fn tick( } }; - if let Err(e) = schedule_engine.generate_schedule(channel.id, from).await { - tracing::warn!( - "auto-scheduler: failed to generate schedule for channel {}: {}", - channel.id, - e - ); - } else { - tracing::info!( - "auto-scheduler: generated schedule for channel {} starting at {}", - channel.id, - from - ); + match schedule_engine.generate_schedule(channel.id, from).await { + Ok(schedule) => { + tracing::info!( + "auto-scheduler: generated schedule for channel {} starting at {}", + channel.id, + from + ); + let _ = event_tx.send(DomainEvent::ScheduleGenerated { + channel_id: channel.id, + schedule, + }); + } + Err(e) => { + tracing::warn!( + "auto-scheduler: failed to generate schedule for channel {}: {}", + channel.id, + e + ); + } } } } @@ -221,7 +231,8 @@ mod tests { Arc::new(MockScheduleRepo { latest: None, saved: saved.clone() }); let engine = make_engine(channel_repo.clone(), schedule_repo); - tick(&engine, &channel_repo).await; + let (event_tx, _) = tokio::sync::broadcast::channel(8); + tick(&engine, &channel_repo, &event_tx).await; let saved = saved.lock().unwrap(); assert_eq!(saved.len(), 1); @@ -240,7 +251,8 @@ mod tests { Arc::new(MockScheduleRepo { latest: Some(schedule), saved: saved.clone() }); let engine = make_engine(channel_repo.clone(), schedule_repo); - tick(&engine, &channel_repo).await; + let (event_tx, _) = tokio::sync::broadcast::channel(8); + tick(&engine, &channel_repo, &event_tx).await; assert_eq!(saved.lock().unwrap().len(), 0); } @@ -256,7 +268,8 @@ mod tests { Arc::new(MockScheduleRepo { latest: Some(schedule), saved: saved.clone() }); let engine = make_engine(channel_repo.clone(), schedule_repo); - tick(&engine, &channel_repo).await; + let (event_tx, _) = tokio::sync::broadcast::channel(8); + tick(&engine, &channel_repo, &event_tx).await; let saved = saved.lock().unwrap(); assert_eq!(saved.len(), 1); @@ -274,7 +287,8 @@ mod tests { Arc::new(MockScheduleRepo { latest: Some(schedule), saved: saved.clone() }); let engine = make_engine(channel_repo.clone(), schedule_repo); - tick(&engine, &channel_repo).await; + let (event_tx, _) = tokio::sync::broadcast::channel(8); + tick(&engine, &channel_repo, &event_tx).await; let saved = saved.lock().unwrap(); assert_eq!(saved.len(), 1); diff --git a/k-tv-backend/api/src/state.rs b/k-tv-backend/api/src/state.rs index 8298a62..911d3e6 100644 --- a/k-tv-backend/api/src/state.rs +++ b/k-tv-backend/api/src/state.rs @@ -11,6 +11,7 @@ use infra::auth::oidc::OidcService; use std::sync::Arc; use crate::config::Config; +use crate::events::EventBus; use domain::{ChannelService, ScheduleEngineService, UserService}; #[derive(Clone)] @@ -25,6 +26,7 @@ pub struct AppState { #[cfg(feature = "auth-jwt")] pub jwt_validator: Option>, pub config: Arc, + pub event_tx: EventBus, /// Index for the local-files provider, used by the rescan route. #[cfg(feature = "local-files")] pub local_index: Option>, @@ -43,6 +45,7 @@ impl AppState { schedule_engine: ScheduleEngineService, provider_registry: Arc, config: Config, + event_tx: EventBus, ) -> anyhow::Result { let cookie_key = Key::derive_from(config.cookie_secret.as_bytes()); @@ -114,6 +117,7 @@ impl AppState { #[cfg(feature = "auth-jwt")] jwt_validator, config: Arc::new(config), + event_tx, #[cfg(feature = "local-files")] local_index: None, #[cfg(feature = "local-files")]