state recovery, polling optimizations, error rendering

widget states cached to SQLite, loaded on startup to seed DataProjection
so server restart preserves last-known data for reconnecting clients.

polling: first poll runs immediately, widget list cached per-task with
30s refresh, static text polled once inline instead of looping.

poll failures propagate WidgetError::SourceUnavailable to clients.
render engine prepends [offline] prefix in accent color, stale data
preserved below.
This commit is contained in:
2026-06-19 12:56:12 +02:00
parent 8b1dac9669
commit 13497dd53c
12 changed files with 338 additions and 40 deletions

View File

@@ -5,12 +5,13 @@ mod polling;
use anyhow::Result;
use application::DataProjection;
use config_sqlite::SqliteConfigStore;
use domain::ConfigRepository;
use http_api::AppState;
use kframe_auth::{Argon2Hasher, AuthConfig, JwtAuthService};
use secret_store::AesSecretStore;
use std::sync::Arc;
use tcp_server::{ClientTracker, TcpBroadcaster, TcpEventBus, run_tcp_server};
use tracing::{error, info};
use tracing::{error, info, warn};
#[tokio::main]
async fn main() -> Result<()> {
@@ -40,6 +41,15 @@ async fn main() -> Result<()> {
let auth = Arc::new(JwtAuthService::new(auth_config));
let hasher = Arc::new(Argon2Hasher);
match config_store.load_widget_states().await {
Ok(states) if !states.is_empty() => {
info!(count = states.len(), "loaded cached widget states");
projection.seed(states).await;
}
Ok(_) => {}
Err(e) => warn!(error = %e, "failed to load cached widget states"),
}
let tcp_addr = cfg.tcp_addr.clone();
let tcp_bc = broadcaster.clone();
let tcp_tracker = tracker.clone();

View File

@@ -3,7 +3,8 @@ use application::DataProjection;
use config_sqlite::SqliteConfigStore;
use data_generators::{ClockGenerator, StaticTextGenerator};
use domain::{
BroadcastPort, ConfigRepository, DataSource, DataSourcePort, DataSourceType, Value, WidgetState,
BroadcastPort, ConfigRepository, DataSource, DataSourcePort, DataSourceType, Value,
WidgetError, WidgetState,
};
use http_json::HttpJsonAdapter;
use media_adapter::MediaAdapter;
@@ -76,6 +77,7 @@ pub async fn run(
};
let mut running: HashMap<u16, JoinHandle<()>> = HashMap::new();
let mut static_done: std::collections::HashSet<u16> = std::collections::HashSet::new();
info!("polling manager started");
@@ -96,11 +98,23 @@ pub async fn run(
true
}
});
static_done.retain(|id| current_ids.contains(id));
for source in &sources {
if source.source_type == DataSourceType::Webhook {
continue;
}
// Static text: poll once inline, never spawn a task
if source.source_type == DataSourceType::StaticText {
if static_done.contains(&source.id) {
continue;
}
poll_once(&adapters, source, &config, &broadcaster, &projection).await;
static_done.insert(source.id);
continue;
}
if running.contains_key(&source.id) {
continue;
}
@@ -126,7 +140,7 @@ pub async fn run(
running.insert(source_id, handle);
}
if running.is_empty() {
if running.is_empty() && static_done.is_empty() {
debug!("no pollable sources, waiting");
}
@@ -134,6 +148,30 @@ pub async fn run(
}
}
async fn poll_once(
adapters: &Adapters,
source: &DataSource,
config: &Arc<SqliteConfigStore>,
broadcaster: &Arc<TcpBroadcaster>,
projection: &Arc<DataProjection>,
) {
let result = match adapters.poll(source).await {
Ok(v) => v,
Err(e) => {
warn!(source = %source.name, error = %e, "poll failed");
return;
}
};
let widgets = match config.list_widgets().await {
Ok(w) => w,
Err(e) => {
warn!(error = %e, "failed to fetch widgets");
return;
}
};
broadcast_changes(source, &result, &widgets, broadcaster, projection, config).await;
}
async fn poll_loop(
source: DataSource,
config: Arc<SqliteConfigStore>,
@@ -142,42 +180,115 @@ async fn poll_loop(
adapters: Adapters,
) {
let interval = source.poll_interval;
let mut widgets = match config.list_widgets().await {
Ok(w) => w,
Err(e) => {
warn!(error = %e, "failed to fetch initial widget list");
vec![]
}
};
let mut last_refresh = tokio::time::Instant::now();
loop {
tokio::time::sleep(interval).await;
let result = match adapters.poll(&source).await {
Ok(v) => v,
Err(e) => {
warn!(source = %source.name, error = %e, "poll failed");
broadcast_errors(&source, &widgets, &broadcaster, &projection).await;
tokio::time::sleep(interval).await;
continue;
}
};
let widgets = match config.list_widgets().await {
Ok(w) => w,
Err(e) => {
warn!(error = %e, "failed to fetch widgets");
continue;
if last_refresh.elapsed() >= SOURCE_REFRESH_INTERVAL {
if let Ok(w) = config.list_widgets().await {
widgets = w;
}
};
let changed: Vec<(u16, WidgetState)> = projection
.apply_poll_result(source.id, &result, &widgets)
.await;
if !changed.is_empty() {
let with_hints: Vec<_> = changed
.iter()
.filter_map(|(id, state)| {
let hint = widgets.iter().find(|w| w.id == *id)?.display_hint.clone();
Some((*id, hint, state.clone()))
})
.collect();
if let Err(e) = broadcaster.push_data_update(&with_hints).await {
warn!(error = %e, "failed to push update");
}
info!(source = %source.name, count = changed.len(), "pushed widget updates");
last_refresh = tokio::time::Instant::now();
}
broadcast_changes(
&source,
&result,
&widgets,
&broadcaster,
&projection,
&config,
)
.await;
tokio::time::sleep(interval).await;
}
}
async fn broadcast_changes(
source: &DataSource,
result: &Value,
widgets: &[domain::WidgetConfig],
broadcaster: &Arc<TcpBroadcaster>,
projection: &Arc<DataProjection>,
config: &Arc<SqliteConfigStore>,
) {
let changed: Vec<(u16, WidgetState)> = projection
.apply_poll_result(source.id, result, widgets)
.await;
if !changed.is_empty() {
let with_hints: Vec<_> = changed
.iter()
.filter_map(|(id, state)| {
let hint = widgets.iter().find(|w| w.id == *id)?.display_hint.clone();
Some((*id, hint, state.clone()))
})
.collect();
if let Err(e) = broadcaster.push_data_update(&with_hints).await {
warn!(error = %e, "failed to push update");
}
if let Err(e) = config.save_widget_states(&changed).await {
warn!(error = %e, "failed to cache widget states");
}
info!(source = %source.name, count = changed.len(), "pushed widget updates");
}
}
async fn broadcast_errors(
source: &DataSource,
widgets: &[domain::WidgetConfig],
broadcaster: &Arc<TcpBroadcaster>,
projection: &Arc<DataProjection>,
) {
let affected: Vec<_> = widgets
.iter()
.filter(|w| w.data_source_id == source.id)
.collect();
if affected.is_empty() {
return;
}
let mut error_states = Vec::new();
for w in &affected {
let mut state = projection
.get_state(w.id)
.await
.unwrap_or_else(|| WidgetState {
data: std::collections::BTreeMap::new(),
error: None,
});
state.error = Some(WidgetError::SourceUnavailable);
error_states.push((w.id, state));
}
projection.seed(error_states.clone()).await;
let with_hints: Vec<_> = error_states
.iter()
.filter_map(|(id, state)| {
let hint = affected.iter().find(|w| w.id == *id)?.display_hint.clone();
Some((*id, hint, state.clone()))
})
.collect();
if let Err(e) = broadcaster.push_data_update(&with_hints).await {
warn!(error = %e, "failed to push error update");
}
}