arch: split ConfigRepository, extract polling, consolidate conversions, decouple protocol
- Value↔JSON: From impls on domain Value behind `json` feature, delete 4 duplicate converters - ConfigRepository split into ConfigRepository (12), UserRepository (3), WidgetStateCache (2) - polling orchestration moved from bootstrap to application::polling_service - WidgetRenderer in client-domain owns scroll/cache, both clients use it - network loop consolidated into client-application::run_connection_loop - protocol crate drops domain dep, Wire↔Domain conversions move to adapters
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
use domain::{AuthPort, ConfigRepository, PasswordHashPort, User};
|
||||
use domain::{AuthPort, PasswordHashPort, User, UserRepository};
|
||||
|
||||
pub enum AuthError<E> {
|
||||
InvalidCredentials,
|
||||
@@ -26,7 +26,7 @@ pub async fn login<C, A, H>(
|
||||
password: &str,
|
||||
) -> Result<String, AuthError<C::Error>>
|
||||
where
|
||||
C: ConfigRepository,
|
||||
C: UserRepository,
|
||||
A: AuthPort,
|
||||
H: PasswordHashPort,
|
||||
{
|
||||
@@ -55,7 +55,7 @@ pub async fn register<C, H>(
|
||||
password: &str,
|
||||
) -> Result<(), AuthError<C::Error>>
|
||||
where
|
||||
C: ConfigRepository,
|
||||
C: UserRepository,
|
||||
H: PasswordHashPort,
|
||||
{
|
||||
let count = config.count_users().await.map_err(AuthError::Repository)?;
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
pub mod auth_service;
|
||||
mod config_service;
|
||||
mod data_projection;
|
||||
pub mod polling_service;
|
||||
|
||||
pub use config_service::ConfigService;
|
||||
pub use data_projection::DataProjection;
|
||||
|
||||
271
crates/application/src/polling_service.rs
Normal file
271
crates/application/src/polling_service.rs
Normal file
@@ -0,0 +1,271 @@
|
||||
use crate::DataProjection;
|
||||
use domain::{
|
||||
BroadcastPort, ConfigRepository, DataSource, Value, WidgetConfig, WidgetError, WidgetState,
|
||||
WidgetStateCache,
|
||||
};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::future::Future;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::task::JoinHandle;
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
const SOURCE_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
|
||||
|
||||
pub async fn run<C, B, P, F>(
|
||||
config: Arc<C>,
|
||||
broadcaster: Arc<B>,
|
||||
projection: Arc<DataProjection>,
|
||||
poller: Arc<P>,
|
||||
) where
|
||||
C: ConfigRepository + WidgetStateCache + Send + Sync + 'static,
|
||||
<C as ConfigRepository>::Error: std::fmt::Display + Send,
|
||||
<C as WidgetStateCache>::Error: std::fmt::Display + Send,
|
||||
B: BroadcastPort + Send + Sync + 'static,
|
||||
B::Error: std::fmt::Display + Send,
|
||||
P: Fn(&DataSource) -> F + Send + Sync + 'static,
|
||||
F: Future<Output = Result<Value, anyhow::Error>> + Send,
|
||||
{
|
||||
let mut running: HashMap<u16, JoinHandle<()>> = HashMap::new();
|
||||
let mut static_done: HashSet<u16> = HashSet::new();
|
||||
|
||||
info!("polling manager started");
|
||||
|
||||
loop {
|
||||
let sources = match config.list_data_sources().await {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
warn!(error = %e, "failed to list data sources");
|
||||
tokio::time::sleep(SOURCE_REFRESH_INTERVAL).await;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let current_ids: Vec<u16> = sources.iter().map(|s| s.id).collect();
|
||||
|
||||
running.retain(|id, handle| {
|
||||
if !current_ids.contains(id) {
|
||||
info!(source_id = id, "stopping poll for removed source");
|
||||
handle.abort();
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
});
|
||||
static_done.retain(|id| current_ids.contains(id));
|
||||
|
||||
for source in &sources {
|
||||
if source.source_type == domain::DataSourceType::Webhook {
|
||||
continue;
|
||||
}
|
||||
|
||||
if source.source_type == domain::DataSourceType::StaticText {
|
||||
if static_done.contains(&source.id) {
|
||||
continue;
|
||||
}
|
||||
poll_and_broadcast(&*poller, source, &config, &broadcaster, &projection).await;
|
||||
static_done.insert(source.id);
|
||||
continue;
|
||||
}
|
||||
|
||||
if running.contains_key(&source.id) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let source_id = source.id;
|
||||
let source = source.clone();
|
||||
let config = config.clone();
|
||||
let broadcaster = broadcaster.clone();
|
||||
let projection = projection.clone();
|
||||
let poller = poller.clone();
|
||||
|
||||
info!(
|
||||
source_id = source.id,
|
||||
name = %source.name,
|
||||
interval_secs = source.poll_interval.as_secs(),
|
||||
"starting poll task"
|
||||
);
|
||||
|
||||
let handle = tokio::spawn(async move {
|
||||
poll_loop(source, config, broadcaster, projection, poller).await;
|
||||
});
|
||||
|
||||
running.insert(source_id, handle);
|
||||
}
|
||||
|
||||
if running.is_empty() && static_done.is_empty() {
|
||||
debug!("no pollable sources, waiting");
|
||||
}
|
||||
|
||||
tokio::time::sleep(SOURCE_REFRESH_INTERVAL).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn poll_and_broadcast<C, B, P, F>(
|
||||
poller: &P,
|
||||
source: &DataSource,
|
||||
config: &Arc<C>,
|
||||
broadcaster: &Arc<B>,
|
||||
projection: &Arc<DataProjection>,
|
||||
) where
|
||||
C: ConfigRepository + WidgetStateCache,
|
||||
<C as ConfigRepository>::Error: std::fmt::Display,
|
||||
<C as WidgetStateCache>::Error: std::fmt::Display,
|
||||
B: BroadcastPort,
|
||||
B::Error: std::fmt::Display,
|
||||
P: Fn(&DataSource) -> F,
|
||||
F: Future<Output = Result<Value, anyhow::Error>>,
|
||||
{
|
||||
let result = match poller(source).await {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
warn!(source = %source.name, error = %e, "poll failed");
|
||||
return;
|
||||
}
|
||||
};
|
||||
let widgets = match config.list_widgets().await {
|
||||
Ok(w) => w,
|
||||
Err(e) => {
|
||||
warn!(error = %e, "failed to fetch widgets");
|
||||
return;
|
||||
}
|
||||
};
|
||||
broadcast_changes(source, &result, &widgets, broadcaster, projection, config).await;
|
||||
}
|
||||
|
||||
async fn poll_loop<C, B, P, F>(
|
||||
source: DataSource,
|
||||
config: Arc<C>,
|
||||
broadcaster: Arc<B>,
|
||||
projection: Arc<DataProjection>,
|
||||
poller: Arc<P>,
|
||||
) where
|
||||
C: ConfigRepository + WidgetStateCache,
|
||||
<C as ConfigRepository>::Error: std::fmt::Display,
|
||||
<C as WidgetStateCache>::Error: std::fmt::Display,
|
||||
B: BroadcastPort,
|
||||
B::Error: std::fmt::Display,
|
||||
P: Fn(&DataSource) -> F,
|
||||
F: Future<Output = Result<Value, anyhow::Error>>,
|
||||
{
|
||||
let interval = source.poll_interval;
|
||||
let mut widgets = match config.list_widgets().await {
|
||||
Ok(w) => w,
|
||||
Err(e) => {
|
||||
warn!(error = %e, "failed to fetch initial widget list");
|
||||
vec![]
|
||||
}
|
||||
};
|
||||
let mut last_refresh = tokio::time::Instant::now();
|
||||
|
||||
loop {
|
||||
let result = match poller(&source).await {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
warn!(source = %source.name, error = %e, "poll failed");
|
||||
broadcast_errors(&source, &widgets, &broadcaster, &projection).await;
|
||||
tokio::time::sleep(interval).await;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if last_refresh.elapsed() >= SOURCE_REFRESH_INTERVAL {
|
||||
if let Ok(w) = config.list_widgets().await {
|
||||
widgets = w;
|
||||
}
|
||||
last_refresh = tokio::time::Instant::now();
|
||||
}
|
||||
|
||||
broadcast_changes(
|
||||
&source,
|
||||
&result,
|
||||
&widgets,
|
||||
&broadcaster,
|
||||
&projection,
|
||||
&config,
|
||||
)
|
||||
.await;
|
||||
|
||||
tokio::time::sleep(interval).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn broadcast_changes<C, B>(
|
||||
source: &DataSource,
|
||||
result: &Value,
|
||||
widgets: &[WidgetConfig],
|
||||
broadcaster: &Arc<B>,
|
||||
projection: &Arc<DataProjection>,
|
||||
config: &Arc<C>,
|
||||
) where
|
||||
C: WidgetStateCache,
|
||||
C::Error: std::fmt::Display,
|
||||
B: BroadcastPort,
|
||||
B::Error: std::fmt::Display,
|
||||
{
|
||||
let changed: Vec<(u16, WidgetState)> = projection
|
||||
.apply_poll_result(source.id, result, widgets)
|
||||
.await;
|
||||
|
||||
if !changed.is_empty() {
|
||||
let with_hints: Vec<_> = changed
|
||||
.iter()
|
||||
.filter_map(|(id, state)| {
|
||||
let hint = widgets.iter().find(|w| w.id == *id)?.display_hint.clone();
|
||||
Some((*id, hint, state.clone()))
|
||||
})
|
||||
.collect();
|
||||
if let Err(e) = broadcaster.push_data_update(&with_hints).await {
|
||||
warn!(error = %e, "failed to push update");
|
||||
}
|
||||
if let Err(e) = config.save_widget_states(&changed).await {
|
||||
warn!(error = %e, "failed to cache widget states");
|
||||
}
|
||||
info!(source = %source.name, count = changed.len(), "pushed widget updates");
|
||||
}
|
||||
}
|
||||
|
||||
async fn broadcast_errors<B>(
|
||||
source: &DataSource,
|
||||
widgets: &[WidgetConfig],
|
||||
broadcaster: &Arc<B>,
|
||||
projection: &Arc<DataProjection>,
|
||||
) where
|
||||
B: BroadcastPort,
|
||||
B::Error: std::fmt::Display,
|
||||
{
|
||||
let affected: Vec<_> = widgets
|
||||
.iter()
|
||||
.filter(|w| w.data_source_id == source.id)
|
||||
.collect();
|
||||
|
||||
if affected.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
let mut error_states = Vec::new();
|
||||
for w in &affected {
|
||||
let mut state = projection
|
||||
.get_state(w.id)
|
||||
.await
|
||||
.unwrap_or_else(|| WidgetState {
|
||||
data: std::collections::BTreeMap::new(),
|
||||
error: None,
|
||||
});
|
||||
state.error = Some(WidgetError::SourceUnavailable);
|
||||
error_states.push((w.id, state));
|
||||
}
|
||||
|
||||
projection.seed(error_states.clone()).await;
|
||||
|
||||
let with_hints: Vec<_> = error_states
|
||||
.iter()
|
||||
.filter_map(|(id, state)| {
|
||||
let hint = affected.iter().find(|w| w.id == *id)?.display_hint.clone();
|
||||
Some((*id, hint, state.clone()))
|
||||
})
|
||||
.collect();
|
||||
if let Err(e) = broadcaster.push_data_update(&with_hints).await {
|
||||
warn!(error = %e, "failed to push error update");
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user