Compare commits

...

7 Commits

18 changed files with 327 additions and 148 deletions

32
Cargo.lock generated
View File

@@ -314,6 +314,7 @@ name = "application"
version = "0.1.0"
dependencies = [
"async-trait",
"bytes",
"chrono",
"domain",
"futures",
@@ -567,6 +568,28 @@ dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "async-stream"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476"
dependencies = [
"async-stream-impl",
"futures-core",
"pin-project-lite",
]
[[package]]
name = "async-stream-impl"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.117",
]
[[package]]
name = "async-task"
version = "4.7.1"
@@ -1822,9 +1845,12 @@ dependencies = [
name = "export"
version = "0.1.0"
dependencies = [
"async-stream",
"async-trait",
"bytes",
"chrono",
"domain",
"futures",
"serde_json",
"tokio",
"uuid",
@@ -3848,9 +3874,12 @@ name = "postgres"
version = "0.1.0"
dependencies = [
"anyhow",
"async-stream",
"async-trait",
"bytes",
"chrono",
"domain",
"futures",
"serde",
"serde_json",
"sqlx",
@@ -5124,9 +5153,12 @@ name = "sqlite"
version = "0.1.0"
dependencies = [
"anyhow",
"async-stream",
"async-trait",
"bytes",
"chrono",
"domain",
"futures",
"serde",
"serde_json",
"sqlx",

View File

@@ -38,6 +38,7 @@ resolver = "2"
tokio = { version = "1.0", features = ["macros", "net", "rt", "rt-multi-thread", "sync", "time"] }
bytes = "1"
futures = "0.3"
async-stream = "0.3"
dotenvy = "0.15"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"

View File

@@ -8,6 +8,9 @@ domain = { workspace = true }
async-trait = { workspace = true }
serde_json = { workspace = true }
chrono = { workspace = true }
futures = { workspace = true }
bytes = { workspace = true }
async-stream = { workspace = true }
[dev-dependencies]
uuid = { workspace = true }

View File

@@ -1,51 +1,89 @@
use async_trait::async_trait;
use bytes::Bytes;
use domain::{
errors::DomainError,
models::{DiaryEntry, ExportFormat},
ports::DiaryExporter,
};
use futures::stream::BoxStream;
pub struct ExportAdapter;
#[async_trait]
impl DiaryExporter for ExportAdapter {
async fn serialize_entries(
fn stream_entries(
&self,
entries: &[DiaryEntry],
stream: BoxStream<'static, Result<DiaryEntry, DomainError>>,
format: ExportFormat,
) -> Result<Vec<u8>, DomainError> {
) -> BoxStream<'static, Result<Bytes, DomainError>> {
match format {
ExportFormat::Csv => serialize_csv(entries),
ExportFormat::Json => serialize_json(entries),
ExportFormat::Csv => stream_csv(stream),
ExportFormat::Json => stream_json(stream),
}
}
}
fn serialize_csv(entries: &[DiaryEntry]) -> Result<Vec<u8>, DomainError> {
let mut out =
String::from("title,year,director,rating,comment,watched_at,external_metadata_id\n");
for e in entries {
let title = csv_escape(e.movie().title().value());
let year = e.movie().release_year().value();
let director = e.movie().director().map(csv_escape).unwrap_or_default();
let rating = e.review().rating().value();
let comment = e
.review()
.comment()
.map(|c| csv_escape(c.value()))
.unwrap_or_default();
let watched_at = e.review().watched_at().format("%Y-%m-%d");
let ext_id = e
.movie()
.external_metadata_id()
.map(|id| id.value().to_string())
.unwrap_or_default();
out.push_str(&format!(
"{},{},{},{},{},{},{}\n",
title, year, director, rating, comment, watched_at, ext_id
));
}
Ok(out.into_bytes())
fn stream_csv(
entries: BoxStream<'static, Result<DiaryEntry, DomainError>>,
) -> BoxStream<'static, Result<Bytes, DomainError>> {
use futures::StreamExt;
let header = futures::stream::once(async {
Ok(Bytes::from_static(
b"title,year,director,rating,comment,watched_at,external_metadata_id\n",
))
});
let rows = entries.map(|r| r.map(|e| Bytes::from(csv_row(&e))));
Box::pin(header.chain(rows))
}
fn stream_json(
stream: BoxStream<'static, Result<DiaryEntry, DomainError>>,
) -> BoxStream<'static, Result<Bytes, DomainError>> {
Box::pin(async_stream::stream! {
futures::pin_mut!(stream);
let mut is_first = true;
while let Some(r) = futures::StreamExt::next(&mut stream).await {
match r {
Err(e) => { yield Err(e); return; }
Ok(entry) => {
let json = serde_json::to_string(&entry_to_json(&entry))
.map_err(|e| DomainError::InfrastructureError(e.to_string()));
let json = match json {
Ok(s) => s,
Err(e) => { yield Err(e); return; }
};
let prefix = if is_first { "[" } else { "," };
is_first = false;
yield Ok(Bytes::from(format!("{}{}", prefix, json)));
}
}
}
if is_first {
yield Ok(Bytes::from_static(b"[]"));
} else {
yield Ok(Bytes::from_static(b"]"));
}
})
}
fn csv_row(e: &DiaryEntry) -> String {
let title = csv_escape(e.movie().title().value());
let year = e.movie().release_year().value();
let director = e.movie().director().map(csv_escape).unwrap_or_default();
let rating = e.review().rating().value();
let comment = e
.review()
.comment()
.map(|c| csv_escape(c.value()))
.unwrap_or_default();
let watched_at = e.review().watched_at().format("%Y-%m-%d");
let ext_id = e
.movie()
.external_metadata_id()
.map(|id| id.value().to_string())
.unwrap_or_default();
format!(
"{},{},{},{},{},{},{}\n",
title, year, director, rating, comment, watched_at, ext_id
)
}
fn csv_escape(s: &str) -> String {
@@ -56,22 +94,16 @@ fn csv_escape(s: &str) -> String {
}
}
fn serialize_json(entries: &[DiaryEntry]) -> Result<Vec<u8>, DomainError> {
let arr: Vec<serde_json::Value> = entries
.iter()
.map(|e| {
serde_json::json!({
"title": e.movie().title().value(),
"year": e.movie().release_year().value(),
"director": e.movie().director(),
"rating": e.review().rating().value(),
"comment": e.review().comment().map(|c| c.value()),
"watched_at": e.review().watched_at().format("%Y-%m-%d").to_string(),
"external_metadata_id": e.movie().external_metadata_id().map(|id| id.value()),
})
})
.collect();
serde_json::to_vec_pretty(&arr).map_err(|e| DomainError::InfrastructureError(e.to_string()))
fn entry_to_json(e: &DiaryEntry) -> serde_json::Value {
serde_json::json!({
"title": e.movie().title().value(),
"year": e.movie().release_year().value(),
"director": e.movie().director(),
"rating": e.review().rating().value(),
"comment": e.review().comment().map(|c| c.value().to_string()),
"watched_at": e.review().watched_at().format("%Y-%m-%d").to_string(),
"external_metadata_id": e.movie().external_metadata_id().map(|id| id.value().to_string()),
})
}
#[cfg(test)]

View File

@@ -5,6 +5,27 @@ use domain::{
value_objects::{ExternalMetadataId, MovieTitle, Rating, ReleaseYear},
};
async fn collect_stream(
stream: futures::stream::BoxStream<'static, Result<bytes::Bytes, domain::errors::DomainError>>,
) -> Vec<u8> {
use futures::StreamExt;
let mut out = Vec::new();
futures::pin_mut!(stream);
while let Some(chunk) = stream.next().await {
out.extend_from_slice(&chunk.unwrap());
}
out
}
fn entry_stream(
entries: Vec<domain::models::DiaryEntry>,
) -> futures::stream::BoxStream<
'static,
Result<domain::models::DiaryEntry, domain::errors::DomainError>,
> {
Box::pin(futures::stream::iter(entries.into_iter().map(Ok)))
}
fn make_entry(
title: &str,
year: u16,
@@ -55,10 +76,8 @@ async fn csv_has_header_and_one_row() {
5,
Some("great"),
);
let bytes = adapter
.serialize_entries(&[entry], ExportFormat::Csv)
.await
.unwrap();
let bytes =
collect_stream(adapter.stream_entries(entry_stream(vec![entry]), ExportFormat::Csv)).await;
let text = String::from_utf8(bytes).unwrap();
assert!(
text.starts_with("title,year,director,rating,comment,watched_at,external_metadata_id\n")
@@ -75,10 +94,8 @@ async fn csv_has_header_and_one_row() {
async fn csv_escapes_commas_in_title() {
let adapter = ExportAdapter;
let entry = make_entry("Tár, A Film", 2022, None, 4, None);
let bytes = adapter
.serialize_entries(&[entry], ExportFormat::Csv)
.await
.unwrap();
let bytes =
collect_stream(adapter.stream_entries(entry_stream(vec![entry]), ExportFormat::Csv)).await;
let text = String::from_utf8(bytes).unwrap();
assert!(text.contains("\"Tár, A Film\""));
}
@@ -87,10 +104,8 @@ async fn csv_escapes_commas_in_title() {
async fn json_is_valid_array() {
let adapter = ExportAdapter;
let entry = make_entry("Dune", 2021, Some("Denis Villeneuve"), 5, None);
let bytes = adapter
.serialize_entries(&[entry], ExportFormat::Json)
.await
.unwrap();
let bytes =
collect_stream(adapter.stream_entries(entry_stream(vec![entry]), ExportFormat::Json)).await;
let arr: Vec<serde_json::Value> = serde_json::from_slice(&bytes).unwrap();
assert_eq!(arr.len(), 1);
assert_eq!(arr[0]["title"], "Dune");
@@ -104,27 +119,23 @@ async fn json_is_valid_array() {
async fn external_metadata_id_included_when_present() {
let adapter = ExportAdapter;
let entry = make_entry_full("Alien", 1979, None, 5, None, Some("tt0078748"));
let bytes = adapter
.serialize_entries(&[entry], ExportFormat::Json)
.await
.unwrap();
let bytes =
collect_stream(adapter.stream_entries(entry_stream(vec![entry]), ExportFormat::Json)).await;
let arr: Vec<serde_json::Value> = serde_json::from_slice(&bytes).unwrap();
assert_eq!(arr[0]["external_metadata_id"], "tt0078748");
let bytes = adapter
.serialize_entries(
&[make_entry_full(
"Alien",
1979,
None,
5,
None,
Some("tt0078748"),
)],
ExportFormat::Csv,
)
.await
.unwrap();
let bytes = collect_stream(adapter.stream_entries(
entry_stream(vec![make_entry_full(
"Alien",
1979,
None,
5,
None,
Some("tt0078748"),
)]),
ExportFormat::Csv,
))
.await;
let text = String::from_utf8(bytes).unwrap();
assert!(text.contains("tt0078748"));
}
@@ -132,13 +143,20 @@ async fn external_metadata_id_included_when_present() {
#[tokio::test]
async fn empty_entries_returns_csv_header_only() {
let adapter = ExportAdapter;
let bytes = adapter
.serialize_entries(&[], ExportFormat::Csv)
.await
.unwrap();
let bytes =
collect_stream(adapter.stream_entries(entry_stream(vec![]), ExportFormat::Csv)).await;
let text = String::from_utf8(bytes).unwrap();
assert_eq!(
text,
"title,year,director,rating,comment,watched_at,external_metadata_id\n"
);
}
#[tokio::test]
async fn empty_json_is_valid_empty_array() {
let adapter = ExportAdapter;
let bytes =
collect_stream(adapter.stream_entries(entry_stream(vec![]), ExportFormat::Json)).await;
let arr: Vec<serde_json::Value> = serde_json::from_slice(&bytes).unwrap();
assert!(arr.is_empty());
}

View File

@@ -20,3 +20,6 @@ async-trait = { workspace = true }
tokio = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
futures = { workspace = true }
bytes = { workspace = true }
async-stream = { workspace = true }

View File

@@ -8,6 +8,7 @@ use domain::{
ports::DiaryRepository,
value_objects::{MovieId, UserId},
};
use futures::stream::BoxStream;
use sqlx::PgPool;
use crate::models::{DiaryRow, FeedRow, MovieRow, MovieStatsRow, ReviewRow};
@@ -427,6 +428,35 @@ impl DiaryRepository for PostgresDiaryRepository {
rows.into_iter().map(DiaryRow::into_domain).collect()
}
fn stream_user_history(
&self,
user_id: UserId,
) -> BoxStream<'static, Result<DiaryEntry, DomainError>> {
let pool = self.pool.clone();
let uid = user_id.value().to_string();
Box::pin(async_stream::stream! {
let mut rows = sqlx::query_as::<_, DiaryRow>(
"SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path,
r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment,
to_char(r.watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at,
to_char(r.created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at,
r.remote_actor_url
FROM reviews r
INNER JOIN movies m ON m.id = r.movie_id
WHERE r.user_id = $1
ORDER BY r.watched_at DESC",
)
.bind(&uid)
.fetch(&pool);
while let Some(row) = futures::StreamExt::next(&mut rows).await {
yield match row {
Ok(r) => r.into_domain(),
Err(e) => Err(Self::map_err(e)),
};
}
})
}
async fn get_movie_stats(&self, movie_id: &MovieId) -> Result<MovieStats, DomainError> {
let id_str = movie_id.value().to_string();
sqlx::query_as::<_, MovieStatsRow>(

View File

@@ -20,3 +20,6 @@ chrono = { workspace = true }
tracing = { workspace = true }
async-trait = { workspace = true }
tokio = { workspace = true }
futures = { workspace = true }
bytes = { workspace = true }
async-stream = { workspace = true }

View File

@@ -8,6 +8,7 @@ use domain::{
ports::DiaryRepository,
value_objects::{MovieId, UserId},
};
use futures::stream::BoxStream;
use sqlx::SqlitePool;
use crate::models::{DiaryRow, FeedRow, MovieRow, MovieStatsRow, ReviewRow};
@@ -389,6 +390,32 @@ impl DiaryRepository for SqliteDiaryRepository {
rows.into_iter().map(DiaryRow::into_domain).collect()
}
fn stream_user_history(
&self,
user_id: UserId,
) -> BoxStream<'static, Result<DiaryEntry, DomainError>> {
let pool = self.pool.clone();
let uid = user_id.value().to_string();
Box::pin(async_stream::stream! {
let mut rows = sqlx::query_as::<_, DiaryRow>(
"SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path,
r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, r.remote_actor_url
FROM reviews r
INNER JOIN movies m ON m.id = r.movie_id
WHERE r.user_id = ?
ORDER BY r.watched_at DESC",
)
.bind(&uid)
.fetch(&pool);
while let Some(row) = futures::StreamExt::next(&mut rows).await {
yield match row {
Ok(r) => r.into_domain(),
Err(e) => Err(Self::map_err(e)),
};
}
})
}
async fn get_movie_stats(&self, movie_id: &MovieId) -> Result<MovieStats, DomainError> {
let id_str = movie_id.value().to_string();
sqlx::query_as::<_, MovieStatsRow>(

View File

@@ -15,6 +15,7 @@ sha2 = { workspace = true }
rand = { workspace = true }
hex = { workspace = true }
serde_json = { workspace = true }
bytes = { workspace = true }
[features]
xlsx = []

View File

@@ -1,22 +1,21 @@
use std::sync::Arc;
use bytes::Bytes;
use domain::{
errors::DomainError,
ports::{DiaryExporter, DiaryRepository},
value_objects::UserId,
};
use futures::stream::BoxStream;
use crate::diary::queries::ExportQuery;
pub async fn execute(
pub fn execute(
diary: &Arc<dyn DiaryRepository>,
diary_exporter: &Arc<dyn DiaryExporter>,
query: ExportQuery,
) -> Result<Vec<u8>, DomainError> {
let entries = diary
.get_user_history(&UserId::from_uuid(query.user_id))
.await?;
diary_exporter
.serialize_entries(&entries, query.format)
.await
) -> BoxStream<'static, Result<Bytes, DomainError>> {
let user_id = UserId::from_uuid(query.user_id);
let entry_stream = diary.stream_user_history(user_id);
diary_exporter.stream_entries(entry_stream, query.format)
}

View File

@@ -144,6 +144,10 @@ pub trait DiaryRepository: Send + Sync {
) -> Result<Paginated<FeedEntry>, DomainError>;
async fn get_review_history(&self, movie_id: &MovieId) -> Result<ReviewHistory, DomainError>;
async fn get_user_history(&self, user_id: &UserId) -> Result<Vec<DiaryEntry>, DomainError>;
fn stream_user_history(
&self,
user_id: UserId,
) -> futures::stream::BoxStream<'static, Result<DiaryEntry, DomainError>>;
async fn get_movie_stats(&self, movie_id: &MovieId) -> Result<MovieStats, DomainError>;
async fn get_movie_social_feed(
&self,
@@ -253,13 +257,12 @@ pub trait PasswordHasher: Send + Sync {
async fn verify(&self, plain_password: &str, hash: &PasswordHash) -> Result<bool, DomainError>;
}
#[async_trait]
pub trait DiaryExporter: Send + Sync {
async fn serialize_entries(
fn stream_entries(
&self,
entries: &[DiaryEntry],
stream: futures::stream::BoxStream<'static, Result<DiaryEntry, DomainError>>,
format: ExportFormat,
) -> Result<Vec<u8>, DomainError>;
) -> futures::stream::BoxStream<'static, Result<bytes::Bytes, DomainError>>;
}
#[async_trait]

View File

@@ -154,6 +154,13 @@ impl DiaryRepository for FakeDiaryRepository {
Ok(vec![])
}
fn stream_user_history(
&self,
_user_id: UserId,
) -> futures::stream::BoxStream<'static, Result<DiaryEntry, DomainError>> {
Box::pin(futures::stream::empty())
}
async fn get_movie_stats(&self, _movie_id: &MovieId) -> Result<MovieStats, DomainError> {
Ok(MovieStats {
total_count: 0,

View File

@@ -49,6 +49,12 @@ impl DiaryRepository for PanicDiaryRepository {
async fn get_user_history(&self, _: &UserId) -> Result<Vec<DiaryEntry>, DomainError> {
panic!("PanicDiaryRepository called")
}
fn stream_user_history(
&self,
_: UserId,
) -> futures::stream::BoxStream<'static, Result<DiaryEntry, DomainError>> {
panic!("PanicDiaryRepository called")
}
async fn get_movie_stats(&self, _: &MovieId) -> Result<MovieStats, DomainError> {
panic!("PanicDiaryRepository called")
}
@@ -250,13 +256,12 @@ impl PosterFetcherClient for PanicPosterFetcher {
pub struct PanicDiaryExporter;
#[async_trait]
impl DiaryExporter for PanicDiaryExporter {
async fn serialize_entries(
fn stream_entries(
&self,
_: &[DiaryEntry],
_: ExportFormat,
) -> Result<Vec<u8>, DomainError> {
_stream: futures::stream::BoxStream<'static, Result<DiaryEntry, DomainError>>,
_format: ExportFormat,
) -> futures::stream::BoxStream<'static, Result<bytes::Bytes, DomainError>> {
panic!("PanicDiaryExporter called")
}
}

View File

@@ -42,6 +42,7 @@ dotenvy = { workspace = true }
uuid = { workspace = true }
chrono = { workspace = true }
async-trait = { workspace = true }
futures = { workspace = true }
api-types = { workspace = true }
domain = { workspace = true, features = ["test-helpers"] }

View File

@@ -1,9 +1,11 @@
use axum::{
Form, Json,
body::Body,
extract::{Extension, Path, Query, State},
http::StatusCode,
response::{IntoResponse, Redirect},
};
use futures::StreamExt;
use uuid::Uuid;
use application::diary::{
@@ -147,30 +149,29 @@ pub async fn export_diary(
user_id: user.0.value(),
format,
};
match export_diary_uc::execute(
let stream = export_diary_uc::execute(
&state.app_ctx.repos.diary,
&state.app_ctx.services.diary_exporter,
query,
)
.await
{
Ok(bytes) => (
StatusCode::OK,
[
(axum::http::header::CONTENT_TYPE, content_type.to_string()),
(
axum::http::header::CONTENT_DISPOSITION,
format!("attachment; filename=\"{}\"", filename),
),
],
bytes,
)
.into_response(),
Err(e) => {
tracing::error!("export error: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR.into_response()
);
let stream = stream.map(|r| {
if let Err(ref e) = r {
tracing::error!("diary export stream error: {e}");
}
}
r
});
(
StatusCode::OK,
[
(axum::http::header::CONTENT_TYPE, content_type.to_string()),
(
axum::http::header::CONTENT_DISPOSITION,
format!("attachment; filename=\"{}\"", filename),
),
],
Body::from_stream(stream),
)
.into_response()
}
#[utoipa::path(
@@ -314,27 +315,29 @@ pub async fn get_export_html(
user_id: user_id.value(),
format,
};
match export_diary_uc::execute(
let stream = export_diary_uc::execute(
&state.app_ctx.repos.diary,
&state.app_ctx.services.diary_exporter,
query,
);
let stream = stream.map(|r| {
if let Err(ref e) = r {
tracing::error!("diary export stream error: {e}");
}
r
});
(
StatusCode::OK,
[
(axum::http::header::CONTENT_TYPE, content_type.to_string()),
(
axum::http::header::CONTENT_DISPOSITION,
format!("attachment; filename=\"{}\"", filename),
),
],
Body::from_stream(stream),
)
.await
{
Ok(bytes) => (
StatusCode::OK,
[
(axum::http::header::CONTENT_TYPE, content_type.to_string()),
(
axum::http::header::CONTENT_DISPOSITION,
format!("attachment; filename=\"{}\"", filename),
),
],
bytes,
)
.into_response(),
Err(e) => crate::errors::domain_error_response(e),
}
.into_response()
}
pub async fn get_activity_feed_html(

View File

@@ -120,6 +120,12 @@ impl DiaryRepository for Panic {
async fn get_user_history(&self, _: &UserId) -> Result<Vec<DiaryEntry>, DomainError> {
panic!()
}
fn stream_user_history(
&self,
_: UserId,
) -> futures::stream::BoxStream<'static, Result<DiaryEntry, DomainError>> {
panic!()
}
async fn get_movie_stats(
&self,
_: &MovieId,
@@ -379,14 +385,17 @@ impl domain::ports::MovieProfileRepository for Panic {
Ok(vec![])
}
}
#[async_trait::async_trait]
impl domain::ports::DiaryExporter for Panic {
async fn serialize_entries(
fn stream_entries(
&self,
_: &[domain::models::DiaryEntry],
_: domain::models::ExportFormat,
) -> Result<Vec<u8>, domain::errors::DomainError> {
panic!()
_stream: futures::stream::BoxStream<
'static,
Result<domain::models::DiaryEntry, domain::errors::DomainError>,
>,
_format: domain::models::ExportFormat,
) -> futures::stream::BoxStream<'static, Result<bytes::Bytes, domain::errors::DomainError>>
{
panic!("Panic DiaryExporter called")
}
}

View File

@@ -165,14 +165,16 @@ impl domain::ports::UserProfileFieldsRepository for PanicProfileFields {
}
struct PanicExporter;
#[async_trait]
impl domain::ports::DiaryExporter for PanicExporter {
async fn serialize_entries(
fn stream_entries(
&self,
_: &[domain::models::DiaryEntry],
_: domain::models::ExportFormat,
) -> Result<Vec<u8>, DomainError> {
panic!()
_stream: futures::stream::BoxStream<
'static,
Result<domain::models::DiaryEntry, DomainError>,
>,
_format: domain::models::ExportFormat,
) -> futures::stream::BoxStream<'static, Result<bytes::Bytes, DomainError>> {
panic!("PanicExporter::stream_entries")
}
}