feat(webhooks): wire event bus into state, routes, and scheduler

This commit is contained in:
2026-03-16 00:41:50 +01:00
parent 5c978389b2
commit 425a6898e4
6 changed files with 88 additions and 20 deletions

View File

@@ -72,6 +72,8 @@ pub struct CreateChannelRequest {
pub access_mode: Option<domain::AccessMode>, pub access_mode: Option<domain::AccessMode>,
/// Plain-text password; hashed before storage. /// Plain-text password; hashed before storage.
pub access_password: Option<String>, pub access_password: Option<String>,
pub webhook_url: Option<String>,
pub webhook_poll_interval_secs: Option<u32>,
} }
/// All fields are optional — only provided fields are updated. /// All fields are optional — only provided fields are updated.
@@ -91,6 +93,9 @@ pub struct UpdateChannelRequest {
pub logo: Option<Option<String>>, pub logo: Option<Option<String>>,
pub logo_position: Option<domain::LogoPosition>, pub logo_position: Option<domain::LogoPosition>,
pub logo_opacity: Option<f32>, pub logo_opacity: Option<f32>,
/// `Some(None)` = clear, `Some(Some(url))` = set, `None` = unchanged.
pub webhook_url: Option<Option<String>>,
pub webhook_poll_interval_secs: Option<u32>,
} }
#[derive(Debug, Serialize)] #[derive(Debug, Serialize)]
@@ -107,6 +112,8 @@ pub struct ChannelResponse {
pub logo: Option<String>, pub logo: Option<String>,
pub logo_position: domain::LogoPosition, pub logo_position: domain::LogoPosition,
pub logo_opacity: f32, pub logo_opacity: f32,
pub webhook_url: Option<String>,
pub webhook_poll_interval_secs: u32,
pub created_at: DateTime<Utc>, pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>, pub updated_at: DateTime<Utc>,
} }
@@ -126,6 +133,8 @@ impl From<domain::Channel> for ChannelResponse {
logo: c.logo, logo: c.logo,
logo_position: c.logo_position, logo_position: c.logo_position,
logo_opacity: c.logo_opacity, logo_opacity: c.logo_opacity,
webhook_url: c.webhook_url,
webhook_poll_interval_secs: c.webhook_poll_interval_secs,
created_at: c.created_at, created_at: c.created_at,
updated_at: c.updated_at, updated_at: c.updated_at,
} }

View File

@@ -21,6 +21,7 @@ use tracing::info;
mod config; mod config;
mod dto; mod dto;
mod error; mod error;
mod events;
mod extractors; mod extractors;
mod poller; mod poller;
mod routes; mod routes;
@@ -152,7 +153,16 @@ async fn main() -> anyhow::Result<()> {
let registry = Arc::new(registry); let registry = Arc::new(registry);
let (event_tx, event_rx) = tokio::sync::broadcast::channel::<domain::DomainEvent>(64);
let bg_channel_repo = channel_repo.clone(); let bg_channel_repo = channel_repo.clone();
let webhook_channel_repo = channel_repo.clone();
tokio::spawn(webhook::run_webhook_consumer(
event_rx,
webhook_channel_repo,
reqwest::Client::new(),
));
let schedule_engine = ScheduleEngineService::new( let schedule_engine = ScheduleEngineService::new(
Arc::clone(&registry) as Arc<dyn IProviderRegistry>, Arc::clone(&registry) as Arc<dyn IProviderRegistry>,
channel_repo, channel_repo,
@@ -166,6 +176,7 @@ async fn main() -> anyhow::Result<()> {
schedule_engine, schedule_engine,
registry, registry,
config.clone(), config.clone(),
event_tx.clone(),
) )
.await?; .await?;
@@ -180,8 +191,16 @@ async fn main() -> anyhow::Result<()> {
cors_origins: config.cors_allowed_origins.clone(), cors_origins: config.cors_allowed_origins.clone(),
}; };
let bg_channel_repo_poller = bg_channel_repo.clone();
let bg_schedule_engine = Arc::clone(&state.schedule_engine); let bg_schedule_engine = Arc::clone(&state.schedule_engine);
tokio::spawn(scheduler::run_auto_scheduler(bg_schedule_engine, bg_channel_repo)); tokio::spawn(scheduler::run_auto_scheduler(bg_schedule_engine, bg_channel_repo, event_tx.clone()));
let bg_schedule_engine_poller = Arc::clone(&state.schedule_engine);
tokio::spawn(poller::run_broadcast_poller(
bg_schedule_engine_poller,
bg_channel_repo_poller,
event_tx,
));
let app = Router::new() let app = Router::new()
.nest("/api/v1", routes::api_v1_router()) .nest("/api/v1", routes::api_v1_router())

View File

@@ -5,6 +5,7 @@ use axum::{
response::IntoResponse, response::IntoResponse,
}; };
use chrono::Utc; use chrono::Utc;
use domain;
use uuid::Uuid; use uuid::Uuid;
use crate::{ use crate::{
@@ -47,10 +48,19 @@ pub(super) async fn create_channel(
channel.access_password_hash = Some(infra::auth::hash_password(pw)); channel.access_password_hash = Some(infra::auth::hash_password(pw));
changed = true; changed = true;
} }
if let Some(url) = payload.webhook_url {
channel.webhook_url = Some(url);
changed = true;
}
if let Some(interval) = payload.webhook_poll_interval_secs {
channel.webhook_poll_interval_secs = interval;
changed = true;
}
if changed { if changed {
channel = state.channel_service.update(channel).await?; channel = state.channel_service.update(channel).await?;
} }
let _ = state.event_tx.send(domain::DomainEvent::ChannelCreated { channel: channel.clone() });
Ok((StatusCode::CREATED, Json(ChannelResponse::from(channel)))) Ok((StatusCode::CREATED, Json(ChannelResponse::from(channel))))
} }
@@ -110,9 +120,16 @@ pub(super) async fn update_channel(
if let Some(opacity) = payload.logo_opacity { if let Some(opacity) = payload.logo_opacity {
channel.logo_opacity = opacity.clamp(0.0, 1.0); channel.logo_opacity = opacity.clamp(0.0, 1.0);
} }
if let Some(url) = payload.webhook_url {
channel.webhook_url = url;
}
if let Some(interval) = payload.webhook_poll_interval_secs {
channel.webhook_poll_interval_secs = interval;
}
channel.updated_at = Utc::now(); channel.updated_at = Utc::now();
let channel = state.channel_service.update(channel).await?; let channel = state.channel_service.update(channel).await?;
let _ = state.event_tx.send(domain::DomainEvent::ChannelUpdated { channel: channel.clone() });
Ok(Json(ChannelResponse::from(channel))) Ok(Json(ChannelResponse::from(channel)))
} }
@@ -123,5 +140,6 @@ pub(super) async fn delete_channel(
) -> Result<impl IntoResponse, ApiError> { ) -> Result<impl IntoResponse, ApiError> {
// ChannelService::delete enforces ownership internally // ChannelService::delete enforces ownership internally
state.channel_service.delete(channel_id, user.id).await?; state.channel_service.delete(channel_id, user.id).await?;
let _ = state.event_tx.send(domain::DomainEvent::ChannelDeleted { channel_id });
Ok(StatusCode::NO_CONTENT) Ok(StatusCode::NO_CONTENT)
} }

View File

@@ -7,7 +7,7 @@ use axum::{
use chrono::Utc; use chrono::Utc;
use uuid::Uuid; use uuid::Uuid;
use domain::DomainError; use domain::{self, DomainError};
use crate::{ use crate::{
dto::ScheduleResponse, dto::ScheduleResponse,
@@ -33,6 +33,10 @@ pub(super) async fn generate_schedule(
.generate_schedule(channel_id, Utc::now()) .generate_schedule(channel_id, Utc::now())
.await?; .await?;
let _ = state.event_tx.send(domain::DomainEvent::ScheduleGenerated {
channel_id,
schedule: schedule.clone(),
});
Ok((StatusCode::CREATED, Json(ScheduleResponse::from(schedule)))) Ok((StatusCode::CREATED, Json(ScheduleResponse::from(schedule))))
} }

View File

@@ -7,21 +7,24 @@ use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use chrono::Utc; use chrono::Utc;
use domain::{ChannelRepository, ScheduleEngineService}; use domain::{ChannelRepository, DomainEvent, ScheduleEngineService};
use tokio::sync::broadcast;
pub async fn run_auto_scheduler( pub async fn run_auto_scheduler(
schedule_engine: Arc<ScheduleEngineService>, schedule_engine: Arc<ScheduleEngineService>,
channel_repo: Arc<dyn ChannelRepository>, channel_repo: Arc<dyn ChannelRepository>,
event_tx: broadcast::Sender<DomainEvent>,
) { ) {
loop { loop {
tokio::time::sleep(Duration::from_secs(3600)).await; tokio::time::sleep(Duration::from_secs(3600)).await;
tick(&schedule_engine, &channel_repo).await; tick(&schedule_engine, &channel_repo, &event_tx).await;
} }
} }
async fn tick( async fn tick(
schedule_engine: &Arc<ScheduleEngineService>, schedule_engine: &Arc<ScheduleEngineService>,
channel_repo: &Arc<dyn ChannelRepository>, channel_repo: &Arc<dyn ChannelRepository>,
event_tx: &broadcast::Sender<DomainEvent>,
) { ) {
let channels = match channel_repo.find_auto_schedule_enabled().await { let channels = match channel_repo.find_auto_schedule_enabled().await {
Ok(c) => c, Ok(c) => c,
@@ -59,18 +62,25 @@ async fn tick(
} }
}; };
if let Err(e) = schedule_engine.generate_schedule(channel.id, from).await { match schedule_engine.generate_schedule(channel.id, from).await {
tracing::warn!( Ok(schedule) => {
"auto-scheduler: failed to generate schedule for channel {}: {}", tracing::info!(
channel.id, "auto-scheduler: generated schedule for channel {} starting at {}",
e channel.id,
); from
} else { );
tracing::info!( let _ = event_tx.send(DomainEvent::ScheduleGenerated {
"auto-scheduler: generated schedule for channel {} starting at {}", channel_id: channel.id,
channel.id, schedule,
from });
); }
Err(e) => {
tracing::warn!(
"auto-scheduler: failed to generate schedule for channel {}: {}",
channel.id,
e
);
}
} }
} }
} }
@@ -221,7 +231,8 @@ mod tests {
Arc::new(MockScheduleRepo { latest: None, saved: saved.clone() }); Arc::new(MockScheduleRepo { latest: None, saved: saved.clone() });
let engine = make_engine(channel_repo.clone(), schedule_repo); let engine = make_engine(channel_repo.clone(), schedule_repo);
tick(&engine, &channel_repo).await; let (event_tx, _) = tokio::sync::broadcast::channel(8);
tick(&engine, &channel_repo, &event_tx).await;
let saved = saved.lock().unwrap(); let saved = saved.lock().unwrap();
assert_eq!(saved.len(), 1); assert_eq!(saved.len(), 1);
@@ -240,7 +251,8 @@ mod tests {
Arc::new(MockScheduleRepo { latest: Some(schedule), saved: saved.clone() }); Arc::new(MockScheduleRepo { latest: Some(schedule), saved: saved.clone() });
let engine = make_engine(channel_repo.clone(), schedule_repo); let engine = make_engine(channel_repo.clone(), schedule_repo);
tick(&engine, &channel_repo).await; let (event_tx, _) = tokio::sync::broadcast::channel(8);
tick(&engine, &channel_repo, &event_tx).await;
assert_eq!(saved.lock().unwrap().len(), 0); assert_eq!(saved.lock().unwrap().len(), 0);
} }
@@ -256,7 +268,8 @@ mod tests {
Arc::new(MockScheduleRepo { latest: Some(schedule), saved: saved.clone() }); Arc::new(MockScheduleRepo { latest: Some(schedule), saved: saved.clone() });
let engine = make_engine(channel_repo.clone(), schedule_repo); let engine = make_engine(channel_repo.clone(), schedule_repo);
tick(&engine, &channel_repo).await; let (event_tx, _) = tokio::sync::broadcast::channel(8);
tick(&engine, &channel_repo, &event_tx).await;
let saved = saved.lock().unwrap(); let saved = saved.lock().unwrap();
assert_eq!(saved.len(), 1); assert_eq!(saved.len(), 1);
@@ -274,7 +287,8 @@ mod tests {
Arc::new(MockScheduleRepo { latest: Some(schedule), saved: saved.clone() }); Arc::new(MockScheduleRepo { latest: Some(schedule), saved: saved.clone() });
let engine = make_engine(channel_repo.clone(), schedule_repo); let engine = make_engine(channel_repo.clone(), schedule_repo);
tick(&engine, &channel_repo).await; let (event_tx, _) = tokio::sync::broadcast::channel(8);
tick(&engine, &channel_repo, &event_tx).await;
let saved = saved.lock().unwrap(); let saved = saved.lock().unwrap();
assert_eq!(saved.len(), 1); assert_eq!(saved.len(), 1);

View File

@@ -11,6 +11,7 @@ use infra::auth::oidc::OidcService;
use std::sync::Arc; use std::sync::Arc;
use crate::config::Config; use crate::config::Config;
use crate::events::EventBus;
use domain::{ChannelService, ScheduleEngineService, UserService}; use domain::{ChannelService, ScheduleEngineService, UserService};
#[derive(Clone)] #[derive(Clone)]
@@ -25,6 +26,7 @@ pub struct AppState {
#[cfg(feature = "auth-jwt")] #[cfg(feature = "auth-jwt")]
pub jwt_validator: Option<Arc<JwtValidator>>, pub jwt_validator: Option<Arc<JwtValidator>>,
pub config: Arc<Config>, pub config: Arc<Config>,
pub event_tx: EventBus,
/// Index for the local-files provider, used by the rescan route. /// Index for the local-files provider, used by the rescan route.
#[cfg(feature = "local-files")] #[cfg(feature = "local-files")]
pub local_index: Option<Arc<infra::LocalIndex>>, pub local_index: Option<Arc<infra::LocalIndex>>,
@@ -43,6 +45,7 @@ impl AppState {
schedule_engine: ScheduleEngineService, schedule_engine: ScheduleEngineService,
provider_registry: Arc<infra::ProviderRegistry>, provider_registry: Arc<infra::ProviderRegistry>,
config: Config, config: Config,
event_tx: EventBus,
) -> anyhow::Result<Self> { ) -> anyhow::Result<Self> {
let cookie_key = Key::derive_from(config.cookie_secret.as_bytes()); let cookie_key = Key::derive_from(config.cookie_secret.as_bytes());
@@ -114,6 +117,7 @@ impl AppState {
#[cfg(feature = "auth-jwt")] #[cfg(feature = "auth-jwt")]
jwt_validator, jwt_validator,
config: Arc::new(config), config: Arc::new(config),
event_tx,
#[cfg(feature = "local-files")] #[cfg(feature = "local-files")]
local_index: None, local_index: None,
#[cfg(feature = "local-files")] #[cfg(feature = "local-files")]