213 lines
7.3 KiB
Rust
213 lines
7.3 KiB
Rust
//! WebhookConsumer background task.
|
|
//!
|
|
//! Subscribes to the domain-event broadcast channel, looks up each channel's
|
|
//! webhook_url, and fires HTTP POST requests (fire-and-forget).
|
|
|
|
use chrono::Utc;
|
|
use handlebars::Handlebars;
|
|
use serde_json::{Value, json};
|
|
use std::sync::Arc;
|
|
use tokio::sync::broadcast;
|
|
use tracing::{info, warn};
|
|
use uuid::Uuid;
|
|
|
|
use domain::{ChannelRepository, DomainEvent};
|
|
|
|
/// Consumes domain events and delivers them to per-channel webhook URLs.
|
|
///
|
|
/// Uses fire-and-forget HTTP POST — failures are logged as warnings, never retried.
|
|
pub async fn run_webhook_consumer(
|
|
mut rx: broadcast::Receiver<DomainEvent>,
|
|
channel_repo: Arc<dyn ChannelRepository>,
|
|
client: reqwest::Client,
|
|
) {
|
|
loop {
|
|
match rx.recv().await {
|
|
Ok(event) => {
|
|
let channel_id = event_channel_id(&event);
|
|
let payload = build_payload(&event);
|
|
|
|
match channel_repo.find_by_id(channel_id).await {
|
|
Ok(Some(channel)) => {
|
|
if let Some(url) = channel.webhook_url {
|
|
let client = client.clone();
|
|
let template = channel.webhook_body_template.clone();
|
|
let headers = channel.webhook_headers.clone();
|
|
tokio::spawn(async move {
|
|
post_webhook(&client, &url, payload, template.as_deref(), headers.as_deref()).await;
|
|
});
|
|
}
|
|
// No webhook_url configured — skip silently
|
|
}
|
|
Ok(None) => {
|
|
// Channel deleted — nothing to do
|
|
}
|
|
Err(e) => {
|
|
warn!("webhook consumer: failed to look up channel {}: {}", channel_id, e);
|
|
}
|
|
}
|
|
}
|
|
Err(broadcast::error::RecvError::Lagged(n)) => {
|
|
warn!("webhook consumer lagged, {} events dropped", n);
|
|
// Continue — don't break; catch up from current position
|
|
}
|
|
Err(broadcast::error::RecvError::Closed) => {
|
|
info!("webhook consumer: event bus closed, shutting down");
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Extract the channel_id from any event variant.
|
|
fn event_channel_id(event: &DomainEvent) -> Uuid {
|
|
match event {
|
|
DomainEvent::BroadcastTransition { channel_id, .. } => *channel_id,
|
|
DomainEvent::NoSignal { channel_id } => *channel_id,
|
|
DomainEvent::ScheduleGenerated { channel_id, .. } => *channel_id,
|
|
DomainEvent::ChannelCreated { channel } => channel.id,
|
|
DomainEvent::ChannelUpdated { channel } => channel.id,
|
|
DomainEvent::ChannelDeleted { channel_id } => *channel_id,
|
|
}
|
|
}
|
|
|
|
/// Build the JSON payload for an event.
|
|
fn build_payload(event: &DomainEvent) -> Value {
|
|
let now = Utc::now().to_rfc3339();
|
|
match event {
|
|
DomainEvent::BroadcastTransition { channel_id, slot } => {
|
|
let offset_secs = (Utc::now() - slot.start_at).num_seconds().max(0) as u64;
|
|
json!({
|
|
"event": "broadcast_transition",
|
|
"timestamp": now,
|
|
"channel_id": channel_id,
|
|
"data": {
|
|
"slot_id": slot.id,
|
|
"item": {
|
|
"id": slot.item.id.as_ref(),
|
|
"title": slot.item.title,
|
|
"duration_secs": slot.item.duration_secs,
|
|
},
|
|
"start_at": slot.start_at.to_rfc3339(),
|
|
"end_at": slot.end_at.to_rfc3339(),
|
|
"offset_secs": offset_secs,
|
|
}
|
|
})
|
|
}
|
|
DomainEvent::NoSignal { channel_id } => {
|
|
json!({
|
|
"event": "no_signal",
|
|
"timestamp": now,
|
|
"channel_id": channel_id,
|
|
"data": {}
|
|
})
|
|
}
|
|
DomainEvent::ScheduleGenerated { channel_id, schedule } => {
|
|
json!({
|
|
"event": "schedule_generated",
|
|
"timestamp": now,
|
|
"channel_id": channel_id,
|
|
"data": {
|
|
"generation": schedule.generation,
|
|
"valid_from": schedule.valid_from.to_rfc3339(),
|
|
"valid_until": schedule.valid_until.to_rfc3339(),
|
|
"slot_count": schedule.slots.len(),
|
|
}
|
|
})
|
|
}
|
|
DomainEvent::ChannelCreated { channel } => {
|
|
json!({
|
|
"event": "channel_created",
|
|
"timestamp": now,
|
|
"channel_id": channel.id,
|
|
"data": {
|
|
"name": channel.name,
|
|
"description": channel.description,
|
|
}
|
|
})
|
|
}
|
|
DomainEvent::ChannelUpdated { channel } => {
|
|
json!({
|
|
"event": "channel_updated",
|
|
"timestamp": now,
|
|
"channel_id": channel.id,
|
|
"data": {
|
|
"name": channel.name,
|
|
"description": channel.description,
|
|
}
|
|
})
|
|
}
|
|
DomainEvent::ChannelDeleted { channel_id } => {
|
|
json!({
|
|
"event": "channel_deleted",
|
|
"timestamp": now,
|
|
"channel_id": channel_id,
|
|
"data": {}
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Fire-and-forget HTTP POST to a webhook URL.
|
|
///
|
|
/// If `template` is provided it is rendered with `payload` as context via Handlebars.
|
|
/// `headers_json` is a JSON object string of extra HTTP headers (e.g. `{"Authorization":"Bearer x"}`).
|
|
/// Content-Type defaults to `application/json` unless overridden in `headers_json`.
|
|
async fn post_webhook(
|
|
client: &reqwest::Client,
|
|
url: &str,
|
|
payload: Value,
|
|
template: Option<&str>,
|
|
headers_json: Option<&str>,
|
|
) {
|
|
let body = if let Some(tmpl) = template {
|
|
let hbs = Handlebars::new();
|
|
match hbs.render_template(tmpl, &payload) {
|
|
Ok(rendered) => rendered,
|
|
Err(e) => {
|
|
warn!("webhook template render failed for {}: {}", url, e);
|
|
return;
|
|
}
|
|
}
|
|
} else {
|
|
match serde_json::to_string(&payload) {
|
|
Ok(s) => s,
|
|
Err(e) => {
|
|
warn!("webhook payload serialize failed: {}", e);
|
|
return;
|
|
}
|
|
}
|
|
};
|
|
|
|
let mut req = client.post(url).body(body);
|
|
let mut has_content_type = false;
|
|
|
|
if let Some(h) = headers_json
|
|
&& let Ok(map) = serde_json::from_str::<serde_json::Map<String, Value>>(h)
|
|
{
|
|
for (k, v) in &map {
|
|
if k.to_lowercase() == "content-type" {
|
|
has_content_type = true;
|
|
}
|
|
if let Some(v_str) = v.as_str() {
|
|
req = req.header(k.as_str(), v_str);
|
|
}
|
|
}
|
|
}
|
|
|
|
if !has_content_type {
|
|
req = req.header("Content-Type", "application/json");
|
|
}
|
|
|
|
match req.send().await {
|
|
Ok(resp) => {
|
|
if !resp.status().is_success() {
|
|
warn!("webhook POST to {} returned status {}", url, resp.status());
|
|
}
|
|
}
|
|
Err(e) => {
|
|
warn!("webhook POST to {} failed: {}", url, e);
|
|
}
|
|
}
|
|
}
|