From 15b75d860cf03ed65e4d36bb2eaadb900e461b85 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Thu, 18 Jun 2026 23:12:05 +0200 Subject: [PATCH] rewire bootstrap with SQLite, HTTP API, and real polling bootstrap: SQLite config, HTTP API on :3000, TCP on :2699, poll loops. http-api: added serve() so bootstrap doesn't depend on axum directly. polling: reads data sources from config, polls via http-json adapter, pushes changed widgets to connected clients. configure via API, e.g.: curl -X POST localhost:3000/api/data-sources -H 'Content-Type: application/json' -d '{...}' curl -X PUT localhost:3000/api/layout -H 'Content-Type: application/json' -d '{...}' --- Cargo.lock | 16 +++-- Cargo.toml | 1 + crates/adapters/http-api/Cargo.toml | 1 + crates/adapters/http-api/src/lib.rs | 12 ++++ crates/adapters/rss/Cargo.toml | 2 +- crates/adapters/rss/src/parser.rs | 2 +- crates/bootstrap/Cargo.toml | 6 +- crates/bootstrap/src/main.rs | 104 ++++++++-------------------- crates/bootstrap/src/polling.rs | 76 ++++++++++++++++++++ 9 files changed, 135 insertions(+), 85 deletions(-) create mode 100644 crates/bootstrap/src/polling.rs diff --git a/Cargo.lock b/Cargo.lock index 795bc7e..089135c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,6 +8,12 @@ version = "0.2.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" +[[package]] +name = "anyhow" +version = "1.0.102" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" + [[package]] name = "api-types" version = "0.1.0" @@ -144,10 +150,12 @@ dependencies = [ name = "bootstrap" version = "0.1.0" dependencies = [ + "anyhow", "application", - "config-memory", + "config-sqlite", "domain", - "protocol", + "http-api", + "http-json", "tcp-server", "tokio", ] @@ -1353,9 +1361,9 @@ dependencies = [ [[package]] name = "quick-xml" -version = "0.37.5" +version = "0.40.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "331e97a1af0bf59823e6eadffe373d7b27f485be8748f71471c662c1f269b7fb" +checksum = "2474bd2e5029e7ccb6abb2ba48cf2383a333851dedf495901544281590c7da7f" dependencies = [ "memchr", "serde", diff --git a/Cargo.toml b/Cargo.toml index 9eef588..80e88d8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ tcp-server = { path = "crates/adapters/tcp-server" } tcp-client = { path = "crates/adapters/tcp-client" } display-terminal = { path = "crates/adapters/display-terminal" } config-sqlite = { path = "crates/adapters/config-sqlite" } +http-json = { path = "crates/adapters/http-json" } http-api = { path = "crates/adapters/http-api" } axum = { version = "0.8", features = ["macros"] } tower-http = { version = "0.6", features = ["cors"] } diff --git a/crates/adapters/http-api/Cargo.toml b/crates/adapters/http-api/Cargo.toml index 79e8ebd..f75ad0b 100644 --- a/crates/adapters/http-api/Cargo.toml +++ b/crates/adapters/http-api/Cargo.toml @@ -11,6 +11,7 @@ axum.workspace = true tower-http.workspace = true serde.workspace = true serde_json.workspace = true +tokio.workspace = true [dev-dependencies] tokio.workspace = true diff --git a/crates/adapters/http-api/src/lib.rs b/crates/adapters/http-api/src/lib.rs index 8b5d26f..a118de8 100644 --- a/crates/adapters/http-api/src/lib.rs +++ b/crates/adapters/http-api/src/lib.rs @@ -31,3 +31,15 @@ where .layer(CorsLayer::permissive()) .with_state(state) } + +pub async fn serve(addr: &str, state: AppState) -> Result<(), std::io::Error> +where + C: ConfigRepository + Send + Sync + 'static, + C::Error: std::fmt::Debug + Send, + E: EventPublisher + Send + Sync + 'static, + E::Error: std::fmt::Debug + Send, +{ + let app = router(state); + let listener = tokio::net::TcpListener::bind(addr).await?; + axum::serve(listener, app).await +} diff --git a/crates/adapters/rss/Cargo.toml b/crates/adapters/rss/Cargo.toml index ebf13cc..1c6d504 100644 --- a/crates/adapters/rss/Cargo.toml +++ b/crates/adapters/rss/Cargo.toml @@ -6,7 +6,7 @@ edition = "2024" [dependencies] domain.workspace = true reqwest.workspace = true -quick-xml = { version = "0.37", features = ["serialize"] } +quick-xml = { version = "0.40", features = ["serialize"] } serde.workspace = true thiserror.workspace = true diff --git a/crates/adapters/rss/src/parser.rs b/crates/adapters/rss/src/parser.rs index 5b88a81..a8072db 100644 --- a/crates/adapters/rss/src/parser.rs +++ b/crates/adapters/rss/src/parser.rs @@ -37,7 +37,7 @@ pub fn parse_rss(xml: &str) -> Result { current_tag.clear(); } Ok(Event::Text(e)) => { - let text = e.unescape().unwrap_or_default().to_string(); + let text = String::from_utf8_lossy(e.as_ref()).to_string(); if !current_tag.is_empty() && !text.trim().is_empty() { if let Some(item) = current_item.as_mut() { item.insert(current_tag.clone(), Value::String(text)); diff --git a/crates/bootstrap/Cargo.toml b/crates/bootstrap/Cargo.toml index 76de982..97a5935 100644 --- a/crates/bootstrap/Cargo.toml +++ b/crates/bootstrap/Cargo.toml @@ -5,8 +5,10 @@ edition = "2024" [dependencies] domain.workspace = true -protocol.workspace = true application.workspace = true -config-memory.workspace = true +config-sqlite.workspace = true tcp-server.workspace = true +http-api.workspace = true +http-json.workspace = true tokio.workspace = true +anyhow.workspace = true diff --git a/crates/bootstrap/src/main.rs b/crates/bootstrap/src/main.rs index 28cf7e4..da98c1f 100644 --- a/crates/bootstrap/src/main.rs +++ b/crates/bootstrap/src/main.rs @@ -1,90 +1,40 @@ +mod polling; + use std::sync::Arc; -use std::time::Duration; -use domain::{ - ConfigRepository, BroadcastPort, - WidgetConfig, DisplayHint, KeyMapping, - Layout, LayoutNode, ContainerNode, LayoutChild, Direction, Sizing, - Value, WidgetState, -}; -use application::{ConfigService, DataProjection}; -use config_memory::MemoryConfigStore; +use anyhow::Result; +use application::DataProjection; +use config_sqlite::SqliteConfigStore; use tcp_server::{TcpBroadcaster, TcpEventBus, run_tcp_server}; +use http_api::AppState; +use tokio::sync::Mutex; + +const DB_PATH: &str = "sqlite:kframe.db?mode=rwc"; +const TCP_ADDR: &str = "0.0.0.0:2699"; +const HTTP_ADDR: &str = "0.0.0.0:3000"; #[tokio::main] -async fn main() { - let config_store = Arc::new(MemoryConfigStore::new()); +async fn main() -> Result<()> { + let config_store = Arc::new(SqliteConfigStore::new(DB_PATH).await?); let event_bus = Arc::new(TcpEventBus::new(64)); let broadcaster = Arc::new(TcpBroadcaster::new(64)); + let projection = Arc::new(Mutex::new(DataProjection::new())); - let service = ConfigService::new(config_store.as_ref(), event_bus.as_ref()); - - service.create_widget(WidgetConfig::new( - 1, "weather".into(), DisplayHint::IconValue, 1, - vec![ - KeyMapping { source_path: "$.temperature".into(), target_key: "value".into() }, - KeyMapping { source_path: "$.icon".into(), target_key: "icon".into() }, - ], - )).await.unwrap(); - - service.create_widget(WidgetConfig::new( - 2, "portfolio".into(), DisplayHint::KeyValue, 2, - vec![ - KeyMapping { source_path: "$.amount".into(), target_key: "value".into() }, - ], - )).await.unwrap(); - - let layout = Layout { - root: LayoutNode::Container(ContainerNode { - direction: Direction::Row, - gap: 4, - padding: 2, - children: vec![ - LayoutChild { sizing: Sizing::Flex(1), node: LayoutNode::Leaf(1) }, - LayoutChild { sizing: Sizing::Flex(1), node: LayoutNode::Leaf(2) }, - ], - }), - }; - service.update_layout(layout).await.unwrap(); - - let bc = broadcaster.clone(); + let tcp_bc = broadcaster.clone(); tokio::spawn(async move { - run_tcp_server("0.0.0.0:2699", bc).await.unwrap(); + run_tcp_server(TCP_ADDR, tcp_bc).await.unwrap(); }); + println!("TCP server on {TCP_ADDR}"); - println!("Server running on :2699"); - println!("Sending fake data every 3 seconds..."); + let http_state = AppState { + config: config_store.clone(), + events: event_bus.clone(), + }; + tokio::spawn(async move { + http_api::serve(HTTP_ADDR, http_state).await.unwrap(); + }); + println!("HTTP API on {HTTP_ADDR}"); - let mut projection = DataProjection::new(); - let mut counter = 0u32; + println!("K-Frame server running"); - loop { - tokio::time::sleep(Duration::from_secs(3)).await; - counter += 1; - - let widgets = config_store.list_widgets().await.unwrap(); - let layout = config_store.get_layout().await.unwrap(); - - let weather_data = Value::Object(std::collections::BTreeMap::from([ - ("temperature".into(), Value::String(format!("{}.{}°C", 5 + counter % 10, counter % 10))), - ("icon".into(), Value::String("sunny".into())), - ])); - - let portfolio_data = Value::Object(std::collections::BTreeMap::from([ - ("amount".into(), Value::String(format!("{}.{} PLN", 100 + counter, counter % 100))), - ])); - - let changed_weather = projection.apply_poll_result(1, &weather_data, &widgets); - let changed_portfolio = projection.apply_poll_result(2, &portfolio_data, &widgets); - - let mut all_changed: Vec<(u16, WidgetState)> = Vec::new(); - all_changed.extend(changed_weather); - all_changed.extend(changed_portfolio); - - if !all_changed.is_empty() { - if let Some(l) = &layout { - broadcaster.push_screen_update(l, &all_changed).await.unwrap(); - } - println!("Pushed {} widget updates (tick {counter})", all_changed.len()); - } - } + polling::run(config_store, broadcaster, projection).await } diff --git a/crates/bootstrap/src/polling.rs b/crates/bootstrap/src/polling.rs new file mode 100644 index 0000000..c557d00 --- /dev/null +++ b/crates/bootstrap/src/polling.rs @@ -0,0 +1,76 @@ +use std::sync::Arc; +use std::time::Duration; +use anyhow::Result; +use domain::{ + ConfigRepository, BroadcastPort, DataSourcePort, DataSourceType, + DataSource, Value, WidgetState, +}; +use application::DataProjection; +use http_json::HttpJsonAdapter; +use tcp_server::TcpBroadcaster; +use config_sqlite::SqliteConfigStore; +use tokio::sync::Mutex; + +const POLL_CHECK_INTERVAL: Duration = Duration::from_secs(5); + +pub async fn run( + config: Arc, + broadcaster: Arc, + projection: Arc>, +) -> Result<()> { + let http_adapter = HttpJsonAdapter::new(); + + loop { + tokio::time::sleep(POLL_CHECK_INTERVAL).await; + + let sources = config.list_data_sources().await + .map_err(|e| anyhow::anyhow!("{e}"))?; + let widgets = config.list_widgets().await + .map_err(|e| anyhow::anyhow!("{e}"))?; + let layout = config.get_layout().await + .map_err(|e| anyhow::anyhow!("{e}"))?; + + if sources.is_empty() || widgets.is_empty() { + continue; + } + + let mut all_changed: Vec<(u16, WidgetState)> = Vec::new(); + + for source in &sources { + let result = match poll_source(&http_adapter, source).await { + Ok(v) => v, + Err(e) => { + eprintln!("poll error for '{}': {e}", source.name); + continue; + } + }; + + let mut proj = projection.lock().await; + let changed = proj.apply_poll_result(source.id, &result, &widgets); + all_changed.extend(changed); + } + + if !all_changed.is_empty() { + if let Some(l) = &layout { + broadcaster.push_screen_update(l, &all_changed).await + .map_err(|e| anyhow::anyhow!("{e}"))?; + } + println!("pushed {} widget updates", all_changed.len()); + } + } +} + +async fn poll_source( + http_adapter: &HttpJsonAdapter, + source: &DataSource, +) -> Result { + match source.source_type { + DataSourceType::HttpJson | DataSourceType::Weather => { + http_adapter.poll(source).await + .map_err(|e| anyhow::anyhow!("{e}")) + } + _ => { + Err(anyhow::anyhow!("unsupported source type: {:?}", source.source_type)) + } + } +}