arch: push wire types out of ClientApp, extract event_service, cleanup dead code
- ClientApp stores domain types, RepaintCommand carries DisplayHint + Vec<(String,Value)> - adapters no longer convert Wire→Domain (eliminated duplication in esp32 + desktop) - event_service in application layer handles LayoutChanged/WebhookDataReceived/ThemeChanged - bootstrap event_handler reduced to 10-line dispatcher - polling_service reuses event_service::apply_and_broadcast (deduplicated broadcast pattern) - AppState.config_service() replaces 11 inline ConfigService::new() calls - delete unused poll_interval_secs parameter chain - delete unused StoragePort/ClientConfig (zero implementations)
This commit is contained in:
123
crates/application/src/event_service.rs
Normal file
123
crates/application/src/event_service.rs
Normal file
@@ -0,0 +1,123 @@
|
||||
use crate::DataProjection;
|
||||
use domain::{
|
||||
BroadcastPort, ConfigRepository, DomainEvent, Layout, Value, WidgetConfig, WidgetState,
|
||||
};
|
||||
use std::sync::Arc;
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
pub async fn handle_event<C, B>(
|
||||
event: DomainEvent,
|
||||
config: &Arc<C>,
|
||||
broadcaster: &Arc<B>,
|
||||
projection: &Arc<DataProjection>,
|
||||
) where
|
||||
C: ConfigRepository,
|
||||
C::Error: std::fmt::Display,
|
||||
B: BroadcastPort,
|
||||
B::Error: std::fmt::Display,
|
||||
{
|
||||
match event {
|
||||
DomainEvent::LayoutChanged { layout } => {
|
||||
handle_layout_changed(&layout, config, broadcaster, projection).await;
|
||||
}
|
||||
DomainEvent::WebhookDataReceived { source_id, data } => {
|
||||
handle_webhook_data(source_id, &data, config, broadcaster, projection).await;
|
||||
}
|
||||
DomainEvent::ThemeChanged { theme } => {
|
||||
if let Err(e) = broadcaster.push_theme_update(&theme).await {
|
||||
error!(error = %e, "failed to push theme update");
|
||||
}
|
||||
info!("theme changed, pushed update to clients");
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_layout_changed<C, B>(
|
||||
layout: &Layout,
|
||||
config: &Arc<C>,
|
||||
broadcaster: &Arc<B>,
|
||||
projection: &Arc<DataProjection>,
|
||||
) where
|
||||
C: ConfigRepository,
|
||||
C::Error: std::fmt::Display,
|
||||
B: BroadcastPort,
|
||||
B::Error: std::fmt::Display,
|
||||
{
|
||||
let widgets = match config.list_widgets().await {
|
||||
Ok(w) => w,
|
||||
Err(e) => {
|
||||
error!(error = %e, "failed to fetch widgets for screen update");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let mut widget_states = Vec::new();
|
||||
for w in &widgets {
|
||||
if let Some(s) = projection.get_state(w.id).await {
|
||||
widget_states.push((w.id, w.display_hint.clone(), s));
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(e) = broadcaster.push_screen_update(layout, &widget_states).await {
|
||||
error!(error = %e, "failed to push screen update");
|
||||
}
|
||||
|
||||
info!("layout changed, pushed screen update to clients");
|
||||
}
|
||||
|
||||
async fn handle_webhook_data<C, B>(
|
||||
source_id: u16,
|
||||
data: &Value,
|
||||
config: &Arc<C>,
|
||||
broadcaster: &Arc<B>,
|
||||
projection: &Arc<DataProjection>,
|
||||
) where
|
||||
C: ConfigRepository,
|
||||
C::Error: std::fmt::Display,
|
||||
B: BroadcastPort,
|
||||
B::Error: std::fmt::Display,
|
||||
{
|
||||
let widgets = match config.list_widgets().await {
|
||||
Ok(w) => w,
|
||||
Err(e) => {
|
||||
error!(error = %e, "failed to fetch widgets for webhook");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let changed = apply_and_broadcast(source_id, data, &widgets, broadcaster, projection).await;
|
||||
if !changed.is_empty() {
|
||||
info!(source_id, count = changed.len(), "webhook data pushed");
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn apply_and_broadcast<B>(
|
||||
source_id: u16,
|
||||
data: &Value,
|
||||
widgets: &[WidgetConfig],
|
||||
broadcaster: &Arc<B>,
|
||||
projection: &Arc<DataProjection>,
|
||||
) -> Vec<(u16, WidgetState)>
|
||||
where
|
||||
B: BroadcastPort,
|
||||
B::Error: std::fmt::Display,
|
||||
{
|
||||
let changed: Vec<(u16, WidgetState)> =
|
||||
projection.apply_poll_result(source_id, data, widgets).await;
|
||||
|
||||
if !changed.is_empty() {
|
||||
let with_hints: Vec<_> = changed
|
||||
.iter()
|
||||
.filter_map(|(id, state)| {
|
||||
let hint = widgets.iter().find(|w| w.id == *id)?.display_hint.clone();
|
||||
Some((*id, hint, state.clone()))
|
||||
})
|
||||
.collect();
|
||||
if let Err(e) = broadcaster.push_data_update(&with_hints).await {
|
||||
warn!(error = %e, "failed to push update");
|
||||
}
|
||||
}
|
||||
|
||||
changed
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
pub mod auth_service;
|
||||
mod config_service;
|
||||
mod data_projection;
|
||||
pub mod event_service;
|
||||
pub mod polling_service;
|
||||
|
||||
pub use config_service::ConfigService;
|
||||
|
||||
@@ -130,7 +130,20 @@ async fn poll_and_broadcast<C, B, P, F>(
|
||||
return;
|
||||
}
|
||||
};
|
||||
broadcast_changes(source, &result, &widgets, broadcaster, projection, config).await;
|
||||
let changed = crate::event_service::apply_and_broadcast(
|
||||
source.id,
|
||||
&result,
|
||||
&widgets,
|
||||
broadcaster,
|
||||
projection,
|
||||
)
|
||||
.await;
|
||||
if !changed.is_empty() {
|
||||
if let Err(e) = config.save_widget_states(&changed).await {
|
||||
warn!(error = %e, "failed to cache widget states");
|
||||
}
|
||||
info!(source = %source.name, count = changed.len(), "pushed widget updates");
|
||||
}
|
||||
}
|
||||
|
||||
async fn poll_loop<C, B, P, F>(
|
||||
@@ -176,55 +189,25 @@ async fn poll_loop<C, B, P, F>(
|
||||
last_refresh = tokio::time::Instant::now();
|
||||
}
|
||||
|
||||
broadcast_changes(
|
||||
&source,
|
||||
let changed = crate::event_service::apply_and_broadcast(
|
||||
source.id,
|
||||
&result,
|
||||
&widgets,
|
||||
&broadcaster,
|
||||
&projection,
|
||||
&config,
|
||||
)
|
||||
.await;
|
||||
if !changed.is_empty() {
|
||||
if let Err(e) = config.save_widget_states(&changed).await {
|
||||
warn!(error = %e, "failed to cache widget states");
|
||||
}
|
||||
info!(source = %source.name, count = changed.len(), "pushed widget updates");
|
||||
}
|
||||
|
||||
tokio::time::sleep(interval).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn broadcast_changes<C, B>(
|
||||
source: &DataSource,
|
||||
result: &Value,
|
||||
widgets: &[WidgetConfig],
|
||||
broadcaster: &Arc<B>,
|
||||
projection: &Arc<DataProjection>,
|
||||
config: &Arc<C>,
|
||||
) where
|
||||
C: WidgetStateCache,
|
||||
C::Error: std::fmt::Display,
|
||||
B: BroadcastPort,
|
||||
B::Error: std::fmt::Display,
|
||||
{
|
||||
let changed: Vec<(u16, WidgetState)> = projection
|
||||
.apply_poll_result(source.id, result, widgets)
|
||||
.await;
|
||||
|
||||
if !changed.is_empty() {
|
||||
let with_hints: Vec<_> = changed
|
||||
.iter()
|
||||
.filter_map(|(id, state)| {
|
||||
let hint = widgets.iter().find(|w| w.id == *id)?.display_hint.clone();
|
||||
Some((*id, hint, state.clone()))
|
||||
})
|
||||
.collect();
|
||||
if let Err(e) = broadcaster.push_data_update(&with_hints).await {
|
||||
warn!(error = %e, "failed to push update");
|
||||
}
|
||||
if let Err(e) = config.save_widget_states(&changed).await {
|
||||
warn!(error = %e, "failed to cache widget states");
|
||||
}
|
||||
info!(source = %source.name, count = changed.len(), "pushed widget updates");
|
||||
}
|
||||
}
|
||||
|
||||
async fn broadcast_errors<B>(
|
||||
source: &DataSource,
|
||||
widgets: &[WidgetConfig],
|
||||
|
||||
Reference in New Issue
Block a user