Compare commits

...

7 Commits

18 changed files with 327 additions and 148 deletions

32
Cargo.lock generated
View File

@@ -314,6 +314,7 @@ name = "application"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"bytes",
"chrono", "chrono",
"domain", "domain",
"futures", "futures",
@@ -567,6 +568,28 @@ dependencies = [
"windows-sys 0.61.2", "windows-sys 0.61.2",
] ]
[[package]]
name = "async-stream"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476"
dependencies = [
"async-stream-impl",
"futures-core",
"pin-project-lite",
]
[[package]]
name = "async-stream-impl"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.117",
]
[[package]] [[package]]
name = "async-task" name = "async-task"
version = "4.7.1" version = "4.7.1"
@@ -1822,9 +1845,12 @@ dependencies = [
name = "export" name = "export"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"async-stream",
"async-trait", "async-trait",
"bytes",
"chrono", "chrono",
"domain", "domain",
"futures",
"serde_json", "serde_json",
"tokio", "tokio",
"uuid", "uuid",
@@ -3848,9 +3874,12 @@ name = "postgres"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-stream",
"async-trait", "async-trait",
"bytes",
"chrono", "chrono",
"domain", "domain",
"futures",
"serde", "serde",
"serde_json", "serde_json",
"sqlx", "sqlx",
@@ -5124,9 +5153,12 @@ name = "sqlite"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-stream",
"async-trait", "async-trait",
"bytes",
"chrono", "chrono",
"domain", "domain",
"futures",
"serde", "serde",
"serde_json", "serde_json",
"sqlx", "sqlx",

View File

@@ -38,6 +38,7 @@ resolver = "2"
tokio = { version = "1.0", features = ["macros", "net", "rt", "rt-multi-thread", "sync", "time"] } tokio = { version = "1.0", features = ["macros", "net", "rt", "rt-multi-thread", "sync", "time"] }
bytes = "1" bytes = "1"
futures = "0.3" futures = "0.3"
async-stream = "0.3"
dotenvy = "0.15" dotenvy = "0.15"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"

View File

@@ -8,6 +8,9 @@ domain = { workspace = true }
async-trait = { workspace = true } async-trait = { workspace = true }
serde_json = { workspace = true } serde_json = { workspace = true }
chrono = { workspace = true } chrono = { workspace = true }
futures = { workspace = true }
bytes = { workspace = true }
async-stream = { workspace = true }
[dev-dependencies] [dev-dependencies]
uuid = { workspace = true } uuid = { workspace = true }

View File

@@ -1,51 +1,89 @@
use async_trait::async_trait; use bytes::Bytes;
use domain::{ use domain::{
errors::DomainError, errors::DomainError,
models::{DiaryEntry, ExportFormat}, models::{DiaryEntry, ExportFormat},
ports::DiaryExporter, ports::DiaryExporter,
}; };
use futures::stream::BoxStream;
pub struct ExportAdapter; pub struct ExportAdapter;
#[async_trait]
impl DiaryExporter for ExportAdapter { impl DiaryExporter for ExportAdapter {
async fn serialize_entries( fn stream_entries(
&self, &self,
entries: &[DiaryEntry], stream: BoxStream<'static, Result<DiaryEntry, DomainError>>,
format: ExportFormat, format: ExportFormat,
) -> Result<Vec<u8>, DomainError> { ) -> BoxStream<'static, Result<Bytes, DomainError>> {
match format { match format {
ExportFormat::Csv => serialize_csv(entries), ExportFormat::Csv => stream_csv(stream),
ExportFormat::Json => serialize_json(entries), ExportFormat::Json => stream_json(stream),
} }
} }
} }
fn serialize_csv(entries: &[DiaryEntry]) -> Result<Vec<u8>, DomainError> { fn stream_csv(
let mut out = entries: BoxStream<'static, Result<DiaryEntry, DomainError>>,
String::from("title,year,director,rating,comment,watched_at,external_metadata_id\n"); ) -> BoxStream<'static, Result<Bytes, DomainError>> {
for e in entries { use futures::StreamExt;
let title = csv_escape(e.movie().title().value()); let header = futures::stream::once(async {
let year = e.movie().release_year().value(); Ok(Bytes::from_static(
let director = e.movie().director().map(csv_escape).unwrap_or_default(); b"title,year,director,rating,comment,watched_at,external_metadata_id\n",
let rating = e.review().rating().value(); ))
let comment = e });
.review() let rows = entries.map(|r| r.map(|e| Bytes::from(csv_row(&e))));
.comment() Box::pin(header.chain(rows))
.map(|c| csv_escape(c.value())) }
.unwrap_or_default();
let watched_at = e.review().watched_at().format("%Y-%m-%d"); fn stream_json(
let ext_id = e stream: BoxStream<'static, Result<DiaryEntry, DomainError>>,
.movie() ) -> BoxStream<'static, Result<Bytes, DomainError>> {
.external_metadata_id() Box::pin(async_stream::stream! {
.map(|id| id.value().to_string()) futures::pin_mut!(stream);
.unwrap_or_default(); let mut is_first = true;
out.push_str(&format!( while let Some(r) = futures::StreamExt::next(&mut stream).await {
"{},{},{},{},{},{},{}\n", match r {
title, year, director, rating, comment, watched_at, ext_id Err(e) => { yield Err(e); return; }
)); Ok(entry) => {
} let json = serde_json::to_string(&entry_to_json(&entry))
Ok(out.into_bytes()) .map_err(|e| DomainError::InfrastructureError(e.to_string()));
let json = match json {
Ok(s) => s,
Err(e) => { yield Err(e); return; }
};
let prefix = if is_first { "[" } else { "," };
is_first = false;
yield Ok(Bytes::from(format!("{}{}", prefix, json)));
}
}
}
if is_first {
yield Ok(Bytes::from_static(b"[]"));
} else {
yield Ok(Bytes::from_static(b"]"));
}
})
}
fn csv_row(e: &DiaryEntry) -> String {
let title = csv_escape(e.movie().title().value());
let year = e.movie().release_year().value();
let director = e.movie().director().map(csv_escape).unwrap_or_default();
let rating = e.review().rating().value();
let comment = e
.review()
.comment()
.map(|c| csv_escape(c.value()))
.unwrap_or_default();
let watched_at = e.review().watched_at().format("%Y-%m-%d");
let ext_id = e
.movie()
.external_metadata_id()
.map(|id| id.value().to_string())
.unwrap_or_default();
format!(
"{},{},{},{},{},{},{}\n",
title, year, director, rating, comment, watched_at, ext_id
)
} }
fn csv_escape(s: &str) -> String { fn csv_escape(s: &str) -> String {
@@ -56,22 +94,16 @@ fn csv_escape(s: &str) -> String {
} }
} }
fn serialize_json(entries: &[DiaryEntry]) -> Result<Vec<u8>, DomainError> { fn entry_to_json(e: &DiaryEntry) -> serde_json::Value {
let arr: Vec<serde_json::Value> = entries serde_json::json!({
.iter() "title": e.movie().title().value(),
.map(|e| { "year": e.movie().release_year().value(),
serde_json::json!({ "director": e.movie().director(),
"title": e.movie().title().value(), "rating": e.review().rating().value(),
"year": e.movie().release_year().value(), "comment": e.review().comment().map(|c| c.value().to_string()),
"director": e.movie().director(), "watched_at": e.review().watched_at().format("%Y-%m-%d").to_string(),
"rating": e.review().rating().value(), "external_metadata_id": e.movie().external_metadata_id().map(|id| id.value().to_string()),
"comment": e.review().comment().map(|c| c.value()), })
"watched_at": e.review().watched_at().format("%Y-%m-%d").to_string(),
"external_metadata_id": e.movie().external_metadata_id().map(|id| id.value()),
})
})
.collect();
serde_json::to_vec_pretty(&arr).map_err(|e| DomainError::InfrastructureError(e.to_string()))
} }
#[cfg(test)] #[cfg(test)]

View File

@@ -5,6 +5,27 @@ use domain::{
value_objects::{ExternalMetadataId, MovieTitle, Rating, ReleaseYear}, value_objects::{ExternalMetadataId, MovieTitle, Rating, ReleaseYear},
}; };
async fn collect_stream(
stream: futures::stream::BoxStream<'static, Result<bytes::Bytes, domain::errors::DomainError>>,
) -> Vec<u8> {
use futures::StreamExt;
let mut out = Vec::new();
futures::pin_mut!(stream);
while let Some(chunk) = stream.next().await {
out.extend_from_slice(&chunk.unwrap());
}
out
}
fn entry_stream(
entries: Vec<domain::models::DiaryEntry>,
) -> futures::stream::BoxStream<
'static,
Result<domain::models::DiaryEntry, domain::errors::DomainError>,
> {
Box::pin(futures::stream::iter(entries.into_iter().map(Ok)))
}
fn make_entry( fn make_entry(
title: &str, title: &str,
year: u16, year: u16,
@@ -55,10 +76,8 @@ async fn csv_has_header_and_one_row() {
5, 5,
Some("great"), Some("great"),
); );
let bytes = adapter let bytes =
.serialize_entries(&[entry], ExportFormat::Csv) collect_stream(adapter.stream_entries(entry_stream(vec![entry]), ExportFormat::Csv)).await;
.await
.unwrap();
let text = String::from_utf8(bytes).unwrap(); let text = String::from_utf8(bytes).unwrap();
assert!( assert!(
text.starts_with("title,year,director,rating,comment,watched_at,external_metadata_id\n") text.starts_with("title,year,director,rating,comment,watched_at,external_metadata_id\n")
@@ -75,10 +94,8 @@ async fn csv_has_header_and_one_row() {
async fn csv_escapes_commas_in_title() { async fn csv_escapes_commas_in_title() {
let adapter = ExportAdapter; let adapter = ExportAdapter;
let entry = make_entry("Tár, A Film", 2022, None, 4, None); let entry = make_entry("Tár, A Film", 2022, None, 4, None);
let bytes = adapter let bytes =
.serialize_entries(&[entry], ExportFormat::Csv) collect_stream(adapter.stream_entries(entry_stream(vec![entry]), ExportFormat::Csv)).await;
.await
.unwrap();
let text = String::from_utf8(bytes).unwrap(); let text = String::from_utf8(bytes).unwrap();
assert!(text.contains("\"Tár, A Film\"")); assert!(text.contains("\"Tár, A Film\""));
} }
@@ -87,10 +104,8 @@ async fn csv_escapes_commas_in_title() {
async fn json_is_valid_array() { async fn json_is_valid_array() {
let adapter = ExportAdapter; let adapter = ExportAdapter;
let entry = make_entry("Dune", 2021, Some("Denis Villeneuve"), 5, None); let entry = make_entry("Dune", 2021, Some("Denis Villeneuve"), 5, None);
let bytes = adapter let bytes =
.serialize_entries(&[entry], ExportFormat::Json) collect_stream(adapter.stream_entries(entry_stream(vec![entry]), ExportFormat::Json)).await;
.await
.unwrap();
let arr: Vec<serde_json::Value> = serde_json::from_slice(&bytes).unwrap(); let arr: Vec<serde_json::Value> = serde_json::from_slice(&bytes).unwrap();
assert_eq!(arr.len(), 1); assert_eq!(arr.len(), 1);
assert_eq!(arr[0]["title"], "Dune"); assert_eq!(arr[0]["title"], "Dune");
@@ -104,27 +119,23 @@ async fn json_is_valid_array() {
async fn external_metadata_id_included_when_present() { async fn external_metadata_id_included_when_present() {
let adapter = ExportAdapter; let adapter = ExportAdapter;
let entry = make_entry_full("Alien", 1979, None, 5, None, Some("tt0078748")); let entry = make_entry_full("Alien", 1979, None, 5, None, Some("tt0078748"));
let bytes = adapter let bytes =
.serialize_entries(&[entry], ExportFormat::Json) collect_stream(adapter.stream_entries(entry_stream(vec![entry]), ExportFormat::Json)).await;
.await
.unwrap();
let arr: Vec<serde_json::Value> = serde_json::from_slice(&bytes).unwrap(); let arr: Vec<serde_json::Value> = serde_json::from_slice(&bytes).unwrap();
assert_eq!(arr[0]["external_metadata_id"], "tt0078748"); assert_eq!(arr[0]["external_metadata_id"], "tt0078748");
let bytes = adapter let bytes = collect_stream(adapter.stream_entries(
.serialize_entries( entry_stream(vec![make_entry_full(
&[make_entry_full( "Alien",
"Alien", 1979,
1979, None,
None, 5,
5, None,
None, Some("tt0078748"),
Some("tt0078748"), )]),
)], ExportFormat::Csv,
ExportFormat::Csv, ))
) .await;
.await
.unwrap();
let text = String::from_utf8(bytes).unwrap(); let text = String::from_utf8(bytes).unwrap();
assert!(text.contains("tt0078748")); assert!(text.contains("tt0078748"));
} }
@@ -132,13 +143,20 @@ async fn external_metadata_id_included_when_present() {
#[tokio::test] #[tokio::test]
async fn empty_entries_returns_csv_header_only() { async fn empty_entries_returns_csv_header_only() {
let adapter = ExportAdapter; let adapter = ExportAdapter;
let bytes = adapter let bytes =
.serialize_entries(&[], ExportFormat::Csv) collect_stream(adapter.stream_entries(entry_stream(vec![]), ExportFormat::Csv)).await;
.await
.unwrap();
let text = String::from_utf8(bytes).unwrap(); let text = String::from_utf8(bytes).unwrap();
assert_eq!( assert_eq!(
text, text,
"title,year,director,rating,comment,watched_at,external_metadata_id\n" "title,year,director,rating,comment,watched_at,external_metadata_id\n"
); );
} }
#[tokio::test]
async fn empty_json_is_valid_empty_array() {
let adapter = ExportAdapter;
let bytes =
collect_stream(adapter.stream_entries(entry_stream(vec![]), ExportFormat::Json)).await;
let arr: Vec<serde_json::Value> = serde_json::from_slice(&bytes).unwrap();
assert!(arr.is_empty());
}

View File

@@ -20,3 +20,6 @@ async-trait = { workspace = true }
tokio = { workspace = true } tokio = { workspace = true }
serde = { workspace = true, features = ["derive"] } serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true } serde_json = { workspace = true }
futures = { workspace = true }
bytes = { workspace = true }
async-stream = { workspace = true }

View File

@@ -8,6 +8,7 @@ use domain::{
ports::DiaryRepository, ports::DiaryRepository,
value_objects::{MovieId, UserId}, value_objects::{MovieId, UserId},
}; };
use futures::stream::BoxStream;
use sqlx::PgPool; use sqlx::PgPool;
use crate::models::{DiaryRow, FeedRow, MovieRow, MovieStatsRow, ReviewRow}; use crate::models::{DiaryRow, FeedRow, MovieRow, MovieStatsRow, ReviewRow};
@@ -427,6 +428,35 @@ impl DiaryRepository for PostgresDiaryRepository {
rows.into_iter().map(DiaryRow::into_domain).collect() rows.into_iter().map(DiaryRow::into_domain).collect()
} }
fn stream_user_history(
&self,
user_id: UserId,
) -> BoxStream<'static, Result<DiaryEntry, DomainError>> {
let pool = self.pool.clone();
let uid = user_id.value().to_string();
Box::pin(async_stream::stream! {
let mut rows = sqlx::query_as::<_, DiaryRow>(
"SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path,
r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment,
to_char(r.watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at,
to_char(r.created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at,
r.remote_actor_url
FROM reviews r
INNER JOIN movies m ON m.id = r.movie_id
WHERE r.user_id = $1
ORDER BY r.watched_at DESC",
)
.bind(&uid)
.fetch(&pool);
while let Some(row) = futures::StreamExt::next(&mut rows).await {
yield match row {
Ok(r) => r.into_domain(),
Err(e) => Err(Self::map_err(e)),
};
}
})
}
async fn get_movie_stats(&self, movie_id: &MovieId) -> Result<MovieStats, DomainError> { async fn get_movie_stats(&self, movie_id: &MovieId) -> Result<MovieStats, DomainError> {
let id_str = movie_id.value().to_string(); let id_str = movie_id.value().to_string();
sqlx::query_as::<_, MovieStatsRow>( sqlx::query_as::<_, MovieStatsRow>(

View File

@@ -20,3 +20,6 @@ chrono = { workspace = true }
tracing = { workspace = true } tracing = { workspace = true }
async-trait = { workspace = true } async-trait = { workspace = true }
tokio = { workspace = true } tokio = { workspace = true }
futures = { workspace = true }
bytes = { workspace = true }
async-stream = { workspace = true }

View File

@@ -8,6 +8,7 @@ use domain::{
ports::DiaryRepository, ports::DiaryRepository,
value_objects::{MovieId, UserId}, value_objects::{MovieId, UserId},
}; };
use futures::stream::BoxStream;
use sqlx::SqlitePool; use sqlx::SqlitePool;
use crate::models::{DiaryRow, FeedRow, MovieRow, MovieStatsRow, ReviewRow}; use crate::models::{DiaryRow, FeedRow, MovieRow, MovieStatsRow, ReviewRow};
@@ -389,6 +390,32 @@ impl DiaryRepository for SqliteDiaryRepository {
rows.into_iter().map(DiaryRow::into_domain).collect() rows.into_iter().map(DiaryRow::into_domain).collect()
} }
fn stream_user_history(
&self,
user_id: UserId,
) -> BoxStream<'static, Result<DiaryEntry, DomainError>> {
let pool = self.pool.clone();
let uid = user_id.value().to_string();
Box::pin(async_stream::stream! {
let mut rows = sqlx::query_as::<_, DiaryRow>(
"SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path,
r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, r.remote_actor_url
FROM reviews r
INNER JOIN movies m ON m.id = r.movie_id
WHERE r.user_id = ?
ORDER BY r.watched_at DESC",
)
.bind(&uid)
.fetch(&pool);
while let Some(row) = futures::StreamExt::next(&mut rows).await {
yield match row {
Ok(r) => r.into_domain(),
Err(e) => Err(Self::map_err(e)),
};
}
})
}
async fn get_movie_stats(&self, movie_id: &MovieId) -> Result<MovieStats, DomainError> { async fn get_movie_stats(&self, movie_id: &MovieId) -> Result<MovieStats, DomainError> {
let id_str = movie_id.value().to_string(); let id_str = movie_id.value().to_string();
sqlx::query_as::<_, MovieStatsRow>( sqlx::query_as::<_, MovieStatsRow>(

View File

@@ -15,6 +15,7 @@ sha2 = { workspace = true }
rand = { workspace = true } rand = { workspace = true }
hex = { workspace = true } hex = { workspace = true }
serde_json = { workspace = true } serde_json = { workspace = true }
bytes = { workspace = true }
[features] [features]
xlsx = [] xlsx = []

View File

@@ -1,22 +1,21 @@
use std::sync::Arc; use std::sync::Arc;
use bytes::Bytes;
use domain::{ use domain::{
errors::DomainError, errors::DomainError,
ports::{DiaryExporter, DiaryRepository}, ports::{DiaryExporter, DiaryRepository},
value_objects::UserId, value_objects::UserId,
}; };
use futures::stream::BoxStream;
use crate::diary::queries::ExportQuery; use crate::diary::queries::ExportQuery;
pub async fn execute( pub fn execute(
diary: &Arc<dyn DiaryRepository>, diary: &Arc<dyn DiaryRepository>,
diary_exporter: &Arc<dyn DiaryExporter>, diary_exporter: &Arc<dyn DiaryExporter>,
query: ExportQuery, query: ExportQuery,
) -> Result<Vec<u8>, DomainError> { ) -> BoxStream<'static, Result<Bytes, DomainError>> {
let entries = diary let user_id = UserId::from_uuid(query.user_id);
.get_user_history(&UserId::from_uuid(query.user_id)) let entry_stream = diary.stream_user_history(user_id);
.await?; diary_exporter.stream_entries(entry_stream, query.format)
diary_exporter
.serialize_entries(&entries, query.format)
.await
} }

View File

@@ -144,6 +144,10 @@ pub trait DiaryRepository: Send + Sync {
) -> Result<Paginated<FeedEntry>, DomainError>; ) -> Result<Paginated<FeedEntry>, DomainError>;
async fn get_review_history(&self, movie_id: &MovieId) -> Result<ReviewHistory, DomainError>; async fn get_review_history(&self, movie_id: &MovieId) -> Result<ReviewHistory, DomainError>;
async fn get_user_history(&self, user_id: &UserId) -> Result<Vec<DiaryEntry>, DomainError>; async fn get_user_history(&self, user_id: &UserId) -> Result<Vec<DiaryEntry>, DomainError>;
fn stream_user_history(
&self,
user_id: UserId,
) -> futures::stream::BoxStream<'static, Result<DiaryEntry, DomainError>>;
async fn get_movie_stats(&self, movie_id: &MovieId) -> Result<MovieStats, DomainError>; async fn get_movie_stats(&self, movie_id: &MovieId) -> Result<MovieStats, DomainError>;
async fn get_movie_social_feed( async fn get_movie_social_feed(
&self, &self,
@@ -253,13 +257,12 @@ pub trait PasswordHasher: Send + Sync {
async fn verify(&self, plain_password: &str, hash: &PasswordHash) -> Result<bool, DomainError>; async fn verify(&self, plain_password: &str, hash: &PasswordHash) -> Result<bool, DomainError>;
} }
#[async_trait]
pub trait DiaryExporter: Send + Sync { pub trait DiaryExporter: Send + Sync {
async fn serialize_entries( fn stream_entries(
&self, &self,
entries: &[DiaryEntry], stream: futures::stream::BoxStream<'static, Result<DiaryEntry, DomainError>>,
format: ExportFormat, format: ExportFormat,
) -> Result<Vec<u8>, DomainError>; ) -> futures::stream::BoxStream<'static, Result<bytes::Bytes, DomainError>>;
} }
#[async_trait] #[async_trait]

View File

@@ -154,6 +154,13 @@ impl DiaryRepository for FakeDiaryRepository {
Ok(vec![]) Ok(vec![])
} }
fn stream_user_history(
&self,
_user_id: UserId,
) -> futures::stream::BoxStream<'static, Result<DiaryEntry, DomainError>> {
Box::pin(futures::stream::empty())
}
async fn get_movie_stats(&self, _movie_id: &MovieId) -> Result<MovieStats, DomainError> { async fn get_movie_stats(&self, _movie_id: &MovieId) -> Result<MovieStats, DomainError> {
Ok(MovieStats { Ok(MovieStats {
total_count: 0, total_count: 0,

View File

@@ -49,6 +49,12 @@ impl DiaryRepository for PanicDiaryRepository {
async fn get_user_history(&self, _: &UserId) -> Result<Vec<DiaryEntry>, DomainError> { async fn get_user_history(&self, _: &UserId) -> Result<Vec<DiaryEntry>, DomainError> {
panic!("PanicDiaryRepository called") panic!("PanicDiaryRepository called")
} }
fn stream_user_history(
&self,
_: UserId,
) -> futures::stream::BoxStream<'static, Result<DiaryEntry, DomainError>> {
panic!("PanicDiaryRepository called")
}
async fn get_movie_stats(&self, _: &MovieId) -> Result<MovieStats, DomainError> { async fn get_movie_stats(&self, _: &MovieId) -> Result<MovieStats, DomainError> {
panic!("PanicDiaryRepository called") panic!("PanicDiaryRepository called")
} }
@@ -250,13 +256,12 @@ impl PosterFetcherClient for PanicPosterFetcher {
pub struct PanicDiaryExporter; pub struct PanicDiaryExporter;
#[async_trait]
impl DiaryExporter for PanicDiaryExporter { impl DiaryExporter for PanicDiaryExporter {
async fn serialize_entries( fn stream_entries(
&self, &self,
_: &[DiaryEntry], _stream: futures::stream::BoxStream<'static, Result<DiaryEntry, DomainError>>,
_: ExportFormat, _format: ExportFormat,
) -> Result<Vec<u8>, DomainError> { ) -> futures::stream::BoxStream<'static, Result<bytes::Bytes, DomainError>> {
panic!("PanicDiaryExporter called") panic!("PanicDiaryExporter called")
} }
} }

View File

@@ -42,6 +42,7 @@ dotenvy = { workspace = true }
uuid = { workspace = true } uuid = { workspace = true }
chrono = { workspace = true } chrono = { workspace = true }
async-trait = { workspace = true } async-trait = { workspace = true }
futures = { workspace = true }
api-types = { workspace = true } api-types = { workspace = true }
domain = { workspace = true, features = ["test-helpers"] } domain = { workspace = true, features = ["test-helpers"] }

View File

@@ -1,9 +1,11 @@
use axum::{ use axum::{
Form, Json, Form, Json,
body::Body,
extract::{Extension, Path, Query, State}, extract::{Extension, Path, Query, State},
http::StatusCode, http::StatusCode,
response::{IntoResponse, Redirect}, response::{IntoResponse, Redirect},
}; };
use futures::StreamExt;
use uuid::Uuid; use uuid::Uuid;
use application::diary::{ use application::diary::{
@@ -147,30 +149,29 @@ pub async fn export_diary(
user_id: user.0.value(), user_id: user.0.value(),
format, format,
}; };
match export_diary_uc::execute( let stream = export_diary_uc::execute(
&state.app_ctx.repos.diary, &state.app_ctx.repos.diary,
&state.app_ctx.services.diary_exporter, &state.app_ctx.services.diary_exporter,
query, query,
) );
.await let stream = stream.map(|r| {
{ if let Err(ref e) = r {
Ok(bytes) => ( tracing::error!("diary export stream error: {e}");
StatusCode::OK,
[
(axum::http::header::CONTENT_TYPE, content_type.to_string()),
(
axum::http::header::CONTENT_DISPOSITION,
format!("attachment; filename=\"{}\"", filename),
),
],
bytes,
)
.into_response(),
Err(e) => {
tracing::error!("export error: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR.into_response()
} }
} r
});
(
StatusCode::OK,
[
(axum::http::header::CONTENT_TYPE, content_type.to_string()),
(
axum::http::header::CONTENT_DISPOSITION,
format!("attachment; filename=\"{}\"", filename),
),
],
Body::from_stream(stream),
)
.into_response()
} }
#[utoipa::path( #[utoipa::path(
@@ -314,27 +315,29 @@ pub async fn get_export_html(
user_id: user_id.value(), user_id: user_id.value(),
format, format,
}; };
match export_diary_uc::execute( let stream = export_diary_uc::execute(
&state.app_ctx.repos.diary, &state.app_ctx.repos.diary,
&state.app_ctx.services.diary_exporter, &state.app_ctx.services.diary_exporter,
query, query,
);
let stream = stream.map(|r| {
if let Err(ref e) = r {
tracing::error!("diary export stream error: {e}");
}
r
});
(
StatusCode::OK,
[
(axum::http::header::CONTENT_TYPE, content_type.to_string()),
(
axum::http::header::CONTENT_DISPOSITION,
format!("attachment; filename=\"{}\"", filename),
),
],
Body::from_stream(stream),
) )
.await .into_response()
{
Ok(bytes) => (
StatusCode::OK,
[
(axum::http::header::CONTENT_TYPE, content_type.to_string()),
(
axum::http::header::CONTENT_DISPOSITION,
format!("attachment; filename=\"{}\"", filename),
),
],
bytes,
)
.into_response(),
Err(e) => crate::errors::domain_error_response(e),
}
} }
pub async fn get_activity_feed_html( pub async fn get_activity_feed_html(

View File

@@ -120,6 +120,12 @@ impl DiaryRepository for Panic {
async fn get_user_history(&self, _: &UserId) -> Result<Vec<DiaryEntry>, DomainError> { async fn get_user_history(&self, _: &UserId) -> Result<Vec<DiaryEntry>, DomainError> {
panic!() panic!()
} }
fn stream_user_history(
&self,
_: UserId,
) -> futures::stream::BoxStream<'static, Result<DiaryEntry, DomainError>> {
panic!()
}
async fn get_movie_stats( async fn get_movie_stats(
&self, &self,
_: &MovieId, _: &MovieId,
@@ -379,14 +385,17 @@ impl domain::ports::MovieProfileRepository for Panic {
Ok(vec![]) Ok(vec![])
} }
} }
#[async_trait::async_trait]
impl domain::ports::DiaryExporter for Panic { impl domain::ports::DiaryExporter for Panic {
async fn serialize_entries( fn stream_entries(
&self, &self,
_: &[domain::models::DiaryEntry], _stream: futures::stream::BoxStream<
_: domain::models::ExportFormat, 'static,
) -> Result<Vec<u8>, domain::errors::DomainError> { Result<domain::models::DiaryEntry, domain::errors::DomainError>,
panic!() >,
_format: domain::models::ExportFormat,
) -> futures::stream::BoxStream<'static, Result<bytes::Bytes, domain::errors::DomainError>>
{
panic!("Panic DiaryExporter called")
} }
} }

View File

@@ -165,14 +165,16 @@ impl domain::ports::UserProfileFieldsRepository for PanicProfileFields {
} }
struct PanicExporter; struct PanicExporter;
#[async_trait]
impl domain::ports::DiaryExporter for PanicExporter { impl domain::ports::DiaryExporter for PanicExporter {
async fn serialize_entries( fn stream_entries(
&self, &self,
_: &[domain::models::DiaryEntry], _stream: futures::stream::BoxStream<
_: domain::models::ExportFormat, 'static,
) -> Result<Vec<u8>, DomainError> { Result<domain::models::DiaryEntry, DomainError>,
panic!() >,
_format: domain::models::ExportFormat,
) -> futures::stream::BoxStream<'static, Result<bytes::Bytes, DomainError>> {
panic!("PanicExporter::stream_entries")
} }
} }