webhook through event system, extract data-generators adapter

webhook route now emits WebhookDataReceived event instead of directly
mutating DataProjection and broadcasting. event_handler applies data
and pushes incremental DataUpdate.

clock/static_text generators extracted to data-generators crate behind
DataSourcePort. chrono removed from bootstrap. polling adapters grouped
into Adapters struct.
This commit is contained in:
2026-06-19 12:33:42 +02:00
parent 437056cfc4
commit 455d5da901
9 changed files with 216 additions and 116 deletions

13
Cargo.lock generated
View File

@@ -226,9 +226,8 @@ version = "0.1.0"
dependencies = [
"anyhow",
"application",
"chrono",
"chrono-tz",
"config-sqlite",
"data-generators",
"domain",
"dotenvy",
"http-api",
@@ -467,6 +466,16 @@ dependencies = [
"cipher",
]
[[package]]
name = "data-generators"
version = "0.1.0"
dependencies = [
"chrono",
"chrono-tz",
"domain",
"thiserror",
]
[[package]]
name = "der"
version = "0.7.10"

View File

@@ -16,6 +16,7 @@ members = [
"crates/adapters/media",
"crates/adapters/auth",
"crates/adapters/secret-store",
"crates/adapters/data-generators",
"crates/api-types",
"crates/bootstrap",
"crates/client-desktop",
@@ -57,5 +58,4 @@ postcard = { version = "1.1", default-features = false, features = ["alloc"] }
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "net", "sync", "time", "io-util"] }
tower = "0.5"
reqwest = { version = "0.12", features = ["json"] }
chrono = "0.4"
chrono-tz = "0.10"
data-generators = { path = "crates/adapters/data-generators" }

View File

@@ -0,0 +1,10 @@
[package]
name = "data-generators"
version = "0.1.0"
edition = "2024"
[dependencies]
domain.workspace = true
chrono = "0.4"
chrono-tz = "0.10"
thiserror.workspace = true

View File

@@ -0,0 +1,59 @@
use chrono::Utc;
use chrono_tz::Tz;
use domain::{DataSource, DataSourceConfig, DataSourcePort, Value};
use std::collections::BTreeMap;
#[derive(Default)]
pub struct ClockGenerator;
impl ClockGenerator {
pub fn new() -> Self {
Self
}
}
#[derive(Debug, thiserror::Error)]
pub enum GeneratorError {
#[error("wrong config type for generator")]
WrongConfig,
}
impl DataSourcePort for ClockGenerator {
type Error = GeneratorError;
async fn poll(&self, source: &DataSource) -> Result<Value, Self::Error> {
let (fmt, tz_name) = match &source.config {
DataSourceConfig::Clock { format, timezone } => (format.as_str(), timezone.as_str()),
_ => ("%H:%M:%S", "UTC"),
};
let tz: Tz = tz_name.parse().unwrap_or(chrono_tz::UTC);
let now = Utc::now().with_timezone(&tz);
let formatted = now.format(fmt).to_string();
let mut map = BTreeMap::new();
map.insert("time".into(), Value::String(formatted));
Ok(Value::Object(map))
}
}
#[derive(Default)]
pub struct StaticTextGenerator;
impl StaticTextGenerator {
pub fn new() -> Self {
Self
}
}
impl DataSourcePort for StaticTextGenerator {
type Error = GeneratorError;
async fn poll(&self, source: &DataSource) -> Result<Value, Self::Error> {
let text = match &source.config {
DataSourceConfig::StaticText { text } => text.clone(),
_ => String::new(),
};
let mut map = BTreeMap::new();
map.insert("text".into(), Value::String(text));
Ok(Value::Object(map))
}
}

View File

@@ -2,7 +2,7 @@ use crate::AppState;
use axum::extract::{Path, State};
use axum::http::StatusCode;
use axum::response::Json;
use domain::{BroadcastPort, ConfigRepository, EventPublisher, WidgetStateReader};
use domain::{ConfigRepository, DomainEvent, EventPublisher};
type S<C, E, W, B, R, A, H> = State<AppState<C, E, W, B, R, A, H>>;
@@ -16,9 +16,6 @@ where
C::Error: std::fmt::Debug,
E: EventPublisher,
E::Error: std::fmt::Debug,
W: WidgetStateReader,
B: BroadcastPort,
B::Error: std::fmt::Debug,
{
let source = state
.config
@@ -34,37 +31,14 @@ where
));
}
let raw = json_to_domain_value(body);
let widgets = state
.config
.list_widgets()
let data = json_to_domain_value(body);
state
.events
.publish(DomainEvent::WebhookDataReceived { source_id, data })
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")))?;
let layout = state
.config
.get_layout()
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")))?;
let changed = state
.widget_states
.apply_raw_data(source_id, &raw, &widgets)
.await;
if !changed.is_empty()
&& let Some(l) = &layout
{
let with_hints: Vec<_> = changed
.iter()
.filter_map(|(id, s)| {
let hint = widgets.iter().find(|w| w.id == *id)?.display_hint.clone();
Some((*id, hint, s.clone()))
})
.collect();
let _ = state.broadcaster.push_screen_update(l, &with_hints).await;
}
Ok(StatusCode::OK)
}

View File

@@ -19,5 +19,4 @@ anyhow.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
dotenvy.workspace = true
chrono.workspace = true
chrono-tz.workspace = true
data-generators.workspace = true

View File

@@ -40,6 +40,37 @@ pub async fn run(
info!("layout changed, pushed screen update to clients");
}
Ok(DomainEvent::WebhookDataReceived { source_id, data }) => {
let widgets = match config.list_widgets().await {
Ok(w) => w,
Err(e) => {
error!(error = %e, "failed to fetch widgets for webhook");
continue;
}
};
let changed = projection
.apply_poll_result(source_id, &data, &widgets)
.await;
if !changed.is_empty() {
let with_hints: Vec<_> = changed
.iter()
.filter_map(|(id, state)| {
let hint = widgets.iter().find(|w| w.id == *id)?.display_hint.clone();
Some((*id, hint, state.clone()))
})
.collect();
if let Err(e) = broadcaster.push_data_update(&with_hints).await {
error!(error = %e, "failed to push webhook data update");
}
info!(
source_id,
count = changed.len(),
"webhook data received, pushed update"
);
}
}
Ok(DomainEvent::ThemeChanged { theme }) => {
if let Err(e) = broadcaster.push_theme_update(&theme).await {
error!(error = %e, "failed to push theme update");

View File

@@ -1,16 +1,14 @@
use anyhow::Result;
use application::DataProjection;
use chrono::Utc;
use chrono_tz::Tz;
use config_sqlite::SqliteConfigStore;
use data_generators::{ClockGenerator, StaticTextGenerator};
use domain::{
BroadcastPort, ConfigRepository, DataSource, DataSourceConfig, DataSourcePort, DataSourceType,
Value, WidgetState,
BroadcastPort, ConfigRepository, DataSource, DataSourcePort, DataSourceType, Value, WidgetState,
};
use http_json::HttpJsonAdapter;
use media_adapter::MediaAdapter;
use rss_adapter::RssAdapter;
use std::collections::{BTreeMap, HashMap};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tcp_server::TcpBroadcaster;
@@ -19,15 +17,63 @@ use tracing::{debug, info, warn};
const SOURCE_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
#[derive(Clone)]
struct Adapters {
http: Arc<HttpJsonAdapter>,
media: Arc<MediaAdapter>,
rss: Arc<RssAdapter>,
clock: Arc<ClockGenerator>,
static_text: Arc<StaticTextGenerator>,
}
impl Adapters {
async fn poll(&self, source: &DataSource) -> Result<Value> {
match source.source_type {
DataSourceType::HttpJson | DataSourceType::Weather => self
.http
.poll(source)
.await
.map_err(|e| anyhow::anyhow!("{e}")),
DataSourceType::Media => self
.media
.poll(source)
.await
.map_err(|e| anyhow::anyhow!("{e}")),
DataSourceType::Rss => self
.rss
.poll(source)
.await
.map_err(|e| anyhow::anyhow!("{e}")),
DataSourceType::Clock => self
.clock
.poll(source)
.await
.map_err(|e| anyhow::anyhow!("{e}")),
DataSourceType::StaticText => self
.static_text
.poll(source)
.await
.map_err(|e| anyhow::anyhow!("{e}")),
DataSourceType::Webhook => Err(anyhow::anyhow!(
"webhook sources are push-based, not polled"
)),
}
}
}
pub async fn run(
config: Arc<SqliteConfigStore>,
broadcaster: Arc<TcpBroadcaster>,
projection: Arc<DataProjection>,
_poll_interval_secs: u64,
) -> Result<()> {
let http_adapter = Arc::new(HttpJsonAdapter::new());
let media_adapter = Arc::new(MediaAdapter::new());
let rss_adapter = Arc::new(RssAdapter::new());
let adapters = Adapters {
http: Arc::new(HttpJsonAdapter::new()),
media: Arc::new(MediaAdapter::new()),
rss: Arc::new(RssAdapter::new()),
clock: Arc::new(ClockGenerator::new()),
static_text: Arc::new(StaticTextGenerator::new()),
};
let mut running: HashMap<u16, JoinHandle<()>> = HashMap::new();
@@ -64,9 +110,7 @@ pub async fn run(
let config = config.clone();
let broadcaster = broadcaster.clone();
let projection = projection.clone();
let http = http_adapter.clone();
let media = media_adapter.clone();
let rss = rss_adapter.clone();
let adapters = adapters.clone();
info!(
source_id = source.id,
@@ -76,7 +120,7 @@ pub async fn run(
);
let handle = tokio::spawn(async move {
poll_loop(source, config, broadcaster, projection, http, media, rss).await;
poll_loop(source, config, broadcaster, projection, adapters).await;
});
running.insert(source_id, handle);
@@ -95,16 +139,14 @@ async fn poll_loop(
config: Arc<SqliteConfigStore>,
broadcaster: Arc<TcpBroadcaster>,
projection: Arc<DataProjection>,
http_adapter: Arc<HttpJsonAdapter>,
media_adapter: Arc<MediaAdapter>,
rss_adapter: Arc<RssAdapter>,
adapters: Adapters,
) {
let interval = source.poll_interval;
loop {
tokio::time::sleep(interval).await;
let result = match poll_source(&http_adapter, &media_adapter, &rss_adapter, &source).await {
let result = match adapters.poll(&source).await {
Ok(v) => v,
Err(e) => {
warn!(source = %source.name, error = %e, "poll failed");
@@ -139,53 +181,3 @@ async fn poll_loop(
}
}
}
async fn poll_source(
http_adapter: &HttpJsonAdapter,
media_adapter: &MediaAdapter,
rss_adapter: &RssAdapter,
source: &DataSource,
) -> Result<Value> {
match source.source_type {
DataSourceType::HttpJson | DataSourceType::Weather => http_adapter
.poll(source)
.await
.map_err(|e| anyhow::anyhow!("{e}")),
DataSourceType::Media => media_adapter
.poll(source)
.await
.map_err(|e| anyhow::anyhow!("{e}")),
DataSourceType::Rss => rss_adapter
.poll(source)
.await
.map_err(|e| anyhow::anyhow!("{e}")),
DataSourceType::Clock => Ok(generate_clock(&source.config)),
DataSourceType::StaticText => Ok(generate_static_text(&source.config)),
DataSourceType::Webhook => Err(anyhow::anyhow!(
"webhook sources are push-based, not polled"
)),
}
}
fn generate_clock(config: &DataSourceConfig) -> Value {
let (fmt, tz_name) = match config {
DataSourceConfig::Clock { format, timezone } => (format.as_str(), timezone.as_str()),
_ => ("%H:%M:%S", "UTC"),
};
let tz: Tz = tz_name.parse().unwrap_or(chrono_tz::UTC);
let now = Utc::now().with_timezone(&tz);
let formatted = now.format(fmt).to_string();
let mut map = BTreeMap::new();
map.insert("time".into(), Value::String(formatted));
Value::Object(map)
}
fn generate_static_text(config: &DataSourceConfig) -> Value {
let text = match config {
DataSourceConfig::StaticText { text } => text.clone(),
_ => String::new(),
};
let mut map = BTreeMap::new();
map.insert("text".into(), Value::String(text));
Value::Object(map)
}

View File

@@ -1,17 +1,43 @@
use crate::entities::{DataSourceId, LayoutPresetId, WidgetId};
use crate::value_objects::{Layout, ThemeConfig};
use crate::value_objects::{Layout, ThemeConfig, Value};
#[derive(Debug, Clone)]
pub enum DomainEvent {
WidgetCreated { id: WidgetId },
WidgetUpdated { id: WidgetId },
WidgetDeleted { id: WidgetId },
DataSourceAdded { id: DataSourceId },
DataSourceUpdated { id: DataSourceId },
DataSourceRemoved { id: DataSourceId },
LayoutChanged { layout: Layout },
ThemeChanged { theme: ThemeConfig },
LayoutPresetSaved { id: LayoutPresetId },
LayoutPresetLoaded { id: LayoutPresetId },
LayoutPresetDeleted { id: LayoutPresetId },
WidgetCreated {
id: WidgetId,
},
WidgetUpdated {
id: WidgetId,
},
WidgetDeleted {
id: WidgetId,
},
DataSourceAdded {
id: DataSourceId,
},
DataSourceUpdated {
id: DataSourceId,
},
DataSourceRemoved {
id: DataSourceId,
},
LayoutChanged {
layout: Layout,
},
ThemeChanged {
theme: ThemeConfig,
},
LayoutPresetSaved {
id: LayoutPresetId,
},
LayoutPresetLoaded {
id: LayoutPresetId,
},
LayoutPresetDeleted {
id: LayoutPresetId,
},
WebhookDataReceived {
source_id: DataSourceId,
data: Value,
},
}