diff --git a/k-tv-backend/Cargo.lock b/k-tv-backend/Cargo.lock index e419507..6297239 100644 --- a/k-tv-backend/Cargo.lock +++ b/k-tv-backend/Cargo.lock @@ -81,6 +81,7 @@ dependencies = [ "infra", "k-core", "rand 0.8.5", + "reqwest", "serde", "serde_json", "serde_qs", diff --git a/k-tv-backend/api/Cargo.toml b/k-tv-backend/api/Cargo.toml index d114c93..82111eb 100644 --- a/k-tv-backend/api/Cargo.toml +++ b/k-tv-backend/api/Cargo.toml @@ -57,6 +57,7 @@ uuid = { version = "1.19.0", features = ["v4", "serde"] } # Logging tracing = "0.1" +reqwest = { version = "0.12", features = ["json"] } async-trait = "0.1" dotenvy = "0.15.7" time = "0.3" diff --git a/k-tv-backend/api/src/events.rs b/k-tv-backend/api/src/events.rs new file mode 100644 index 0000000..2f7114c --- /dev/null +++ b/k-tv-backend/api/src/events.rs @@ -0,0 +1,12 @@ +//! Event bus type alias. +//! +//! The broadcast sender is kept in `AppState` and cloned into each route handler. +//! Receivers are created with `event_tx.subscribe()`. + +use tokio::sync::broadcast; +use domain::DomainEvent; + +/// A sender half of the domain-event broadcast channel. +/// +/// Clone to share across tasks. Use `event_tx.subscribe()` to create receivers. +pub type EventBus = broadcast::Sender; diff --git a/k-tv-backend/domain/src/entities.rs b/k-tv-backend/domain/src/entities.rs index b7998a4..91779c7 100644 --- a/k-tv-backend/domain/src/entities.rs +++ b/k-tv-backend/domain/src/entities.rs @@ -88,6 +88,8 @@ pub struct Channel { pub logo: Option, pub logo_position: LogoPosition, pub logo_opacity: f32, + pub webhook_url: Option, + pub webhook_poll_interval_secs: u32, pub created_at: DateTime, pub updated_at: DateTime, } @@ -113,6 +115,8 @@ impl Channel { logo: None, logo_position: LogoPosition::default(), logo_opacity: 1.0, + webhook_url: None, + webhook_poll_interval_secs: 5, created_at: now, updated_at: now, } diff --git a/k-tv-backend/domain/src/events.rs b/k-tv-backend/domain/src/events.rs new file mode 100644 index 0000000..611522b --- /dev/null +++ b/k-tv-backend/domain/src/events.rs @@ -0,0 +1,34 @@ +//! Domain events emitted when important state transitions occur. +//! +//! These are pure data — no I/O, no tokio deps. The transport +//! (tokio::sync::broadcast) lives in `api`; domain only owns the schema. + +use uuid::Uuid; +use crate::entities::{Channel, GeneratedSchedule, ScheduledSlot}; + +/// Events emitted by the application when important state changes occur. +/// +/// Must be `Clone + Send + 'static` for use as a `broadcast::channel` item. +#[derive(Clone)] +pub enum DomainEvent { + BroadcastTransition { + channel_id: Uuid, + slot: ScheduledSlot, + }, + NoSignal { + channel_id: Uuid, + }, + ScheduleGenerated { + channel_id: Uuid, + schedule: GeneratedSchedule, + }, + ChannelCreated { + channel: Channel, + }, + ChannelUpdated { + channel: Channel, + }, + ChannelDeleted { + channel_id: Uuid, + }, +} diff --git a/k-tv-backend/domain/src/lib.rs b/k-tv-backend/domain/src/lib.rs index e8b7f87..aaefa18 100644 --- a/k-tv-backend/domain/src/lib.rs +++ b/k-tv-backend/domain/src/lib.rs @@ -9,11 +9,13 @@ pub mod iptv; pub mod ports; pub mod repositories; pub mod services; +pub mod events; pub mod value_objects; // Re-export commonly used types pub use entities::*; pub use errors::{DomainError, DomainResult}; +pub use events::DomainEvent; pub use ports::{Collection, IMediaProvider, IProviderRegistry, ProviderCapabilities, SeriesSummary, StreamingProtocol, StreamQuality}; pub use repositories::*; pub use iptv::{generate_m3u, generate_xmltv}; diff --git a/k-tv-backend/infra/src/channel_repository/mapping.rs b/k-tv-backend/infra/src/channel_repository/mapping.rs index c9d02d6..61fce60 100644 --- a/k-tv-backend/infra/src/channel_repository/mapping.rs +++ b/k-tv-backend/infra/src/channel_repository/mapping.rs @@ -19,6 +19,8 @@ pub(super) struct ChannelRow { pub logo: Option, pub logo_position: String, pub logo_opacity: f32, + pub webhook_url: Option, + pub webhook_poll_interval_secs: i64, pub created_at: String, pub updated_at: String, } @@ -73,6 +75,8 @@ impl TryFrom for Channel { logo: row.logo, logo_position, logo_opacity: row.logo_opacity, + webhook_url: row.webhook_url, + webhook_poll_interval_secs: row.webhook_poll_interval_secs as u32, created_at: parse_dt(&row.created_at)?, updated_at: parse_dt(&row.updated_at)?, }) @@ -80,4 +84,4 @@ impl TryFrom for Channel { } pub(super) const SELECT_COLS: &str = - "id, owner_id, name, description, timezone, schedule_config, recycle_policy, auto_schedule, access_mode, access_password_hash, logo, logo_position, logo_opacity, created_at, updated_at"; + "id, owner_id, name, description, timezone, schedule_config, recycle_policy, auto_schedule, access_mode, access_password_hash, logo, logo_position, logo_opacity, webhook_url, webhook_poll_interval_secs, created_at, updated_at"; diff --git a/k-tv-backend/infra/src/channel_repository/postgres.rs b/k-tv-backend/infra/src/channel_repository/postgres.rs index 169aff6..5c37e1f 100644 --- a/k-tv-backend/infra/src/channel_repository/postgres.rs +++ b/k-tv-backend/infra/src/channel_repository/postgres.rs @@ -66,18 +66,20 @@ impl ChannelRepository for PostgresChannelRepository { sqlx::query( r#" INSERT INTO channels - (id, owner_id, name, description, timezone, schedule_config, recycle_policy, auto_schedule, access_mode, access_password_hash, created_at, updated_at) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) + (id, owner_id, name, description, timezone, schedule_config, recycle_policy, auto_schedule, access_mode, access_password_hash, webhook_url, webhook_poll_interval_secs, created_at, updated_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14) ON CONFLICT(id) DO UPDATE SET - name = EXCLUDED.name, - description = EXCLUDED.description, - timezone = EXCLUDED.timezone, - schedule_config = EXCLUDED.schedule_config, - recycle_policy = EXCLUDED.recycle_policy, - auto_schedule = EXCLUDED.auto_schedule, - access_mode = EXCLUDED.access_mode, - access_password_hash = EXCLUDED.access_password_hash, - updated_at = EXCLUDED.updated_at + name = EXCLUDED.name, + description = EXCLUDED.description, + timezone = EXCLUDED.timezone, + schedule_config = EXCLUDED.schedule_config, + recycle_policy = EXCLUDED.recycle_policy, + auto_schedule = EXCLUDED.auto_schedule, + access_mode = EXCLUDED.access_mode, + access_password_hash = EXCLUDED.access_password_hash, + webhook_url = EXCLUDED.webhook_url, + webhook_poll_interval_secs = EXCLUDED.webhook_poll_interval_secs, + updated_at = EXCLUDED.updated_at "#, ) .bind(channel.id.to_string()) @@ -90,6 +92,8 @@ impl ChannelRepository for PostgresChannelRepository { .bind(channel.auto_schedule as i64) .bind(&access_mode) .bind(&channel.access_password_hash) + .bind(&channel.webhook_url) + .bind(channel.webhook_poll_interval_secs as i64) .bind(channel.created_at.to_rfc3339()) .bind(channel.updated_at.to_rfc3339()) .execute(&self.pool) diff --git a/k-tv-backend/infra/src/channel_repository/sqlite.rs b/k-tv-backend/infra/src/channel_repository/sqlite.rs index d30cd3a..0e275e4 100644 --- a/k-tv-backend/infra/src/channel_repository/sqlite.rs +++ b/k-tv-backend/infra/src/channel_repository/sqlite.rs @@ -71,21 +71,23 @@ impl ChannelRepository for SqliteChannelRepository { sqlx::query( r#" INSERT INTO channels - (id, owner_id, name, description, timezone, schedule_config, recycle_policy, auto_schedule, access_mode, access_password_hash, logo, logo_position, logo_opacity, created_at, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + (id, owner_id, name, description, timezone, schedule_config, recycle_policy, auto_schedule, access_mode, access_password_hash, logo, logo_position, logo_opacity, webhook_url, webhook_poll_interval_secs, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET - name = excluded.name, - description = excluded.description, - timezone = excluded.timezone, - schedule_config = excluded.schedule_config, - recycle_policy = excluded.recycle_policy, - auto_schedule = excluded.auto_schedule, - access_mode = excluded.access_mode, - access_password_hash = excluded.access_password_hash, - logo = excluded.logo, - logo_position = excluded.logo_position, - logo_opacity = excluded.logo_opacity, - updated_at = excluded.updated_at + name = excluded.name, + description = excluded.description, + timezone = excluded.timezone, + schedule_config = excluded.schedule_config, + recycle_policy = excluded.recycle_policy, + auto_schedule = excluded.auto_schedule, + access_mode = excluded.access_mode, + access_password_hash = excluded.access_password_hash, + logo = excluded.logo, + logo_position = excluded.logo_position, + logo_opacity = excluded.logo_opacity, + webhook_url = excluded.webhook_url, + webhook_poll_interval_secs = excluded.webhook_poll_interval_secs, + updated_at = excluded.updated_at "#, ) .bind(channel.id.to_string()) @@ -101,6 +103,8 @@ impl ChannelRepository for SqliteChannelRepository { .bind(&channel.logo) .bind(&logo_position) .bind(channel.logo_opacity) + .bind(&channel.webhook_url) + .bind(channel.webhook_poll_interval_secs as i64) .bind(channel.created_at.to_rfc3339()) .bind(channel.updated_at.to_rfc3339()) .execute(&self.pool) diff --git a/k-tv-backend/migrations_sqlite/20260316000000_add_webhook_fields.sql b/k-tv-backend/migrations_sqlite/20260316000000_add_webhook_fields.sql new file mode 100644 index 0000000..c74c140 --- /dev/null +++ b/k-tv-backend/migrations_sqlite/20260316000000_add_webhook_fields.sql @@ -0,0 +1,2 @@ +ALTER TABLE channels ADD COLUMN webhook_url TEXT; +ALTER TABLE channels ADD COLUMN webhook_poll_interval_secs INTEGER NOT NULL DEFAULT 5;