feat: add webhook body template and headers support for channels

This commit is contained in:
2026-03-16 01:10:26 +01:00
parent db461db270
commit e76167134b
12 changed files with 366 additions and 23 deletions

155
k-tv-backend/Cargo.lock generated
View File

@@ -78,6 +78,7 @@ dependencies = [
"chrono",
"domain",
"dotenvy",
"handlebars",
"infra",
"k-core",
"rand 0.8.5",
@@ -539,14 +540,38 @@ dependencies = [
"syn",
]
[[package]]
name = "darling"
version = "0.20.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee"
dependencies = [
"darling_core 0.20.11",
"darling_macro 0.20.11",
]
[[package]]
name = "darling"
version = "0.21.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9cdf337090841a411e2a7f3deb9187445851f91b309c0c0a29e05f74a00a48c0"
dependencies = [
"darling_core",
"darling_macro",
"darling_core 0.21.3",
"darling_macro 0.21.3",
]
[[package]]
name = "darling_core"
version = "0.20.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d00b9596d185e565c2207a0b01f8bd1a135483d02d9b7b0a54b11da8d53412e"
dependencies = [
"fnv",
"ident_case",
"proc-macro2",
"quote",
"strsim",
"syn",
]
[[package]]
@@ -563,13 +588,24 @@ dependencies = [
"syn",
]
[[package]]
name = "darling_macro"
version = "0.20.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead"
dependencies = [
"darling_core 0.20.11",
"quote",
"syn",
]
[[package]]
name = "darling_macro"
version = "0.21.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d38308df82d1080de0afee5d069fa14b0326a88c14f15c5ccda35b4a6c414c81"
dependencies = [
"darling_core",
"darling_core 0.21.3",
"quote",
"syn",
]
@@ -601,6 +637,37 @@ dependencies = [
"serde_core",
]
[[package]]
name = "derive_builder"
version = "0.20.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "507dfb09ea8b7fa618fcf76e953f4f5e192547945816d5358edffe39f6f94947"
dependencies = [
"derive_builder_macro",
]
[[package]]
name = "derive_builder_core"
version = "0.20.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d5bcf7b024d6835cfb3d473887cd966994907effbe9227e8c8219824d06c4e8"
dependencies = [
"darling 0.20.11",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "derive_builder_macro"
version = "0.20.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab63b0e2bf4d5928aff72e83a7dace85d7bba5fe12dcc3c5a572d78caffd3f3c"
dependencies = [
"derive_builder_core",
"syn",
]
[[package]]
name = "digest"
version = "0.10.7"
@@ -1030,6 +1097,22 @@ dependencies = [
"tracing",
]
[[package]]
name = "handlebars"
version = "6.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b3f9296c208515b87bd915a2f5d1163d4b3f863ba83337d7713cf478055948e"
dependencies = [
"derive_builder",
"log",
"num-order",
"pest",
"pest_derive",
"serde",
"serde_json",
"thiserror 2.0.17",
]
[[package]]
name = "hashbrown"
version = "0.12.3"
@@ -1756,6 +1839,21 @@ dependencies = [
"num-traits",
]
[[package]]
name = "num-modular"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17bb261bf36fa7d83f4c294f834e91256769097b3cb505d44831e0a179ac647f"
[[package]]
name = "num-order"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "537b596b97c40fcf8056d153049eb22f481c17ebce72a513ec9286e4986d1bb6"
dependencies = [
"num-modular",
]
[[package]]
name = "num-traits"
version = "0.2.19"
@@ -1989,6 +2087,49 @@ version = "2.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220"
[[package]]
name = "pest"
version = "2.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0848c601009d37dfa3430c4666e147e49cdcf1b92ecd3e63657d8a5f19da662"
dependencies = [
"memchr",
"ucd-trie",
]
[[package]]
name = "pest_derive"
version = "2.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "11f486f1ea21e6c10ed15d5a7c77165d0ee443402f0780849d1768e7d9d6fe77"
dependencies = [
"pest",
"pest_generator",
]
[[package]]
name = "pest_generator"
version = "2.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8040c4647b13b210a963c1ed407c1ff4fdfa01c31d6d2a098218702e6664f94f"
dependencies = [
"pest",
"pest_meta",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "pest_meta"
version = "2.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89815c69d36021a140146f26659a81d6c2afa33d216d736dd4be5381a7362220"
dependencies = [
"pest",
"sha2",
]
[[package]]
name = "phf"
version = "0.12.1"
@@ -2816,7 +2957,7 @@ version = "3.16.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52a8e3ca0ca629121f70ab50f95249e5a6f925cc0f6ffe8256c45b728875706c"
dependencies = [
"darling",
"darling 0.21.3",
"proc-macro2",
"quote",
"syn",
@@ -3572,6 +3713,12 @@ version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb"
[[package]]
name = "ucd-trie"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2896d95c02a80c6d6a5d6e953d479f5ddf2dfdb6a244441010e373ac0fb88971"
[[package]]
name = "unicode-bidi"
version = "0.3.18"

View File

@@ -58,6 +58,7 @@ uuid = { version = "1.19.0", features = ["v4", "serde"] }
tracing = "0.1"
reqwest = { version = "0.12", features = ["json"] }
handlebars = "6"
async-trait = "0.1"
dotenvy = "0.15.7"
time = "0.3"

View File

@@ -74,6 +74,8 @@ pub struct CreateChannelRequest {
pub access_password: Option<String>,
pub webhook_url: Option<String>,
pub webhook_poll_interval_secs: Option<u32>,
pub webhook_body_template: Option<String>,
pub webhook_headers: Option<String>,
}
/// All fields are optional — only provided fields are updated.
@@ -96,6 +98,10 @@ pub struct UpdateChannelRequest {
/// `Some(None)` = clear, `Some(Some(url))` = set, `None` = unchanged.
pub webhook_url: Option<Option<String>>,
pub webhook_poll_interval_secs: Option<u32>,
/// `Some(None)` = clear, `Some(Some(tmpl))` = set, `None` = unchanged.
pub webhook_body_template: Option<Option<String>>,
/// `Some(None)` = clear, `Some(Some(json))` = set, `None` = unchanged.
pub webhook_headers: Option<Option<String>>,
}
#[derive(Debug, Serialize)]
@@ -114,6 +120,8 @@ pub struct ChannelResponse {
pub logo_opacity: f32,
pub webhook_url: Option<String>,
pub webhook_poll_interval_secs: u32,
pub webhook_body_template: Option<String>,
pub webhook_headers: Option<String>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
@@ -135,6 +143,8 @@ impl From<domain::Channel> for ChannelResponse {
logo_opacity: c.logo_opacity,
webhook_url: c.webhook_url,
webhook_poll_interval_secs: c.webhook_poll_interval_secs,
webhook_body_template: c.webhook_body_template,
webhook_headers: c.webhook_headers,
created_at: c.created_at,
updated_at: c.updated_at,
}

View File

@@ -56,6 +56,14 @@ pub(super) async fn create_channel(
channel.webhook_poll_interval_secs = interval;
changed = true;
}
if let Some(tmpl) = payload.webhook_body_template {
channel.webhook_body_template = Some(tmpl);
changed = true;
}
if let Some(headers) = payload.webhook_headers {
channel.webhook_headers = Some(headers);
changed = true;
}
if changed {
channel = state.channel_service.update(channel).await?;
}
@@ -126,6 +134,12 @@ pub(super) async fn update_channel(
if let Some(interval) = payload.webhook_poll_interval_secs {
channel.webhook_poll_interval_secs = interval;
}
if let Some(tmpl) = payload.webhook_body_template {
channel.webhook_body_template = tmpl;
}
if let Some(headers) = payload.webhook_headers {
channel.webhook_headers = headers;
}
channel.updated_at = Utc::now();
let channel = state.channel_service.update(channel).await?;

View File

@@ -4,6 +4,7 @@
//! webhook_url, and fires HTTP POST requests (fire-and-forget).
use chrono::Utc;
use handlebars::Handlebars;
use serde_json::{Value, json};
use std::sync::Arc;
use tokio::sync::broadcast;
@@ -30,8 +31,10 @@ pub async fn run_webhook_consumer(
Ok(Some(channel)) => {
if let Some(url) = channel.webhook_url {
let client = client.clone();
let template = channel.webhook_body_template.clone();
let headers = channel.webhook_headers.clone();
tokio::spawn(async move {
post_webhook(&client, &url, payload).await;
post_webhook(&client, &url, payload, template.as_deref(), headers.as_deref()).await;
});
}
// No webhook_url configured — skip silently
@@ -146,15 +149,60 @@ fn build_payload(event: &DomainEvent) -> Value {
}
/// Fire-and-forget HTTP POST to a webhook URL.
async fn post_webhook(client: &reqwest::Client, url: &str, payload: Value) {
match client.post(url).json(&payload).send().await {
///
/// If `template` is provided it is rendered with `payload` as context via Handlebars.
/// `headers_json` is a JSON object string of extra HTTP headers (e.g. `{"Authorization":"Bearer x"}`).
/// Content-Type defaults to `application/json` unless overridden in `headers_json`.
async fn post_webhook(
client: &reqwest::Client,
url: &str,
payload: Value,
template: Option<&str>,
headers_json: Option<&str>,
) {
let body = if let Some(tmpl) = template {
let hbs = Handlebars::new();
match hbs.render_template(tmpl, &payload) {
Ok(rendered) => rendered,
Err(e) => {
warn!("webhook template render failed for {}: {}", url, e);
return;
}
}
} else {
match serde_json::to_string(&payload) {
Ok(s) => s,
Err(e) => {
warn!("webhook payload serialize failed: {}", e);
return;
}
}
};
let mut req = client.post(url).body(body);
let mut has_content_type = false;
if let Some(h) = headers_json {
if let Ok(map) = serde_json::from_str::<serde_json::Map<String, Value>>(h) {
for (k, v) in &map {
if k.to_lowercase() == "content-type" {
has_content_type = true;
}
if let Some(v_str) = v.as_str() {
req = req.header(k.as_str(), v_str);
}
}
}
}
if !has_content_type {
req = req.header("Content-Type", "application/json");
}
match req.send().await {
Ok(resp) => {
if !resp.status().is_success() {
warn!(
"webhook POST to {} returned status {}",
url,
resp.status()
);
warn!("webhook POST to {} returned status {}", url, resp.status());
}
}
Err(e) => {

View File

@@ -90,6 +90,8 @@ pub struct Channel {
pub logo_opacity: f32,
pub webhook_url: Option<String>,
pub webhook_poll_interval_secs: u32,
pub webhook_body_template: Option<String>,
pub webhook_headers: Option<String>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
@@ -117,6 +119,8 @@ impl Channel {
logo_opacity: 1.0,
webhook_url: None,
webhook_poll_interval_secs: 5,
webhook_body_template: None,
webhook_headers: None,
created_at: now,
updated_at: now,
}

View File

@@ -21,6 +21,8 @@ pub(super) struct ChannelRow {
pub logo_opacity: f32,
pub webhook_url: Option<String>,
pub webhook_poll_interval_secs: i64,
pub webhook_body_template: Option<String>,
pub webhook_headers: Option<String>,
pub created_at: String,
pub updated_at: String,
}
@@ -77,6 +79,8 @@ impl TryFrom<ChannelRow> for Channel {
logo_opacity: row.logo_opacity,
webhook_url: row.webhook_url,
webhook_poll_interval_secs: row.webhook_poll_interval_secs as u32,
webhook_body_template: row.webhook_body_template,
webhook_headers: row.webhook_headers,
created_at: parse_dt(&row.created_at)?,
updated_at: parse_dt(&row.updated_at)?,
})
@@ -84,4 +88,4 @@ impl TryFrom<ChannelRow> for Channel {
}
pub(super) const SELECT_COLS: &str =
"id, owner_id, name, description, timezone, schedule_config, recycle_policy, auto_schedule, access_mode, access_password_hash, logo, logo_position, logo_opacity, webhook_url, webhook_poll_interval_secs, created_at, updated_at";
"id, owner_id, name, description, timezone, schedule_config, recycle_policy, auto_schedule, access_mode, access_password_hash, logo, logo_position, logo_opacity, webhook_url, webhook_poll_interval_secs, webhook_body_template, webhook_headers, created_at, updated_at";

View File

@@ -71,8 +71,8 @@ impl ChannelRepository for SqliteChannelRepository {
sqlx::query(
r#"
INSERT INTO channels
(id, owner_id, name, description, timezone, schedule_config, recycle_policy, auto_schedule, access_mode, access_password_hash, logo, logo_position, logo_opacity, webhook_url, webhook_poll_interval_secs, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
(id, owner_id, name, description, timezone, schedule_config, recycle_policy, auto_schedule, access_mode, access_password_hash, logo, logo_position, logo_opacity, webhook_url, webhook_poll_interval_secs, webhook_body_template, webhook_headers, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
name = excluded.name,
description = excluded.description,
@@ -87,6 +87,8 @@ impl ChannelRepository for SqliteChannelRepository {
logo_opacity = excluded.logo_opacity,
webhook_url = excluded.webhook_url,
webhook_poll_interval_secs = excluded.webhook_poll_interval_secs,
webhook_body_template = excluded.webhook_body_template,
webhook_headers = excluded.webhook_headers,
updated_at = excluded.updated_at
"#,
)
@@ -105,6 +107,8 @@ impl ChannelRepository for SqliteChannelRepository {
.bind(channel.logo_opacity)
.bind(&channel.webhook_url)
.bind(channel.webhook_poll_interval_secs as i64)
.bind(&channel.webhook_body_template)
.bind(&channel.webhook_headers)
.bind(channel.created_at.to_rfc3339())
.bind(channel.updated_at.to_rfc3339())
.execute(&self.pool)

View File

@@ -0,0 +1,2 @@
ALTER TABLE channels ADD COLUMN webhook_body_template TEXT;
ALTER TABLE channels ADD COLUMN webhook_headers TEXT;