per-source polling, initial client state, webhook, preview, client tracking
- per-source poll intervals: spawn task per source with own interval,
manager re-checks sources every 30s for add/remove
- initial screen update on TCP connect: send layout + widget states
- client tracking: ClientRegistry port, GET /api/clients, dashboard list
- webhook adapter: POST /api/webhook/{source_id} feeds data into projection
- widget preview: GET /api/widgets/{id}/preview returns current state
- serve SPA from Axum: ServeDir + index.html fallback via KFRAME_SPA_DIR
- layout builder delete confirmation with AlertDialog
- form validation: required fields disable save button
- guide page at /guide
- fix architecture: ClientDto to api-types, ClientRegistry + WidgetStateReader
ports in domain, DataProjection has internal Mutex, no adapter cross-deps
- ESP32: full screen clear on layout change (stale pixel fix)
This commit is contained in:
@@ -5,6 +5,7 @@ pub struct ServerConfig {
|
||||
pub tcp_addr: String,
|
||||
pub http_addr: String,
|
||||
pub poll_interval_secs: u64,
|
||||
pub spa_dir: Option<String>,
|
||||
}
|
||||
|
||||
impl ServerConfig {
|
||||
@@ -18,6 +19,7 @@ impl ServerConfig {
|
||||
.ok()
|
||||
.and_then(|v| v.parse().ok())
|
||||
.unwrap_or(5),
|
||||
spa_dir: env::var("KFRAME_SPA_DIR").ok(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,14 +3,13 @@ use config_sqlite::SqliteConfigStore;
|
||||
use domain::{BroadcastPort, ConfigRepository, DomainEvent};
|
||||
use std::sync::Arc;
|
||||
use tcp_server::{TcpBroadcaster, TcpEventBus};
|
||||
use tokio::sync::Mutex;
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
pub async fn run(
|
||||
event_bus: Arc<TcpEventBus>,
|
||||
config: Arc<SqliteConfigStore>,
|
||||
broadcaster: Arc<TcpBroadcaster>,
|
||||
projection: Arc<Mutex<DataProjection>>,
|
||||
projection: Arc<DataProjection>,
|
||||
) {
|
||||
let mut rx = event_bus.subscribe();
|
||||
|
||||
@@ -25,11 +24,12 @@ pub async fn run(
|
||||
}
|
||||
};
|
||||
|
||||
let proj = projection.lock().await;
|
||||
let widget_states: Vec<_> = widgets
|
||||
.iter()
|
||||
.filter_map(|w| proj.get_state(w.id).map(|s| (w.id, s.clone())))
|
||||
.collect();
|
||||
let mut widget_states = Vec::new();
|
||||
for w in &widgets {
|
||||
if let Some(s) = projection.get_state(w.id).await {
|
||||
widget_states.push((w.id, s));
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(e) = broadcaster
|
||||
.push_screen_update(&layout, &widget_states)
|
||||
|
||||
@@ -7,8 +7,7 @@ use application::DataProjection;
|
||||
use config_sqlite::SqliteConfigStore;
|
||||
use http_api::AppState;
|
||||
use std::sync::Arc;
|
||||
use tcp_server::{TcpBroadcaster, TcpEventBus, run_tcp_server};
|
||||
use tokio::sync::Mutex;
|
||||
use tcp_server::{ClientTracker, TcpBroadcaster, TcpEventBus, run_tcp_server};
|
||||
use tracing::{error, info};
|
||||
|
||||
#[tokio::main]
|
||||
@@ -29,12 +28,16 @@ async fn main() -> Result<()> {
|
||||
|
||||
let event_bus = Arc::new(TcpEventBus::new(64));
|
||||
let broadcaster = Arc::new(TcpBroadcaster::new(64));
|
||||
let projection = Arc::new(Mutex::new(DataProjection::new()));
|
||||
let projection = Arc::new(DataProjection::new());
|
||||
let tracker = Arc::new(ClientTracker::new());
|
||||
|
||||
let tcp_addr = cfg.tcp_addr.clone();
|
||||
let tcp_bc = broadcaster.clone();
|
||||
let tcp_tracker = tracker.clone();
|
||||
let tcp_config = config_store.clone();
|
||||
let tcp_proj = projection.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = run_tcp_server(&tcp_addr, tcp_bc).await {
|
||||
if let Err(e) = run_tcp_server(&tcp_addr, tcp_bc, tcp_tracker, tcp_config, tcp_proj).await {
|
||||
error!(error = %e, "tcp server failed");
|
||||
}
|
||||
});
|
||||
@@ -44,6 +47,10 @@ async fn main() -> Result<()> {
|
||||
let http_state = AppState {
|
||||
config: config_store.clone(),
|
||||
events: event_bus.clone(),
|
||||
widget_states: projection.clone(),
|
||||
broadcaster: broadcaster.clone(),
|
||||
clients: tracker.clone(),
|
||||
spa_dir: cfg.spa_dir,
|
||||
};
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = http_api::serve(&http_addr, http_state).await {
|
||||
|
||||
@@ -7,71 +7,135 @@ use domain::{
|
||||
use http_json::HttpJsonAdapter;
|
||||
use media_adapter::MediaAdapter;
|
||||
use rss_adapter::RssAdapter;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tcp_server::TcpBroadcaster;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::task::JoinHandle;
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
const SOURCE_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
|
||||
|
||||
pub async fn run(
|
||||
config: Arc<SqliteConfigStore>,
|
||||
broadcaster: Arc<TcpBroadcaster>,
|
||||
projection: Arc<Mutex<DataProjection>>,
|
||||
poll_interval_secs: u64,
|
||||
projection: Arc<DataProjection>,
|
||||
_poll_interval_secs: u64,
|
||||
) -> Result<()> {
|
||||
let http_adapter = HttpJsonAdapter::new();
|
||||
let media_adapter = MediaAdapter::new();
|
||||
let rss_adapter = RssAdapter::new();
|
||||
let interval = Duration::from_secs(poll_interval_secs);
|
||||
let http_adapter = Arc::new(HttpJsonAdapter::new());
|
||||
let media_adapter = Arc::new(MediaAdapter::new());
|
||||
let rss_adapter = Arc::new(RssAdapter::new());
|
||||
|
||||
info!(interval_secs = poll_interval_secs, "polling loop started");
|
||||
let mut running: HashMap<u16, JoinHandle<()>> = HashMap::new();
|
||||
|
||||
info!("polling manager started");
|
||||
|
||||
loop {
|
||||
tokio::time::sleep(interval).await;
|
||||
|
||||
let sources = config
|
||||
.list_data_sources()
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
let widgets = config
|
||||
.list_widgets()
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
let layout = config
|
||||
.get_layout()
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
|
||||
if sources.is_empty() || widgets.is_empty() {
|
||||
debug!("no sources or widgets configured, skipping poll");
|
||||
continue;
|
||||
}
|
||||
let current_ids: Vec<u16> = sources.iter().map(|s| s.id).collect();
|
||||
|
||||
let mut all_changed: Vec<(u16, WidgetState)> = Vec::new();
|
||||
running.retain(|id, handle| {
|
||||
if !current_ids.contains(id) {
|
||||
info!(source_id = id, "stopping poll for removed source");
|
||||
handle.abort();
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
});
|
||||
|
||||
for source in &sources {
|
||||
let result =
|
||||
match poll_source(&http_adapter, &media_adapter, &rss_adapter, source).await {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
warn!(source = %source.name, error = %e, "poll failed");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
if source.source_type == DataSourceType::Webhook {
|
||||
continue;
|
||||
}
|
||||
if running.contains_key(&source.id) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut proj = projection.lock().await;
|
||||
let changed = proj.apply_poll_result(source.id, &result, &widgets);
|
||||
all_changed.extend(changed);
|
||||
let source_id = source.id;
|
||||
let source = source.clone();
|
||||
let config = config.clone();
|
||||
let broadcaster = broadcaster.clone();
|
||||
let projection = projection.clone();
|
||||
let http = http_adapter.clone();
|
||||
let media = media_adapter.clone();
|
||||
let rss = rss_adapter.clone();
|
||||
|
||||
info!(
|
||||
source_id = source.id,
|
||||
name = %source.name,
|
||||
interval_secs = source.poll_interval.as_secs(),
|
||||
"starting poll task"
|
||||
);
|
||||
|
||||
let handle = tokio::spawn(async move {
|
||||
poll_loop(source, config, broadcaster, projection, http, media, rss).await;
|
||||
});
|
||||
|
||||
running.insert(source_id, handle);
|
||||
}
|
||||
|
||||
if !all_changed.is_empty() {
|
||||
if let Some(l) = &layout {
|
||||
broadcaster
|
||||
.push_screen_update(l, &all_changed)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
if running.is_empty() {
|
||||
debug!("no pollable sources, waiting");
|
||||
}
|
||||
|
||||
tokio::time::sleep(SOURCE_REFRESH_INTERVAL).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn poll_loop(
|
||||
source: DataSource,
|
||||
config: Arc<SqliteConfigStore>,
|
||||
broadcaster: Arc<TcpBroadcaster>,
|
||||
projection: Arc<DataProjection>,
|
||||
http_adapter: Arc<HttpJsonAdapter>,
|
||||
media_adapter: Arc<MediaAdapter>,
|
||||
rss_adapter: Arc<RssAdapter>,
|
||||
) {
|
||||
let interval = source.poll_interval;
|
||||
|
||||
loop {
|
||||
tokio::time::sleep(interval).await;
|
||||
|
||||
let result = match poll_source(&http_adapter, &media_adapter, &rss_adapter, &source).await {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
warn!(source = %source.name, error = %e, "poll failed");
|
||||
continue;
|
||||
}
|
||||
info!(count = all_changed.len(), "pushed widget updates");
|
||||
};
|
||||
|
||||
let widgets = match config.list_widgets().await {
|
||||
Ok(w) => w,
|
||||
Err(e) => {
|
||||
warn!(error = %e, "failed to fetch widgets");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let layout = match config.get_layout().await {
|
||||
Ok(l) => l,
|
||||
Err(e) => {
|
||||
warn!(error = %e, "failed to fetch layout");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let changed: Vec<(u16, WidgetState)> = projection
|
||||
.apply_poll_result(source.id, &result, &widgets)
|
||||
.await;
|
||||
|
||||
if !changed.is_empty() {
|
||||
if let Some(l) = &layout
|
||||
&& let Err(e) = broadcaster.push_screen_update(l, &changed).await
|
||||
{
|
||||
warn!(error = %e, "failed to push update");
|
||||
}
|
||||
info!(source = %source.name, count = changed.len(), "pushed widget updates");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user