diff --git a/crates/adapters/config-memory/src/lib.rs b/crates/adapters/config-memory/src/lib.rs index b014d67..eef0218 100644 --- a/crates/adapters/config-memory/src/lib.rs +++ b/crates/adapters/config-memory/src/lib.rs @@ -1,6 +1,6 @@ use domain::{ ConfigRepository, DataSource, DataSourceId, Layout, LayoutPreset, LayoutPresetId, ThemeConfig, - User, WidgetConfig, WidgetId, + User, WidgetConfig, WidgetId, WidgetState, }; use std::collections::HashMap; use std::sync::RwLock; @@ -203,4 +203,15 @@ impl ConfigRepository for MemoryConfigStore { .map_err(|_| MemoryConfigError::LockPoisoned)?; Ok(guard.len() as u32) } + + async fn save_widget_states( + &self, + _states: &[(WidgetId, WidgetState)], + ) -> Result<(), Self::Error> { + Ok(()) + } + + async fn load_widget_states(&self) -> Result, Self::Error> { + Ok(vec![]) + } } diff --git a/crates/adapters/config-sqlite/src/lib.rs b/crates/adapters/config-sqlite/src/lib.rs index bce9b96..29a48aa 100644 --- a/crates/adapters/config-sqlite/src/lib.rs +++ b/crates/adapters/config-sqlite/src/lib.rs @@ -96,6 +96,15 @@ impl SqliteConfigStore { .execute(&self.pool) .await?; + sqlx::query( + "CREATE TABLE IF NOT EXISTS widget_state_cache ( + widget_id INTEGER PRIMARY KEY, + state_json TEXT NOT NULL + )", + ) + .execute(&self.pool) + .await?; + // Add alignment columns to widgets (idempotent) let _ = sqlx::query("ALTER TABLE widgets ADD COLUMN h_align TEXT NOT NULL DEFAULT 'left'") .execute(&self.pool) diff --git a/crates/adapters/config-sqlite/src/repository/mod.rs b/crates/adapters/config-sqlite/src/repository/mod.rs index 5f22b14..5680e40 100644 --- a/crates/adapters/config-sqlite/src/repository/mod.rs +++ b/crates/adapters/config-sqlite/src/repository/mod.rs @@ -3,13 +3,14 @@ mod layout; mod presets; mod theme; mod users; +mod widget_state_cache; mod widgets; use crate::SqliteConfigStore; use crate::error::SqliteConfigError; use domain::{ ConfigRepository, DataSource, DataSourceId, Layout, LayoutPreset, LayoutPresetId, ThemeConfig, - User, WidgetConfig, WidgetId, + User, WidgetConfig, WidgetId, WidgetState, }; impl ConfigRepository for SqliteConfigStore { @@ -90,4 +91,15 @@ impl ConfigRepository for SqliteConfigStore { async fn count_users(&self) -> Result { self.count_users_impl().await } + + async fn save_widget_states( + &self, + states: &[(WidgetId, WidgetState)], + ) -> Result<(), Self::Error> { + self.save_widget_states_impl(states).await + } + + async fn load_widget_states(&self) -> Result, Self::Error> { + self.load_widget_states_impl().await + } } diff --git a/crates/adapters/config-sqlite/src/repository/widget_state_cache.rs b/crates/adapters/config-sqlite/src/repository/widget_state_cache.rs new file mode 100644 index 0000000..58e3809 --- /dev/null +++ b/crates/adapters/config-sqlite/src/repository/widget_state_cache.rs @@ -0,0 +1,99 @@ +use crate::SqliteConfigStore; +use crate::error::SqliteConfigError; +use domain::{Value, WidgetId, WidgetState}; +use sqlx::Row; +use std::collections::BTreeMap; + +impl SqliteConfigStore { + pub(crate) async fn save_widget_states_impl( + &self, + states: &[(WidgetId, WidgetState)], + ) -> Result<(), SqliteConfigError> { + for (id, state) in states { + let json = domain_state_to_json(state) + .map_err(|e| SqliteConfigError::Serialization(e.to_string()))?; + sqlx::query( + "INSERT OR REPLACE INTO widget_state_cache (widget_id, state_json) VALUES (?, ?)", + ) + .bind(*id as i64) + .bind(&json) + .execute(&self.pool) + .await + .map_err(SqliteConfigError::Sql)?; + } + Ok(()) + } + + pub(crate) async fn load_widget_states_impl( + &self, + ) -> Result, SqliteConfigError> { + let rows = sqlx::query("SELECT widget_id, state_json FROM widget_state_cache") + .fetch_all(&self.pool) + .await + .map_err(SqliteConfigError::Sql)?; + + let mut result = Vec::new(); + for row in &rows { + let id: i64 = row.get("widget_id"); + let json_str: String = row.get("state_json"); + let state = json_to_domain_state(&json_str) + .map_err(|e| SqliteConfigError::Serialization(e.to_string()))?; + result.push((id as WidgetId, state)); + } + Ok(result) + } +} + +fn domain_value_to_json(v: &Value) -> serde_json::Value { + match v { + Value::Null => serde_json::Value::Null, + Value::Bool(b) => serde_json::Value::Bool(*b), + Value::Number(n) => serde_json::json!(n), + Value::String(s) => serde_json::Value::String(s.clone()), + Value::Array(arr) => { + serde_json::Value::Array(arr.iter().map(domain_value_to_json).collect()) + } + Value::Object(map) => { + let obj: serde_json::Map = map + .iter() + .map(|(k, v)| (k.clone(), domain_value_to_json(v))) + .collect(); + serde_json::Value::Object(obj) + } + } +} + +fn json_value_to_domain(v: &serde_json::Value) -> Value { + match v { + serde_json::Value::Null => Value::Null, + serde_json::Value::Bool(b) => Value::Bool(*b), + serde_json::Value::Number(n) => Value::Number(n.as_f64().unwrap_or(0.0)), + serde_json::Value::String(s) => Value::String(s.clone()), + serde_json::Value::Array(arr) => { + Value::Array(arr.iter().map(json_value_to_domain).collect()) + } + serde_json::Value::Object(map) => Value::Object( + map.iter() + .map(|(k, v)| (k.clone(), json_value_to_domain(v))) + .collect(), + ), + } +} + +fn domain_state_to_json(state: &WidgetState) -> Result { + let data: serde_json::Map = state + .data + .iter() + .map(|(k, v)| (k.clone(), domain_value_to_json(v))) + .collect(); + serde_json::to_string(&data) +} + +fn json_to_domain_state(json: &str) -> Result { + let map: serde_json::Map = serde_json::from_str(json)?; + let data: BTreeMap = map + .iter() + .map(|(k, v)| (k.clone(), json_value_to_domain(v))) + .collect(); + Ok(WidgetState { data, error: None }) +} diff --git a/crates/application/src/data_projection.rs b/crates/application/src/data_projection.rs index 5e82ee9..01057ed 100644 --- a/crates/application/src/data_projection.rs +++ b/crates/application/src/data_projection.rs @@ -19,6 +19,13 @@ impl DataProjection { Self::default() } + pub async fn seed(&self, states: Vec<(WidgetId, WidgetState)>) { + let mut current = self.current.lock().await; + for (id, state) in states { + current.insert(id, state); + } + } + pub async fn get_state(&self, widget_id: WidgetId) -> Option { self.current.lock().await.get(&widget_id).cloned() } diff --git a/crates/application/tests/support/mod.rs b/crates/application/tests/support/mod.rs index d267f20..b8760ef 100644 --- a/crates/application/tests/support/mod.rs +++ b/crates/application/tests/support/mod.rs @@ -1,6 +1,6 @@ use domain::{ ConfigRepository, DataSource, DataSourceId, DomainEvent, EventPublisher, Layout, LayoutPreset, - LayoutPresetId, ThemeConfig, User, WidgetConfig, WidgetId, + LayoutPresetId, ThemeConfig, User, WidgetConfig, WidgetId, WidgetState, }; use std::collections::HashMap; use std::sync::Mutex; @@ -135,6 +135,17 @@ impl ConfigRepository for InMemoryConfigRepository { async fn count_users(&self) -> Result { Ok(0) } + + async fn save_widget_states( + &self, + _states: &[(WidgetId, WidgetState)], + ) -> Result<(), Self::Error> { + Ok(()) + } + + async fn load_widget_states(&self) -> Result, Self::Error> { + Ok(vec![]) + } } pub struct InMemoryEventPublisher { diff --git a/crates/bootstrap/src/main.rs b/crates/bootstrap/src/main.rs index c4e291d..bc0540b 100644 --- a/crates/bootstrap/src/main.rs +++ b/crates/bootstrap/src/main.rs @@ -5,12 +5,13 @@ mod polling; use anyhow::Result; use application::DataProjection; use config_sqlite::SqliteConfigStore; +use domain::ConfigRepository; use http_api::AppState; use kframe_auth::{Argon2Hasher, AuthConfig, JwtAuthService}; use secret_store::AesSecretStore; use std::sync::Arc; use tcp_server::{ClientTracker, TcpBroadcaster, TcpEventBus, run_tcp_server}; -use tracing::{error, info}; +use tracing::{error, info, warn}; #[tokio::main] async fn main() -> Result<()> { @@ -40,6 +41,15 @@ async fn main() -> Result<()> { let auth = Arc::new(JwtAuthService::new(auth_config)); let hasher = Arc::new(Argon2Hasher); + match config_store.load_widget_states().await { + Ok(states) if !states.is_empty() => { + info!(count = states.len(), "loaded cached widget states"); + projection.seed(states).await; + } + Ok(_) => {} + Err(e) => warn!(error = %e, "failed to load cached widget states"), + } + let tcp_addr = cfg.tcp_addr.clone(); let tcp_bc = broadcaster.clone(); let tcp_tracker = tracker.clone(); diff --git a/crates/bootstrap/src/polling.rs b/crates/bootstrap/src/polling.rs index 9c88551..8c4f3a9 100644 --- a/crates/bootstrap/src/polling.rs +++ b/crates/bootstrap/src/polling.rs @@ -3,7 +3,8 @@ use application::DataProjection; use config_sqlite::SqliteConfigStore; use data_generators::{ClockGenerator, StaticTextGenerator}; use domain::{ - BroadcastPort, ConfigRepository, DataSource, DataSourcePort, DataSourceType, Value, WidgetState, + BroadcastPort, ConfigRepository, DataSource, DataSourcePort, DataSourceType, Value, + WidgetError, WidgetState, }; use http_json::HttpJsonAdapter; use media_adapter::MediaAdapter; @@ -76,6 +77,7 @@ pub async fn run( }; let mut running: HashMap> = HashMap::new(); + let mut static_done: std::collections::HashSet = std::collections::HashSet::new(); info!("polling manager started"); @@ -96,11 +98,23 @@ pub async fn run( true } }); + static_done.retain(|id| current_ids.contains(id)); for source in &sources { if source.source_type == DataSourceType::Webhook { continue; } + + // Static text: poll once inline, never spawn a task + if source.source_type == DataSourceType::StaticText { + if static_done.contains(&source.id) { + continue; + } + poll_once(&adapters, source, &config, &broadcaster, &projection).await; + static_done.insert(source.id); + continue; + } + if running.contains_key(&source.id) { continue; } @@ -126,7 +140,7 @@ pub async fn run( running.insert(source_id, handle); } - if running.is_empty() { + if running.is_empty() && static_done.is_empty() { debug!("no pollable sources, waiting"); } @@ -134,6 +148,30 @@ pub async fn run( } } +async fn poll_once( + adapters: &Adapters, + source: &DataSource, + config: &Arc, + broadcaster: &Arc, + projection: &Arc, +) { + let result = match adapters.poll(source).await { + Ok(v) => v, + Err(e) => { + warn!(source = %source.name, error = %e, "poll failed"); + return; + } + }; + let widgets = match config.list_widgets().await { + Ok(w) => w, + Err(e) => { + warn!(error = %e, "failed to fetch widgets"); + return; + } + }; + broadcast_changes(source, &result, &widgets, broadcaster, projection, config).await; +} + async fn poll_loop( source: DataSource, config: Arc, @@ -142,42 +180,115 @@ async fn poll_loop( adapters: Adapters, ) { let interval = source.poll_interval; + let mut widgets = match config.list_widgets().await { + Ok(w) => w, + Err(e) => { + warn!(error = %e, "failed to fetch initial widget list"); + vec![] + } + }; + let mut last_refresh = tokio::time::Instant::now(); loop { - tokio::time::sleep(interval).await; - let result = match adapters.poll(&source).await { Ok(v) => v, Err(e) => { warn!(source = %source.name, error = %e, "poll failed"); + broadcast_errors(&source, &widgets, &broadcaster, &projection).await; + tokio::time::sleep(interval).await; continue; } }; - let widgets = match config.list_widgets().await { - Ok(w) => w, - Err(e) => { - warn!(error = %e, "failed to fetch widgets"); - continue; + if last_refresh.elapsed() >= SOURCE_REFRESH_INTERVAL { + if let Ok(w) = config.list_widgets().await { + widgets = w; } - }; - - let changed: Vec<(u16, WidgetState)> = projection - .apply_poll_result(source.id, &result, &widgets) - .await; - - if !changed.is_empty() { - let with_hints: Vec<_> = changed - .iter() - .filter_map(|(id, state)| { - let hint = widgets.iter().find(|w| w.id == *id)?.display_hint.clone(); - Some((*id, hint, state.clone())) - }) - .collect(); - if let Err(e) = broadcaster.push_data_update(&with_hints).await { - warn!(error = %e, "failed to push update"); - } - info!(source = %source.name, count = changed.len(), "pushed widget updates"); + last_refresh = tokio::time::Instant::now(); } + + broadcast_changes( + &source, + &result, + &widgets, + &broadcaster, + &projection, + &config, + ) + .await; + + tokio::time::sleep(interval).await; + } +} + +async fn broadcast_changes( + source: &DataSource, + result: &Value, + widgets: &[domain::WidgetConfig], + broadcaster: &Arc, + projection: &Arc, + config: &Arc, +) { + let changed: Vec<(u16, WidgetState)> = projection + .apply_poll_result(source.id, result, widgets) + .await; + + if !changed.is_empty() { + let with_hints: Vec<_> = changed + .iter() + .filter_map(|(id, state)| { + let hint = widgets.iter().find(|w| w.id == *id)?.display_hint.clone(); + Some((*id, hint, state.clone())) + }) + .collect(); + if let Err(e) = broadcaster.push_data_update(&with_hints).await { + warn!(error = %e, "failed to push update"); + } + if let Err(e) = config.save_widget_states(&changed).await { + warn!(error = %e, "failed to cache widget states"); + } + info!(source = %source.name, count = changed.len(), "pushed widget updates"); + } +} + +async fn broadcast_errors( + source: &DataSource, + widgets: &[domain::WidgetConfig], + broadcaster: &Arc, + projection: &Arc, +) { + let affected: Vec<_> = widgets + .iter() + .filter(|w| w.data_source_id == source.id) + .collect(); + + if affected.is_empty() { + return; + } + + let mut error_states = Vec::new(); + for w in &affected { + let mut state = projection + .get_state(w.id) + .await + .unwrap_or_else(|| WidgetState { + data: std::collections::BTreeMap::new(), + error: None, + }); + state.error = Some(WidgetError::SourceUnavailable); + error_states.push((w.id, state)); + } + + projection.seed(error_states.clone()).await; + + let with_hints: Vec<_> = error_states + .iter() + .filter_map(|(id, state)| { + let hint = affected.iter().find(|w| w.id == *id)?.display_hint.clone(); + Some((*id, hint, state.clone())) + }) + .collect(); + if let Err(e) = broadcaster.push_data_update(&with_hints).await { + warn!(error = %e, "failed to push error update"); } } diff --git a/crates/client-desktop/src/main.rs b/crates/client-desktop/src/main.rs index 8e0cf89..c7d2890 100644 --- a/crates/client-desktop/src/main.rs +++ b/crates/client-desktop/src/main.rs @@ -2,7 +2,7 @@ use client_application::ClientApp; use client_domain::NetworkPort; use client_domain::{BoundingBox, DisplayPort, FontMetrics, RenderEngine, ThemeConfig}; use display_terminal::TerminalDisplay; -use domain::DisplayHint; +use domain::{DisplayHint, WidgetError}; use protocol::decode_server_message; use std::sync::mpsc; use std::thread; @@ -85,7 +85,10 @@ fn main() { .map(|kv| (kv.key.clone(), kv.value.clone().into())) .collect(); - let draw_cmds = engine.render_widget(&hint, &data, cmd.bounds, 0); + let error: Option = + cmd.state.error.as_ref().map(|e| e.clone().into()); + let draw_cmds = + engine.render_widget(&hint, &data, cmd.bounds, 0, error.as_ref()); for dc in &draw_cmds { display .draw_text_span(&dc.text, dc.x, dc.y, dc.color, dc.font) diff --git a/crates/client-domain/src/render_engine.rs b/crates/client-domain/src/render_engine.rs index b183262..ccf78fa 100644 --- a/crates/client-domain/src/render_engine.rs +++ b/crates/client-domain/src/render_engine.rs @@ -2,7 +2,7 @@ use crate::{ BoundingBox, Color, FontMetrics, FontSize, ThemeConfig, alignment::align_offset, markup::parse_markup, text_layout::wrap_lines, }; -use domain::{DisplayHint, DisplayHintKind, HAlign, VAlign, Value}; +use domain::{DisplayHint, DisplayHintKind, HAlign, VAlign, Value, WidgetError}; #[derive(Debug, Clone, PartialEq)] pub struct DrawCommand { @@ -92,8 +92,12 @@ impl RenderEngine { data: &[(String, Value)], bounds: BoundingBox, scroll_offset: u16, + error: Option<&WidgetError>, ) -> Vec { - let text = self.format_widget(hint, data); + let mut text = self.format_widget(hint, data); + if error.is_some() { + text = format!("{{accent}}[offline]{{/}}\n{text}"); + } let mut cmds = self.render_text(&text, bounds, hint.h_align, hint.v_align); if scroll_offset > 0 { @@ -110,8 +114,17 @@ impl RenderEngine { cmds } - pub fn content_height(&self, hint: &DisplayHint, data: &[(String, Value)], width: u16) -> u16 { - let text = self.format_widget(hint, data); + pub fn content_height( + &self, + hint: &DisplayHint, + data: &[(String, Value)], + width: u16, + error: Option<&WidgetError>, + ) -> u16 { + let mut text = self.format_widget(hint, data); + if error.is_some() { + text = format!("{{accent}}[offline]{{/}}\n{text}"); + } let plain: String = parse_markup(&text, &self.theme) .iter() .map(|s| s.text.as_str()) diff --git a/crates/client-esp32/src/tasks/render.rs b/crates/client-esp32/src/tasks/render.rs index 3ad005c..f62b4ac 100644 --- a/crates/client-esp32/src/tasks/render.rs +++ b/crates/client-esp32/src/tasks/render.rs @@ -5,7 +5,7 @@ use client_domain::{ BoundingBox, Color, DisplayPort, FontMetrics, RenderEngine, ScrollState, ThemeConfig, }; use client_application::{ClientApp, RepaintCommand}; -use domain::{DisplayHint, Value}; +use domain::{DisplayHint, Value, WidgetError}; use protocol::ServerMessage; use super::RenderEvent; use crate::config::RENDER_POLL_INTERVAL; @@ -21,6 +21,7 @@ const COLOR_DISCONNECTED: Color = Color(200, 0, 0); struct WidgetCache { hint: DisplayHint, data: Vec<(String, Value)>, + error: Option, bounds: BoundingBox, scroll: ScrollState, } @@ -121,13 +122,15 @@ fn update_cache(engine: &RenderEngine, cmd: &RepaintCommand) -> WidgetCache { .iter() .map(|kv| (kv.key.clone(), kv.value.clone().into())) .collect(); + let error: Option = cmd.state.error.as_ref().map(|e| e.clone().into()); - let content_h = engine.content_height(&hint, &data, cmd.bounds.width); + let content_h = engine.content_height(&hint, &data, cmd.bounds.width, error.as_ref()); let scroll = ScrollState::new(cmd.bounds.height, content_h); WidgetCache { hint, data, + error, bounds: cmd.bounds, scroll, } @@ -143,6 +146,7 @@ fn draw_widget( &cache.data, cache.bounds, cache.scroll.offset(), + cache.error.as_ref(), ); for dc in &draw_cmds { diff --git a/crates/domain/src/ports/config_repository.rs b/crates/domain/src/ports/config_repository.rs index 6a53966..dfb3a7a 100644 --- a/crates/domain/src/ports/config_repository.rs +++ b/crates/domain/src/ports/config_repository.rs @@ -1,7 +1,7 @@ use crate::entities::{ DataSource, DataSourceId, LayoutPreset, LayoutPresetId, User, WidgetConfig, WidgetId, }; -use crate::value_objects::{Layout, ThemeConfig}; +use crate::value_objects::{Layout, ThemeConfig, WidgetState}; use std::future::Future; pub trait ConfigRepository { @@ -63,4 +63,12 @@ pub trait ConfigRepository { ) -> impl Future, Self::Error>> + Send; fn save_user(&self, user: &User) -> impl Future> + Send; fn count_users(&self) -> impl Future> + Send; + + fn save_widget_states( + &self, + states: &[(WidgetId, WidgetState)], + ) -> impl Future> + Send; + fn load_widget_states( + &self, + ) -> impl Future, Self::Error>> + Send; }