refactor adapters into modular file structure
config-sqlite: split into repository/ (per entity) + serialization/ (per type) + error.rs http-api: split into dto/ (per resource) + routes/ (per resource) tcp-server: split into broadcaster, event_bus, server, error rss: split parser from adapter, external tests media: split error, external tests
This commit is contained in:
@@ -1,150 +1,9 @@
|
||||
use std::sync::Arc;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use domain::{
|
||||
BroadcastPort, EventPublisher, DomainEvent,
|
||||
Layout, WidgetId, WidgetState,
|
||||
};
|
||||
use protocol::{
|
||||
ServerMessage, WidgetDescriptor, WireDisplayHint, WireLayoutNode,
|
||||
encode,
|
||||
};
|
||||
mod error;
|
||||
mod broadcaster;
|
||||
mod event_bus;
|
||||
mod server;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum TcpServerError {
|
||||
Io(std::io::Error),
|
||||
Encode(postcard::Error),
|
||||
}
|
||||
|
||||
impl std::fmt::Display for TcpServerError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
TcpServerError::Io(e) => write!(f, "io: {e}"),
|
||||
TcpServerError::Encode(e) => write!(f, "encode: {e}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TcpBroadcaster {
|
||||
tx: broadcast::Sender<Vec<u8>>,
|
||||
}
|
||||
|
||||
impl TcpBroadcaster {
|
||||
pub fn new(capacity: usize) -> Self {
|
||||
let (tx, _) = broadcast::channel(capacity);
|
||||
Self { tx }
|
||||
}
|
||||
|
||||
pub fn subscribe(&self) -> broadcast::Receiver<Vec<u8>> {
|
||||
self.tx.subscribe()
|
||||
}
|
||||
|
||||
fn send_frame(&self, frame: Vec<u8>) -> Result<(), TcpServerError> {
|
||||
let _ = self.tx.send(frame);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl BroadcastPort for TcpBroadcaster {
|
||||
type Error = TcpServerError;
|
||||
|
||||
async fn push_screen_update(
|
||||
&self,
|
||||
layout: &Layout,
|
||||
widgets: &[(WidgetId, WidgetState)],
|
||||
) -> Result<(), Self::Error> {
|
||||
let wire_layout: WireLayoutNode = (&layout.root).into();
|
||||
let wire_widgets: Vec<WidgetDescriptor> = widgets.iter().map(|(id, state)| {
|
||||
WidgetDescriptor {
|
||||
id: *id,
|
||||
display_hint: WireDisplayHint::IconValue,
|
||||
state: state.into(),
|
||||
}
|
||||
}).collect();
|
||||
|
||||
let msg = ServerMessage::ScreenUpdate {
|
||||
layout: wire_layout,
|
||||
widgets: wire_widgets,
|
||||
};
|
||||
|
||||
let frame = encode(&msg).map_err(TcpServerError::Encode)?;
|
||||
self.send_frame(frame)
|
||||
}
|
||||
|
||||
async fn push_data_update(
|
||||
&self,
|
||||
updates: &[(WidgetId, WidgetState)],
|
||||
) -> Result<(), Self::Error> {
|
||||
let wire_widgets: Vec<WidgetDescriptor> = updates.iter().map(|(id, state)| {
|
||||
WidgetDescriptor {
|
||||
id: *id,
|
||||
display_hint: WireDisplayHint::IconValue,
|
||||
state: state.into(),
|
||||
}
|
||||
}).collect();
|
||||
|
||||
let msg = ServerMessage::DataUpdate {
|
||||
widgets: wire_widgets,
|
||||
};
|
||||
|
||||
let frame = encode(&msg).map_err(TcpServerError::Encode)?;
|
||||
self.send_frame(frame)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TcpEventBus {
|
||||
tx: broadcast::Sender<DomainEvent>,
|
||||
}
|
||||
|
||||
impl TcpEventBus {
|
||||
pub fn new(capacity: usize) -> Self {
|
||||
let (tx, _) = broadcast::channel(capacity);
|
||||
Self { tx }
|
||||
}
|
||||
|
||||
pub fn subscribe(&self) -> broadcast::Receiver<DomainEvent> {
|
||||
self.tx.subscribe()
|
||||
}
|
||||
}
|
||||
|
||||
impl EventPublisher for TcpEventBus {
|
||||
type Error = TcpServerError;
|
||||
|
||||
async fn publish(&self, event: DomainEvent) -> Result<(), Self::Error> {
|
||||
let _ = self.tx.send(event);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run_tcp_server(
|
||||
addr: &str,
|
||||
broadcaster: Arc<TcpBroadcaster>,
|
||||
) -> Result<(), TcpServerError> {
|
||||
let listener = TcpListener::bind(addr).await.map_err(TcpServerError::Io)?;
|
||||
println!("TCP server listening on {addr}");
|
||||
|
||||
loop {
|
||||
let (mut socket, peer) = listener.accept().await.map_err(TcpServerError::Io)?;
|
||||
println!("Client connected: {peer}");
|
||||
|
||||
let mut rx = broadcaster.subscribe();
|
||||
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
match rx.recv().await {
|
||||
Ok(frame) => {
|
||||
if socket.write_all(&frame).await.is_err() {
|
||||
println!("Client disconnected: {peer}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(broadcast::error::RecvError::Closed) => break,
|
||||
Err(broadcast::error::RecvError::Lagged(n)) => {
|
||||
println!("Client {peer} lagged by {n} messages");
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
pub use error::TcpServerError;
|
||||
pub use broadcaster::TcpBroadcaster;
|
||||
pub use event_bus::TcpEventBus;
|
||||
pub use server::run_tcp_server;
|
||||
|
||||
Reference in New Issue
Block a user