From 2fa118570f102b3aaddbe787814e5685b96b6829 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Fri, 12 Jun 2026 01:08:11 +0200 Subject: [PATCH] =?UTF-8?q?feat(export):=20stream=5Fentries=20=E2=80=94=20?= =?UTF-8?q?CSV/JSON=20streaming=20via=20BoxStream?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crates/adapters/export/Cargo.toml | 3 + crates/adapters/export/src/lib.rs | 128 +++++++++++++++--------- crates/adapters/export/src/tests/lib.rs | 77 ++++++++------ 3 files changed, 127 insertions(+), 81 deletions(-) diff --git a/crates/adapters/export/Cargo.toml b/crates/adapters/export/Cargo.toml index 85b3a80..2b10cf4 100644 --- a/crates/adapters/export/Cargo.toml +++ b/crates/adapters/export/Cargo.toml @@ -8,6 +8,9 @@ domain = { workspace = true } async-trait = { workspace = true } serde_json = { workspace = true } chrono = { workspace = true } +futures = { workspace = true } +bytes = { workspace = true } +async-stream = { workspace = true } [dev-dependencies] uuid = { workspace = true } diff --git a/crates/adapters/export/src/lib.rs b/crates/adapters/export/src/lib.rs index f2b22d8..516a1d9 100644 --- a/crates/adapters/export/src/lib.rs +++ b/crates/adapters/export/src/lib.rs @@ -1,51 +1,89 @@ -use async_trait::async_trait; +use bytes::Bytes; use domain::{ errors::DomainError, models::{DiaryEntry, ExportFormat}, ports::DiaryExporter, }; +use futures::stream::BoxStream; pub struct ExportAdapter; -#[async_trait] impl DiaryExporter for ExportAdapter { - async fn serialize_entries( + fn stream_entries( &self, - entries: &[DiaryEntry], + stream: BoxStream<'static, Result>, format: ExportFormat, - ) -> Result, DomainError> { + ) -> BoxStream<'static, Result> { match format { - ExportFormat::Csv => serialize_csv(entries), - ExportFormat::Json => serialize_json(entries), + ExportFormat::Csv => stream_csv(stream), + ExportFormat::Json => stream_json(stream), } } } -fn serialize_csv(entries: &[DiaryEntry]) -> Result, DomainError> { - let mut out = - String::from("title,year,director,rating,comment,watched_at,external_metadata_id\n"); - for e in entries { - let title = csv_escape(e.movie().title().value()); - let year = e.movie().release_year().value(); - let director = e.movie().director().map(csv_escape).unwrap_or_default(); - let rating = e.review().rating().value(); - let comment = e - .review() - .comment() - .map(|c| csv_escape(c.value())) - .unwrap_or_default(); - let watched_at = e.review().watched_at().format("%Y-%m-%d"); - let ext_id = e - .movie() - .external_metadata_id() - .map(|id| id.value().to_string()) - .unwrap_or_default(); - out.push_str(&format!( - "{},{},{},{},{},{},{}\n", - title, year, director, rating, comment, watched_at, ext_id - )); - } - Ok(out.into_bytes()) +fn stream_csv( + entries: BoxStream<'static, Result>, +) -> BoxStream<'static, Result> { + use futures::StreamExt; + let header = futures::stream::once(async { + Ok(Bytes::from_static( + b"title,year,director,rating,comment,watched_at,external_metadata_id\n", + )) + }); + let rows = entries.map(|r| r.map(|e| Bytes::from(csv_row(&e)))); + Box::pin(header.chain(rows)) +} + +fn stream_json( + stream: BoxStream<'static, Result>, +) -> BoxStream<'static, Result> { + Box::pin(async_stream::stream! { + futures::pin_mut!(stream); + let mut is_first = true; + while let Some(r) = futures::StreamExt::next(&mut stream).await { + match r { + Err(e) => { yield Err(e); return; } + Ok(entry) => { + let json = serde_json::to_string(&entry_to_json(&entry)) + .map_err(|e| DomainError::InfrastructureError(e.to_string())); + let json = match json { + Ok(s) => s, + Err(e) => { yield Err(e); return; } + }; + let prefix = if is_first { "[" } else { "," }; + is_first = false; + yield Ok(Bytes::from(format!("{}{}", prefix, json))); + } + } + } + if is_first { + yield Ok(Bytes::from_static(b"[]")); + } else { + yield Ok(Bytes::from_static(b"]")); + } + }) +} + +fn csv_row(e: &DiaryEntry) -> String { + let title = csv_escape(e.movie().title().value()); + let year = e.movie().release_year().value(); + let director = e.movie().director().map(csv_escape).unwrap_or_default(); + let rating = e.review().rating().value(); + let comment = e + .review() + .comment() + .map(|c| csv_escape(c.value())) + .unwrap_or_default(); + let watched_at = e.review().watched_at().format("%Y-%m-%d"); + let ext_id = e + .movie() + .external_metadata_id() + .map(|id| id.value().to_string()) + .unwrap_or_default(); + format!( + "{},{},{},{},{},{},{}\n", + title, year, director, rating, comment, watched_at, ext_id + ) } fn csv_escape(s: &str) -> String { @@ -56,22 +94,16 @@ fn csv_escape(s: &str) -> String { } } -fn serialize_json(entries: &[DiaryEntry]) -> Result, DomainError> { - let arr: Vec = entries - .iter() - .map(|e| { - serde_json::json!({ - "title": e.movie().title().value(), - "year": e.movie().release_year().value(), - "director": e.movie().director(), - "rating": e.review().rating().value(), - "comment": e.review().comment().map(|c| c.value()), - "watched_at": e.review().watched_at().format("%Y-%m-%d").to_string(), - "external_metadata_id": e.movie().external_metadata_id().map(|id| id.value()), - }) - }) - .collect(); - serde_json::to_vec_pretty(&arr).map_err(|e| DomainError::InfrastructureError(e.to_string())) +fn entry_to_json(e: &DiaryEntry) -> serde_json::Value { + serde_json::json!({ + "title": e.movie().title().value(), + "year": e.movie().release_year().value(), + "director": e.movie().director(), + "rating": e.review().rating().value(), + "comment": e.review().comment().map(|c| c.value().to_string()), + "watched_at": e.review().watched_at().format("%Y-%m-%d").to_string(), + "external_metadata_id": e.movie().external_metadata_id().map(|id| id.value().to_string()), + }) } #[cfg(test)] diff --git a/crates/adapters/export/src/tests/lib.rs b/crates/adapters/export/src/tests/lib.rs index 5d1960b..3b936eb 100644 --- a/crates/adapters/export/src/tests/lib.rs +++ b/crates/adapters/export/src/tests/lib.rs @@ -5,6 +5,25 @@ use domain::{ value_objects::{ExternalMetadataId, MovieTitle, Rating, ReleaseYear}, }; +async fn collect_stream( + stream: futures::stream::BoxStream<'static, Result>, +) -> Vec { + use futures::StreamExt; + let mut out = Vec::new(); + futures::pin_mut!(stream); + while let Some(chunk) = stream.next().await { + out.extend_from_slice(&chunk.unwrap()); + } + out +} + +fn entry_stream( + entries: Vec, +) -> futures::stream::BoxStream<'static, Result> +{ + Box::pin(futures::stream::iter(entries.into_iter().map(Ok))) +} + fn make_entry( title: &str, year: u16, @@ -55,10 +74,8 @@ async fn csv_has_header_and_one_row() { 5, Some("great"), ); - let bytes = adapter - .serialize_entries(&[entry], ExportFormat::Csv) - .await - .unwrap(); + let bytes = + collect_stream(adapter.stream_entries(entry_stream(vec![entry]), ExportFormat::Csv)).await; let text = String::from_utf8(bytes).unwrap(); assert!( text.starts_with("title,year,director,rating,comment,watched_at,external_metadata_id\n") @@ -75,10 +92,8 @@ async fn csv_has_header_and_one_row() { async fn csv_escapes_commas_in_title() { let adapter = ExportAdapter; let entry = make_entry("Tár, A Film", 2022, None, 4, None); - let bytes = adapter - .serialize_entries(&[entry], ExportFormat::Csv) - .await - .unwrap(); + let bytes = + collect_stream(adapter.stream_entries(entry_stream(vec![entry]), ExportFormat::Csv)).await; let text = String::from_utf8(bytes).unwrap(); assert!(text.contains("\"Tár, A Film\"")); } @@ -87,10 +102,8 @@ async fn csv_escapes_commas_in_title() { async fn json_is_valid_array() { let adapter = ExportAdapter; let entry = make_entry("Dune", 2021, Some("Denis Villeneuve"), 5, None); - let bytes = adapter - .serialize_entries(&[entry], ExportFormat::Json) - .await - .unwrap(); + let bytes = + collect_stream(adapter.stream_entries(entry_stream(vec![entry]), ExportFormat::Json)).await; let arr: Vec = serde_json::from_slice(&bytes).unwrap(); assert_eq!(arr.len(), 1); assert_eq!(arr[0]["title"], "Dune"); @@ -104,27 +117,18 @@ async fn json_is_valid_array() { async fn external_metadata_id_included_when_present() { let adapter = ExportAdapter; let entry = make_entry_full("Alien", 1979, None, 5, None, Some("tt0078748")); - let bytes = adapter - .serialize_entries(&[entry], ExportFormat::Json) - .await - .unwrap(); + let bytes = + collect_stream(adapter.stream_entries(entry_stream(vec![entry]), ExportFormat::Json)).await; let arr: Vec = serde_json::from_slice(&bytes).unwrap(); assert_eq!(arr[0]["external_metadata_id"], "tt0078748"); - let bytes = adapter - .serialize_entries( - &[make_entry_full( - "Alien", - 1979, - None, - 5, - None, - Some("tt0078748"), - )], + let bytes = collect_stream( + adapter.stream_entries( + entry_stream(vec![make_entry_full("Alien", 1979, None, 5, None, Some("tt0078748"))]), ExportFormat::Csv, - ) - .await - .unwrap(); + ), + ) + .await; let text = String::from_utf8(bytes).unwrap(); assert!(text.contains("tt0078748")); } @@ -132,13 +136,20 @@ async fn external_metadata_id_included_when_present() { #[tokio::test] async fn empty_entries_returns_csv_header_only() { let adapter = ExportAdapter; - let bytes = adapter - .serialize_entries(&[], ExportFormat::Csv) - .await - .unwrap(); + let bytes = + collect_stream(adapter.stream_entries(entry_stream(vec![]), ExportFormat::Csv)).await; let text = String::from_utf8(bytes).unwrap(); assert_eq!( text, "title,year,director,rating,comment,watched_at,external_metadata_id\n" ); } + +#[tokio::test] +async fn empty_json_is_valid_empty_array() { + let adapter = ExportAdapter; + let bytes = + collect_stream(adapter.stream_entries(entry_stream(vec![]), ExportFormat::Json)).await; + let arr: Vec = serde_json::from_slice(&bytes).unwrap(); + assert!(arr.is_empty()); +}