Compare commits

..

6 Commits

23 changed files with 981 additions and 65 deletions

1
.gitignore vendored
View File

@@ -1 +1,2 @@
transcode/ transcode/
.worktrees/

View File

@@ -81,6 +81,7 @@ dependencies = [
"infra", "infra",
"k-core", "k-core",
"rand 0.8.5", "rand 0.8.5",
"reqwest",
"serde", "serde",
"serde_json", "serde_json",
"serde_qs", "serde_qs",

View File

@@ -57,6 +57,7 @@ uuid = { version = "1.19.0", features = ["v4", "serde"] }
# Logging # Logging
tracing = "0.1" tracing = "0.1"
reqwest = { version = "0.12", features = ["json"] }
async-trait = "0.1" async-trait = "0.1"
dotenvy = "0.15.7" dotenvy = "0.15.7"
time = "0.3" time = "0.3"

View File

@@ -72,6 +72,8 @@ pub struct CreateChannelRequest {
pub access_mode: Option<domain::AccessMode>, pub access_mode: Option<domain::AccessMode>,
/// Plain-text password; hashed before storage. /// Plain-text password; hashed before storage.
pub access_password: Option<String>, pub access_password: Option<String>,
pub webhook_url: Option<String>,
pub webhook_poll_interval_secs: Option<u32>,
} }
/// All fields are optional — only provided fields are updated. /// All fields are optional — only provided fields are updated.
@@ -91,6 +93,9 @@ pub struct UpdateChannelRequest {
pub logo: Option<Option<String>>, pub logo: Option<Option<String>>,
pub logo_position: Option<domain::LogoPosition>, pub logo_position: Option<domain::LogoPosition>,
pub logo_opacity: Option<f32>, pub logo_opacity: Option<f32>,
/// `Some(None)` = clear, `Some(Some(url))` = set, `None` = unchanged.
pub webhook_url: Option<Option<String>>,
pub webhook_poll_interval_secs: Option<u32>,
} }
#[derive(Debug, Serialize)] #[derive(Debug, Serialize)]
@@ -107,6 +112,8 @@ pub struct ChannelResponse {
pub logo: Option<String>, pub logo: Option<String>,
pub logo_position: domain::LogoPosition, pub logo_position: domain::LogoPosition,
pub logo_opacity: f32, pub logo_opacity: f32,
pub webhook_url: Option<String>,
pub webhook_poll_interval_secs: u32,
pub created_at: DateTime<Utc>, pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>, pub updated_at: DateTime<Utc>,
} }
@@ -126,6 +133,8 @@ impl From<domain::Channel> for ChannelResponse {
logo: c.logo, logo: c.logo,
logo_position: c.logo_position, logo_position: c.logo_position,
logo_opacity: c.logo_opacity, logo_opacity: c.logo_opacity,
webhook_url: c.webhook_url,
webhook_poll_interval_secs: c.webhook_poll_interval_secs,
created_at: c.created_at, created_at: c.created_at,
updated_at: c.updated_at, updated_at: c.updated_at,
} }

View File

@@ -0,0 +1,12 @@
//! Event bus type alias.
//!
//! The broadcast sender is kept in `AppState` and cloned into each route handler.
//! Receivers are created with `event_tx.subscribe()`.
use tokio::sync::broadcast;
use domain::DomainEvent;
/// A sender half of the domain-event broadcast channel.
///
/// Clone to share across tasks. Use `event_tx.subscribe()` to create receivers.
pub type EventBus = broadcast::Sender<DomainEvent>;

View File

@@ -21,10 +21,13 @@ use tracing::info;
mod config; mod config;
mod dto; mod dto;
mod error; mod error;
mod events;
mod extractors; mod extractors;
mod poller;
mod routes; mod routes;
mod scheduler; mod scheduler;
mod state; mod state;
mod webhook;
use crate::config::Config; use crate::config::Config;
use crate::state::AppState; use crate::state::AppState;
@@ -150,7 +153,16 @@ async fn main() -> anyhow::Result<()> {
let registry = Arc::new(registry); let registry = Arc::new(registry);
let (event_tx, event_rx) = tokio::sync::broadcast::channel::<domain::DomainEvent>(64);
let bg_channel_repo = channel_repo.clone(); let bg_channel_repo = channel_repo.clone();
let webhook_channel_repo = channel_repo.clone();
tokio::spawn(webhook::run_webhook_consumer(
event_rx,
webhook_channel_repo,
reqwest::Client::new(),
));
let schedule_engine = ScheduleEngineService::new( let schedule_engine = ScheduleEngineService::new(
Arc::clone(&registry) as Arc<dyn IProviderRegistry>, Arc::clone(&registry) as Arc<dyn IProviderRegistry>,
channel_repo, channel_repo,
@@ -164,6 +176,7 @@ async fn main() -> anyhow::Result<()> {
schedule_engine, schedule_engine,
registry, registry,
config.clone(), config.clone(),
event_tx.clone(),
) )
.await?; .await?;
@@ -178,8 +191,16 @@ async fn main() -> anyhow::Result<()> {
cors_origins: config.cors_allowed_origins.clone(), cors_origins: config.cors_allowed_origins.clone(),
}; };
let bg_channel_repo_poller = bg_channel_repo.clone();
let bg_schedule_engine = Arc::clone(&state.schedule_engine); let bg_schedule_engine = Arc::clone(&state.schedule_engine);
tokio::spawn(scheduler::run_auto_scheduler(bg_schedule_engine, bg_channel_repo)); tokio::spawn(scheduler::run_auto_scheduler(bg_schedule_engine, bg_channel_repo, event_tx.clone()));
let bg_schedule_engine_poller = Arc::clone(&state.schedule_engine);
tokio::spawn(poller::run_broadcast_poller(
bg_schedule_engine_poller,
bg_channel_repo_poller,
event_tx,
));
let app = Router::new() let app = Router::new()
.nest("/api/v1", routes::api_v1_router()) .nest("/api/v1", routes::api_v1_router())

View File

@@ -0,0 +1,432 @@
//! BroadcastPoller background task.
//!
//! Polls each channel that has a webhook_url configured. On each tick (every 1s)
//! it checks which channels are due for a poll (elapsed >= webhook_poll_interval_secs)
//! and emits BroadcastTransition or NoSignal events when the current slot changes.
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use chrono::Utc;
use tokio::sync::broadcast;
use tracing::error;
use uuid::Uuid;
use domain::{ChannelRepository, DomainError, DomainEvent, ScheduleEngineService};
/// Per-channel poller state.
#[derive(Debug)]
struct ChannelPollState {
/// ID of the last slot we saw as current (None = no signal).
last_slot_id: Option<Uuid>,
/// Wall-clock instant of the last poll for this channel.
last_checked: Instant,
}
/// Polls channels with webhook URLs and emits broadcast transition events.
pub async fn run_broadcast_poller(
schedule_engine: Arc<ScheduleEngineService>,
channel_repo: Arc<dyn ChannelRepository>,
event_tx: broadcast::Sender<DomainEvent>,
) {
let mut state: HashMap<Uuid, ChannelPollState> = HashMap::new();
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
poll_tick(&schedule_engine, &channel_repo, &event_tx, &mut state).await;
}
}
pub(crate) async fn poll_tick(
schedule_engine: &Arc<ScheduleEngineService>,
channel_repo: &Arc<dyn ChannelRepository>,
event_tx: &broadcast::Sender<DomainEvent>,
state: &mut HashMap<Uuid, ChannelPollState>,
) {
let channels = match channel_repo.find_all().await {
Ok(c) => c,
Err(e) => {
error!("broadcast poller: failed to load channels: {}", e);
return;
}
};
// Remove deleted channels from state
let live_ids: std::collections::HashSet<Uuid> = channels.iter().map(|c| c.id).collect();
state.retain(|id, _| live_ids.contains(id));
let now = Utc::now();
for channel in channels {
// Only poll channels with a configured webhook URL
if channel.webhook_url.is_none() {
state.remove(&channel.id);
continue;
}
let poll_interval = Duration::from_secs(channel.webhook_poll_interval_secs as u64);
let entry = state.entry(channel.id).or_insert_with(|| ChannelPollState {
last_slot_id: None,
last_checked: Instant::now() - poll_interval, // trigger immediately on first encounter
});
if entry.last_checked.elapsed() < poll_interval {
continue; // Not yet due for a poll
}
entry.last_checked = Instant::now();
// Find the current slot
let current_slot_id = match schedule_engine.get_active_schedule(channel.id, now).await {
Ok(Some(schedule)) => {
schedule
.slots
.iter()
.find(|s| s.start_at <= now && now < s.end_at)
.map(|s| s.id)
}
Ok(None) => None,
Err(DomainError::NoActiveSchedule(_)) => None,
Err(DomainError::ChannelNotFound(_)) => {
state.remove(&channel.id);
continue;
}
Err(e) => {
error!(
"broadcast poller: error checking schedule for channel {}: {}",
channel.id, e
);
continue;
}
};
if current_slot_id == entry.last_slot_id {
continue;
}
// State changed — emit appropriate event
match &current_slot_id {
Some(slot_id) => {
if let Ok(Some(schedule)) = schedule_engine.get_active_schedule(channel.id, now).await {
if let Some(slot) = schedule.slots.iter().find(|s| s.id == *slot_id).cloned() {
let _ = event_tx.send(DomainEvent::BroadcastTransition {
channel_id: channel.id,
slot,
});
}
}
}
None => {
let _ = event_tx.send(DomainEvent::NoSignal {
channel_id: channel.id,
});
}
}
entry.last_slot_id = current_slot_id;
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use chrono::{DateTime, Duration, Utc};
use domain::{
Channel, ChannelRepository, Collection, DomainResult, GeneratedSchedule, IProviderRegistry,
MediaFilter, MediaItem, MediaItemId, PlaybackRecord, ProviderCapabilities,
ScheduleEngineService, ScheduleRepository, SeriesSummary, StreamQuality, StreamingProtocol,
};
use domain::value_objects::{ChannelId, ContentType, UserId};
use tokio::sync::broadcast;
use uuid::Uuid;
// ── Mocks ─────────────────────────────────────────────────────────────────
struct MockChannelRepo {
channels: Vec<Channel>,
}
#[async_trait]
impl ChannelRepository for MockChannelRepo {
async fn find_by_id(&self, id: ChannelId) -> DomainResult<Option<Channel>> {
Ok(self.channels.iter().find(|c| c.id == id).cloned())
}
async fn find_by_owner(&self, _owner_id: UserId) -> DomainResult<Vec<Channel>> {
unimplemented!()
}
async fn find_all(&self) -> DomainResult<Vec<Channel>> {
Ok(self.channels.clone())
}
async fn find_auto_schedule_enabled(&self) -> DomainResult<Vec<Channel>> {
unimplemented!()
}
async fn save(&self, _channel: &Channel) -> DomainResult<()> {
unimplemented!()
}
async fn delete(&self, _id: ChannelId) -> DomainResult<()> {
unimplemented!()
}
}
struct MockScheduleRepo {
active: Option<GeneratedSchedule>,
saved: Arc<Mutex<Vec<GeneratedSchedule>>>,
}
#[async_trait]
impl ScheduleRepository for MockScheduleRepo {
async fn find_active(
&self,
_channel_id: ChannelId,
_at: DateTime<Utc>,
) -> DomainResult<Option<GeneratedSchedule>> {
Ok(self.active.clone())
}
async fn find_latest(&self, _channel_id: ChannelId) -> DomainResult<Option<GeneratedSchedule>> {
Ok(self.active.clone())
}
async fn save(&self, schedule: &GeneratedSchedule) -> DomainResult<()> {
self.saved.lock().unwrap().push(schedule.clone());
Ok(())
}
async fn find_playback_history(&self, _channel_id: ChannelId) -> DomainResult<Vec<PlaybackRecord>> {
Ok(vec![])
}
async fn save_playback_record(&self, _record: &PlaybackRecord) -> DomainResult<()> {
Ok(())
}
}
struct MockRegistry;
#[async_trait]
impl IProviderRegistry for MockRegistry {
async fn fetch_items(&self, _provider_id: &str, _filter: &MediaFilter) -> DomainResult<Vec<MediaItem>> {
Ok(vec![])
}
async fn fetch_by_id(&self, _item_id: &MediaItemId) -> DomainResult<Option<MediaItem>> {
Ok(None)
}
async fn get_stream_url(&self, _item_id: &MediaItemId, _quality: &StreamQuality) -> DomainResult<String> {
unimplemented!()
}
fn provider_ids(&self) -> Vec<String> {
vec![]
}
fn primary_id(&self) -> &str {
""
}
fn capabilities(&self, _provider_id: &str) -> Option<ProviderCapabilities> {
None
}
async fn list_collections(&self, _provider_id: &str) -> DomainResult<Vec<Collection>> {
unimplemented!()
}
async fn list_series(&self, _provider_id: &str, _collection_id: Option<&str>) -> DomainResult<Vec<SeriesSummary>> {
unimplemented!()
}
async fn list_genres(&self, _provider_id: &str, _content_type: Option<&ContentType>) -> DomainResult<Vec<String>> {
unimplemented!()
}
}
// ── Helpers ───────────────────────────────────────────────────────────────
fn make_channel_with_webhook(channel_id: Uuid) -> Channel {
let mut ch = Channel::new(Uuid::new_v4(), "Test", "UTC");
ch.id = channel_id;
ch.webhook_url = Some("http://example.com/hook".to_string());
ch.webhook_poll_interval_secs = 0; // always due
ch
}
fn make_slot(channel_id: Uuid, slot_id: Uuid) -> domain::ScheduledSlot {
use domain::entities::MediaItem;
let now = Utc::now();
domain::ScheduledSlot {
id: slot_id,
start_at: now - Duration::minutes(1),
end_at: now + Duration::minutes(29),
item: MediaItem {
id: MediaItemId::new("test-item"),
title: "Test Movie".to_string(),
content_type: ContentType::Movie,
duration_secs: 1800,
description: None,
genres: vec![],
year: None,
tags: vec![],
series_name: None,
season_number: None,
episode_number: None,
},
source_block_id: Uuid::new_v4(),
}
}
fn make_schedule(channel_id: Uuid, slots: Vec<domain::ScheduledSlot>) -> GeneratedSchedule {
let now = Utc::now();
GeneratedSchedule {
id: Uuid::new_v4(),
channel_id,
valid_from: now - Duration::hours(1),
valid_until: now + Duration::hours(47),
generation: 1,
slots,
}
}
fn make_engine(
channel_repo: Arc<dyn ChannelRepository>,
schedule_repo: Arc<dyn ScheduleRepository>,
) -> Arc<ScheduleEngineService> {
Arc::new(ScheduleEngineService::new(
Arc::new(MockRegistry),
channel_repo,
schedule_repo,
))
}
// ── Tests ─────────────────────────────────────────────────────────────────
#[tokio::test]
async fn test_broadcast_transition_emitted_on_slot_change() {
let channel_id = Uuid::new_v4();
let slot_id = Uuid::new_v4();
let ch = make_channel_with_webhook(channel_id);
let slot = make_slot(channel_id, slot_id);
let schedule = make_schedule(channel_id, vec![slot]);
let channel_repo: Arc<dyn ChannelRepository> =
Arc::new(MockChannelRepo { channels: vec![ch] });
let schedule_repo: Arc<dyn ScheduleRepository> = Arc::new(MockScheduleRepo {
active: Some(schedule),
saved: Arc::new(Mutex::new(vec![])),
});
let engine = make_engine(channel_repo.clone(), schedule_repo);
let (event_tx, mut event_rx) = broadcast::channel(8);
let mut state: HashMap<Uuid, ChannelPollState> = HashMap::new();
poll_tick(&engine, &channel_repo, &event_tx, &mut state).await;
let event = event_rx.try_recv().expect("expected an event");
match event {
DomainEvent::BroadcastTransition { channel_id: cid, slot: s } => {
assert_eq!(cid, channel_id);
assert_eq!(s.id, slot_id);
}
other => panic!("expected BroadcastTransition, got something else"),
}
}
#[tokio::test]
async fn test_no_event_when_slot_unchanged() {
let channel_id = Uuid::new_v4();
let slot_id = Uuid::new_v4();
let ch = make_channel_with_webhook(channel_id);
let slot = make_slot(channel_id, slot_id);
let schedule = make_schedule(channel_id, vec![slot]);
let channel_repo: Arc<dyn ChannelRepository> =
Arc::new(MockChannelRepo { channels: vec![ch] });
let schedule_repo: Arc<dyn ScheduleRepository> = Arc::new(MockScheduleRepo {
active: Some(schedule),
saved: Arc::new(Mutex::new(vec![])),
});
let engine = make_engine(channel_repo.clone(), schedule_repo);
let (event_tx, mut event_rx) = broadcast::channel(8);
let mut state: HashMap<Uuid, ChannelPollState> = HashMap::new();
// First tick — emits BroadcastTransition
poll_tick(&engine, &channel_repo, &event_tx, &mut state).await;
let _ = event_rx.try_recv();
// Second tick — same slot, no event
poll_tick(&engine, &channel_repo, &event_tx, &mut state).await;
assert!(
event_rx.try_recv().is_err(),
"no event expected when slot unchanged"
);
}
#[tokio::test]
async fn test_no_signal_emitted_when_slot_goes_to_none() {
let channel_id = Uuid::new_v4();
let slot_id = Uuid::new_v4();
let ch = make_channel_with_webhook(channel_id);
let slot = make_slot(channel_id, slot_id);
let schedule_with_slot = make_schedule(channel_id, vec![slot]);
// Repo that starts with a slot then returns empty schedule
use std::sync::atomic::{AtomicBool, Ordering};
struct SwitchingScheduleRepo {
first: GeneratedSchedule,
second: GeneratedSchedule,
called: AtomicBool,
}
#[async_trait]
impl ScheduleRepository for SwitchingScheduleRepo {
async fn find_active(
&self,
_channel_id: ChannelId,
_at: DateTime<Utc>,
) -> DomainResult<Option<GeneratedSchedule>> {
if self.called.swap(true, Ordering::SeqCst) {
Ok(Some(self.second.clone()))
} else {
Ok(Some(self.first.clone()))
}
}
async fn find_latest(&self, _: ChannelId) -> DomainResult<Option<GeneratedSchedule>> {
Ok(None)
}
async fn save(&self, _: &GeneratedSchedule) -> DomainResult<()> { Ok(()) }
async fn find_playback_history(&self, _: ChannelId) -> DomainResult<Vec<PlaybackRecord>> {
Ok(vec![])
}
async fn save_playback_record(&self, _: &PlaybackRecord) -> DomainResult<()> { Ok(()) }
}
let now = Utc::now();
let empty_schedule = GeneratedSchedule {
id: Uuid::new_v4(),
channel_id,
valid_from: now - Duration::hours(1),
valid_until: now + Duration::hours(47),
generation: 2,
slots: vec![], // no current slot
};
let channel_repo: Arc<dyn ChannelRepository> =
Arc::new(MockChannelRepo { channels: vec![ch] });
let schedule_repo: Arc<dyn ScheduleRepository> = Arc::new(SwitchingScheduleRepo {
first: schedule_with_slot,
second: empty_schedule,
called: AtomicBool::new(false),
});
let engine = make_engine(channel_repo.clone(), schedule_repo);
let (event_tx, mut event_rx) = broadcast::channel(8);
let mut state: HashMap<Uuid, ChannelPollState> = HashMap::new();
// First tick — emits BroadcastTransition (slot present)
poll_tick(&engine, &channel_repo, &event_tx, &mut state).await;
let _ = event_rx.try_recv();
// Second tick — schedule has no current slot, emits NoSignal
poll_tick(&engine, &channel_repo, &event_tx, &mut state).await;
let event = event_rx.try_recv().expect("expected NoSignal event");
match event {
DomainEvent::NoSignal { channel_id: cid } => assert_eq!(cid, channel_id),
_ => panic!("expected NoSignal"),
}
}
}

View File

@@ -5,6 +5,7 @@ use axum::{
response::IntoResponse, response::IntoResponse,
}; };
use chrono::Utc; use chrono::Utc;
use domain;
use uuid::Uuid; use uuid::Uuid;
use crate::{ use crate::{
@@ -47,10 +48,19 @@ pub(super) async fn create_channel(
channel.access_password_hash = Some(infra::auth::hash_password(pw)); channel.access_password_hash = Some(infra::auth::hash_password(pw));
changed = true; changed = true;
} }
if let Some(url) = payload.webhook_url {
channel.webhook_url = Some(url);
changed = true;
}
if let Some(interval) = payload.webhook_poll_interval_secs {
channel.webhook_poll_interval_secs = interval;
changed = true;
}
if changed { if changed {
channel = state.channel_service.update(channel).await?; channel = state.channel_service.update(channel).await?;
} }
let _ = state.event_tx.send(domain::DomainEvent::ChannelCreated { channel: channel.clone() });
Ok((StatusCode::CREATED, Json(ChannelResponse::from(channel)))) Ok((StatusCode::CREATED, Json(ChannelResponse::from(channel))))
} }
@@ -110,9 +120,16 @@ pub(super) async fn update_channel(
if let Some(opacity) = payload.logo_opacity { if let Some(opacity) = payload.logo_opacity {
channel.logo_opacity = opacity.clamp(0.0, 1.0); channel.logo_opacity = opacity.clamp(0.0, 1.0);
} }
if let Some(url) = payload.webhook_url {
channel.webhook_url = url;
}
if let Some(interval) = payload.webhook_poll_interval_secs {
channel.webhook_poll_interval_secs = interval;
}
channel.updated_at = Utc::now(); channel.updated_at = Utc::now();
let channel = state.channel_service.update(channel).await?; let channel = state.channel_service.update(channel).await?;
let _ = state.event_tx.send(domain::DomainEvent::ChannelUpdated { channel: channel.clone() });
Ok(Json(ChannelResponse::from(channel))) Ok(Json(ChannelResponse::from(channel)))
} }
@@ -123,5 +140,6 @@ pub(super) async fn delete_channel(
) -> Result<impl IntoResponse, ApiError> { ) -> Result<impl IntoResponse, ApiError> {
// ChannelService::delete enforces ownership internally // ChannelService::delete enforces ownership internally
state.channel_service.delete(channel_id, user.id).await?; state.channel_service.delete(channel_id, user.id).await?;
let _ = state.event_tx.send(domain::DomainEvent::ChannelDeleted { channel_id });
Ok(StatusCode::NO_CONTENT) Ok(StatusCode::NO_CONTENT)
} }

View File

@@ -7,7 +7,7 @@ use axum::{
use chrono::Utc; use chrono::Utc;
use uuid::Uuid; use uuid::Uuid;
use domain::DomainError; use domain::{self, DomainError};
use crate::{ use crate::{
dto::ScheduleResponse, dto::ScheduleResponse,
@@ -33,6 +33,10 @@ pub(super) async fn generate_schedule(
.generate_schedule(channel_id, Utc::now()) .generate_schedule(channel_id, Utc::now())
.await?; .await?;
let _ = state.event_tx.send(domain::DomainEvent::ScheduleGenerated {
channel_id,
schedule: schedule.clone(),
});
Ok((StatusCode::CREATED, Json(ScheduleResponse::from(schedule)))) Ok((StatusCode::CREATED, Json(ScheduleResponse::from(schedule))))
} }

View File

@@ -7,21 +7,24 @@ use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use chrono::Utc; use chrono::Utc;
use domain::{ChannelRepository, ScheduleEngineService}; use domain::{ChannelRepository, DomainEvent, ScheduleEngineService};
use tokio::sync::broadcast;
pub async fn run_auto_scheduler( pub async fn run_auto_scheduler(
schedule_engine: Arc<ScheduleEngineService>, schedule_engine: Arc<ScheduleEngineService>,
channel_repo: Arc<dyn ChannelRepository>, channel_repo: Arc<dyn ChannelRepository>,
event_tx: broadcast::Sender<DomainEvent>,
) { ) {
loop { loop {
tokio::time::sleep(Duration::from_secs(3600)).await; tokio::time::sleep(Duration::from_secs(3600)).await;
tick(&schedule_engine, &channel_repo).await; tick(&schedule_engine, &channel_repo, &event_tx).await;
} }
} }
async fn tick( async fn tick(
schedule_engine: &Arc<ScheduleEngineService>, schedule_engine: &Arc<ScheduleEngineService>,
channel_repo: &Arc<dyn ChannelRepository>, channel_repo: &Arc<dyn ChannelRepository>,
event_tx: &broadcast::Sender<DomainEvent>,
) { ) {
let channels = match channel_repo.find_auto_schedule_enabled().await { let channels = match channel_repo.find_auto_schedule_enabled().await {
Ok(c) => c, Ok(c) => c,
@@ -59,18 +62,25 @@ async fn tick(
} }
}; };
if let Err(e) = schedule_engine.generate_schedule(channel.id, from).await { match schedule_engine.generate_schedule(channel.id, from).await {
tracing::warn!( Ok(schedule) => {
"auto-scheduler: failed to generate schedule for channel {}: {}",
channel.id,
e
);
} else {
tracing::info!( tracing::info!(
"auto-scheduler: generated schedule for channel {} starting at {}", "auto-scheduler: generated schedule for channel {} starting at {}",
channel.id, channel.id,
from from
); );
let _ = event_tx.send(DomainEvent::ScheduleGenerated {
channel_id: channel.id,
schedule,
});
}
Err(e) => {
tracing::warn!(
"auto-scheduler: failed to generate schedule for channel {}: {}",
channel.id,
e
);
}
} }
} }
} }
@@ -221,7 +231,8 @@ mod tests {
Arc::new(MockScheduleRepo { latest: None, saved: saved.clone() }); Arc::new(MockScheduleRepo { latest: None, saved: saved.clone() });
let engine = make_engine(channel_repo.clone(), schedule_repo); let engine = make_engine(channel_repo.clone(), schedule_repo);
tick(&engine, &channel_repo).await; let (event_tx, _) = tokio::sync::broadcast::channel(8);
tick(&engine, &channel_repo, &event_tx).await;
let saved = saved.lock().unwrap(); let saved = saved.lock().unwrap();
assert_eq!(saved.len(), 1); assert_eq!(saved.len(), 1);
@@ -240,7 +251,8 @@ mod tests {
Arc::new(MockScheduleRepo { latest: Some(schedule), saved: saved.clone() }); Arc::new(MockScheduleRepo { latest: Some(schedule), saved: saved.clone() });
let engine = make_engine(channel_repo.clone(), schedule_repo); let engine = make_engine(channel_repo.clone(), schedule_repo);
tick(&engine, &channel_repo).await; let (event_tx, _) = tokio::sync::broadcast::channel(8);
tick(&engine, &channel_repo, &event_tx).await;
assert_eq!(saved.lock().unwrap().len(), 0); assert_eq!(saved.lock().unwrap().len(), 0);
} }
@@ -256,7 +268,8 @@ mod tests {
Arc::new(MockScheduleRepo { latest: Some(schedule), saved: saved.clone() }); Arc::new(MockScheduleRepo { latest: Some(schedule), saved: saved.clone() });
let engine = make_engine(channel_repo.clone(), schedule_repo); let engine = make_engine(channel_repo.clone(), schedule_repo);
tick(&engine, &channel_repo).await; let (event_tx, _) = tokio::sync::broadcast::channel(8);
tick(&engine, &channel_repo, &event_tx).await;
let saved = saved.lock().unwrap(); let saved = saved.lock().unwrap();
assert_eq!(saved.len(), 1); assert_eq!(saved.len(), 1);
@@ -274,7 +287,8 @@ mod tests {
Arc::new(MockScheduleRepo { latest: Some(schedule), saved: saved.clone() }); Arc::new(MockScheduleRepo { latest: Some(schedule), saved: saved.clone() });
let engine = make_engine(channel_repo.clone(), schedule_repo); let engine = make_engine(channel_repo.clone(), schedule_repo);
tick(&engine, &channel_repo).await; let (event_tx, _) = tokio::sync::broadcast::channel(8);
tick(&engine, &channel_repo, &event_tx).await;
let saved = saved.lock().unwrap(); let saved = saved.lock().unwrap();
assert_eq!(saved.len(), 1); assert_eq!(saved.len(), 1);

View File

@@ -11,6 +11,7 @@ use infra::auth::oidc::OidcService;
use std::sync::Arc; use std::sync::Arc;
use crate::config::Config; use crate::config::Config;
use crate::events::EventBus;
use domain::{ChannelService, ScheduleEngineService, UserService}; use domain::{ChannelService, ScheduleEngineService, UserService};
#[derive(Clone)] #[derive(Clone)]
@@ -25,6 +26,7 @@ pub struct AppState {
#[cfg(feature = "auth-jwt")] #[cfg(feature = "auth-jwt")]
pub jwt_validator: Option<Arc<JwtValidator>>, pub jwt_validator: Option<Arc<JwtValidator>>,
pub config: Arc<Config>, pub config: Arc<Config>,
pub event_tx: EventBus,
/// Index for the local-files provider, used by the rescan route. /// Index for the local-files provider, used by the rescan route.
#[cfg(feature = "local-files")] #[cfg(feature = "local-files")]
pub local_index: Option<Arc<infra::LocalIndex>>, pub local_index: Option<Arc<infra::LocalIndex>>,
@@ -43,6 +45,7 @@ impl AppState {
schedule_engine: ScheduleEngineService, schedule_engine: ScheduleEngineService,
provider_registry: Arc<infra::ProviderRegistry>, provider_registry: Arc<infra::ProviderRegistry>,
config: Config, config: Config,
event_tx: EventBus,
) -> anyhow::Result<Self> { ) -> anyhow::Result<Self> {
let cookie_key = Key::derive_from(config.cookie_secret.as_bytes()); let cookie_key = Key::derive_from(config.cookie_secret.as_bytes());
@@ -114,6 +117,7 @@ impl AppState {
#[cfg(feature = "auth-jwt")] #[cfg(feature = "auth-jwt")]
jwt_validator, jwt_validator,
config: Arc::new(config), config: Arc::new(config),
event_tx,
#[cfg(feature = "local-files")] #[cfg(feature = "local-files")]
local_index: None, local_index: None,
#[cfg(feature = "local-files")] #[cfg(feature = "local-files")]

View File

@@ -0,0 +1,164 @@
//! WebhookConsumer background task.
//!
//! Subscribes to the domain-event broadcast channel, looks up each channel's
//! webhook_url, and fires HTTP POST requests (fire-and-forget).
use chrono::Utc;
use serde_json::{Value, json};
use std::sync::Arc;
use tokio::sync::broadcast;
use tracing::{info, warn};
use uuid::Uuid;
use domain::{ChannelRepository, DomainEvent};
/// Consumes domain events and delivers them to per-channel webhook URLs.
///
/// Uses fire-and-forget HTTP POST — failures are logged as warnings, never retried.
pub async fn run_webhook_consumer(
mut rx: broadcast::Receiver<DomainEvent>,
channel_repo: Arc<dyn ChannelRepository>,
client: reqwest::Client,
) {
loop {
match rx.recv().await {
Ok(event) => {
let channel_id = event_channel_id(&event);
let payload = build_payload(&event);
match channel_repo.find_by_id(channel_id).await {
Ok(Some(channel)) => {
if let Some(url) = channel.webhook_url {
let client = client.clone();
tokio::spawn(async move {
post_webhook(&client, &url, payload).await;
});
}
// No webhook_url configured — skip silently
}
Ok(None) => {
// Channel deleted — nothing to do
}
Err(e) => {
warn!("webhook consumer: failed to look up channel {}: {}", channel_id, e);
}
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
warn!("webhook consumer lagged, {} events dropped", n);
// Continue — don't break; catch up from current position
}
Err(broadcast::error::RecvError::Closed) => {
info!("webhook consumer: event bus closed, shutting down");
break;
}
}
}
}
/// Extract the channel_id from any event variant.
fn event_channel_id(event: &DomainEvent) -> Uuid {
match event {
DomainEvent::BroadcastTransition { channel_id, .. } => *channel_id,
DomainEvent::NoSignal { channel_id } => *channel_id,
DomainEvent::ScheduleGenerated { channel_id, .. } => *channel_id,
DomainEvent::ChannelCreated { channel } => channel.id,
DomainEvent::ChannelUpdated { channel } => channel.id,
DomainEvent::ChannelDeleted { channel_id } => *channel_id,
}
}
/// Build the JSON payload for an event.
fn build_payload(event: &DomainEvent) -> Value {
let now = Utc::now().to_rfc3339();
match event {
DomainEvent::BroadcastTransition { channel_id, slot } => {
let offset_secs = (Utc::now() - slot.start_at).num_seconds().max(0) as u64;
json!({
"event": "broadcast_transition",
"timestamp": now,
"channel_id": channel_id,
"data": {
"slot_id": slot.id,
"item": {
"id": slot.item.id.as_ref(),
"title": slot.item.title,
"duration_secs": slot.item.duration_secs,
},
"start_at": slot.start_at.to_rfc3339(),
"end_at": slot.end_at.to_rfc3339(),
"offset_secs": offset_secs,
}
})
}
DomainEvent::NoSignal { channel_id } => {
json!({
"event": "no_signal",
"timestamp": now,
"channel_id": channel_id,
"data": {}
})
}
DomainEvent::ScheduleGenerated { channel_id, schedule } => {
json!({
"event": "schedule_generated",
"timestamp": now,
"channel_id": channel_id,
"data": {
"generation": schedule.generation,
"valid_from": schedule.valid_from.to_rfc3339(),
"valid_until": schedule.valid_until.to_rfc3339(),
"slot_count": schedule.slots.len(),
}
})
}
DomainEvent::ChannelCreated { channel } => {
json!({
"event": "channel_created",
"timestamp": now,
"channel_id": channel.id,
"data": {
"name": channel.name,
"description": channel.description,
}
})
}
DomainEvent::ChannelUpdated { channel } => {
json!({
"event": "channel_updated",
"timestamp": now,
"channel_id": channel.id,
"data": {
"name": channel.name,
"description": channel.description,
}
})
}
DomainEvent::ChannelDeleted { channel_id } => {
json!({
"event": "channel_deleted",
"timestamp": now,
"channel_id": channel_id,
"data": {}
})
}
}
}
/// Fire-and-forget HTTP POST to a webhook URL.
async fn post_webhook(client: &reqwest::Client, url: &str, payload: Value) {
match client.post(url).json(&payload).send().await {
Ok(resp) => {
if !resp.status().is_success() {
warn!(
"webhook POST to {} returned status {}",
url,
resp.status()
);
}
}
Err(e) => {
warn!("webhook POST to {} failed: {}", url, e);
}
}
}

View File

@@ -88,6 +88,8 @@ pub struct Channel {
pub logo: Option<String>, pub logo: Option<String>,
pub logo_position: LogoPosition, pub logo_position: LogoPosition,
pub logo_opacity: f32, pub logo_opacity: f32,
pub webhook_url: Option<String>,
pub webhook_poll_interval_secs: u32,
pub created_at: DateTime<Utc>, pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>, pub updated_at: DateTime<Utc>,
} }
@@ -113,6 +115,8 @@ impl Channel {
logo: None, logo: None,
logo_position: LogoPosition::default(), logo_position: LogoPosition::default(),
logo_opacity: 1.0, logo_opacity: 1.0,
webhook_url: None,
webhook_poll_interval_secs: 5,
created_at: now, created_at: now,
updated_at: now, updated_at: now,
} }

View File

@@ -0,0 +1,112 @@
//! Domain events emitted when important state transitions occur.
//!
//! These are pure data — no I/O, no tokio deps. The transport
//! (tokio::sync::broadcast) lives in `api`; domain only owns the schema.
use uuid::Uuid;
use crate::entities::{Channel, GeneratedSchedule, ScheduledSlot};
/// Events emitted by the application when important state changes occur.
///
/// Must be `Clone + Send + 'static` for use as a `broadcast::channel` item.
#[derive(Clone)]
pub enum DomainEvent {
BroadcastTransition {
channel_id: Uuid,
slot: ScheduledSlot,
},
NoSignal {
channel_id: Uuid,
},
ScheduleGenerated {
channel_id: Uuid,
schedule: GeneratedSchedule,
},
ChannelCreated {
channel: Channel,
},
ChannelUpdated {
channel: Channel,
},
ChannelDeleted {
channel_id: Uuid,
},
}
#[cfg(test)]
mod tests {
use super::*;
use uuid::Uuid;
fn make_slot() -> crate::entities::ScheduledSlot {
use crate::entities::{MediaItem, ScheduledSlot};
use crate::value_objects::{ContentType, MediaItemId};
use chrono::Utc;
ScheduledSlot {
id: Uuid::new_v4(),
start_at: Utc::now(),
end_at: Utc::now() + chrono::Duration::minutes(30),
item: MediaItem {
id: MediaItemId::new("test-item".to_string()),
title: "Test Movie".to_string(),
content_type: ContentType::Movie,
duration_secs: 1800,
description: None,
genres: vec![],
year: None,
tags: vec![],
series_name: None,
season_number: None,
episode_number: None,
},
source_block_id: Uuid::new_v4(),
}
}
#[test]
fn broadcast_transition_carries_slot() {
let channel_id = Uuid::new_v4();
let slot = make_slot();
let event = DomainEvent::BroadcastTransition { channel_id, slot: slot.clone() };
match event {
DomainEvent::BroadcastTransition { channel_id: cid, slot: s } => {
assert_eq!(cid, channel_id);
assert_eq!(s.item.title, "Test Movie");
}
_ => panic!("wrong variant"),
}
}
#[test]
fn no_signal_carries_channel_id() {
let channel_id = Uuid::new_v4();
let event = DomainEvent::NoSignal { channel_id };
match event {
DomainEvent::NoSignal { channel_id: cid } => assert_eq!(cid, channel_id),
_ => panic!("wrong variant"),
}
}
#[test]
fn schedule_generated_carries_metadata() {
use crate::entities::GeneratedSchedule;
use chrono::Utc;
let channel_id = Uuid::new_v4();
let schedule = GeneratedSchedule {
id: Uuid::new_v4(),
channel_id,
valid_from: Utc::now(),
valid_until: Utc::now() + chrono::Duration::hours(48),
generation: 3,
slots: vec![],
};
let event = DomainEvent::ScheduleGenerated { channel_id, schedule: schedule.clone() };
match event {
DomainEvent::ScheduleGenerated { schedule: s, .. } => {
assert_eq!(s.generation, 3);
assert_eq!(s.slots.len(), 0);
}
_ => panic!("wrong variant"),
}
}
}

View File

@@ -9,11 +9,13 @@ pub mod iptv;
pub mod ports; pub mod ports;
pub mod repositories; pub mod repositories;
pub mod services; pub mod services;
pub mod events;
pub mod value_objects; pub mod value_objects;
// Re-export commonly used types // Re-export commonly used types
pub use entities::*; pub use entities::*;
pub use errors::{DomainError, DomainResult}; pub use errors::{DomainError, DomainResult};
pub use events::DomainEvent;
pub use ports::{Collection, IMediaProvider, IProviderRegistry, ProviderCapabilities, SeriesSummary, StreamingProtocol, StreamQuality}; pub use ports::{Collection, IMediaProvider, IProviderRegistry, ProviderCapabilities, SeriesSummary, StreamingProtocol, StreamQuality};
pub use repositories::*; pub use repositories::*;
pub use iptv::{generate_m3u, generate_xmltv}; pub use iptv::{generate_m3u, generate_xmltv};

View File

@@ -19,6 +19,8 @@ pub(super) struct ChannelRow {
pub logo: Option<String>, pub logo: Option<String>,
pub logo_position: String, pub logo_position: String,
pub logo_opacity: f32, pub logo_opacity: f32,
pub webhook_url: Option<String>,
pub webhook_poll_interval_secs: i64,
pub created_at: String, pub created_at: String,
pub updated_at: String, pub updated_at: String,
} }
@@ -73,6 +75,8 @@ impl TryFrom<ChannelRow> for Channel {
logo: row.logo, logo: row.logo,
logo_position, logo_position,
logo_opacity: row.logo_opacity, logo_opacity: row.logo_opacity,
webhook_url: row.webhook_url,
webhook_poll_interval_secs: row.webhook_poll_interval_secs as u32,
created_at: parse_dt(&row.created_at)?, created_at: parse_dt(&row.created_at)?,
updated_at: parse_dt(&row.updated_at)?, updated_at: parse_dt(&row.updated_at)?,
}) })
@@ -80,4 +84,4 @@ impl TryFrom<ChannelRow> for Channel {
} }
pub(super) const SELECT_COLS: &str = pub(super) const SELECT_COLS: &str =
"id, owner_id, name, description, timezone, schedule_config, recycle_policy, auto_schedule, access_mode, access_password_hash, logo, logo_position, logo_opacity, created_at, updated_at"; "id, owner_id, name, description, timezone, schedule_config, recycle_policy, auto_schedule, access_mode, access_password_hash, logo, logo_position, logo_opacity, webhook_url, webhook_poll_interval_secs, created_at, updated_at";

View File

@@ -66,8 +66,8 @@ impl ChannelRepository for PostgresChannelRepository {
sqlx::query( sqlx::query(
r#" r#"
INSERT INTO channels INSERT INTO channels
(id, owner_id, name, description, timezone, schedule_config, recycle_policy, auto_schedule, access_mode, access_password_hash, created_at, updated_at) (id, owner_id, name, description, timezone, schedule_config, recycle_policy, auto_schedule, access_mode, access_password_hash, webhook_url, webhook_poll_interval_secs, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
ON CONFLICT(id) DO UPDATE SET ON CONFLICT(id) DO UPDATE SET
name = EXCLUDED.name, name = EXCLUDED.name,
description = EXCLUDED.description, description = EXCLUDED.description,
@@ -77,6 +77,8 @@ impl ChannelRepository for PostgresChannelRepository {
auto_schedule = EXCLUDED.auto_schedule, auto_schedule = EXCLUDED.auto_schedule,
access_mode = EXCLUDED.access_mode, access_mode = EXCLUDED.access_mode,
access_password_hash = EXCLUDED.access_password_hash, access_password_hash = EXCLUDED.access_password_hash,
webhook_url = EXCLUDED.webhook_url,
webhook_poll_interval_secs = EXCLUDED.webhook_poll_interval_secs,
updated_at = EXCLUDED.updated_at updated_at = EXCLUDED.updated_at
"#, "#,
) )
@@ -90,6 +92,8 @@ impl ChannelRepository for PostgresChannelRepository {
.bind(channel.auto_schedule as i64) .bind(channel.auto_schedule as i64)
.bind(&access_mode) .bind(&access_mode)
.bind(&channel.access_password_hash) .bind(&channel.access_password_hash)
.bind(&channel.webhook_url)
.bind(channel.webhook_poll_interval_secs as i64)
.bind(channel.created_at.to_rfc3339()) .bind(channel.created_at.to_rfc3339())
.bind(channel.updated_at.to_rfc3339()) .bind(channel.updated_at.to_rfc3339())
.execute(&self.pool) .execute(&self.pool)

View File

@@ -71,8 +71,8 @@ impl ChannelRepository for SqliteChannelRepository {
sqlx::query( sqlx::query(
r#" r#"
INSERT INTO channels INSERT INTO channels
(id, owner_id, name, description, timezone, schedule_config, recycle_policy, auto_schedule, access_mode, access_password_hash, logo, logo_position, logo_opacity, created_at, updated_at) (id, owner_id, name, description, timezone, schedule_config, recycle_policy, auto_schedule, access_mode, access_password_hash, logo, logo_position, logo_opacity, webhook_url, webhook_poll_interval_secs, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET ON CONFLICT(id) DO UPDATE SET
name = excluded.name, name = excluded.name,
description = excluded.description, description = excluded.description,
@@ -85,6 +85,8 @@ impl ChannelRepository for SqliteChannelRepository {
logo = excluded.logo, logo = excluded.logo,
logo_position = excluded.logo_position, logo_position = excluded.logo_position,
logo_opacity = excluded.logo_opacity, logo_opacity = excluded.logo_opacity,
webhook_url = excluded.webhook_url,
webhook_poll_interval_secs = excluded.webhook_poll_interval_secs,
updated_at = excluded.updated_at updated_at = excluded.updated_at
"#, "#,
) )
@@ -101,6 +103,8 @@ impl ChannelRepository for SqliteChannelRepository {
.bind(&channel.logo) .bind(&channel.logo)
.bind(&logo_position) .bind(&logo_position)
.bind(channel.logo_opacity) .bind(channel.logo_opacity)
.bind(&channel.webhook_url)
.bind(channel.webhook_poll_interval_secs as i64)
.bind(channel.created_at.to_rfc3339()) .bind(channel.created_at.to_rfc3339())
.bind(channel.updated_at.to_rfc3339()) .bind(channel.updated_at.to_rfc3339())
.execute(&self.pool) .execute(&self.pool)

View File

@@ -0,0 +1,2 @@
ALTER TABLE channels ADD COLUMN webhook_url TEXT;
ALTER TABLE channels ADD COLUMN webhook_poll_interval_secs INTEGER NOT NULL DEFAULT 5;

View File

@@ -756,6 +756,8 @@ interface EditChannelSheetProps {
logo?: string | null; logo?: string | null;
logo_position?: LogoPosition; logo_position?: LogoPosition;
logo_opacity?: number; logo_opacity?: number;
webhook_url?: string | null;
webhook_poll_interval_secs?: number;
}, },
) => void; ) => void;
isPending: boolean; isPending: boolean;
@@ -787,6 +789,8 @@ export function EditChannelSheet({
const [logo, setLogo] = useState<string | null>(null); const [logo, setLogo] = useState<string | null>(null);
const [logoPosition, setLogoPosition] = useState<LogoPosition>("top_right"); const [logoPosition, setLogoPosition] = useState<LogoPosition>("top_right");
const [logoOpacity, setLogoOpacity] = useState(100); const [logoOpacity, setLogoOpacity] = useState(100);
const [webhookUrl, setWebhookUrl] = useState("");
const [webhookPollInterval, setWebhookPollInterval] = useState<number | "">(5);
const [selectedBlockId, setSelectedBlockId] = useState<string | null>(null); const [selectedBlockId, setSelectedBlockId] = useState<string | null>(null);
const [fieldErrors, setFieldErrors] = useState<FieldErrors>({}); const [fieldErrors, setFieldErrors] = useState<FieldErrors>({});
const fileInputRef = useRef<HTMLInputElement>(null); const fileInputRef = useRef<HTMLInputElement>(null);
@@ -804,6 +808,8 @@ export function EditChannelSheet({
setLogo(channel.logo ?? null); setLogo(channel.logo ?? null);
setLogoPosition(channel.logo_position ?? "top_right"); setLogoPosition(channel.logo_position ?? "top_right");
setLogoOpacity(Math.round((channel.logo_opacity ?? 1) * 100)); setLogoOpacity(Math.round((channel.logo_opacity ?? 1) * 100));
setWebhookUrl(channel.webhook_url ?? "");
setWebhookPollInterval(channel.webhook_poll_interval_secs ?? 5);
setSelectedBlockId(null); setSelectedBlockId(null);
setFieldErrors({}); setFieldErrors({});
} }
@@ -836,6 +842,10 @@ export function EditChannelSheet({
logo: logo, logo: logo,
logo_position: logoPosition, logo_position: logoPosition,
logo_opacity: logoOpacity / 100, logo_opacity: logoOpacity / 100,
webhook_url: webhookUrl || null,
...(webhookUrl
? { webhook_poll_interval_secs: webhookPollInterval === "" ? 5 : webhookPollInterval }
: {}),
}); });
}; };
@@ -1085,6 +1095,28 @@ export function EditChannelSheet({
onChange={setRecyclePolicy} onChange={setRecyclePolicy}
/> />
</section> </section>
{/* Webhook */}
<section className="space-y-3">
<h3 className="text-xs font-semibold uppercase tracking-wider text-zinc-500">Webhook</h3>
<Field label="Webhook URL" hint="POST events to this URL on broadcast changes">
<TextInput
value={webhookUrl}
onChange={setWebhookUrl}
placeholder="https://example.com/webhook"
/>
</Field>
{webhookUrl && (
<Field label="Poll interval (seconds)" hint="How often to check for broadcast changes">
<NumberInput
value={webhookPollInterval}
onChange={setWebhookPollInterval}
min={1}
placeholder="5"
/>
</Field>
)}
</section>
</div> </div>
{/* Footer */} {/* Footer */}

View File

@@ -147,6 +147,8 @@ export interface ChannelResponse {
logo?: string | null; logo?: string | null;
logo_position: LogoPosition; logo_position: LogoPosition;
logo_opacity: number; logo_opacity: number;
webhook_url?: string | null;
webhook_poll_interval_secs?: number;
created_at: string; created_at: string;
updated_at: string; updated_at: string;
} }
@@ -157,6 +159,8 @@ export interface CreateChannelRequest {
description?: string; description?: string;
access_mode?: AccessMode; access_mode?: AccessMode;
access_password?: string; access_password?: string;
webhook_url?: string;
webhook_poll_interval_secs?: number;
} }
export interface UpdateChannelRequest { export interface UpdateChannelRequest {
@@ -173,6 +177,9 @@ export interface UpdateChannelRequest {
logo?: string | null; logo?: string | null;
logo_position?: LogoPosition; logo_position?: LogoPosition;
logo_opacity?: number; logo_opacity?: number;
/** null = clear webhook */
webhook_url?: string | null;
webhook_poll_interval_secs?: number;
} }
// Media & Schedule // Media & Schedule

View File

@@ -16,6 +16,7 @@
"cmdk": "^1.1.1", "cmdk": "^1.1.1",
"date-fns": "^4.1.0", "date-fns": "^4.1.0",
"embla-carousel-react": "^8.6.0", "embla-carousel-react": "^8.6.0",
"hls.js": "^1.6.15",
"input-otp": "^1.4.2", "input-otp": "^1.4.2",
"lucide-react": "^0.577.0", "lucide-react": "^0.577.0",
"next": "16.1.6", "next": "16.1.6",
@@ -30,17 +31,20 @@
"sonner": "^2.0.7", "sonner": "^2.0.7",
"tailwind-merge": "^3.5.0", "tailwind-merge": "^3.5.0",
"tw-animate-css": "^1.4.0", "tw-animate-css": "^1.4.0",
"vaul": "^1.1.2" "vaul": "^1.1.2",
"zod": "^4.3.6"
}, },
"devDependencies": { "devDependencies": {
"@tailwindcss/postcss": "^4", "@tailwindcss/postcss": "^4",
"@types/chromecast-caf-sender": "^1.0.11",
"@types/hls.js": "^1.0.0",
"@types/node": "^20", "@types/node": "^20",
"@types/react": "^19", "@types/react": "^19",
"@types/react-dom": "^19", "@types/react-dom": "^19",
"eslint": "^9", "eslint": "^9",
"eslint-config-next": "16.1.6", "eslint-config-next": "16.1.6",
"tailwindcss": "^4", "tailwindcss": "^4",
"typescript": "^5" "typescript": "5.9.3"
} }
}, },
"node_modules/@alloc/quick-lru": { "node_modules/@alloc/quick-lru": {
@@ -1493,13 +1497,6 @@
"version": "1.0.0", "version": "1.0.0",
"license": "MIT" "license": "MIT"
}, },
"node_modules/@modelcontextprotocol/sdk/node_modules/zod": {
"version": "4.3.6",
"license": "MIT",
"funding": {
"url": "https://github.com/sponsors/colinhacks"
}
},
"node_modules/@mswjs/interceptors": { "node_modules/@mswjs/interceptors": {
"version": "0.41.3", "version": "0.41.3",
"license": "MIT", "license": "MIT",
@@ -3551,6 +3548,27 @@
"tslib": "^2.4.0" "tslib": "^2.4.0"
} }
}, },
"node_modules/@types/chrome": {
"version": "0.1.37",
"resolved": "https://registry.npmjs.org/@types/chrome/-/chrome-0.1.37.tgz",
"integrity": "sha512-IJE4ceuDO7lrEuua7Pow47zwNcI8E6qqkowRP7aFPaZ0lrjxh6y836OPqqkIZeTX64FTogbw+4RNH0+QrweCTQ==",
"dev": true,
"license": "MIT",
"dependencies": {
"@types/filesystem": "*",
"@types/har-format": "*"
}
},
"node_modules/@types/chromecast-caf-sender": {
"version": "1.0.11",
"resolved": "https://registry.npmjs.org/@types/chromecast-caf-sender/-/chromecast-caf-sender-1.0.11.tgz",
"integrity": "sha512-Pv3xvNYtxD/cTM/tKfuZRlLasvpxAm+CFni0GJd6Cp8XgiZS9g9tMZkR1uymsi5fIFv057SZKKAWVFFgy7fJtw==",
"dev": true,
"license": "MIT",
"dependencies": {
"@types/chrome": "*"
}
},
"node_modules/@types/d3-array": { "node_modules/@types/d3-array": {
"version": "3.2.2", "version": "3.2.2",
"license": "MIT" "license": "MIT"
@@ -3601,6 +3619,41 @@
"dev": true, "dev": true,
"license": "MIT" "license": "MIT"
}, },
"node_modules/@types/filesystem": {
"version": "0.0.36",
"resolved": "https://registry.npmjs.org/@types/filesystem/-/filesystem-0.0.36.tgz",
"integrity": "sha512-vPDXOZuannb9FZdxgHnqSwAG/jvdGM8Wq+6N4D/d80z+D4HWH+bItqsZaVRQykAn6WEVeEkLm2oQigyHtgb0RA==",
"dev": true,
"license": "MIT",
"dependencies": {
"@types/filewriter": "*"
}
},
"node_modules/@types/filewriter": {
"version": "0.0.33",
"resolved": "https://registry.npmjs.org/@types/filewriter/-/filewriter-0.0.33.tgz",
"integrity": "sha512-xFU8ZXTw4gd358lb2jw25nxY9QAgqn2+bKKjKOYfNCzN4DKCFetK7sPtrlpg66Ywe3vWY9FNxprZawAh9wfJ3g==",
"dev": true,
"license": "MIT"
},
"node_modules/@types/har-format": {
"version": "1.2.16",
"resolved": "https://registry.npmjs.org/@types/har-format/-/har-format-1.2.16.tgz",
"integrity": "sha512-fluxdy7ryD3MV6h8pTfTYpy/xQzCFC7m89nOH9y94cNqJ1mDIDPut7MnRHI3F6qRmh/cT2fUjG1MLdCNb4hE9A==",
"dev": true,
"license": "MIT"
},
"node_modules/@types/hls.js": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/@types/hls.js/-/hls.js-1.0.0.tgz",
"integrity": "sha512-EGY2QJefX+Z9XH4PAxI7RFoNqBlQEk16UpYR3kbr82CIgMX5SlMe0PjFdFV0JytRhyVPQCiwSyONuI6S1KdSag==",
"deprecated": "This is a stub types definition. hls.js provides its own type definitions, so you do not need this installed.",
"dev": true,
"license": "MIT",
"dependencies": {
"hls.js": "*"
}
},
"node_modules/@types/json-schema": { "node_modules/@types/json-schema": {
"version": "7.0.15", "version": "7.0.15",
"dev": true, "dev": true,
@@ -5862,14 +5915,6 @@
"eslint": "^3.0.0 || ^4.0.0 || ^5.0.0 || ^6.0.0 || ^7.0.0 || ^8.0.0-0 || ^9.0.0" "eslint": "^3.0.0 || ^4.0.0 || ^5.0.0 || ^6.0.0 || ^7.0.0 || ^8.0.0-0 || ^9.0.0"
} }
}, },
"node_modules/eslint-plugin-react-hooks/node_modules/zod": {
"version": "4.3.6",
"dev": true,
"license": "MIT",
"funding": {
"url": "https://github.com/sponsors/colinhacks"
}
},
"node_modules/eslint-plugin-react/node_modules/resolve": { "node_modules/eslint-plugin-react/node_modules/resolve": {
"version": "2.0.0-next.6", "version": "2.0.0-next.6",
"dev": true, "dev": true,
@@ -6654,6 +6699,12 @@
"hermes-estree": "0.25.1" "hermes-estree": "0.25.1"
} }
}, },
"node_modules/hls.js": {
"version": "1.6.15",
"resolved": "https://registry.npmjs.org/hls.js/-/hls.js-1.6.15.tgz",
"integrity": "sha512-E3a5VwgXimGHwpRGV+WxRTKeSp2DW5DI5MWv34ulL3t5UNmyJWCQ1KmLEHbYzcfThfXG8amBL+fCYPneGHC4VA==",
"license": "Apache-2.0"
},
"node_modules/hono": { "node_modules/hono": {
"version": "4.12.7", "version": "4.12.7",
"license": "MIT", "license": "MIT",
@@ -9321,6 +9372,15 @@
"shadcn": "dist/index.js" "shadcn": "dist/index.js"
} }
}, },
"node_modules/shadcn/node_modules/zod": {
"version": "3.25.76",
"resolved": "https://registry.npmjs.org/zod/-/zod-3.25.76.tgz",
"integrity": "sha512-gzUt/qt81nXsFGKIFcC3YnfEAx5NkunCfnDlvuBSSFS02bcXu4Lmea0AFIUwbLWxWPx3d9p8S5QoaujKcNQxcQ==",
"license": "MIT",
"funding": {
"url": "https://github.com/sponsors/colinhacks"
}
},
"node_modules/sharp": { "node_modules/sharp": {
"version": "0.34.5", "version": "0.34.5",
"hasInstallScript": true, "hasInstallScript": true,
@@ -10005,6 +10065,8 @@
}, },
"node_modules/typescript": { "node_modules/typescript": {
"version": "5.9.3", "version": "5.9.3",
"resolved": "https://registry.npmjs.org/typescript/-/typescript-5.9.3.tgz",
"integrity": "sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw==",
"devOptional": true, "devOptional": true,
"license": "Apache-2.0", "license": "Apache-2.0",
"bin": { "bin": {
@@ -10525,7 +10587,9 @@
} }
}, },
"node_modules/zod": { "node_modules/zod": {
"version": "3.25.76", "version": "4.3.6",
"resolved": "https://registry.npmjs.org/zod/-/zod-4.3.6.tgz",
"integrity": "sha512-rftlrkhHZOcjDwkGlnUtZZkvaPHCsDATp4pGpuOOMDaTdDDXF91wuVDJoWoPsKX/3YPQ5fHuF3STjcYyKr+Qhg==",
"license": "MIT", "license": "MIT",
"funding": { "funding": {
"url": "https://github.com/sponsors/colinhacks" "url": "https://github.com/sponsors/colinhacks"

View File

@@ -45,7 +45,7 @@
"eslint": "^9", "eslint": "^9",
"eslint-config-next": "16.1.6", "eslint-config-next": "16.1.6",
"tailwindcss": "^4", "tailwindcss": "^4",
"typescript": "^5" "typescript": "5.9.3"
}, },
"ignoreScripts": [ "ignoreScripts": [
"sharp", "sharp",