add http-json, rss, and media data source adapters
http-json: generic HTTP+JSON polling adapter, converts serde_json to domain Value. 4 tests. rss: XML RSS feed parser, extracts items into Value array. 1 test. media: Navidrome/Subsonic getNowPlaying adapter. 2 tests with fake server.
This commit is contained in:
13
crates/adapters/http-json/Cargo.toml
Normal file
13
crates/adapters/http-json/Cargo.toml
Normal file
@@ -0,0 +1,13 @@
|
||||
[package]
|
||||
name = "http-json"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
domain.workspace = true
|
||||
reqwest.workspace = true
|
||||
serde_json.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
tokio.workspace = true
|
||||
axum.workspace = true
|
||||
68
crates/adapters/http-json/src/lib.rs
Normal file
68
crates/adapters/http-json/src/lib.rs
Normal file
@@ -0,0 +1,68 @@
|
||||
use domain::{DataSource, DataSourcePort, Value};
|
||||
|
||||
pub struct HttpJsonAdapter {
|
||||
client: reqwest::Client,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum HttpJsonError {
|
||||
Request(reqwest::Error),
|
||||
NoUrl,
|
||||
Parse(String),
|
||||
}
|
||||
|
||||
impl std::fmt::Display for HttpJsonError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
HttpJsonError::Request(e) => write!(f, "request: {e}"),
|
||||
HttpJsonError::NoUrl => write!(f, "no url configured"),
|
||||
HttpJsonError::Parse(e) => write!(f, "parse: {e}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl HttpJsonAdapter {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
client: reqwest::Client::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn json_to_value(json: serde_json::Value) -> Value {
|
||||
match json {
|
||||
serde_json::Value::Null => Value::Null,
|
||||
serde_json::Value::Bool(b) => Value::Bool(b),
|
||||
serde_json::Value::Number(n) => Value::Number(n.as_f64().unwrap_or(0.0)),
|
||||
serde_json::Value::String(s) => Value::String(s),
|
||||
serde_json::Value::Array(arr) => {
|
||||
Value::Array(arr.into_iter().map(json_to_value).collect())
|
||||
}
|
||||
serde_json::Value::Object(map) => {
|
||||
Value::Object(map.into_iter().map(|(k, v)| (k, json_to_value(v))).collect())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DataSourcePort for HttpJsonAdapter {
|
||||
type Error = HttpJsonError;
|
||||
|
||||
async fn poll(&self, source: &DataSource) -> Result<Value, Self::Error> {
|
||||
let url = source.config.url.as_ref().ok_or(HttpJsonError::NoUrl)?;
|
||||
|
||||
let mut req = self.client.get(url);
|
||||
|
||||
for (key, val) in &source.config.headers {
|
||||
req = req.header(key, val);
|
||||
}
|
||||
|
||||
if let Some(api_key) = &source.config.api_key {
|
||||
req = req.header("Authorization", format!("Bearer {api_key}"));
|
||||
}
|
||||
|
||||
let resp = req.send().await.map_err(HttpJsonError::Request)?;
|
||||
let json: serde_json::Value = resp.json().await.map_err(HttpJsonError::Request)?;
|
||||
|
||||
Ok(json_to_value(json))
|
||||
}
|
||||
}
|
||||
102
crates/adapters/http-json/tests/http_json_tests.rs
Normal file
102
crates/adapters/http-json/tests/http_json_tests.rs
Normal file
@@ -0,0 +1,102 @@
|
||||
use std::time::Duration;
|
||||
use axum::{Router, routing::get, response::Json};
|
||||
use domain::{DataSource, DataSourceConfig, DataSourcePort, DataSourceType, Value};
|
||||
use http_json::HttpJsonAdapter;
|
||||
|
||||
async fn start_fake_api() -> String {
|
||||
let app = Router::new()
|
||||
.route("/weather", get(|| async {
|
||||
Json(serde_json::json!({
|
||||
"main": {"temp": 5.4, "humidity": 80},
|
||||
"weather": [{"icon": "cloud_rain"}]
|
||||
}))
|
||||
}))
|
||||
.route("/simple", get(|| async {
|
||||
Json(serde_json::json!({"value": "hello"}))
|
||||
}))
|
||||
.route("/not-json", get(|| async { "plain text" }));
|
||||
|
||||
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
tokio::spawn(async move {
|
||||
axum::serve(listener, app).await.unwrap();
|
||||
});
|
||||
format!("http://{addr}")
|
||||
}
|
||||
|
||||
fn make_source(url: String) -> DataSource {
|
||||
DataSource {
|
||||
id: 1,
|
||||
name: "test".into(),
|
||||
source_type: DataSourceType::HttpJson,
|
||||
poll_interval: Duration::from_secs(60),
|
||||
config: DataSourceConfig {
|
||||
url: Some(url),
|
||||
headers: vec![],
|
||||
api_key: None,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn polls_url_and_returns_nested_json_as_value() {
|
||||
let base = start_fake_api().await;
|
||||
let adapter = HttpJsonAdapter::new();
|
||||
let source = make_source(format!("{base}/weather"));
|
||||
|
||||
let result = adapter.poll(&source).await.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
result.get_path("$.main.temp"),
|
||||
Some(&Value::Number(5.4))
|
||||
);
|
||||
assert_eq!(
|
||||
result.get_path("$.main.humidity"),
|
||||
Some(&Value::Number(80.0))
|
||||
);
|
||||
assert_eq!(
|
||||
result.get_path("$.weather[0].icon"),
|
||||
Some(&Value::String("cloud_rain".into()))
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn polls_simple_json() {
|
||||
let base = start_fake_api().await;
|
||||
let adapter = HttpJsonAdapter::new();
|
||||
let source = make_source(format!("{base}/simple"));
|
||||
|
||||
let result = adapter.poll(&source).await.unwrap();
|
||||
assert_eq!(
|
||||
result.get_path("$.value"),
|
||||
Some(&Value::String("hello".into()))
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn returns_error_when_no_url() {
|
||||
let adapter = HttpJsonAdapter::new();
|
||||
let source = DataSource {
|
||||
id: 1,
|
||||
name: "bad".into(),
|
||||
source_type: DataSourceType::HttpJson,
|
||||
poll_interval: Duration::from_secs(60),
|
||||
config: DataSourceConfig {
|
||||
url: None,
|
||||
headers: vec![],
|
||||
api_key: None,
|
||||
},
|
||||
};
|
||||
|
||||
let result = adapter.poll(&source).await;
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn returns_error_on_connection_refused() {
|
||||
let adapter = HttpJsonAdapter::new();
|
||||
let source = make_source("http://127.0.0.1:1".into());
|
||||
|
||||
let result = adapter.poll(&source).await;
|
||||
assert!(result.is_err());
|
||||
}
|
||||
13
crates/adapters/media/Cargo.toml
Normal file
13
crates/adapters/media/Cargo.toml
Normal file
@@ -0,0 +1,13 @@
|
||||
[package]
|
||||
name = "media-adapter"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
domain.workspace = true
|
||||
reqwest.workspace = true
|
||||
serde_json.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
tokio.workspace = true
|
||||
axum.workspace = true
|
||||
158
crates/adapters/media/src/lib.rs
Normal file
158
crates/adapters/media/src/lib.rs
Normal file
@@ -0,0 +1,158 @@
|
||||
use std::collections::BTreeMap;
|
||||
use domain::{DataSource, DataSourcePort, Value};
|
||||
|
||||
pub struct MediaAdapter {
|
||||
client: reqwest::Client,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum MediaError {
|
||||
Request(reqwest::Error),
|
||||
NoUrl,
|
||||
Parse(String),
|
||||
}
|
||||
|
||||
impl std::fmt::Display for MediaError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
MediaError::Request(e) => write!(f, "request: {e}"),
|
||||
MediaError::NoUrl => write!(f, "no url configured"),
|
||||
MediaError::Parse(e) => write!(f, "parse: {e}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl MediaAdapter {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
client: reqwest::Client::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DataSourcePort for MediaAdapter {
|
||||
type Error = MediaError;
|
||||
|
||||
async fn poll(&self, source: &DataSource) -> Result<Value, Self::Error> {
|
||||
let base_url = source.config.url.as_ref().ok_or(MediaError::NoUrl)?;
|
||||
let api_key = source.config.api_key.as_deref().unwrap_or("");
|
||||
|
||||
let url = format!(
|
||||
"{base_url}/rest/getNowPlaying.view?u=kframe&t={api_key}&s=salt&v=1.16.1&c=kframe&f=json"
|
||||
);
|
||||
|
||||
let resp = self.client.get(&url).send().await.map_err(MediaError::Request)?;
|
||||
let json: serde_json::Value = resp.json().await.map_err(MediaError::Request)?;
|
||||
|
||||
let entries = json["subsonic-response"]["nowPlaying"]["entry"]
|
||||
.as_array()
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
|
||||
if entries.is_empty() {
|
||||
let mut result = BTreeMap::new();
|
||||
result.insert("playing".into(), Value::Bool(false));
|
||||
return Ok(Value::Object(result));
|
||||
}
|
||||
|
||||
let entry = &entries[0];
|
||||
let mut result = BTreeMap::new();
|
||||
result.insert("playing".into(), Value::Bool(true));
|
||||
result.insert("title".into(), Value::String(
|
||||
entry["title"].as_str().unwrap_or("Unknown").into()
|
||||
));
|
||||
result.insert("artist".into(), Value::String(
|
||||
entry["artist"].as_str().unwrap_or("Unknown").into()
|
||||
));
|
||||
result.insert("album".into(), Value::String(
|
||||
entry["album"].as_str().unwrap_or("Unknown").into()
|
||||
));
|
||||
|
||||
if let Some(duration) = entry["duration"].as_u64() {
|
||||
result.insert("duration".into(), Value::Number(duration as f64));
|
||||
}
|
||||
|
||||
Ok(Value::Object(result))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::time::Duration;
|
||||
use domain::{DataSourceConfig, DataSourceType};
|
||||
|
||||
fn subsonic_response(playing: bool) -> serde_json::Value {
|
||||
if playing {
|
||||
serde_json::json!({
|
||||
"subsonic-response": {
|
||||
"status": "ok",
|
||||
"nowPlaying": {
|
||||
"entry": [{
|
||||
"title": "Believer",
|
||||
"artist": "Imagine Dragons",
|
||||
"album": "Evolve",
|
||||
"duration": 204
|
||||
}]
|
||||
}
|
||||
}
|
||||
})
|
||||
} else {
|
||||
serde_json::json!({
|
||||
"subsonic-response": {
|
||||
"status": "ok",
|
||||
"nowPlaying": {}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
async fn start_fake_subsonic(playing: bool) -> String {
|
||||
let app = axum::Router::new()
|
||||
.route("/rest/getNowPlaying.view", axum::routing::get(move || async move {
|
||||
axum::response::Json(subsonic_response(playing))
|
||||
}));
|
||||
|
||||
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
tokio::spawn(async move { axum::serve(listener, app).await.unwrap() });
|
||||
format!("http://{addr}")
|
||||
}
|
||||
|
||||
fn make_source(url: String) -> DataSource {
|
||||
DataSource {
|
||||
id: 1,
|
||||
name: "navidrome".into(),
|
||||
source_type: DataSourceType::Media,
|
||||
poll_interval: Duration::from_secs(5),
|
||||
config: DataSourceConfig {
|
||||
url: Some(url),
|
||||
headers: vec![],
|
||||
api_key: Some("testtoken".into()),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn returns_now_playing_info() {
|
||||
let base = start_fake_subsonic(true).await;
|
||||
let adapter = MediaAdapter::new();
|
||||
let source = make_source(base);
|
||||
|
||||
let result = adapter.poll(&source).await.unwrap();
|
||||
|
||||
assert_eq!(result.get_path("$.playing"), Some(&Value::Bool(true)));
|
||||
assert_eq!(result.get_path("$.title"), Some(&Value::String("Believer".into())));
|
||||
assert_eq!(result.get_path("$.artist"), Some(&Value::String("Imagine Dragons".into())));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn returns_not_playing_when_empty() {
|
||||
let base = start_fake_subsonic(false).await;
|
||||
let adapter = MediaAdapter::new();
|
||||
let source = make_source(base);
|
||||
|
||||
let result = adapter.poll(&source).await.unwrap();
|
||||
assert_eq!(result.get_path("$.playing"), Some(&Value::Bool(false)));
|
||||
}
|
||||
}
|
||||
14
crates/adapters/rss/Cargo.toml
Normal file
14
crates/adapters/rss/Cargo.toml
Normal file
@@ -0,0 +1,14 @@
|
||||
[package]
|
||||
name = "rss-adapter"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
domain.workspace = true
|
||||
reqwest.workspace = true
|
||||
quick-xml = { version = "0.37", features = ["serialize"] }
|
||||
serde.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
tokio.workspace = true
|
||||
axum.workspace = true
|
||||
144
crates/adapters/rss/src/lib.rs
Normal file
144
crates/adapters/rss/src/lib.rs
Normal file
@@ -0,0 +1,144 @@
|
||||
use std::collections::BTreeMap;
|
||||
use domain::{DataSource, DataSourcePort, Value};
|
||||
use quick_xml::events::Event;
|
||||
use quick_xml::Reader;
|
||||
|
||||
pub struct RssAdapter {
|
||||
client: reqwest::Client,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum RssError {
|
||||
Request(reqwest::Error),
|
||||
NoUrl,
|
||||
Parse(String),
|
||||
}
|
||||
|
||||
impl std::fmt::Display for RssError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
RssError::Request(e) => write!(f, "request: {e}"),
|
||||
RssError::NoUrl => write!(f, "no url configured"),
|
||||
RssError::Parse(e) => write!(f, "parse: {e}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl RssAdapter {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
client: reqwest::Client::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_rss(xml: &str) -> Result<Value, RssError> {
|
||||
let mut reader = Reader::from_str(xml);
|
||||
let mut items: Vec<Value> = Vec::new();
|
||||
let mut current_item: Option<BTreeMap<String, Value>> = None;
|
||||
let mut current_tag = String::new();
|
||||
let mut in_channel = false;
|
||||
let mut channel_title = String::new();
|
||||
let mut channel_link = String::new();
|
||||
|
||||
loop {
|
||||
match reader.read_event() {
|
||||
Ok(Event::Start(e)) => {
|
||||
let tag = String::from_utf8_lossy(e.name().as_ref()).to_string();
|
||||
match tag.as_str() {
|
||||
"channel" => in_channel = true,
|
||||
"item" => { current_item = Some(BTreeMap::new()); }
|
||||
_ => current_tag = tag,
|
||||
}
|
||||
}
|
||||
Ok(Event::End(e)) => {
|
||||
let tag = String::from_utf8_lossy(e.name().as_ref()).to_string();
|
||||
if tag == "item" {
|
||||
if let Some(item) = current_item.take() {
|
||||
items.push(Value::Object(item));
|
||||
}
|
||||
}
|
||||
current_tag.clear();
|
||||
}
|
||||
Ok(Event::Text(e)) => {
|
||||
let text = e.unescape().unwrap_or_default().to_string();
|
||||
if !current_tag.is_empty() && !text.trim().is_empty() {
|
||||
if let Some(item) = current_item.as_mut() {
|
||||
item.insert(current_tag.clone(), Value::String(text));
|
||||
} else if in_channel {
|
||||
match current_tag.as_str() {
|
||||
"title" => channel_title = text,
|
||||
"link" => channel_link = text,
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(Event::CData(e)) => {
|
||||
let text = String::from_utf8_lossy(&e).to_string();
|
||||
if !current_tag.is_empty() {
|
||||
if let Some(item) = current_item.as_mut() {
|
||||
item.insert(current_tag.clone(), Value::String(text));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(Event::Eof) => break,
|
||||
Err(e) => return Err(RssError::Parse(format!("{e}"))),
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
let mut result = BTreeMap::new();
|
||||
result.insert("title".into(), Value::String(channel_title));
|
||||
result.insert("link".into(), Value::String(channel_link));
|
||||
result.insert("count".into(), Value::Number(items.len() as f64));
|
||||
result.insert("items".into(), Value::Array(items));
|
||||
|
||||
Ok(Value::Object(result))
|
||||
}
|
||||
|
||||
impl DataSourcePort for RssAdapter {
|
||||
type Error = RssError;
|
||||
|
||||
async fn poll(&self, source: &DataSource) -> Result<Value, Self::Error> {
|
||||
let url = source.config.url.as_ref().ok_or(RssError::NoUrl)?;
|
||||
|
||||
let resp = self.client.get(url).send().await.map_err(RssError::Request)?;
|
||||
let xml = resp.text().await.map_err(RssError::Request)?;
|
||||
|
||||
parse_rss(&xml)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
const SAMPLE_RSS: &str = r#"<?xml version="1.0" encoding="UTF-8"?>
|
||||
<rss version="2.0">
|
||||
<channel>
|
||||
<title>Test Feed</title>
|
||||
<link>https://example.com</link>
|
||||
<item>
|
||||
<title>First Article</title>
|
||||
<description>Description of first article</description>
|
||||
<link>https://example.com/1</link>
|
||||
</item>
|
||||
<item>
|
||||
<title>Second Article</title>
|
||||
<description>Description of second</description>
|
||||
<link>https://example.com/2</link>
|
||||
</item>
|
||||
</channel>
|
||||
</rss>"#;
|
||||
|
||||
#[test]
|
||||
fn parses_rss_into_value() {
|
||||
let result = parse_rss(SAMPLE_RSS).unwrap();
|
||||
|
||||
assert_eq!(result.get_path("$.title"), Some(&Value::String("Test Feed".into())));
|
||||
assert_eq!(result.get_path("$.items[0].title"), Some(&Value::String("First Article".into())));
|
||||
assert_eq!(result.get_path("$.items[1].title"), Some(&Value::String("Second Article".into())));
|
||||
assert_eq!(result.get_path("$.items[0].description"), Some(&Value::String("Description of first article".into())));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user