Files
k-frame/crates/adapters/tcp-server/src/server.rs

114 lines
3.4 KiB
Rust

use crate::client_tracker::ClientTracker;
use crate::error::TcpServerError;
use domain::{ConfigRepository, WidgetStateReader};
use protocol::{ServerMessage, WidgetDescriptor, WireLayoutNode, encode};
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;
use tokio::sync::broadcast;
use tracing::{error, info, warn};
use crate::broadcaster::TcpBroadcaster;
pub async fn run_tcp_server<C, W>(
addr: &str,
broadcaster: Arc<TcpBroadcaster>,
tracker: Arc<ClientTracker>,
config: Arc<C>,
widget_states: Arc<W>,
) -> Result<(), TcpServerError>
where
C: ConfigRepository + Send + Sync + 'static,
C::Error: std::fmt::Debug + Send,
W: WidgetStateReader + Send + Sync + 'static,
{
let listener = TcpListener::bind(addr).await.map_err(TcpServerError::Io)?;
info!(addr, "TCP server listening");
loop {
let (mut socket, peer) = listener.accept().await.map_err(TcpServerError::Io)?;
info!(%peer, "client connected");
tracker.add(peer);
let tracker = tracker.clone();
let mut rx = broadcaster.subscribe();
let initial_frame = build_initial_frame(&*config, &*widget_states).await;
tokio::spawn(async move {
if let Some(frame) = initial_frame
&& socket.write_all(&frame).await.is_err()
{
info!(%peer, "client disconnected during initial send");
tracker.remove(peer);
return;
}
loop {
match rx.recv().await {
Ok(frame) => {
if socket.write_all(&frame).await.is_err() {
info!(%peer, "client disconnected");
break;
}
}
Err(broadcast::error::RecvError::Closed) => break,
Err(broadcast::error::RecvError::Lagged(n)) => {
warn!(%peer, skipped = n, "client lagged");
}
}
}
tracker.remove(peer);
});
}
}
async fn build_initial_frame<C, W>(config: &C, widget_states: &W) -> Option<Vec<u8>>
where
C: ConfigRepository,
C::Error: std::fmt::Debug,
W: WidgetStateReader,
{
let layout = match config.get_layout().await {
Ok(Some(l)) => l,
Ok(None) => return None,
Err(e) => {
error!(error = ?e, "failed to fetch layout for initial send");
return None;
}
};
let widgets = match config.list_widgets().await {
Ok(w) => w,
Err(e) => {
error!(error = ?e, "failed to fetch widgets for initial send");
return None;
}
};
let wire_layout: WireLayoutNode = (&layout.root).into();
let mut wire_widgets = Vec::new();
for w in &widgets {
if let Some(s) = widget_states.get_widget_state(w.id).await {
wire_widgets.push(WidgetDescriptor {
id: w.id,
display_hint: (&w.display_hint).into(),
state: (&s).into(),
});
}
}
let msg = ServerMessage::ScreenUpdate {
layout: wire_layout,
widgets: wire_widgets,
};
match encode(&msg) {
Ok(frame) => Some(frame),
Err(e) => {
error!(error = %e, "failed to encode initial screen update");
None
}
}
}