rewire bootstrap with SQLite, HTTP API, and real polling
bootstrap: SQLite config, HTTP API on :3000, TCP on :2699, poll loops.
http-api: added serve() so bootstrap doesn't depend on axum directly.
polling: reads data sources from config, polls via http-json adapter,
pushes changed widgets to connected clients.
configure via API, e.g.:
curl -X POST localhost:3000/api/data-sources -H 'Content-Type: application/json' -d '{...}'
curl -X PUT localhost:3000/api/layout -H 'Content-Type: application/json' -d '{...}'
This commit is contained in:
@@ -11,6 +11,7 @@ axum.workspace = true
|
||||
tower-http.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
tokio.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
tokio.workspace = true
|
||||
|
||||
@@ -31,3 +31,15 @@ where
|
||||
.layer(CorsLayer::permissive())
|
||||
.with_state(state)
|
||||
}
|
||||
|
||||
pub async fn serve<C, E>(addr: &str, state: AppState<C, E>) -> Result<(), std::io::Error>
|
||||
where
|
||||
C: ConfigRepository + Send + Sync + 'static,
|
||||
C::Error: std::fmt::Debug + Send,
|
||||
E: EventPublisher + Send + Sync + 'static,
|
||||
E::Error: std::fmt::Debug + Send,
|
||||
{
|
||||
let app = router(state);
|
||||
let listener = tokio::net::TcpListener::bind(addr).await?;
|
||||
axum::serve(listener, app).await
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@ edition = "2024"
|
||||
[dependencies]
|
||||
domain.workspace = true
|
||||
reqwest.workspace = true
|
||||
quick-xml = { version = "0.37", features = ["serialize"] }
|
||||
quick-xml = { version = "0.40", features = ["serialize"] }
|
||||
serde.workspace = true
|
||||
thiserror.workspace = true
|
||||
|
||||
|
||||
@@ -37,7 +37,7 @@ pub fn parse_rss(xml: &str) -> Result<Value, RssError> {
|
||||
current_tag.clear();
|
||||
}
|
||||
Ok(Event::Text(e)) => {
|
||||
let text = e.unescape().unwrap_or_default().to_string();
|
||||
let text = String::from_utf8_lossy(e.as_ref()).to_string();
|
||||
if !current_tag.is_empty() && !text.trim().is_empty() {
|
||||
if let Some(item) = current_item.as_mut() {
|
||||
item.insert(current_tag.clone(), Value::String(text));
|
||||
|
||||
@@ -5,8 +5,10 @@ edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
domain.workspace = true
|
||||
protocol.workspace = true
|
||||
application.workspace = true
|
||||
config-memory.workspace = true
|
||||
config-sqlite.workspace = true
|
||||
tcp-server.workspace = true
|
||||
http-api.workspace = true
|
||||
http-json.workspace = true
|
||||
tokio.workspace = true
|
||||
anyhow.workspace = true
|
||||
|
||||
@@ -1,90 +1,40 @@
|
||||
mod polling;
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use domain::{
|
||||
ConfigRepository, BroadcastPort,
|
||||
WidgetConfig, DisplayHint, KeyMapping,
|
||||
Layout, LayoutNode, ContainerNode, LayoutChild, Direction, Sizing,
|
||||
Value, WidgetState,
|
||||
};
|
||||
use application::{ConfigService, DataProjection};
|
||||
use config_memory::MemoryConfigStore;
|
||||
use anyhow::Result;
|
||||
use application::DataProjection;
|
||||
use config_sqlite::SqliteConfigStore;
|
||||
use tcp_server::{TcpBroadcaster, TcpEventBus, run_tcp_server};
|
||||
use http_api::AppState;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
const DB_PATH: &str = "sqlite:kframe.db?mode=rwc";
|
||||
const TCP_ADDR: &str = "0.0.0.0:2699";
|
||||
const HTTP_ADDR: &str = "0.0.0.0:3000";
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let config_store = Arc::new(MemoryConfigStore::new());
|
||||
async fn main() -> Result<()> {
|
||||
let config_store = Arc::new(SqliteConfigStore::new(DB_PATH).await?);
|
||||
let event_bus = Arc::new(TcpEventBus::new(64));
|
||||
let broadcaster = Arc::new(TcpBroadcaster::new(64));
|
||||
let projection = Arc::new(Mutex::new(DataProjection::new()));
|
||||
|
||||
let service = ConfigService::new(config_store.as_ref(), event_bus.as_ref());
|
||||
|
||||
service.create_widget(WidgetConfig::new(
|
||||
1, "weather".into(), DisplayHint::IconValue, 1,
|
||||
vec![
|
||||
KeyMapping { source_path: "$.temperature".into(), target_key: "value".into() },
|
||||
KeyMapping { source_path: "$.icon".into(), target_key: "icon".into() },
|
||||
],
|
||||
)).await.unwrap();
|
||||
|
||||
service.create_widget(WidgetConfig::new(
|
||||
2, "portfolio".into(), DisplayHint::KeyValue, 2,
|
||||
vec![
|
||||
KeyMapping { source_path: "$.amount".into(), target_key: "value".into() },
|
||||
],
|
||||
)).await.unwrap();
|
||||
|
||||
let layout = Layout {
|
||||
root: LayoutNode::Container(ContainerNode {
|
||||
direction: Direction::Row,
|
||||
gap: 4,
|
||||
padding: 2,
|
||||
children: vec![
|
||||
LayoutChild { sizing: Sizing::Flex(1), node: LayoutNode::Leaf(1) },
|
||||
LayoutChild { sizing: Sizing::Flex(1), node: LayoutNode::Leaf(2) },
|
||||
],
|
||||
}),
|
||||
};
|
||||
service.update_layout(layout).await.unwrap();
|
||||
|
||||
let bc = broadcaster.clone();
|
||||
let tcp_bc = broadcaster.clone();
|
||||
tokio::spawn(async move {
|
||||
run_tcp_server("0.0.0.0:2699", bc).await.unwrap();
|
||||
run_tcp_server(TCP_ADDR, tcp_bc).await.unwrap();
|
||||
});
|
||||
println!("TCP server on {TCP_ADDR}");
|
||||
|
||||
println!("Server running on :2699");
|
||||
println!("Sending fake data every 3 seconds...");
|
||||
let http_state = AppState {
|
||||
config: config_store.clone(),
|
||||
events: event_bus.clone(),
|
||||
};
|
||||
tokio::spawn(async move {
|
||||
http_api::serve(HTTP_ADDR, http_state).await.unwrap();
|
||||
});
|
||||
println!("HTTP API on {HTTP_ADDR}");
|
||||
|
||||
let mut projection = DataProjection::new();
|
||||
let mut counter = 0u32;
|
||||
println!("K-Frame server running");
|
||||
|
||||
loop {
|
||||
tokio::time::sleep(Duration::from_secs(3)).await;
|
||||
counter += 1;
|
||||
|
||||
let widgets = config_store.list_widgets().await.unwrap();
|
||||
let layout = config_store.get_layout().await.unwrap();
|
||||
|
||||
let weather_data = Value::Object(std::collections::BTreeMap::from([
|
||||
("temperature".into(), Value::String(format!("{}.{}°C", 5 + counter % 10, counter % 10))),
|
||||
("icon".into(), Value::String("sunny".into())),
|
||||
]));
|
||||
|
||||
let portfolio_data = Value::Object(std::collections::BTreeMap::from([
|
||||
("amount".into(), Value::String(format!("{}.{} PLN", 100 + counter, counter % 100))),
|
||||
]));
|
||||
|
||||
let changed_weather = projection.apply_poll_result(1, &weather_data, &widgets);
|
||||
let changed_portfolio = projection.apply_poll_result(2, &portfolio_data, &widgets);
|
||||
|
||||
let mut all_changed: Vec<(u16, WidgetState)> = Vec::new();
|
||||
all_changed.extend(changed_weather);
|
||||
all_changed.extend(changed_portfolio);
|
||||
|
||||
if !all_changed.is_empty() {
|
||||
if let Some(l) = &layout {
|
||||
broadcaster.push_screen_update(l, &all_changed).await.unwrap();
|
||||
}
|
||||
println!("Pushed {} widget updates (tick {counter})", all_changed.len());
|
||||
}
|
||||
}
|
||||
polling::run(config_store, broadcaster, projection).await
|
||||
}
|
||||
|
||||
76
crates/bootstrap/src/polling.rs
Normal file
76
crates/bootstrap/src/polling.rs
Normal file
@@ -0,0 +1,76 @@
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use anyhow::Result;
|
||||
use domain::{
|
||||
ConfigRepository, BroadcastPort, DataSourcePort, DataSourceType,
|
||||
DataSource, Value, WidgetState,
|
||||
};
|
||||
use application::DataProjection;
|
||||
use http_json::HttpJsonAdapter;
|
||||
use tcp_server::TcpBroadcaster;
|
||||
use config_sqlite::SqliteConfigStore;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
const POLL_CHECK_INTERVAL: Duration = Duration::from_secs(5);
|
||||
|
||||
pub async fn run(
|
||||
config: Arc<SqliteConfigStore>,
|
||||
broadcaster: Arc<TcpBroadcaster>,
|
||||
projection: Arc<Mutex<DataProjection>>,
|
||||
) -> Result<()> {
|
||||
let http_adapter = HttpJsonAdapter::new();
|
||||
|
||||
loop {
|
||||
tokio::time::sleep(POLL_CHECK_INTERVAL).await;
|
||||
|
||||
let sources = config.list_data_sources().await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
let widgets = config.list_widgets().await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
let layout = config.get_layout().await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
|
||||
if sources.is_empty() || widgets.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut all_changed: Vec<(u16, WidgetState)> = Vec::new();
|
||||
|
||||
for source in &sources {
|
||||
let result = match poll_source(&http_adapter, source).await {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
eprintln!("poll error for '{}': {e}", source.name);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let mut proj = projection.lock().await;
|
||||
let changed = proj.apply_poll_result(source.id, &result, &widgets);
|
||||
all_changed.extend(changed);
|
||||
}
|
||||
|
||||
if !all_changed.is_empty() {
|
||||
if let Some(l) = &layout {
|
||||
broadcaster.push_screen_update(l, &all_changed).await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
}
|
||||
println!("pushed {} widget updates", all_changed.len());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn poll_source(
|
||||
http_adapter: &HttpJsonAdapter,
|
||||
source: &DataSource,
|
||||
) -> Result<Value> {
|
||||
match source.source_type {
|
||||
DataSourceType::HttpJson | DataSourceType::Weather => {
|
||||
http_adapter.poll(source).await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))
|
||||
}
|
||||
_ => {
|
||||
Err(anyhow::anyhow!("unsupported source type: {:?}", source.source_type))
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user