feat(webhooks): add foundation layer — migration, domain events, EventBus

This commit is contained in:
2026-03-16 00:35:25 +01:00
parent 6307850b82
commit 1d0e640946
10 changed files with 94 additions and 26 deletions

View File

@@ -81,6 +81,7 @@ dependencies = [
"infra",
"k-core",
"rand 0.8.5",
"reqwest",
"serde",
"serde_json",
"serde_qs",

View File

@@ -57,6 +57,7 @@ uuid = { version = "1.19.0", features = ["v4", "serde"] }
# Logging
tracing = "0.1"
reqwest = { version = "0.12", features = ["json"] }
async-trait = "0.1"
dotenvy = "0.15.7"
time = "0.3"

View File

@@ -0,0 +1,12 @@
//! Event bus type alias.
//!
//! The broadcast sender is kept in `AppState` and cloned into each route handler.
//! Receivers are created with `event_tx.subscribe()`.
use tokio::sync::broadcast;
use domain::DomainEvent;
/// A sender half of the domain-event broadcast channel.
///
/// Clone to share across tasks. Use `event_tx.subscribe()` to create receivers.
pub type EventBus = broadcast::Sender<DomainEvent>;

View File

@@ -88,6 +88,8 @@ pub struct Channel {
pub logo: Option<String>,
pub logo_position: LogoPosition,
pub logo_opacity: f32,
pub webhook_url: Option<String>,
pub webhook_poll_interval_secs: u32,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
@@ -113,6 +115,8 @@ impl Channel {
logo: None,
logo_position: LogoPosition::default(),
logo_opacity: 1.0,
webhook_url: None,
webhook_poll_interval_secs: 5,
created_at: now,
updated_at: now,
}

View File

@@ -0,0 +1,34 @@
//! Domain events emitted when important state transitions occur.
//!
//! These are pure data — no I/O, no tokio deps. The transport
//! (tokio::sync::broadcast) lives in `api`; domain only owns the schema.
use uuid::Uuid;
use crate::entities::{Channel, GeneratedSchedule, ScheduledSlot};
/// Events emitted by the application when important state changes occur.
///
/// Must be `Clone + Send + 'static` for use as a `broadcast::channel` item.
#[derive(Clone)]
pub enum DomainEvent {
BroadcastTransition {
channel_id: Uuid,
slot: ScheduledSlot,
},
NoSignal {
channel_id: Uuid,
},
ScheduleGenerated {
channel_id: Uuid,
schedule: GeneratedSchedule,
},
ChannelCreated {
channel: Channel,
},
ChannelUpdated {
channel: Channel,
},
ChannelDeleted {
channel_id: Uuid,
},
}

View File

@@ -9,11 +9,13 @@ pub mod iptv;
pub mod ports;
pub mod repositories;
pub mod services;
pub mod events;
pub mod value_objects;
// Re-export commonly used types
pub use entities::*;
pub use errors::{DomainError, DomainResult};
pub use events::DomainEvent;
pub use ports::{Collection, IMediaProvider, IProviderRegistry, ProviderCapabilities, SeriesSummary, StreamingProtocol, StreamQuality};
pub use repositories::*;
pub use iptv::{generate_m3u, generate_xmltv};

View File

@@ -19,6 +19,8 @@ pub(super) struct ChannelRow {
pub logo: Option<String>,
pub logo_position: String,
pub logo_opacity: f32,
pub webhook_url: Option<String>,
pub webhook_poll_interval_secs: i64,
pub created_at: String,
pub updated_at: String,
}
@@ -73,6 +75,8 @@ impl TryFrom<ChannelRow> for Channel {
logo: row.logo,
logo_position,
logo_opacity: row.logo_opacity,
webhook_url: row.webhook_url,
webhook_poll_interval_secs: row.webhook_poll_interval_secs as u32,
created_at: parse_dt(&row.created_at)?,
updated_at: parse_dt(&row.updated_at)?,
})
@@ -80,4 +84,4 @@ impl TryFrom<ChannelRow> for Channel {
}
pub(super) const SELECT_COLS: &str =
"id, owner_id, name, description, timezone, schedule_config, recycle_policy, auto_schedule, access_mode, access_password_hash, logo, logo_position, logo_opacity, created_at, updated_at";
"id, owner_id, name, description, timezone, schedule_config, recycle_policy, auto_schedule, access_mode, access_password_hash, logo, logo_position, logo_opacity, webhook_url, webhook_poll_interval_secs, created_at, updated_at";

View File

@@ -66,8 +66,8 @@ impl ChannelRepository for PostgresChannelRepository {
sqlx::query(
r#"
INSERT INTO channels
(id, owner_id, name, description, timezone, schedule_config, recycle_policy, auto_schedule, access_mode, access_password_hash, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
(id, owner_id, name, description, timezone, schedule_config, recycle_policy, auto_schedule, access_mode, access_password_hash, webhook_url, webhook_poll_interval_secs, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
ON CONFLICT(id) DO UPDATE SET
name = EXCLUDED.name,
description = EXCLUDED.description,
@@ -77,6 +77,8 @@ impl ChannelRepository for PostgresChannelRepository {
auto_schedule = EXCLUDED.auto_schedule,
access_mode = EXCLUDED.access_mode,
access_password_hash = EXCLUDED.access_password_hash,
webhook_url = EXCLUDED.webhook_url,
webhook_poll_interval_secs = EXCLUDED.webhook_poll_interval_secs,
updated_at = EXCLUDED.updated_at
"#,
)
@@ -90,6 +92,8 @@ impl ChannelRepository for PostgresChannelRepository {
.bind(channel.auto_schedule as i64)
.bind(&access_mode)
.bind(&channel.access_password_hash)
.bind(&channel.webhook_url)
.bind(channel.webhook_poll_interval_secs as i64)
.bind(channel.created_at.to_rfc3339())
.bind(channel.updated_at.to_rfc3339())
.execute(&self.pool)

View File

@@ -71,8 +71,8 @@ impl ChannelRepository for SqliteChannelRepository {
sqlx::query(
r#"
INSERT INTO channels
(id, owner_id, name, description, timezone, schedule_config, recycle_policy, auto_schedule, access_mode, access_password_hash, logo, logo_position, logo_opacity, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
(id, owner_id, name, description, timezone, schedule_config, recycle_policy, auto_schedule, access_mode, access_password_hash, logo, logo_position, logo_opacity, webhook_url, webhook_poll_interval_secs, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
name = excluded.name,
description = excluded.description,
@@ -85,6 +85,8 @@ impl ChannelRepository for SqliteChannelRepository {
logo = excluded.logo,
logo_position = excluded.logo_position,
logo_opacity = excluded.logo_opacity,
webhook_url = excluded.webhook_url,
webhook_poll_interval_secs = excluded.webhook_poll_interval_secs,
updated_at = excluded.updated_at
"#,
)
@@ -101,6 +103,8 @@ impl ChannelRepository for SqliteChannelRepository {
.bind(&channel.logo)
.bind(&logo_position)
.bind(channel.logo_opacity)
.bind(&channel.webhook_url)
.bind(channel.webhook_poll_interval_secs as i64)
.bind(channel.created_at.to_rfc3339())
.bind(channel.updated_at.to_rfc3339())
.execute(&self.pool)

View File

@@ -0,0 +1,2 @@
ALTER TABLE channels ADD COLUMN webhook_url TEXT;
ALTER TABLE channels ADD COLUMN webhook_poll_interval_secs INTEGER NOT NULL DEFAULT 5;