From 1d7b5324d6ad83a8c1cf1238156c8615f243493e Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Fri, 19 Jun 2026 00:42:31 +0200 Subject: [PATCH] per-source polling, initial client state, webhook, preview, client tracking - per-source poll intervals: spawn task per source with own interval, manager re-checks sources every 30s for add/remove - initial screen update on TCP connect: send layout + widget states - client tracking: ClientRegistry port, GET /api/clients, dashboard list - webhook adapter: POST /api/webhook/{source_id} feeds data into projection - widget preview: GET /api/widgets/{id}/preview returns current state - serve SPA from Axum: ServeDir + index.html fallback via KFRAME_SPA_DIR - layout builder delete confirmation with AlertDialog - form validation: required fields disable save button - guide page at /guide - fix architecture: ClientDto to api-types, ClientRegistry + WidgetStateReader ports in domain, DataProjection has internal Mutex, no adapter cross-deps - ESP32: full screen clear on layout change (stale pixel fix) --- Cargo.lock | 31 ++ Cargo.toml | 2 +- crates/adapters/http-api/Cargo.toml | 1 + crates/adapters/http-api/src/lib.rs | 43 ++- .../adapters/http-api/src/routes/clients.rs | 25 ++ .../http-api/src/routes/data_sources.rs | 22 +- crates/adapters/http-api/src/routes/layout.rs | 10 +- crates/adapters/http-api/src/routes/mod.rs | 51 ++- .../adapters/http-api/src/routes/presets.rs | 22 +- .../adapters/http-api/src/routes/webhook.rs | 81 ++++ .../adapters/http-api/src/routes/widgets.rs | 67 +++- crates/adapters/http-api/tests/api_tests.rs | 14 +- .../adapters/tcp-server/src/client_tracker.rs | 48 +++ crates/adapters/tcp-server/src/lib.rs | 2 + crates/adapters/tcp-server/src/server.rs | 82 +++- crates/api-types/src/client.rs | 17 + crates/api-types/src/lib.rs | 2 + crates/application/Cargo.toml | 1 + crates/application/src/data_projection.rs | 43 ++- .../tests/data_projection_tests.rs | 50 ++- crates/bootstrap/src/config.rs | 2 + crates/bootstrap/src/event_handler.rs | 14 +- crates/bootstrap/src/main.rs | 15 +- crates/bootstrap/src/polling.rs | 146 ++++++-- crates/client-esp32/src/tasks/render.rs | 3 +- crates/domain/src/lib.rs | 5 +- crates/domain/src/ports/client_registry.rs | 11 + crates/domain/src/ports/mod.rs | 4 + .../domain/src/ports/widget_state_reader.rs | 14 + spa/src/api/clients.ts | 11 + spa/src/api/types.ts | 5 + spa/src/api/widgets.ts | 9 + spa/src/components/app-shell.tsx | 2 + spa/src/pages/dashboard.tsx | 49 ++- spa/src/pages/data-sources.tsx | 10 +- spa/src/pages/guide.tsx | 350 ++++++++++++++++++ spa/src/pages/layout-builder.tsx | 48 ++- spa/src/pages/widgets.tsx | 70 +++- spa/src/router.tsx | 8 + 39 files changed, 1232 insertions(+), 158 deletions(-) create mode 100644 crates/adapters/http-api/src/routes/clients.rs create mode 100644 crates/adapters/http-api/src/routes/webhook.rs create mode 100644 crates/adapters/tcp-server/src/client_tracker.rs create mode 100644 crates/api-types/src/client.rs create mode 100644 crates/domain/src/ports/client_registry.rs create mode 100644 crates/domain/src/ports/widget_state_reader.rs create mode 100644 spa/src/api/clients.ts create mode 100644 spa/src/pages/guide.tsx diff --git a/Cargo.lock b/Cargo.lock index e9efe08..419c76d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -782,6 +782,12 @@ dependencies = [ "tokio", ] +[[package]] +name = "http-range-header" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9171a2ea8a68358193d15dd5d70c1c10a2afc3e7e4c5bc92bc9f025cebd7359c" + [[package]] name = "httparse" version = "1.10.1" @@ -1136,6 +1142,16 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "mime_guess" +version = "2.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e" +dependencies = [ + "mime", + "unicase", +] + [[package]] name = "mio" version = "1.2.1" @@ -2277,10 +2293,19 @@ checksum = "4cfcf7e2740e6fc6d4d688b4ef00650406bb94adf4731e43c096c3a19fe40840" dependencies = [ "bitflags", "bytes", + "futures-core", "futures-util", "http", "http-body", + "http-body-util", + "http-range-header", + "httpdate", + "mime", + "mime_guess", + "percent-encoding", "pin-project-lite", + "tokio", + "tokio-util", "tower", "tower-layer", "tower-service", @@ -2373,6 +2398,12 @@ version = "1.20.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6f5e870be6c3b371b77fe0ee0bafb859fa4964b4404c27de1d380043c4dda20" +[[package]] +name = "unicase" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbc4bc3a9f746d862c45cb89d705aa10f187bb96c76001afab07a0d35ce60142" + [[package]] name = "unicode-bidi" version = "0.3.18" diff --git a/Cargo.toml b/Cargo.toml index 482f611..acf4c50 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,7 +39,7 @@ http-api = { path = "crates/adapters/http-api" } media-adapter = { path = "crates/adapters/media" } rss-adapter = { path = "crates/adapters/rss" } axum = { version = "0.8", features = ["macros"] } -tower-http = { version = "0.6", features = ["cors"] } +tower-http = { version = "0.6", features = ["cors", "fs"] } api-types = { path = "crates/api-types" } thiserror = "2.0" anyhow = "1.0" diff --git a/crates/adapters/http-api/Cargo.toml b/crates/adapters/http-api/Cargo.toml index f75ad0b..1f38e72 100644 --- a/crates/adapters/http-api/Cargo.toml +++ b/crates/adapters/http-api/Cargo.toml @@ -19,3 +19,4 @@ tower.workspace = true serde_json.workspace = true config-memory.workspace = true tcp-server.workspace = true +application.workspace = true diff --git a/crates/adapters/http-api/src/lib.rs b/crates/adapters/http-api/src/lib.rs index 4564a2e..ab820ba 100644 --- a/crates/adapters/http-api/src/lib.rs +++ b/crates/adapters/http-api/src/lib.rs @@ -1,43 +1,72 @@ mod routes; use axum::Router; -use domain::{ConfigRepository, EventPublisher}; +use domain::{BroadcastPort, ClientRegistry, ConfigRepository, EventPublisher, WidgetStateReader}; use std::sync::Arc; use tower_http::cors::CorsLayer; +use tower_http::services::{ServeDir, ServeFile}; -pub struct AppState { +pub struct AppState { pub config: Arc, pub events: Arc, + pub widget_states: Arc, + pub broadcaster: Arc, + pub clients: Arc, + pub spa_dir: Option, } -impl Clone for AppState { +impl Clone for AppState { fn clone(&self) -> Self { Self { config: self.config.clone(), events: self.events.clone(), + widget_states: self.widget_states.clone(), + broadcaster: self.broadcaster.clone(), + clients: self.clients.clone(), + spa_dir: self.spa_dir.clone(), } } } -pub fn router(state: AppState) -> Router +pub fn router(state: AppState) -> Router where C: ConfigRepository + Send + Sync + 'static, C::Error: std::fmt::Debug + Send, E: EventPublisher + Send + Sync + 'static, E::Error: std::fmt::Debug + Send, + W: WidgetStateReader + Send + Sync + 'static, + B: BroadcastPort + Send + Sync + 'static, + B::Error: std::fmt::Debug + Send, + R: ClientRegistry + Send + Sync + 'static, { - Router::new() + let spa_dir = state.spa_dir.clone(); + + let app = Router::new() .nest("/api", routes::api_routes()) .layer(CorsLayer::permissive()) - .with_state(state) + .with_state(state); + + if let Some(dir) = spa_dir { + let index = format!("{dir}/index.html"); + app.fallback_service(ServeDir::new(&dir).fallback(ServeFile::new(index))) + } else { + app + } } -pub async fn serve(addr: &str, state: AppState) -> Result<(), std::io::Error> +pub async fn serve( + addr: &str, + state: AppState, +) -> Result<(), std::io::Error> where C: ConfigRepository + Send + Sync + 'static, C::Error: std::fmt::Debug + Send, E: EventPublisher + Send + Sync + 'static, E::Error: std::fmt::Debug + Send, + W: WidgetStateReader + Send + Sync + 'static, + B: BroadcastPort + Send + Sync + 'static, + B::Error: std::fmt::Debug + Send, + R: ClientRegistry + Send + Sync + 'static, { let app = router(state); let listener = tokio::net::TcpListener::bind(addr).await?; diff --git a/crates/adapters/http-api/src/routes/clients.rs b/crates/adapters/http-api/src/routes/clients.rs new file mode 100644 index 0000000..02ba253 --- /dev/null +++ b/crates/adapters/http-api/src/routes/clients.rs @@ -0,0 +1,25 @@ +use crate::AppState; +use api_types::ClientDto; +use axum::extract::State; +use axum::response::Json; +use domain::{ClientRegistry, ConfigRepository, EventPublisher}; + +type S = State>; + +pub async fn list_clients(State(state): S) -> Json> +where + C: ConfigRepository, + C::Error: std::fmt::Debug, + E: EventPublisher, + E::Error: std::fmt::Debug, + R: ClientRegistry, +{ + Json( + state + .clients + .list_clients() + .iter() + .map(ClientDto::from) + .collect(), + ) +} diff --git a/crates/adapters/http-api/src/routes/data_sources.rs b/crates/adapters/http-api/src/routes/data_sources.rs index abe6f2a..b3a9cb3 100644 --- a/crates/adapters/http-api/src/routes/data_sources.rs +++ b/crates/adapters/http-api/src/routes/data_sources.rs @@ -8,10 +8,10 @@ use axum::{ }; use domain::{ConfigRepository, EventPublisher}; -type S = State>; +type S = State>; -pub async fn list_data_sources( - State(state): S, +pub async fn list_data_sources( + State(state): S, ) -> Result>, StatusCode> where C: ConfigRepository, @@ -27,8 +27,8 @@ where Ok(Json(sources.iter().map(DataSourceDto::from).collect())) } -pub async fn get_data_source( - State(state): S, +pub async fn get_data_source( + State(state): S, Path(id): Path, ) -> Result, StatusCode> where @@ -48,8 +48,8 @@ where } } -pub async fn create_data_source( - State(state): S, +pub async fn create_data_source( + State(state): S, Json(body): Json, ) -> Result where @@ -68,8 +68,8 @@ where Ok(StatusCode::CREATED) } -pub async fn update_data_source( - State(state): S, +pub async fn update_data_source( + State(state): S, Path(_id): Path, Json(body): Json, ) -> Result @@ -89,8 +89,8 @@ where Ok(StatusCode::OK) } -pub async fn delete_data_source( - State(state): S, +pub async fn delete_data_source( + State(state): S, Path(id): Path, ) -> Result where diff --git a/crates/adapters/http-api/src/routes/layout.rs b/crates/adapters/http-api/src/routes/layout.rs index f4e4c53..cbbd1ff 100644 --- a/crates/adapters/http-api/src/routes/layout.rs +++ b/crates/adapters/http-api/src/routes/layout.rs @@ -4,9 +4,11 @@ use application::ConfigService; use axum::{extract::State, http::StatusCode, response::Json}; use domain::{ConfigRepository, EventPublisher}; -type S = State>; +type S = State>; -pub async fn get_layout(State(state): S) -> Result>, StatusCode> +pub async fn get_layout( + State(state): S, +) -> Result>, StatusCode> where C: ConfigRepository, C::Error: std::fmt::Debug, @@ -21,8 +23,8 @@ where Ok(Json(layout.as_ref().map(LayoutDto::from))) } -pub async fn update_layout( - State(state): S, +pub async fn update_layout( + State(state): S, Json(body): Json, ) -> Result where diff --git a/crates/adapters/http-api/src/routes/mod.rs b/crates/adapters/http-api/src/routes/mod.rs index f47b64f..9154447 100644 --- a/crates/adapters/http-api/src/routes/mod.rs +++ b/crates/adapters/http-api/src/routes/mod.rs @@ -1,53 +1,74 @@ +mod clients; mod data_sources; mod layout; mod presets; +mod webhook; mod widgets; use crate::AppState; use axum::Router; use axum::routing::{get, post}; -use domain::{ConfigRepository, EventPublisher}; +use domain::{BroadcastPort, ClientRegistry, ConfigRepository, EventPublisher, WidgetStateReader}; -pub fn api_routes() -> Router> +pub fn api_routes() -> Router> where C: ConfigRepository + Send + Sync + 'static, C::Error: std::fmt::Debug + Send, E: EventPublisher + Send + Sync + 'static, E::Error: std::fmt::Debug + Send, + W: WidgetStateReader + Send + Sync + 'static, + B: BroadcastPort + Send + Sync + 'static, + B::Error: std::fmt::Debug + Send, + R: ClientRegistry + Send + Sync + 'static, { Router::new() .route( "/widgets", - get(widgets::list_widgets::).post(widgets::create_widget::), + get(widgets::list_widgets::) + .post(widgets::create_widget::), ) .route( "/widgets/{id}", - get(widgets::get_widget::) - .put(widgets::update_widget::) - .delete(widgets::delete_widget::), + get(widgets::get_widget::) + .put(widgets::update_widget::) + .delete(widgets::delete_widget::), + ) + .route( + "/widgets/{id}/preview", + get(widgets::preview_widget::), ) .route( "/data-sources", - get(data_sources::list_data_sources::) - .post(data_sources::create_data_source::), + get(data_sources::list_data_sources::) + .post(data_sources::create_data_source::), ) .route( "/data-sources/{id}", - get(data_sources::get_data_source::) - .put(data_sources::update_data_source::) - .delete(data_sources::delete_data_source::), + get(data_sources::get_data_source::) + .put(data_sources::update_data_source::) + .delete(data_sources::delete_data_source::), ) .route( "/layout", - get(layout::get_layout::).put(layout::update_layout::), + get(layout::get_layout::).put(layout::update_layout::), ) .route( "/presets", - get(presets::list_presets::).post(presets::create_preset::), + get(presets::list_presets::) + .post(presets::create_preset::), ) .route( "/presets/{id}", - get(presets::get_preset::).delete(presets::delete_preset::), + get(presets::get_preset::) + .delete(presets::delete_preset::), + ) + .route( + "/presets/{id}/load", + post(presets::load_preset::), + ) + .route("/clients", get(clients::list_clients::)) + .route( + "/webhook/{source_id}", + post(webhook::receive_webhook::), ) - .route("/presets/{id}/load", post(presets::load_preset::)) } diff --git a/crates/adapters/http-api/src/routes/presets.rs b/crates/adapters/http-api/src/routes/presets.rs index e756b05..787f3e3 100644 --- a/crates/adapters/http-api/src/routes/presets.rs +++ b/crates/adapters/http-api/src/routes/presets.rs @@ -8,9 +8,11 @@ use axum::{ }; use domain::{ConfigRepository, EventPublisher}; -type S = State>; +type S = State>; -pub async fn list_presets(State(state): S) -> Result>, StatusCode> +pub async fn list_presets( + State(state): S, +) -> Result>, StatusCode> where C: ConfigRepository, C::Error: std::fmt::Debug, @@ -25,8 +27,8 @@ where Ok(Json(presets.iter().map(PresetDto::from).collect())) } -pub async fn get_preset( - State(state): S, +pub async fn get_preset( + State(state): S, Path(id): Path, ) -> Result, StatusCode> where @@ -46,8 +48,8 @@ where } } -pub async fn create_preset( - State(state): S, +pub async fn create_preset( + State(state): S, Json(body): Json, ) -> Result where @@ -66,8 +68,8 @@ where Ok(StatusCode::CREATED) } -pub async fn delete_preset( - State(state): S, +pub async fn delete_preset( + State(state): S, Path(id): Path, ) -> Result where @@ -83,8 +85,8 @@ where Ok(StatusCode::NO_CONTENT) } -pub async fn load_preset( - State(state): S, +pub async fn load_preset( + State(state): S, Path(id): Path, ) -> Result where diff --git a/crates/adapters/http-api/src/routes/webhook.rs b/crates/adapters/http-api/src/routes/webhook.rs new file mode 100644 index 0000000..e269c09 --- /dev/null +++ b/crates/adapters/http-api/src/routes/webhook.rs @@ -0,0 +1,81 @@ +use crate::AppState; +use axum::extract::{Path, State}; +use axum::http::StatusCode; +use axum::response::Json; +use domain::{BroadcastPort, ConfigRepository, EventPublisher, WidgetStateReader}; + +type S = State>; + +pub async fn receive_webhook( + State(state): S, + Path(source_id): Path, + Json(body): Json, +) -> Result +where + C: ConfigRepository, + C::Error: std::fmt::Debug, + E: EventPublisher, + E::Error: std::fmt::Debug, + W: WidgetStateReader, + B: BroadcastPort, + B::Error: std::fmt::Debug, +{ + let source = state + .config + .get_data_source(source_id) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")))? + .ok_or((StatusCode::NOT_FOUND, "data source not found".into()))?; + + if source.source_type != domain::DataSourceType::Webhook { + return Err(( + StatusCode::BAD_REQUEST, + "data source is not a webhook type".into(), + )); + } + + let raw = json_to_domain_value(body); + let widgets = state + .config + .list_widgets() + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")))?; + + let layout = state + .config + .get_layout() + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")))?; + + let changed = state + .widget_states + .apply_raw_data(source_id, &raw, &widgets) + .await; + + if !changed.is_empty() + && let Some(l) = &layout + { + let _ = state.broadcaster.push_screen_update(l, &changed).await; + } + + Ok(StatusCode::OK) +} + +fn json_to_domain_value(json: serde_json::Value) -> domain::Value { + match json { + serde_json::Value::Null => domain::Value::Null, + serde_json::Value::Bool(b) => domain::Value::Bool(b), + serde_json::Value::Number(n) => domain::Value::Number(n.as_f64().unwrap_or(0.0)), + serde_json::Value::String(s) => domain::Value::String(s), + serde_json::Value::Array(arr) => { + domain::Value::Array(arr.into_iter().map(json_to_domain_value).collect()) + } + serde_json::Value::Object(obj) => { + let map = obj + .into_iter() + .map(|(k, v)| (k, json_to_domain_value(v))) + .collect(); + domain::Value::Object(map) + } + } +} diff --git a/crates/adapters/http-api/src/routes/widgets.rs b/crates/adapters/http-api/src/routes/widgets.rs index 6b69565..8ff9569 100644 --- a/crates/adapters/http-api/src/routes/widgets.rs +++ b/crates/adapters/http-api/src/routes/widgets.rs @@ -6,11 +6,13 @@ use axum::{ http::StatusCode, response::Json, }; -use domain::{ConfigRepository, EventPublisher}; +use domain::{ConfigRepository, EventPublisher, WidgetStateReader}; -type S = State>; +type S = State>; -pub async fn list_widgets(State(state): S) -> Result>, StatusCode> +pub async fn list_widgets( + State(state): S, +) -> Result>, StatusCode> where C: ConfigRepository, C::Error: std::fmt::Debug, @@ -25,8 +27,8 @@ where Ok(Json(widgets.iter().map(WidgetDto::from).collect())) } -pub async fn get_widget( - State(state): S, +pub async fn get_widget( + State(state): S, Path(id): Path, ) -> Result, StatusCode> where @@ -46,8 +48,8 @@ where } } -pub async fn create_widget( - State(state): S, +pub async fn create_widget( + State(state): S, Json(body): Json, ) -> Result where @@ -66,8 +68,8 @@ where Ok(StatusCode::CREATED) } -pub async fn update_widget( - State(state): S, +pub async fn update_widget( + State(state): S, Path(_id): Path, Json(body): Json, ) -> Result @@ -87,8 +89,8 @@ where Ok(StatusCode::OK) } -pub async fn delete_widget( - State(state): S, +pub async fn delete_widget( + State(state): S, Path(id): Path, ) -> Result where @@ -103,3 +105,46 @@ where .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; Ok(StatusCode::NO_CONTENT) } + +pub async fn preview_widget( + State(state): S, + Path(id): Path, +) -> Result, StatusCode> +where + C: ConfigRepository, + C::Error: std::fmt::Debug, + E: EventPublisher, + E::Error: std::fmt::Debug, + W: WidgetStateReader, +{ + match state.widget_states.get_widget_state(id).await { + Some(ws) => { + let map: serde_json::Map = ws + .data + .iter() + .map(|(k, v)| (k.clone(), domain_value_to_json(v))) + .collect(); + Ok(Json(serde_json::Value::Object(map))) + } + None => Err(StatusCode::NOT_FOUND), + } +} + +fn domain_value_to_json(v: &domain::Value) -> serde_json::Value { + match v { + domain::Value::Null => serde_json::Value::Null, + domain::Value::Bool(b) => serde_json::Value::Bool(*b), + domain::Value::Number(n) => serde_json::json!(n), + domain::Value::String(s) => serde_json::Value::String(s.clone()), + domain::Value::Array(arr) => { + serde_json::Value::Array(arr.iter().map(domain_value_to_json).collect()) + } + domain::Value::Object(obj) => { + let map = obj + .iter() + .map(|(k, v)| (k.clone(), domain_value_to_json(v))) + .collect(); + serde_json::Value::Object(map) + } + } +} diff --git a/crates/adapters/http-api/tests/api_tests.rs b/crates/adapters/http-api/tests/api_tests.rs index bb24740..aab9562 100644 --- a/crates/adapters/http-api/tests/api_tests.rs +++ b/crates/adapters/http-api/tests/api_tests.rs @@ -1,15 +1,21 @@ +use application::DataProjection; use axum::body::Body; use axum::http::{Request, StatusCode}; use config_memory::MemoryConfigStore; use http_api::{AppState, router}; use std::sync::Arc; -use tcp_server::TcpEventBus; +use tcp_server::{ClientTracker, TcpBroadcaster, TcpEventBus}; use tower::ServiceExt; fn test_app() -> axum::Router { - let config = Arc::new(MemoryConfigStore::new()); - let events = Arc::new(TcpEventBus::new(16)); - let state = AppState { config, events }; + let state = AppState { + config: Arc::new(MemoryConfigStore::new()), + events: Arc::new(TcpEventBus::new(16)), + widget_states: Arc::new(DataProjection::new()), + broadcaster: Arc::new(TcpBroadcaster::new(16)), + clients: Arc::new(ClientTracker::new()), + spa_dir: None, + }; router(state) } diff --git a/crates/adapters/tcp-server/src/client_tracker.rs b/crates/adapters/tcp-server/src/client_tracker.rs new file mode 100644 index 0000000..8751f3a --- /dev/null +++ b/crates/adapters/tcp-server/src/client_tracker.rs @@ -0,0 +1,48 @@ +use domain::{ClientRegistry, ConnectedClient}; +use std::net::SocketAddr; +use std::sync::Mutex; +use std::time::SystemTime; + +#[derive(Default)] +pub struct ClientTracker { + clients: Mutex>, +} + +impl ClientTracker { + pub fn new() -> Self { + Self::default() + } + + pub fn add(&self, addr: SocketAddr) { + let info = ConnectedClient { + addr: addr.to_string(), + connected_at: SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(), + }; + self.clients.lock().unwrap().push(info); + } + + pub fn remove(&self, addr: SocketAddr) { + let addr_str = addr.to_string(); + self.clients.lock().unwrap().retain(|c| c.addr != addr_str); + } +} + +impl ClientRegistry for ClientTracker { + fn add_client(&self, addr: &str, connected_at: u64) { + self.clients.lock().unwrap().push(ConnectedClient { + addr: addr.to_string(), + connected_at, + }); + } + + fn remove_client(&self, addr: &str) { + self.clients.lock().unwrap().retain(|c| c.addr != addr); + } + + fn list_clients(&self) -> Vec { + self.clients.lock().unwrap().clone() + } +} diff --git a/crates/adapters/tcp-server/src/lib.rs b/crates/adapters/tcp-server/src/lib.rs index 5bd425e..4ab1c81 100644 --- a/crates/adapters/tcp-server/src/lib.rs +++ b/crates/adapters/tcp-server/src/lib.rs @@ -1,9 +1,11 @@ mod broadcaster; +mod client_tracker; mod error; mod event_bus; mod server; pub use broadcaster::TcpBroadcaster; +pub use client_tracker::ClientTracker; pub use error::TcpServerError; pub use event_bus::TcpEventBus; pub use server::run_tcp_server; diff --git a/crates/adapters/tcp-server/src/server.rs b/crates/adapters/tcp-server/src/server.rs index c30c981..1c41e72 100644 --- a/crates/adapters/tcp-server/src/server.rs +++ b/crates/adapters/tcp-server/src/server.rs @@ -1,15 +1,27 @@ -use crate::broadcaster::TcpBroadcaster; +use crate::client_tracker::ClientTracker; use crate::error::TcpServerError; +use domain::{ConfigRepository, WidgetStateReader}; +use protocol::{ServerMessage, WidgetDescriptor, WireDisplayHint, WireLayoutNode, encode}; use std::sync::Arc; use tokio::io::AsyncWriteExt; use tokio::net::TcpListener; use tokio::sync::broadcast; -use tracing::{info, warn}; +use tracing::{error, info, warn}; -pub async fn run_tcp_server( +use crate::broadcaster::TcpBroadcaster; + +pub async fn run_tcp_server( addr: &str, broadcaster: Arc, -) -> Result<(), TcpServerError> { + tracker: Arc, + config: Arc, + widget_states: Arc, +) -> Result<(), TcpServerError> +where + C: ConfigRepository + Send + Sync + 'static, + C::Error: std::fmt::Debug + Send, + W: WidgetStateReader + Send + Sync + 'static, +{ let listener = TcpListener::bind(addr).await.map_err(TcpServerError::Io)?; info!(addr, "TCP server listening"); @@ -17,9 +29,21 @@ pub async fn run_tcp_server( let (mut socket, peer) = listener.accept().await.map_err(TcpServerError::Io)?; info!(%peer, "client connected"); + tracker.add(peer); + let tracker = tracker.clone(); let mut rx = broadcaster.subscribe(); + let initial_frame = build_initial_frame(&*config, &*widget_states).await; + tokio::spawn(async move { + if let Some(frame) = initial_frame + && socket.write_all(&frame).await.is_err() + { + info!(%peer, "client disconnected during initial send"); + tracker.remove(peer); + return; + } + loop { match rx.recv().await { Ok(frame) => { @@ -34,6 +58,56 @@ pub async fn run_tcp_server( } } } + tracker.remove(peer); }); } } + +async fn build_initial_frame(config: &C, widget_states: &W) -> Option> +where + C: ConfigRepository, + C::Error: std::fmt::Debug, + W: WidgetStateReader, +{ + let layout = match config.get_layout().await { + Ok(Some(l)) => l, + Ok(None) => return None, + Err(e) => { + error!(error = ?e, "failed to fetch layout for initial send"); + return None; + } + }; + + let widgets = match config.list_widgets().await { + Ok(w) => w, + Err(e) => { + error!(error = ?e, "failed to fetch widgets for initial send"); + return None; + } + }; + + let wire_layout: WireLayoutNode = (&layout.root).into(); + let mut wire_widgets = Vec::new(); + for w in &widgets { + if let Some(s) = widget_states.get_widget_state(w.id).await { + wire_widgets.push(WidgetDescriptor { + id: w.id, + display_hint: WireDisplayHint::IconValue, + state: (&s).into(), + }); + } + } + + let msg = ServerMessage::ScreenUpdate { + layout: wire_layout, + widgets: wire_widgets, + }; + + match encode(&msg) { + Ok(frame) => Some(frame), + Err(e) => { + error!(error = %e, "failed to encode initial screen update"); + None + } + } +} diff --git a/crates/api-types/src/client.rs b/crates/api-types/src/client.rs new file mode 100644 index 0000000..9b22ada --- /dev/null +++ b/crates/api-types/src/client.rs @@ -0,0 +1,17 @@ +use domain::ConnectedClient; +use serde::Serialize; + +#[derive(Serialize)] +pub struct ClientDto { + pub addr: String, + pub connected_at: u64, +} + +impl From<&ConnectedClient> for ClientDto { + fn from(c: &ConnectedClient) -> Self { + Self { + addr: c.addr.clone(), + connected_at: c.connected_at, + } + } +} diff --git a/crates/api-types/src/lib.rs b/crates/api-types/src/lib.rs index 7ac4c8d..5a470a5 100644 --- a/crates/api-types/src/lib.rs +++ b/crates/api-types/src/lib.rs @@ -1,8 +1,10 @@ +pub mod client; pub mod data_source; pub mod layout; pub mod preset; pub mod widget; +pub use client::ClientDto; pub use data_source::DataSourceDto; pub use layout::{LayoutChildDto, LayoutDto, LayoutNodeDto, SizingDto}; pub use preset::{CreatePresetDto, PresetDto}; diff --git a/crates/application/Cargo.toml b/crates/application/Cargo.toml index 8164893..3c6ec35 100644 --- a/crates/application/Cargo.toml +++ b/crates/application/Cargo.toml @@ -6,6 +6,7 @@ edition = "2024" [dependencies] domain.workspace = true thiserror.workspace = true +tokio.workspace = true [dev-dependencies] tokio = { workspace = true } diff --git a/crates/application/src/data_projection.rs b/crates/application/src/data_projection.rs index deb526f..5e82ee9 100644 --- a/crates/application/src/data_projection.rs +++ b/crates/application/src/data_projection.rs @@ -1,9 +1,17 @@ -use domain::{DataSourceId, Value, WidgetConfig, WidgetId, WidgetState}; +use domain::{DataSourceId, Value, WidgetConfig, WidgetId, WidgetState, WidgetStateReader}; use std::collections::HashMap; +use tokio::sync::Mutex; -#[derive(Default)] pub struct DataProjection { - current: HashMap, + current: Mutex>, +} + +impl Default for DataProjection { + fn default() -> Self { + Self { + current: Mutex::new(HashMap::new()), + } + } } impl DataProjection { @@ -11,16 +19,17 @@ impl DataProjection { Self::default() } - pub fn get_state(&self, widget_id: WidgetId) -> Option<&WidgetState> { - self.current.get(&widget_id) + pub async fn get_state(&self, widget_id: WidgetId) -> Option { + self.current.lock().await.get(&widget_id).cloned() } - pub fn apply_poll_result( - &mut self, + pub async fn apply_poll_result( + &self, data_source_id: DataSourceId, raw: &Value, widget_configs: &[WidgetConfig], ) -> Vec<(WidgetId, WidgetState)> { + let mut current = self.current.lock().await; let mut changed = Vec::new(); for config in widget_configs { @@ -30,13 +39,12 @@ impl DataProjection { let new_state = config.extract(raw); - let is_changed = self - .current + let is_changed = current .get(&config.id) .is_none_or(|prev| *prev != new_state); if is_changed { - self.current.insert(config.id, new_state.clone()); + current.insert(config.id, new_state.clone()); changed.push((config.id, new_state)); } } @@ -44,3 +52,18 @@ impl DataProjection { changed } } + +impl WidgetStateReader for DataProjection { + async fn get_widget_state(&self, id: WidgetId) -> Option { + self.get_state(id).await + } + + async fn apply_raw_data( + &self, + source_id: u16, + raw: &Value, + widgets: &[WidgetConfig], + ) -> Vec<(WidgetId, WidgetState)> { + self.apply_poll_result(source_id, raw, widgets).await + } +} diff --git a/crates/application/tests/data_projection_tests.rs b/crates/application/tests/data_projection_tests.rs index 4c6d869..9a23df3 100644 --- a/crates/application/tests/data_projection_tests.rs +++ b/crates/application/tests/data_projection_tests.rs @@ -1,5 +1,5 @@ use application::DataProjection; -use domain::{DisplayHint, KeyMapping, Value, WidgetConfig, WidgetId, WidgetState}; +use domain::{DisplayHint, KeyMapping, Value, WidgetConfig}; use std::collections::BTreeMap; fn weather_widget() -> WidgetConfig { @@ -28,12 +28,14 @@ fn weather_response(temp: f64) -> Value { ])) } -#[test] -fn apply_poll_result_detects_new_widget_state() { - let mut projection = DataProjection::new(); +#[tokio::test] +async fn apply_poll_result_detects_new_widget_state() { + let projection = DataProjection::new(); let widgets = vec![weather_widget()]; - let changed = projection.apply_poll_result(10, &weather_response(5.4), &widgets); + let changed = projection + .apply_poll_result(10, &weather_response(5.4), &widgets) + .await; assert_eq!(changed.len(), 1); assert_eq!(changed[0].0, 1); @@ -43,24 +45,32 @@ fn apply_poll_result_detects_new_widget_state() { ); } -#[test] -fn apply_poll_result_returns_empty_when_nothing_changed() { - let mut projection = DataProjection::new(); +#[tokio::test] +async fn apply_poll_result_returns_empty_when_nothing_changed() { + let projection = DataProjection::new(); let widgets = vec![weather_widget()]; - projection.apply_poll_result(10, &weather_response(5.4), &widgets); - let changed = projection.apply_poll_result(10, &weather_response(5.4), &widgets); + projection + .apply_poll_result(10, &weather_response(5.4), &widgets) + .await; + let changed = projection + .apply_poll_result(10, &weather_response(5.4), &widgets) + .await; assert!(changed.is_empty()); } -#[test] -fn apply_poll_result_detects_changed_value() { - let mut projection = DataProjection::new(); +#[tokio::test] +async fn apply_poll_result_detects_changed_value() { + let projection = DataProjection::new(); let widgets = vec![weather_widget()]; - projection.apply_poll_result(10, &weather_response(5.4), &widgets); - let changed = projection.apply_poll_result(10, &weather_response(6.1), &widgets); + projection + .apply_poll_result(10, &weather_response(5.4), &widgets) + .await; + let changed = projection + .apply_poll_result(10, &weather_response(6.1), &widgets) + .await; assert_eq!(changed.len(), 1); assert_eq!( @@ -69,9 +79,9 @@ fn apply_poll_result_detects_changed_value() { ); } -#[test] -fn apply_poll_result_only_updates_widgets_bound_to_source() { - let mut projection = DataProjection::new(); +#[tokio::test] +async fn apply_poll_result_only_updates_widgets_bound_to_source() { + let projection = DataProjection::new(); let widgets = vec![ weather_widget(), WidgetConfig::new( @@ -86,7 +96,9 @@ fn apply_poll_result_only_updates_widgets_bound_to_source() { ), ]; - let changed = projection.apply_poll_result(10, &weather_response(5.4), &widgets); + let changed = projection + .apply_poll_result(10, &weather_response(5.4), &widgets) + .await; assert_eq!(changed.len(), 1); assert_eq!(changed[0].0, 1); diff --git a/crates/bootstrap/src/config.rs b/crates/bootstrap/src/config.rs index c3a0393..9e7645e 100644 --- a/crates/bootstrap/src/config.rs +++ b/crates/bootstrap/src/config.rs @@ -5,6 +5,7 @@ pub struct ServerConfig { pub tcp_addr: String, pub http_addr: String, pub poll_interval_secs: u64, + pub spa_dir: Option, } impl ServerConfig { @@ -18,6 +19,7 @@ impl ServerConfig { .ok() .and_then(|v| v.parse().ok()) .unwrap_or(5), + spa_dir: env::var("KFRAME_SPA_DIR").ok(), } } } diff --git a/crates/bootstrap/src/event_handler.rs b/crates/bootstrap/src/event_handler.rs index 83d7d65..5767cf2 100644 --- a/crates/bootstrap/src/event_handler.rs +++ b/crates/bootstrap/src/event_handler.rs @@ -3,14 +3,13 @@ use config_sqlite::SqliteConfigStore; use domain::{BroadcastPort, ConfigRepository, DomainEvent}; use std::sync::Arc; use tcp_server::{TcpBroadcaster, TcpEventBus}; -use tokio::sync::Mutex; use tracing::{error, info, warn}; pub async fn run( event_bus: Arc, config: Arc, broadcaster: Arc, - projection: Arc>, + projection: Arc, ) { let mut rx = event_bus.subscribe(); @@ -25,11 +24,12 @@ pub async fn run( } }; - let proj = projection.lock().await; - let widget_states: Vec<_> = widgets - .iter() - .filter_map(|w| proj.get_state(w.id).map(|s| (w.id, s.clone()))) - .collect(); + let mut widget_states = Vec::new(); + for w in &widgets { + if let Some(s) = projection.get_state(w.id).await { + widget_states.push((w.id, s)); + } + } if let Err(e) = broadcaster .push_screen_update(&layout, &widget_states) diff --git a/crates/bootstrap/src/main.rs b/crates/bootstrap/src/main.rs index 7787b1e..a08c976 100644 --- a/crates/bootstrap/src/main.rs +++ b/crates/bootstrap/src/main.rs @@ -7,8 +7,7 @@ use application::DataProjection; use config_sqlite::SqliteConfigStore; use http_api::AppState; use std::sync::Arc; -use tcp_server::{TcpBroadcaster, TcpEventBus, run_tcp_server}; -use tokio::sync::Mutex; +use tcp_server::{ClientTracker, TcpBroadcaster, TcpEventBus, run_tcp_server}; use tracing::{error, info}; #[tokio::main] @@ -29,12 +28,16 @@ async fn main() -> Result<()> { let event_bus = Arc::new(TcpEventBus::new(64)); let broadcaster = Arc::new(TcpBroadcaster::new(64)); - let projection = Arc::new(Mutex::new(DataProjection::new())); + let projection = Arc::new(DataProjection::new()); + let tracker = Arc::new(ClientTracker::new()); let tcp_addr = cfg.tcp_addr.clone(); let tcp_bc = broadcaster.clone(); + let tcp_tracker = tracker.clone(); + let tcp_config = config_store.clone(); + let tcp_proj = projection.clone(); tokio::spawn(async move { - if let Err(e) = run_tcp_server(&tcp_addr, tcp_bc).await { + if let Err(e) = run_tcp_server(&tcp_addr, tcp_bc, tcp_tracker, tcp_config, tcp_proj).await { error!(error = %e, "tcp server failed"); } }); @@ -44,6 +47,10 @@ async fn main() -> Result<()> { let http_state = AppState { config: config_store.clone(), events: event_bus.clone(), + widget_states: projection.clone(), + broadcaster: broadcaster.clone(), + clients: tracker.clone(), + spa_dir: cfg.spa_dir, }; tokio::spawn(async move { if let Err(e) = http_api::serve(&http_addr, http_state).await { diff --git a/crates/bootstrap/src/polling.rs b/crates/bootstrap/src/polling.rs index d91a985..c5ce43c 100644 --- a/crates/bootstrap/src/polling.rs +++ b/crates/bootstrap/src/polling.rs @@ -7,71 +7,135 @@ use domain::{ use http_json::HttpJsonAdapter; use media_adapter::MediaAdapter; use rss_adapter::RssAdapter; +use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use tcp_server::TcpBroadcaster; -use tokio::sync::Mutex; +use tokio::task::JoinHandle; use tracing::{debug, info, warn}; +const SOURCE_REFRESH_INTERVAL: Duration = Duration::from_secs(30); + pub async fn run( config: Arc, broadcaster: Arc, - projection: Arc>, - poll_interval_secs: u64, + projection: Arc, + _poll_interval_secs: u64, ) -> Result<()> { - let http_adapter = HttpJsonAdapter::new(); - let media_adapter = MediaAdapter::new(); - let rss_adapter = RssAdapter::new(); - let interval = Duration::from_secs(poll_interval_secs); + let http_adapter = Arc::new(HttpJsonAdapter::new()); + let media_adapter = Arc::new(MediaAdapter::new()); + let rss_adapter = Arc::new(RssAdapter::new()); - info!(interval_secs = poll_interval_secs, "polling loop started"); + let mut running: HashMap> = HashMap::new(); + + info!("polling manager started"); loop { - tokio::time::sleep(interval).await; - let sources = config .list_data_sources() .await .map_err(|e| anyhow::anyhow!("{e}"))?; - let widgets = config - .list_widgets() - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - let layout = config - .get_layout() - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - if sources.is_empty() || widgets.is_empty() { - debug!("no sources or widgets configured, skipping poll"); - continue; - } + let current_ids: Vec = sources.iter().map(|s| s.id).collect(); - let mut all_changed: Vec<(u16, WidgetState)> = Vec::new(); + running.retain(|id, handle| { + if !current_ids.contains(id) { + info!(source_id = id, "stopping poll for removed source"); + handle.abort(); + false + } else { + true + } + }); for source in &sources { - let result = - match poll_source(&http_adapter, &media_adapter, &rss_adapter, source).await { - Ok(v) => v, - Err(e) => { - warn!(source = %source.name, error = %e, "poll failed"); - continue; - } - }; + if source.source_type == DataSourceType::Webhook { + continue; + } + if running.contains_key(&source.id) { + continue; + } - let mut proj = projection.lock().await; - let changed = proj.apply_poll_result(source.id, &result, &widgets); - all_changed.extend(changed); + let source_id = source.id; + let source = source.clone(); + let config = config.clone(); + let broadcaster = broadcaster.clone(); + let projection = projection.clone(); + let http = http_adapter.clone(); + let media = media_adapter.clone(); + let rss = rss_adapter.clone(); + + info!( + source_id = source.id, + name = %source.name, + interval_secs = source.poll_interval.as_secs(), + "starting poll task" + ); + + let handle = tokio::spawn(async move { + poll_loop(source, config, broadcaster, projection, http, media, rss).await; + }); + + running.insert(source_id, handle); } - if !all_changed.is_empty() { - if let Some(l) = &layout { - broadcaster - .push_screen_update(l, &all_changed) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; + if running.is_empty() { + debug!("no pollable sources, waiting"); + } + + tokio::time::sleep(SOURCE_REFRESH_INTERVAL).await; + } +} + +async fn poll_loop( + source: DataSource, + config: Arc, + broadcaster: Arc, + projection: Arc, + http_adapter: Arc, + media_adapter: Arc, + rss_adapter: Arc, +) { + let interval = source.poll_interval; + + loop { + tokio::time::sleep(interval).await; + + let result = match poll_source(&http_adapter, &media_adapter, &rss_adapter, &source).await { + Ok(v) => v, + Err(e) => { + warn!(source = %source.name, error = %e, "poll failed"); + continue; } - info!(count = all_changed.len(), "pushed widget updates"); + }; + + let widgets = match config.list_widgets().await { + Ok(w) => w, + Err(e) => { + warn!(error = %e, "failed to fetch widgets"); + continue; + } + }; + + let layout = match config.get_layout().await { + Ok(l) => l, + Err(e) => { + warn!(error = %e, "failed to fetch layout"); + continue; + } + }; + + let changed: Vec<(u16, WidgetState)> = projection + .apply_poll_result(source.id, &result, &widgets) + .await; + + if !changed.is_empty() { + if let Some(l) = &layout + && let Err(e) = broadcaster.push_screen_update(l, &changed).await + { + warn!(error = %e, "failed to push update"); + } + info!(source = %source.name, count = changed.len(), "pushed widget updates"); } } } diff --git a/crates/client-esp32/src/tasks/render.rs b/crates/client-esp32/src/tasks/render.rs index f4999a4..48b0b1c 100644 --- a/crates/client-esp32/src/tasks/render.rs +++ b/crates/client-esp32/src/tasks/render.rs @@ -21,9 +21,10 @@ pub fn run( loop { match rx.recv_timeout(RENDER_POLL_INTERVAL) { Ok(msg) => { + let is_screen_update = matches!(msg, ServerMessage::ScreenUpdate { .. }); let repaints = app.handle_message(msg); - if !repaints.is_empty() && first_update { + if !repaints.is_empty() && (first_update || is_screen_update) { display.fill_background(SCREEN).unwrap(); first_update = false; } diff --git a/crates/domain/src/lib.rs b/crates/domain/src/lib.rs index 3b2556c..95df3b8 100644 --- a/crates/domain/src/lib.rs +++ b/crates/domain/src/lib.rs @@ -10,7 +10,10 @@ pub use entities::{ LayoutPreset, LayoutPresetId, WidgetConfig, WidgetId, }; pub use events::DomainEvent; -pub use ports::{BroadcastPort, ConfigRepository, DataSourcePort, EventPublisher}; +pub use ports::{ + BroadcastPort, ClientRegistry, ConfigRepository, ConnectedClient, DataSourcePort, + EventPublisher, WidgetStateReader, +}; pub use value_objects::{ ContainerNode, Direction, DisplayHint, KeyMapping, Layout, LayoutChild, LayoutNode, LayoutValidationError, Sizing, Value, WidgetError, WidgetState, diff --git a/crates/domain/src/ports/client_registry.rs b/crates/domain/src/ports/client_registry.rs new file mode 100644 index 0000000..2aa3dad --- /dev/null +++ b/crates/domain/src/ports/client_registry.rs @@ -0,0 +1,11 @@ +#[derive(Clone)] +pub struct ConnectedClient { + pub addr: String, + pub connected_at: u64, +} + +pub trait ClientRegistry { + fn add_client(&self, addr: &str, connected_at: u64); + fn remove_client(&self, addr: &str); + fn list_clients(&self) -> Vec; +} diff --git a/crates/domain/src/ports/mod.rs b/crates/domain/src/ports/mod.rs index ad3e440..a950cd1 100644 --- a/crates/domain/src/ports/mod.rs +++ b/crates/domain/src/ports/mod.rs @@ -1,9 +1,13 @@ mod broadcast; +mod client_registry; mod config_repository; mod data_source_port; mod event; +mod widget_state_reader; pub use broadcast::BroadcastPort; +pub use client_registry::{ClientRegistry, ConnectedClient}; pub use config_repository::ConfigRepository; pub use data_source_port::DataSourcePort; pub use event::EventPublisher; +pub use widget_state_reader::WidgetStateReader; diff --git a/crates/domain/src/ports/widget_state_reader.rs b/crates/domain/src/ports/widget_state_reader.rs new file mode 100644 index 0000000..2f72fc3 --- /dev/null +++ b/crates/domain/src/ports/widget_state_reader.rs @@ -0,0 +1,14 @@ +use crate::entities::WidgetId; +use crate::value_objects::WidgetState; +use std::future::Future; + +pub trait WidgetStateReader { + fn get_widget_state(&self, id: WidgetId) -> impl Future> + Send; + + fn apply_raw_data( + &self, + source_id: u16, + raw: &crate::value_objects::Value, + widgets: &[crate::entities::WidgetConfig], + ) -> impl Future> + Send; +} diff --git a/spa/src/api/clients.ts b/spa/src/api/clients.ts new file mode 100644 index 0000000..4a89ed5 --- /dev/null +++ b/spa/src/api/clients.ts @@ -0,0 +1,11 @@ +import { useQuery } from "@tanstack/react-query" +import { api } from "./client" +import type { ClientInfo } from "./types" + +export function useClients() { + return useQuery({ + queryKey: ["clients"], + queryFn: () => api.get("/clients"), + refetchInterval: 5000, + }) +} diff --git a/spa/src/api/types.ts b/spa/src/api/types.ts index 573aa97..67bc4bd 100644 --- a/spa/src/api/types.ts +++ b/spa/src/api/types.ts @@ -57,3 +57,8 @@ export interface Preset { name: string layout: Layout } + +export interface ClientInfo { + addr: string + connected_at: number +} diff --git a/spa/src/api/widgets.ts b/spa/src/api/widgets.ts index 283afaa..f346c59 100644 --- a/spa/src/api/widgets.ts +++ b/spa/src/api/widgets.ts @@ -40,6 +40,15 @@ export function useUpdateWidget() { }) } +export function useWidgetPreview(id: number, enabled: boolean) { + return useQuery({ + queryKey: ["widget-preview", id], + queryFn: () => api.get>(`/widgets/${id}/preview`), + enabled, + refetchInterval: 5000, + }) +} + export function useDeleteWidget() { const qc = useQueryClient() return useMutation({ diff --git a/spa/src/components/app-shell.tsx b/spa/src/components/app-shell.tsx index 7d52b51..7b97e5e 100644 --- a/spa/src/components/app-shell.tsx +++ b/spa/src/components/app-shell.tsx @@ -18,6 +18,7 @@ import { Box, Layers, Save, + BookOpen, } from "lucide-react" const NAV = [ @@ -26,6 +27,7 @@ const NAV = [ { to: "/widgets", label: "Widgets", icon: Box }, { to: "/layout", label: "Layout", icon: Layers }, { to: "/presets", label: "Presets", icon: Save }, + { to: "/guide", label: "Guide", icon: BookOpen }, ] as const export function AppShell({ children }: { children: React.ReactNode }) { diff --git a/spa/src/pages/dashboard.tsx b/spa/src/pages/dashboard.tsx index 0efb6fe..f746c52 100644 --- a/spa/src/pages/dashboard.tsx +++ b/spa/src/pages/dashboard.tsx @@ -9,11 +9,24 @@ import { useDataSources } from "@/api/data-sources" import { useWidgets } from "@/api/widgets" import { useLayout } from "@/api/layout" import { usePresets } from "@/api/presets" -import { Activity, Box, Layers, Database } from "lucide-react" +import { useClients } from "@/api/clients" +import { Activity, Box, Layers, Database, Monitor } from "lucide-react" +import { Badge } from "@/components/ui/badge" function countNodes(node: { children?: { node: unknown }[] }): number { if (!node.children) return 1 - return 1 + node.children.reduce((sum, c) => sum + countNodes(c.node as typeof node), 0) + return ( + 1 + + node.children.reduce( + (sum, c) => sum + countNodes(c.node as typeof node), + 0, + ) + ) +} + +function formatConnectedAt(ts: number): string { + const d = new Date(ts * 1000) + return d.toLocaleTimeString() } export function DashboardPage() { @@ -21,8 +34,15 @@ export function DashboardPage() { const widgets = useWidgets() const layout = useLayout() const presets = usePresets() + const clients = useClients() const stats = [ + { + label: "Clients", + value: clients.data?.length ?? "—", + icon: Monitor, + desc: "Connected displays", + }, { label: "Data Sources", value: sources.data?.length ?? "—", @@ -56,7 +76,7 @@ export function DashboardPage() {

K-Frame system overview

-
+
{stats.map((s) => ( @@ -70,6 +90,29 @@ export function DashboardPage() { ))}
+ + {clients.data && clients.data.length > 0 && ( + + + Connected Clients + + +
+ {clients.data.map((c) => ( +
+ {c.addr} + + since {formatConnectedAt(c.connected_at)} + +
+ ))} +
+
+
+ )}
) } diff --git a/spa/src/pages/data-sources.tsx b/spa/src/pages/data-sources.tsx index 194964b..9f9d621 100644 --- a/spa/src/pages/data-sources.tsx +++ b/spa/src/pages/data-sources.tsx @@ -192,7 +192,15 @@ export function DataSourcesPage() { - diff --git a/spa/src/pages/guide.tsx b/spa/src/pages/guide.tsx new file mode 100644 index 0000000..00bf5ac --- /dev/null +++ b/spa/src/pages/guide.tsx @@ -0,0 +1,350 @@ +import { + Card, + CardContent, + CardDescription, + CardHeader, + CardTitle, +} from "@/components/ui/card" +import { Badge } from "@/components/ui/badge" +import { Separator } from "@/components/ui/separator" + +export function GuidePage() { + return ( +
+
+

Guide

+

+ How to set up K-Frame from scratch +

+
+ + {/* Overview */} + + + How K-Frame Works + The data pipeline at a glance + + +

+ K-Frame is an IoT dashboard system. The server polls external data + sources (weather APIs, Navidrome, RSS feeds, etc.), extracts values, + and pushes them to connected display clients (ESP32 screens) over + TCP. +

+
+ Data Source → poll → raw JSON → Widget mappings → Widget State → + Layout → ESP32 display +
+

+ You configure everything through this UI. Changes to layout are + pushed to clients immediately. +

+
+
+ + {/* Step 1 */} + + + Step 1: Add a Data Source + + Where to pull data from + + + +

+ Go to Data Sources → Add Source. A data source is + an external feed that the server polls at a regular interval. +

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
FieldDescription
nameHuman-readable label (e.g. "weather", "navidrome")
source_typeDetermines which adapter handles polling (see reference below)
urlBase URL of the API to poll
api_keyOptional API key (masked in the UI)
poll_intervalHow often to fetch data, in seconds
headers + Key-value pairs for authentication or custom config. For + example, the media adapter reads{" "} + username and{" "} + password from + headers +
+
+
+ + {/* Step 2 */} + + + Step 2: Create a Widget + + Extract and name the data you want to display + + + +

+ Go to Widgets → Add Widget. A widget is a display + primitive — it extracts specific values from a data source's raw + response and gives them names the client can render. +

+ +

Display Hints

+
+ icon_value + text_block + key_value +
+

+ Hints tell the client how to render the widget. The client decides + the actual visual treatment. +

+ + + +

Key Mappings

+

+ Mappings define how to extract data from the raw API response. + Each mapping has two fields: +

+ + + + + + + + + + + + + + + + + +
FieldDescription
source_path + JSONPath expression into the raw response (e.g.{" "} + $.title,{" "} + $.main.temp,{" "} + $.weather[0].icon) +
target_key + The name the extracted value gets in the widget's state. + This is what the client sees (e.g.{" "} + value,{" "} + label,{" "} + icon) +
+ +
+ Example: Navidrome "now playing"
+ source_path $.title → target_key value
+ source_path $.artist → target_key label +
+
+
+ + {/* Step 3 */} + + + Step 3: Build a Layout + + Arrange widgets on the display + + + +

+ Go to Layout. The layout is a recursive tree of + containers and widgets. +

+ +

Node Types

+ + + + + + + + + + + + + + + + + + + + + +
TypeDescription
Container (row)Children laid out horizontally
Container (column)Children laid out vertically
Leaf (widget)Renders a specific widget's data
+ +

Sizing

+

Each child in a container has a sizing mode:

+
    +
  • + Flex(weight) — proportional share of remaining + space. Two children with flex 1 and 2 get 1/3 and 2/3 of the + space. +
  • +
  • + Fixed(pixels) — exact pixel width (in rows) or + height (in columns). +
  • +
+ +

Gap & Padding

+

+ Gap adds uniform spacing between children.{" "} + Padding insets the container's content area on all + sides. Both are in pixels. Typically use padding on the root + container to keep content off screen edges. +

+
+
+ + {/* Step 4 */} + + + Step 4: Save & Push + + Changes go live immediately + + + +

+ Click Save Layout to persist the layout and push + it to all connected clients immediately. The ESP32 will re-render + with the new layout without needing a restart. +

+

+ Data updates are pushed automatically whenever a poll detects + changed values — no action needed from you. +

+
+
+ + {/* Presets */} + + + Presets + + Save and restore layout snapshots + + + +

+ Go to Presets to save the current layout as a + named preset. You can load a preset later to restore its layout, + or delete presets you no longer need. +

+

+ Loading a preset replaces the active layout and pushes to clients + immediately. +

+
+
+ + {/* Source Types Reference */} + + + Source Type Reference + + What each adapter expects + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
TypeDirectionNotes
weatherpoll + OpenWeather-compatible. Set URL with API key in query params. + Returns nested JSON — use paths like{" "} + $.main.temp +
mediapoll + Navidrome/Subsonic. Set base URL, add{" "} + username and{" "} + password{" "} + as headers. Returns{" "} + $.playing,{" "} + $.title,{" "} + $.artist,{" "} + $.album,{" "} + $.duration +
rsspoll + Any RSS feed URL. Returns{" "} + $.title,{" "} + $.items (array + of items with title, link, description) +
http_jsonpoll + Generic — polls any URL, returns raw JSON. Use JSONPath in + mappings to extract what you need. +
webhookpush + Receives incoming HTTP POSTs. Poll interval must be 0. + Not yet wired in the polling loop. +
+
+
+
+ ) +} diff --git a/spa/src/pages/layout-builder.tsx b/spa/src/pages/layout-builder.tsx index 2a11f45..eb8bd11 100644 --- a/spa/src/pages/layout-builder.tsx +++ b/spa/src/pages/layout-builder.tsx @@ -16,6 +16,16 @@ import { SelectTrigger, SelectValue, } from "@/components/ui/select" +import { + AlertDialog, + AlertDialogAction, + AlertDialogCancel, + AlertDialogContent, + AlertDialogDescription, + AlertDialogFooter, + AlertDialogHeader, + AlertDialogTitle, +} from "@/components/ui/alert-dialog" import { Input } from "@/components/ui/input" import { Label } from "@/components/ui/label" import { Badge } from "@/components/ui/badge" @@ -110,6 +120,7 @@ export function LayoutBuilderPage() { const [root, setRoot] = useState(null) const [selected, setSelected] = useState(null) const [initialized, setInitialized] = useState(false) + const [pendingDelete, setPendingDelete] = useState(null) if (!initialized && currentLayout?.root) { setRoot(structuredClone(currentLayout.root)) @@ -255,7 +266,7 @@ export function LayoutBuilderPage() { onAddContainer={(path, dir) => addChild(path, makeContainerChild(dir)) } - onRemove={() => removeChild(selected)} + onRemove={() => setPendingDelete(selected)} onUpdateSizing={(sizing) => updateSizing(selected, sizing)} isRoot={selected.length === 0} widgets={widgets} @@ -264,7 +275,7 @@ export function LayoutBuilderPage() { removeChild(selected)} + onRemove={() => setPendingDelete(selected)} onUpdateSizing={(sizing) => updateSizing(selected, sizing)} widgets={widgets} sizing={ @@ -284,6 +295,39 @@ export function LayoutBuilderPage() { + + !o && setPendingDelete(null)} + > + + + + {pendingDelete?.length === 0 + ? "Clear entire layout?" + : "Remove this node?"} + + + {pendingDelete?.length === 0 + ? "This will remove the entire layout tree. You can rebuild it afterward." + : "This will remove the selected node and all its children."} + + + + Cancel + { + if (pendingDelete !== null) { + removeChild(pendingDelete) + setPendingDelete(null) + } + }} + > + {pendingDelete?.length === 0 ? "Clear" : "Remove"} + + + + ) } diff --git a/spa/src/pages/widgets.tsx b/spa/src/pages/widgets.tsx index 0adef9f..b468162 100644 --- a/spa/src/pages/widgets.tsx +++ b/spa/src/pages/widgets.tsx @@ -4,6 +4,7 @@ import { useCreateWidget, useUpdateWidget, useDeleteWidget, + useWidgetPreview, } from "@/api/widgets" import { useDataSources } from "@/api/data-sources" import type { Widget, DisplayHint, KeyMapping } from "@/api/types" @@ -42,7 +43,7 @@ import { AlertDialogTitle, } from "@/components/ui/alert-dialog" import { Badge } from "@/components/ui/badge" -import { Plus, Pencil, Trash2, X } from "lucide-react" +import { Plus, Pencil, Trash2, X, Eye } from "lucide-react" import { toast } from "sonner" const DISPLAY_HINTS: DisplayHint[] = ["icon_value", "text_block", "key_value"] @@ -66,6 +67,7 @@ export function WidgetsPage() { const [editing, setEditing] = useState(null) const [deleting, setDeleting] = useState(null) + const [previewing, setPreviewing] = useState(null) function openNew() { const nextId = @@ -145,6 +147,14 @@ export function WidgetsPage() {
+ - @@ -213,10 +230,59 @@ export function WidgetsPage() { + + {previewing != null && ( + w.id === previewing)?.name ?? ""} + onClose={() => setPreviewing(null)} + /> + )}
) } +function WidgetPreviewDialog({ + widgetId, + widgetName, + onClose, +}: { + widgetId: number + widgetName: string + onClose: () => void +}) { + const { data, isLoading, isError } = useWidgetPreview(widgetId, true) + + return ( + !o && onClose()}> + + + Preview: {widgetName} + +
+ {isLoading && ( +

Loading…

+ )} + {isError && ( +

+ No data yet — widget hasn't been polled +

+ )} + {data && ( +
+              {JSON.stringify(data, null, 2)}
+            
+ )} +
+ + + +
+
+ ) +} function WidgetForm({ value, diff --git a/spa/src/router.tsx b/spa/src/router.tsx index 1895159..986d70a 100644 --- a/spa/src/router.tsx +++ b/spa/src/router.tsx @@ -10,6 +10,7 @@ import { DataSourcesPage } from "@/pages/data-sources" import { WidgetsPage } from "@/pages/widgets" import { LayoutBuilderPage } from "@/pages/layout-builder" import { PresetsPage } from "@/pages/presets" +import { GuidePage } from "@/pages/guide" const rootRoute = createRootRoute({ component: () => ( @@ -49,12 +50,19 @@ const presetsRoute = createRoute({ component: PresetsPage, }) +const guideRoute = createRoute({ + getParentRoute: () => rootRoute, + path: "/guide", + component: GuidePage, +}) + const routeTree = rootRoute.addChildren([ indexRoute, dataSourcesRoute, widgetsRoute, layoutRoute, presetsRoute, + guideRoute, ]) export const router = createRouter({ routeTree })