add SPA config UI, wire media/rss adapters, event-driven layout push
- React SPA: dashboard, data sources CRUD, widgets CRUD, layout builder, presets. TanStack Router + Query, shadcn/ui, Vite proxy to :3000 - wire media + rss adapters into polling loop, remove xtb source type - media adapter: read username/password from headers, proper subsonic auth - event handler: subscribe to LayoutChanged, push screen update to clients - fix clippy warnings across workspace (Default impls, collapsible ifs, redundant closures, is_none_or, unused imports)
This commit is contained in:
@@ -12,10 +12,8 @@ impl ServerConfig {
|
||||
Self {
|
||||
database_url: env::var("KFRAME_DATABASE_URL")
|
||||
.unwrap_or_else(|_| "sqlite:kframe.db?mode=rwc".into()),
|
||||
tcp_addr: env::var("KFRAME_TCP_ADDR")
|
||||
.unwrap_or_else(|_| "0.0.0.0:2699".into()),
|
||||
http_addr: env::var("KFRAME_HTTP_ADDR")
|
||||
.unwrap_or_else(|_| "0.0.0.0:3000".into()),
|
||||
tcp_addr: env::var("KFRAME_TCP_ADDR").unwrap_or_else(|_| "0.0.0.0:2699".into()),
|
||||
http_addr: env::var("KFRAME_HTTP_ADDR").unwrap_or_else(|_| "0.0.0.0:3000".into()),
|
||||
poll_interval_secs: env::var("KFRAME_POLL_INTERVAL_SECS")
|
||||
.ok()
|
||||
.and_then(|v| v.parse().ok())
|
||||
|
||||
53
crates/bootstrap/src/event_handler.rs
Normal file
53
crates/bootstrap/src/event_handler.rs
Normal file
@@ -0,0 +1,53 @@
|
||||
use application::DataProjection;
|
||||
use config_sqlite::SqliteConfigStore;
|
||||
use domain::{BroadcastPort, ConfigRepository, DomainEvent};
|
||||
use std::sync::Arc;
|
||||
use tcp_server::{TcpBroadcaster, TcpEventBus};
|
||||
use tokio::sync::Mutex;
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
pub async fn run(
|
||||
event_bus: Arc<TcpEventBus>,
|
||||
config: Arc<SqliteConfigStore>,
|
||||
broadcaster: Arc<TcpBroadcaster>,
|
||||
projection: Arc<Mutex<DataProjection>>,
|
||||
) {
|
||||
let mut rx = event_bus.subscribe();
|
||||
|
||||
loop {
|
||||
match rx.recv().await {
|
||||
Ok(DomainEvent::LayoutChanged { layout }) => {
|
||||
let widgets = match config.list_widgets().await {
|
||||
Ok(w) => w,
|
||||
Err(e) => {
|
||||
error!(error = %e, "failed to fetch widgets for screen update");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let proj = projection.lock().await;
|
||||
let widget_states: Vec<_> = widgets
|
||||
.iter()
|
||||
.filter_map(|w| proj.get_state(w.id).map(|s| (w.id, s.clone())))
|
||||
.collect();
|
||||
|
||||
if let Err(e) = broadcaster
|
||||
.push_screen_update(&layout, &widget_states)
|
||||
.await
|
||||
{
|
||||
error!(error = %e, "failed to push screen update");
|
||||
}
|
||||
|
||||
info!("layout changed, pushed screen update to clients");
|
||||
}
|
||||
Ok(_) => {}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
|
||||
warn!(skipped = n, "event handler lagged, missed events");
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
|
||||
error!("event bus closed");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,14 +1,15 @@
|
||||
mod config;
|
||||
mod event_handler;
|
||||
mod polling;
|
||||
|
||||
use std::sync::Arc;
|
||||
use anyhow::Result;
|
||||
use application::DataProjection;
|
||||
use config_sqlite::SqliteConfigStore;
|
||||
use tcp_server::{TcpBroadcaster, TcpEventBus, run_tcp_server};
|
||||
use http_api::AppState;
|
||||
use std::sync::Arc;
|
||||
use tcp_server::{TcpBroadcaster, TcpEventBus, run_tcp_server};
|
||||
use tokio::sync::Mutex;
|
||||
use tracing::{info, error};
|
||||
use tracing::{error, info};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
@@ -53,5 +54,19 @@ async fn main() -> Result<()> {
|
||||
|
||||
info!("K-Frame server running");
|
||||
|
||||
polling::run(config_store, broadcaster, projection, cfg.poll_interval_secs).await
|
||||
let ev_bus = event_bus.clone();
|
||||
let ev_config = config_store.clone();
|
||||
let ev_bc = broadcaster.clone();
|
||||
let ev_proj = projection.clone();
|
||||
tokio::spawn(async move {
|
||||
event_handler::run(ev_bus, ev_config, ev_bc, ev_proj).await;
|
||||
});
|
||||
|
||||
polling::run(
|
||||
config_store,
|
||||
broadcaster,
|
||||
projection,
|
||||
cfg.poll_interval_secs,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -1,16 +1,17 @@
|
||||
use anyhow::Result;
|
||||
use application::DataProjection;
|
||||
use config_sqlite::SqliteConfigStore;
|
||||
use domain::{
|
||||
BroadcastPort, ConfigRepository, DataSource, DataSourcePort, DataSourceType, Value, WidgetState,
|
||||
};
|
||||
use http_json::HttpJsonAdapter;
|
||||
use media_adapter::MediaAdapter;
|
||||
use rss_adapter::RssAdapter;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use anyhow::Result;
|
||||
use domain::{
|
||||
ConfigRepository, BroadcastPort, DataSourcePort, DataSourceType,
|
||||
DataSource, Value, WidgetState,
|
||||
};
|
||||
use application::DataProjection;
|
||||
use http_json::HttpJsonAdapter;
|
||||
use tcp_server::TcpBroadcaster;
|
||||
use config_sqlite::SqliteConfigStore;
|
||||
use tokio::sync::Mutex;
|
||||
use tracing::{info, warn, debug};
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
pub async fn run(
|
||||
config: Arc<SqliteConfigStore>,
|
||||
@@ -19,6 +20,8 @@ pub async fn run(
|
||||
poll_interval_secs: u64,
|
||||
) -> Result<()> {
|
||||
let http_adapter = HttpJsonAdapter::new();
|
||||
let media_adapter = MediaAdapter::new();
|
||||
let rss_adapter = RssAdapter::new();
|
||||
let interval = Duration::from_secs(poll_interval_secs);
|
||||
|
||||
info!(interval_secs = poll_interval_secs, "polling loop started");
|
||||
@@ -26,11 +29,17 @@ pub async fn run(
|
||||
loop {
|
||||
tokio::time::sleep(interval).await;
|
||||
|
||||
let sources = config.list_data_sources().await
|
||||
let sources = config
|
||||
.list_data_sources()
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
let widgets = config.list_widgets().await
|
||||
let widgets = config
|
||||
.list_widgets()
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
let layout = config.get_layout().await
|
||||
let layout = config
|
||||
.get_layout()
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
|
||||
if sources.is_empty() || widgets.is_empty() {
|
||||
@@ -41,13 +50,14 @@ pub async fn run(
|
||||
let mut all_changed: Vec<(u16, WidgetState)> = Vec::new();
|
||||
|
||||
for source in &sources {
|
||||
let result = match poll_source(&http_adapter, source).await {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
warn!(source = %source.name, error = %e, "poll failed");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let result =
|
||||
match poll_source(&http_adapter, &media_adapter, &rss_adapter, source).await {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
warn!(source = %source.name, error = %e, "poll failed");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let mut proj = projection.lock().await;
|
||||
let changed = proj.apply_poll_result(source.id, &result, &widgets);
|
||||
@@ -56,7 +66,9 @@ pub async fn run(
|
||||
|
||||
if !all_changed.is_empty() {
|
||||
if let Some(l) = &layout {
|
||||
broadcaster.push_screen_update(l, &all_changed).await
|
||||
broadcaster
|
||||
.push_screen_update(l, &all_changed)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
}
|
||||
info!(count = all_changed.len(), "pushed widget updates");
|
||||
@@ -66,15 +78,25 @@ pub async fn run(
|
||||
|
||||
async fn poll_source(
|
||||
http_adapter: &HttpJsonAdapter,
|
||||
media_adapter: &MediaAdapter,
|
||||
rss_adapter: &RssAdapter,
|
||||
source: &DataSource,
|
||||
) -> Result<Value> {
|
||||
match source.source_type {
|
||||
DataSourceType::HttpJson | DataSourceType::Weather => {
|
||||
http_adapter.poll(source).await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))
|
||||
}
|
||||
_ => {
|
||||
Err(anyhow::anyhow!("unsupported source type: {:?}", source.source_type))
|
||||
}
|
||||
DataSourceType::HttpJson | DataSourceType::Weather => http_adapter
|
||||
.poll(source)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{e}")),
|
||||
DataSourceType::Media => media_adapter
|
||||
.poll(source)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{e}")),
|
||||
DataSourceType::Rss => rss_adapter
|
||||
.poll(source)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{e}")),
|
||||
DataSourceType::Webhook => Err(anyhow::anyhow!(
|
||||
"webhook sources are push-based, not polled"
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user