diff --git a/Cargo.lock b/Cargo.lock index 0bed358..dbe2585 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -226,9 +226,8 @@ version = "0.1.0" dependencies = [ "anyhow", "application", - "chrono", - "chrono-tz", "config-sqlite", + "data-generators", "domain", "dotenvy", "http-api", @@ -467,6 +466,16 @@ dependencies = [ "cipher", ] +[[package]] +name = "data-generators" +version = "0.1.0" +dependencies = [ + "chrono", + "chrono-tz", + "domain", + "thiserror", +] + [[package]] name = "der" version = "0.7.10" diff --git a/Cargo.toml b/Cargo.toml index c086181..eb2b7c3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ members = [ "crates/adapters/media", "crates/adapters/auth", "crates/adapters/secret-store", + "crates/adapters/data-generators", "crates/api-types", "crates/bootstrap", "crates/client-desktop", @@ -57,5 +58,4 @@ postcard = { version = "1.1", default-features = false, features = ["alloc"] } tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "net", "sync", "time", "io-util"] } tower = "0.5" reqwest = { version = "0.12", features = ["json"] } -chrono = "0.4" -chrono-tz = "0.10" +data-generators = { path = "crates/adapters/data-generators" } diff --git a/crates/adapters/data-generators/Cargo.toml b/crates/adapters/data-generators/Cargo.toml new file mode 100644 index 0000000..21c57b1 --- /dev/null +++ b/crates/adapters/data-generators/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "data-generators" +version = "0.1.0" +edition = "2024" + +[dependencies] +domain.workspace = true +chrono = "0.4" +chrono-tz = "0.10" +thiserror.workspace = true diff --git a/crates/adapters/data-generators/src/lib.rs b/crates/adapters/data-generators/src/lib.rs new file mode 100644 index 0000000..ea83936 --- /dev/null +++ b/crates/adapters/data-generators/src/lib.rs @@ -0,0 +1,59 @@ +use chrono::Utc; +use chrono_tz::Tz; +use domain::{DataSource, DataSourceConfig, DataSourcePort, Value}; +use std::collections::BTreeMap; + +#[derive(Default)] +pub struct ClockGenerator; + +impl ClockGenerator { + pub fn new() -> Self { + Self + } +} + +#[derive(Debug, thiserror::Error)] +pub enum GeneratorError { + #[error("wrong config type for generator")] + WrongConfig, +} + +impl DataSourcePort for ClockGenerator { + type Error = GeneratorError; + + async fn poll(&self, source: &DataSource) -> Result { + let (fmt, tz_name) = match &source.config { + DataSourceConfig::Clock { format, timezone } => (format.as_str(), timezone.as_str()), + _ => ("%H:%M:%S", "UTC"), + }; + let tz: Tz = tz_name.parse().unwrap_or(chrono_tz::UTC); + let now = Utc::now().with_timezone(&tz); + let formatted = now.format(fmt).to_string(); + let mut map = BTreeMap::new(); + map.insert("time".into(), Value::String(formatted)); + Ok(Value::Object(map)) + } +} + +#[derive(Default)] +pub struct StaticTextGenerator; + +impl StaticTextGenerator { + pub fn new() -> Self { + Self + } +} + +impl DataSourcePort for StaticTextGenerator { + type Error = GeneratorError; + + async fn poll(&self, source: &DataSource) -> Result { + let text = match &source.config { + DataSourceConfig::StaticText { text } => text.clone(), + _ => String::new(), + }; + let mut map = BTreeMap::new(); + map.insert("text".into(), Value::String(text)); + Ok(Value::Object(map)) + } +} diff --git a/crates/adapters/http-api/src/routes/webhook.rs b/crates/adapters/http-api/src/routes/webhook.rs index faaaa3d..b4e5bac 100644 --- a/crates/adapters/http-api/src/routes/webhook.rs +++ b/crates/adapters/http-api/src/routes/webhook.rs @@ -2,7 +2,7 @@ use crate::AppState; use axum::extract::{Path, State}; use axum::http::StatusCode; use axum::response::Json; -use domain::{BroadcastPort, ConfigRepository, EventPublisher, WidgetStateReader}; +use domain::{ConfigRepository, DomainEvent, EventPublisher}; type S = State>; @@ -16,9 +16,6 @@ where C::Error: std::fmt::Debug, E: EventPublisher, E::Error: std::fmt::Debug, - W: WidgetStateReader, - B: BroadcastPort, - B::Error: std::fmt::Debug, { let source = state .config @@ -34,37 +31,14 @@ where )); } - let raw = json_to_domain_value(body); - let widgets = state - .config - .list_widgets() + let data = json_to_domain_value(body); + + state + .events + .publish(DomainEvent::WebhookDataReceived { source_id, data }) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")))?; - let layout = state - .config - .get_layout() - .await - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")))?; - - let changed = state - .widget_states - .apply_raw_data(source_id, &raw, &widgets) - .await; - - if !changed.is_empty() - && let Some(l) = &layout - { - let with_hints: Vec<_> = changed - .iter() - .filter_map(|(id, s)| { - let hint = widgets.iter().find(|w| w.id == *id)?.display_hint.clone(); - Some((*id, hint, s.clone())) - }) - .collect(); - let _ = state.broadcaster.push_screen_update(l, &with_hints).await; - } - Ok(StatusCode::OK) } diff --git a/crates/bootstrap/Cargo.toml b/crates/bootstrap/Cargo.toml index ef55dbc..81fc490 100644 --- a/crates/bootstrap/Cargo.toml +++ b/crates/bootstrap/Cargo.toml @@ -19,5 +19,4 @@ anyhow.workspace = true tracing.workspace = true tracing-subscriber.workspace = true dotenvy.workspace = true -chrono.workspace = true -chrono-tz.workspace = true +data-generators.workspace = true diff --git a/crates/bootstrap/src/event_handler.rs b/crates/bootstrap/src/event_handler.rs index 4b2f8a5..ab164bf 100644 --- a/crates/bootstrap/src/event_handler.rs +++ b/crates/bootstrap/src/event_handler.rs @@ -40,6 +40,37 @@ pub async fn run( info!("layout changed, pushed screen update to clients"); } + Ok(DomainEvent::WebhookDataReceived { source_id, data }) => { + let widgets = match config.list_widgets().await { + Ok(w) => w, + Err(e) => { + error!(error = %e, "failed to fetch widgets for webhook"); + continue; + } + }; + + let changed = projection + .apply_poll_result(source_id, &data, &widgets) + .await; + + if !changed.is_empty() { + let with_hints: Vec<_> = changed + .iter() + .filter_map(|(id, state)| { + let hint = widgets.iter().find(|w| w.id == *id)?.display_hint.clone(); + Some((*id, hint, state.clone())) + }) + .collect(); + if let Err(e) = broadcaster.push_data_update(&with_hints).await { + error!(error = %e, "failed to push webhook data update"); + } + info!( + source_id, + count = changed.len(), + "webhook data received, pushed update" + ); + } + } Ok(DomainEvent::ThemeChanged { theme }) => { if let Err(e) = broadcaster.push_theme_update(&theme).await { error!(error = %e, "failed to push theme update"); diff --git a/crates/bootstrap/src/polling.rs b/crates/bootstrap/src/polling.rs index 97dc819..9c88551 100644 --- a/crates/bootstrap/src/polling.rs +++ b/crates/bootstrap/src/polling.rs @@ -1,16 +1,14 @@ use anyhow::Result; use application::DataProjection; -use chrono::Utc; -use chrono_tz::Tz; use config_sqlite::SqliteConfigStore; +use data_generators::{ClockGenerator, StaticTextGenerator}; use domain::{ - BroadcastPort, ConfigRepository, DataSource, DataSourceConfig, DataSourcePort, DataSourceType, - Value, WidgetState, + BroadcastPort, ConfigRepository, DataSource, DataSourcePort, DataSourceType, Value, WidgetState, }; use http_json::HttpJsonAdapter; use media_adapter::MediaAdapter; use rss_adapter::RssAdapter; -use std::collections::{BTreeMap, HashMap}; +use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use tcp_server::TcpBroadcaster; @@ -19,15 +17,63 @@ use tracing::{debug, info, warn}; const SOURCE_REFRESH_INTERVAL: Duration = Duration::from_secs(30); +#[derive(Clone)] +struct Adapters { + http: Arc, + media: Arc, + rss: Arc, + clock: Arc, + static_text: Arc, +} + +impl Adapters { + async fn poll(&self, source: &DataSource) -> Result { + match source.source_type { + DataSourceType::HttpJson | DataSourceType::Weather => self + .http + .poll(source) + .await + .map_err(|e| anyhow::anyhow!("{e}")), + DataSourceType::Media => self + .media + .poll(source) + .await + .map_err(|e| anyhow::anyhow!("{e}")), + DataSourceType::Rss => self + .rss + .poll(source) + .await + .map_err(|e| anyhow::anyhow!("{e}")), + DataSourceType::Clock => self + .clock + .poll(source) + .await + .map_err(|e| anyhow::anyhow!("{e}")), + DataSourceType::StaticText => self + .static_text + .poll(source) + .await + .map_err(|e| anyhow::anyhow!("{e}")), + DataSourceType::Webhook => Err(anyhow::anyhow!( + "webhook sources are push-based, not polled" + )), + } + } +} + pub async fn run( config: Arc, broadcaster: Arc, projection: Arc, _poll_interval_secs: u64, ) -> Result<()> { - let http_adapter = Arc::new(HttpJsonAdapter::new()); - let media_adapter = Arc::new(MediaAdapter::new()); - let rss_adapter = Arc::new(RssAdapter::new()); + let adapters = Adapters { + http: Arc::new(HttpJsonAdapter::new()), + media: Arc::new(MediaAdapter::new()), + rss: Arc::new(RssAdapter::new()), + clock: Arc::new(ClockGenerator::new()), + static_text: Arc::new(StaticTextGenerator::new()), + }; let mut running: HashMap> = HashMap::new(); @@ -64,9 +110,7 @@ pub async fn run( let config = config.clone(); let broadcaster = broadcaster.clone(); let projection = projection.clone(); - let http = http_adapter.clone(); - let media = media_adapter.clone(); - let rss = rss_adapter.clone(); + let adapters = adapters.clone(); info!( source_id = source.id, @@ -76,7 +120,7 @@ pub async fn run( ); let handle = tokio::spawn(async move { - poll_loop(source, config, broadcaster, projection, http, media, rss).await; + poll_loop(source, config, broadcaster, projection, adapters).await; }); running.insert(source_id, handle); @@ -95,16 +139,14 @@ async fn poll_loop( config: Arc, broadcaster: Arc, projection: Arc, - http_adapter: Arc, - media_adapter: Arc, - rss_adapter: Arc, + adapters: Adapters, ) { let interval = source.poll_interval; loop { tokio::time::sleep(interval).await; - let result = match poll_source(&http_adapter, &media_adapter, &rss_adapter, &source).await { + let result = match adapters.poll(&source).await { Ok(v) => v, Err(e) => { warn!(source = %source.name, error = %e, "poll failed"); @@ -139,53 +181,3 @@ async fn poll_loop( } } } - -async fn poll_source( - http_adapter: &HttpJsonAdapter, - media_adapter: &MediaAdapter, - rss_adapter: &RssAdapter, - source: &DataSource, -) -> Result { - match source.source_type { - DataSourceType::HttpJson | DataSourceType::Weather => http_adapter - .poll(source) - .await - .map_err(|e| anyhow::anyhow!("{e}")), - DataSourceType::Media => media_adapter - .poll(source) - .await - .map_err(|e| anyhow::anyhow!("{e}")), - DataSourceType::Rss => rss_adapter - .poll(source) - .await - .map_err(|e| anyhow::anyhow!("{e}")), - DataSourceType::Clock => Ok(generate_clock(&source.config)), - DataSourceType::StaticText => Ok(generate_static_text(&source.config)), - DataSourceType::Webhook => Err(anyhow::anyhow!( - "webhook sources are push-based, not polled" - )), - } -} - -fn generate_clock(config: &DataSourceConfig) -> Value { - let (fmt, tz_name) = match config { - DataSourceConfig::Clock { format, timezone } => (format.as_str(), timezone.as_str()), - _ => ("%H:%M:%S", "UTC"), - }; - let tz: Tz = tz_name.parse().unwrap_or(chrono_tz::UTC); - let now = Utc::now().with_timezone(&tz); - let formatted = now.format(fmt).to_string(); - let mut map = BTreeMap::new(); - map.insert("time".into(), Value::String(formatted)); - Value::Object(map) -} - -fn generate_static_text(config: &DataSourceConfig) -> Value { - let text = match config { - DataSourceConfig::StaticText { text } => text.clone(), - _ => String::new(), - }; - let mut map = BTreeMap::new(); - map.insert("text".into(), Value::String(text)); - Value::Object(map) -} diff --git a/crates/domain/src/events/mod.rs b/crates/domain/src/events/mod.rs index b37aef9..e9410c6 100644 --- a/crates/domain/src/events/mod.rs +++ b/crates/domain/src/events/mod.rs @@ -1,17 +1,43 @@ use crate::entities::{DataSourceId, LayoutPresetId, WidgetId}; -use crate::value_objects::{Layout, ThemeConfig}; +use crate::value_objects::{Layout, ThemeConfig, Value}; #[derive(Debug, Clone)] pub enum DomainEvent { - WidgetCreated { id: WidgetId }, - WidgetUpdated { id: WidgetId }, - WidgetDeleted { id: WidgetId }, - DataSourceAdded { id: DataSourceId }, - DataSourceUpdated { id: DataSourceId }, - DataSourceRemoved { id: DataSourceId }, - LayoutChanged { layout: Layout }, - ThemeChanged { theme: ThemeConfig }, - LayoutPresetSaved { id: LayoutPresetId }, - LayoutPresetLoaded { id: LayoutPresetId }, - LayoutPresetDeleted { id: LayoutPresetId }, + WidgetCreated { + id: WidgetId, + }, + WidgetUpdated { + id: WidgetId, + }, + WidgetDeleted { + id: WidgetId, + }, + DataSourceAdded { + id: DataSourceId, + }, + DataSourceUpdated { + id: DataSourceId, + }, + DataSourceRemoved { + id: DataSourceId, + }, + LayoutChanged { + layout: Layout, + }, + ThemeChanged { + theme: ThemeConfig, + }, + LayoutPresetSaved { + id: LayoutPresetId, + }, + LayoutPresetLoaded { + id: LayoutPresetId, + }, + LayoutPresetDeleted { + id: LayoutPresetId, + }, + WebhookDataReceived { + source_id: DataSourceId, + data: Value, + }, }