feat(export): stream_entries — CSV/JSON streaming via BoxStream<Bytes>

This commit is contained in:
2026-06-12 01:08:11 +02:00
parent ded7517a8a
commit 2fa118570f
3 changed files with 127 additions and 81 deletions

View File

@@ -8,6 +8,9 @@ domain = { workspace = true }
async-trait = { workspace = true } async-trait = { workspace = true }
serde_json = { workspace = true } serde_json = { workspace = true }
chrono = { workspace = true } chrono = { workspace = true }
futures = { workspace = true }
bytes = { workspace = true }
async-stream = { workspace = true }
[dev-dependencies] [dev-dependencies]
uuid = { workspace = true } uuid = { workspace = true }

View File

@@ -1,51 +1,89 @@
use async_trait::async_trait; use bytes::Bytes;
use domain::{ use domain::{
errors::DomainError, errors::DomainError,
models::{DiaryEntry, ExportFormat}, models::{DiaryEntry, ExportFormat},
ports::DiaryExporter, ports::DiaryExporter,
}; };
use futures::stream::BoxStream;
pub struct ExportAdapter; pub struct ExportAdapter;
#[async_trait]
impl DiaryExporter for ExportAdapter { impl DiaryExporter for ExportAdapter {
async fn serialize_entries( fn stream_entries(
&self, &self,
entries: &[DiaryEntry], stream: BoxStream<'static, Result<DiaryEntry, DomainError>>,
format: ExportFormat, format: ExportFormat,
) -> Result<Vec<u8>, DomainError> { ) -> BoxStream<'static, Result<Bytes, DomainError>> {
match format { match format {
ExportFormat::Csv => serialize_csv(entries), ExportFormat::Csv => stream_csv(stream),
ExportFormat::Json => serialize_json(entries), ExportFormat::Json => stream_json(stream),
} }
} }
} }
fn serialize_csv(entries: &[DiaryEntry]) -> Result<Vec<u8>, DomainError> { fn stream_csv(
let mut out = entries: BoxStream<'static, Result<DiaryEntry, DomainError>>,
String::from("title,year,director,rating,comment,watched_at,external_metadata_id\n"); ) -> BoxStream<'static, Result<Bytes, DomainError>> {
for e in entries { use futures::StreamExt;
let title = csv_escape(e.movie().title().value()); let header = futures::stream::once(async {
let year = e.movie().release_year().value(); Ok(Bytes::from_static(
let director = e.movie().director().map(csv_escape).unwrap_or_default(); b"title,year,director,rating,comment,watched_at,external_metadata_id\n",
let rating = e.review().rating().value(); ))
let comment = e });
.review() let rows = entries.map(|r| r.map(|e| Bytes::from(csv_row(&e))));
.comment() Box::pin(header.chain(rows))
.map(|c| csv_escape(c.value())) }
.unwrap_or_default();
let watched_at = e.review().watched_at().format("%Y-%m-%d"); fn stream_json(
let ext_id = e stream: BoxStream<'static, Result<DiaryEntry, DomainError>>,
.movie() ) -> BoxStream<'static, Result<Bytes, DomainError>> {
.external_metadata_id() Box::pin(async_stream::stream! {
.map(|id| id.value().to_string()) futures::pin_mut!(stream);
.unwrap_or_default(); let mut is_first = true;
out.push_str(&format!( while let Some(r) = futures::StreamExt::next(&mut stream).await {
"{},{},{},{},{},{},{}\n", match r {
title, year, director, rating, comment, watched_at, ext_id Err(e) => { yield Err(e); return; }
)); Ok(entry) => {
} let json = serde_json::to_string(&entry_to_json(&entry))
Ok(out.into_bytes()) .map_err(|e| DomainError::InfrastructureError(e.to_string()));
let json = match json {
Ok(s) => s,
Err(e) => { yield Err(e); return; }
};
let prefix = if is_first { "[" } else { "," };
is_first = false;
yield Ok(Bytes::from(format!("{}{}", prefix, json)));
}
}
}
if is_first {
yield Ok(Bytes::from_static(b"[]"));
} else {
yield Ok(Bytes::from_static(b"]"));
}
})
}
fn csv_row(e: &DiaryEntry) -> String {
let title = csv_escape(e.movie().title().value());
let year = e.movie().release_year().value();
let director = e.movie().director().map(csv_escape).unwrap_or_default();
let rating = e.review().rating().value();
let comment = e
.review()
.comment()
.map(|c| csv_escape(c.value()))
.unwrap_or_default();
let watched_at = e.review().watched_at().format("%Y-%m-%d");
let ext_id = e
.movie()
.external_metadata_id()
.map(|id| id.value().to_string())
.unwrap_or_default();
format!(
"{},{},{},{},{},{},{}\n",
title, year, director, rating, comment, watched_at, ext_id
)
} }
fn csv_escape(s: &str) -> String { fn csv_escape(s: &str) -> String {
@@ -56,22 +94,16 @@ fn csv_escape(s: &str) -> String {
} }
} }
fn serialize_json(entries: &[DiaryEntry]) -> Result<Vec<u8>, DomainError> { fn entry_to_json(e: &DiaryEntry) -> serde_json::Value {
let arr: Vec<serde_json::Value> = entries serde_json::json!({
.iter() "title": e.movie().title().value(),
.map(|e| { "year": e.movie().release_year().value(),
serde_json::json!({ "director": e.movie().director(),
"title": e.movie().title().value(), "rating": e.review().rating().value(),
"year": e.movie().release_year().value(), "comment": e.review().comment().map(|c| c.value().to_string()),
"director": e.movie().director(), "watched_at": e.review().watched_at().format("%Y-%m-%d").to_string(),
"rating": e.review().rating().value(), "external_metadata_id": e.movie().external_metadata_id().map(|id| id.value().to_string()),
"comment": e.review().comment().map(|c| c.value()), })
"watched_at": e.review().watched_at().format("%Y-%m-%d").to_string(),
"external_metadata_id": e.movie().external_metadata_id().map(|id| id.value()),
})
})
.collect();
serde_json::to_vec_pretty(&arr).map_err(|e| DomainError::InfrastructureError(e.to_string()))
} }
#[cfg(test)] #[cfg(test)]

View File

@@ -5,6 +5,25 @@ use domain::{
value_objects::{ExternalMetadataId, MovieTitle, Rating, ReleaseYear}, value_objects::{ExternalMetadataId, MovieTitle, Rating, ReleaseYear},
}; };
async fn collect_stream(
stream: futures::stream::BoxStream<'static, Result<bytes::Bytes, domain::errors::DomainError>>,
) -> Vec<u8> {
use futures::StreamExt;
let mut out = Vec::new();
futures::pin_mut!(stream);
while let Some(chunk) = stream.next().await {
out.extend_from_slice(&chunk.unwrap());
}
out
}
fn entry_stream(
entries: Vec<domain::models::DiaryEntry>,
) -> futures::stream::BoxStream<'static, Result<domain::models::DiaryEntry, domain::errors::DomainError>>
{
Box::pin(futures::stream::iter(entries.into_iter().map(Ok)))
}
fn make_entry( fn make_entry(
title: &str, title: &str,
year: u16, year: u16,
@@ -55,10 +74,8 @@ async fn csv_has_header_and_one_row() {
5, 5,
Some("great"), Some("great"),
); );
let bytes = adapter let bytes =
.serialize_entries(&[entry], ExportFormat::Csv) collect_stream(adapter.stream_entries(entry_stream(vec![entry]), ExportFormat::Csv)).await;
.await
.unwrap();
let text = String::from_utf8(bytes).unwrap(); let text = String::from_utf8(bytes).unwrap();
assert!( assert!(
text.starts_with("title,year,director,rating,comment,watched_at,external_metadata_id\n") text.starts_with("title,year,director,rating,comment,watched_at,external_metadata_id\n")
@@ -75,10 +92,8 @@ async fn csv_has_header_and_one_row() {
async fn csv_escapes_commas_in_title() { async fn csv_escapes_commas_in_title() {
let adapter = ExportAdapter; let adapter = ExportAdapter;
let entry = make_entry("Tár, A Film", 2022, None, 4, None); let entry = make_entry("Tár, A Film", 2022, None, 4, None);
let bytes = adapter let bytes =
.serialize_entries(&[entry], ExportFormat::Csv) collect_stream(adapter.stream_entries(entry_stream(vec![entry]), ExportFormat::Csv)).await;
.await
.unwrap();
let text = String::from_utf8(bytes).unwrap(); let text = String::from_utf8(bytes).unwrap();
assert!(text.contains("\"Tár, A Film\"")); assert!(text.contains("\"Tár, A Film\""));
} }
@@ -87,10 +102,8 @@ async fn csv_escapes_commas_in_title() {
async fn json_is_valid_array() { async fn json_is_valid_array() {
let adapter = ExportAdapter; let adapter = ExportAdapter;
let entry = make_entry("Dune", 2021, Some("Denis Villeneuve"), 5, None); let entry = make_entry("Dune", 2021, Some("Denis Villeneuve"), 5, None);
let bytes = adapter let bytes =
.serialize_entries(&[entry], ExportFormat::Json) collect_stream(adapter.stream_entries(entry_stream(vec![entry]), ExportFormat::Json)).await;
.await
.unwrap();
let arr: Vec<serde_json::Value> = serde_json::from_slice(&bytes).unwrap(); let arr: Vec<serde_json::Value> = serde_json::from_slice(&bytes).unwrap();
assert_eq!(arr.len(), 1); assert_eq!(arr.len(), 1);
assert_eq!(arr[0]["title"], "Dune"); assert_eq!(arr[0]["title"], "Dune");
@@ -104,27 +117,18 @@ async fn json_is_valid_array() {
async fn external_metadata_id_included_when_present() { async fn external_metadata_id_included_when_present() {
let adapter = ExportAdapter; let adapter = ExportAdapter;
let entry = make_entry_full("Alien", 1979, None, 5, None, Some("tt0078748")); let entry = make_entry_full("Alien", 1979, None, 5, None, Some("tt0078748"));
let bytes = adapter let bytes =
.serialize_entries(&[entry], ExportFormat::Json) collect_stream(adapter.stream_entries(entry_stream(vec![entry]), ExportFormat::Json)).await;
.await
.unwrap();
let arr: Vec<serde_json::Value> = serde_json::from_slice(&bytes).unwrap(); let arr: Vec<serde_json::Value> = serde_json::from_slice(&bytes).unwrap();
assert_eq!(arr[0]["external_metadata_id"], "tt0078748"); assert_eq!(arr[0]["external_metadata_id"], "tt0078748");
let bytes = adapter let bytes = collect_stream(
.serialize_entries( adapter.stream_entries(
&[make_entry_full( entry_stream(vec![make_entry_full("Alien", 1979, None, 5, None, Some("tt0078748"))]),
"Alien",
1979,
None,
5,
None,
Some("tt0078748"),
)],
ExportFormat::Csv, ExportFormat::Csv,
) ),
.await )
.unwrap(); .await;
let text = String::from_utf8(bytes).unwrap(); let text = String::from_utf8(bytes).unwrap();
assert!(text.contains("tt0078748")); assert!(text.contains("tt0078748"));
} }
@@ -132,13 +136,20 @@ async fn external_metadata_id_included_when_present() {
#[tokio::test] #[tokio::test]
async fn empty_entries_returns_csv_header_only() { async fn empty_entries_returns_csv_header_only() {
let adapter = ExportAdapter; let adapter = ExportAdapter;
let bytes = adapter let bytes =
.serialize_entries(&[], ExportFormat::Csv) collect_stream(adapter.stream_entries(entry_stream(vec![]), ExportFormat::Csv)).await;
.await
.unwrap();
let text = String::from_utf8(bytes).unwrap(); let text = String::from_utf8(bytes).unwrap();
assert_eq!( assert_eq!(
text, text,
"title,year,director,rating,comment,watched_at,external_metadata_id\n" "title,year,director,rating,comment,watched_at,external_metadata_id\n"
); );
} }
#[tokio::test]
async fn empty_json_is_valid_empty_array() {
let adapter = ExportAdapter;
let bytes =
collect_stream(adapter.stream_entries(entry_stream(vec![]), ExportFormat::Json)).await;
let arr: Vec<serde_json::Value> = serde_json::from_slice(&bytes).unwrap();
assert!(arr.is_empty());
}