Compare commits
7 Commits
bf272bf8d9
...
aec5f6b058
| Author | SHA1 | Date | |
|---|---|---|---|
| aec5f6b058 | |||
| d9234ecd11 | |||
| 010ee404c8 | |||
| d4c42f8567 | |||
| 9c44330f14 | |||
| 2fa118570f | |||
| ded7517a8a |
32
Cargo.lock
generated
32
Cargo.lock
generated
@@ -314,6 +314,7 @@ name = "application"
|
|||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"async-trait",
|
"async-trait",
|
||||||
|
"bytes",
|
||||||
"chrono",
|
"chrono",
|
||||||
"domain",
|
"domain",
|
||||||
"futures",
|
"futures",
|
||||||
@@ -567,6 +568,28 @@ dependencies = [
|
|||||||
"windows-sys 0.61.2",
|
"windows-sys 0.61.2",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "async-stream"
|
||||||
|
version = "0.3.6"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476"
|
||||||
|
dependencies = [
|
||||||
|
"async-stream-impl",
|
||||||
|
"futures-core",
|
||||||
|
"pin-project-lite",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "async-stream-impl"
|
||||||
|
version = "0.3.6"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"syn 2.0.117",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "async-task"
|
name = "async-task"
|
||||||
version = "4.7.1"
|
version = "4.7.1"
|
||||||
@@ -1822,9 +1845,12 @@ dependencies = [
|
|||||||
name = "export"
|
name = "export"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"async-stream",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
|
"bytes",
|
||||||
"chrono",
|
"chrono",
|
||||||
"domain",
|
"domain",
|
||||||
|
"futures",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"tokio",
|
"tokio",
|
||||||
"uuid",
|
"uuid",
|
||||||
@@ -3848,9 +3874,12 @@ name = "postgres"
|
|||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
|
"async-stream",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
|
"bytes",
|
||||||
"chrono",
|
"chrono",
|
||||||
"domain",
|
"domain",
|
||||||
|
"futures",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"sqlx",
|
"sqlx",
|
||||||
@@ -5124,9 +5153,12 @@ name = "sqlite"
|
|||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
|
"async-stream",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
|
"bytes",
|
||||||
"chrono",
|
"chrono",
|
||||||
"domain",
|
"domain",
|
||||||
|
"futures",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"sqlx",
|
"sqlx",
|
||||||
|
|||||||
@@ -38,6 +38,7 @@ resolver = "2"
|
|||||||
tokio = { version = "1.0", features = ["macros", "net", "rt", "rt-multi-thread", "sync", "time"] }
|
tokio = { version = "1.0", features = ["macros", "net", "rt", "rt-multi-thread", "sync", "time"] }
|
||||||
bytes = "1"
|
bytes = "1"
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
|
async-stream = "0.3"
|
||||||
dotenvy = "0.15"
|
dotenvy = "0.15"
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
||||||
|
|||||||
@@ -8,6 +8,9 @@ domain = { workspace = true }
|
|||||||
async-trait = { workspace = true }
|
async-trait = { workspace = true }
|
||||||
serde_json = { workspace = true }
|
serde_json = { workspace = true }
|
||||||
chrono = { workspace = true }
|
chrono = { workspace = true }
|
||||||
|
futures = { workspace = true }
|
||||||
|
bytes = { workspace = true }
|
||||||
|
async-stream = { workspace = true }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
uuid = { workspace = true }
|
uuid = { workspace = true }
|
||||||
|
|||||||
@@ -1,30 +1,70 @@
|
|||||||
use async_trait::async_trait;
|
use bytes::Bytes;
|
||||||
use domain::{
|
use domain::{
|
||||||
errors::DomainError,
|
errors::DomainError,
|
||||||
models::{DiaryEntry, ExportFormat},
|
models::{DiaryEntry, ExportFormat},
|
||||||
ports::DiaryExporter,
|
ports::DiaryExporter,
|
||||||
};
|
};
|
||||||
|
use futures::stream::BoxStream;
|
||||||
|
|
||||||
pub struct ExportAdapter;
|
pub struct ExportAdapter;
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl DiaryExporter for ExportAdapter {
|
impl DiaryExporter for ExportAdapter {
|
||||||
async fn serialize_entries(
|
fn stream_entries(
|
||||||
&self,
|
&self,
|
||||||
entries: &[DiaryEntry],
|
stream: BoxStream<'static, Result<DiaryEntry, DomainError>>,
|
||||||
format: ExportFormat,
|
format: ExportFormat,
|
||||||
) -> Result<Vec<u8>, DomainError> {
|
) -> BoxStream<'static, Result<Bytes, DomainError>> {
|
||||||
match format {
|
match format {
|
||||||
ExportFormat::Csv => serialize_csv(entries),
|
ExportFormat::Csv => stream_csv(stream),
|
||||||
ExportFormat::Json => serialize_json(entries),
|
ExportFormat::Json => stream_json(stream),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn serialize_csv(entries: &[DiaryEntry]) -> Result<Vec<u8>, DomainError> {
|
fn stream_csv(
|
||||||
let mut out =
|
entries: BoxStream<'static, Result<DiaryEntry, DomainError>>,
|
||||||
String::from("title,year,director,rating,comment,watched_at,external_metadata_id\n");
|
) -> BoxStream<'static, Result<Bytes, DomainError>> {
|
||||||
for e in entries {
|
use futures::StreamExt;
|
||||||
|
let header = futures::stream::once(async {
|
||||||
|
Ok(Bytes::from_static(
|
||||||
|
b"title,year,director,rating,comment,watched_at,external_metadata_id\n",
|
||||||
|
))
|
||||||
|
});
|
||||||
|
let rows = entries.map(|r| r.map(|e| Bytes::from(csv_row(&e))));
|
||||||
|
Box::pin(header.chain(rows))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn stream_json(
|
||||||
|
stream: BoxStream<'static, Result<DiaryEntry, DomainError>>,
|
||||||
|
) -> BoxStream<'static, Result<Bytes, DomainError>> {
|
||||||
|
Box::pin(async_stream::stream! {
|
||||||
|
futures::pin_mut!(stream);
|
||||||
|
let mut is_first = true;
|
||||||
|
while let Some(r) = futures::StreamExt::next(&mut stream).await {
|
||||||
|
match r {
|
||||||
|
Err(e) => { yield Err(e); return; }
|
||||||
|
Ok(entry) => {
|
||||||
|
let json = serde_json::to_string(&entry_to_json(&entry))
|
||||||
|
.map_err(|e| DomainError::InfrastructureError(e.to_string()));
|
||||||
|
let json = match json {
|
||||||
|
Ok(s) => s,
|
||||||
|
Err(e) => { yield Err(e); return; }
|
||||||
|
};
|
||||||
|
let prefix = if is_first { "[" } else { "," };
|
||||||
|
is_first = false;
|
||||||
|
yield Ok(Bytes::from(format!("{}{}", prefix, json)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if is_first {
|
||||||
|
yield Ok(Bytes::from_static(b"[]"));
|
||||||
|
} else {
|
||||||
|
yield Ok(Bytes::from_static(b"]"));
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn csv_row(e: &DiaryEntry) -> String {
|
||||||
let title = csv_escape(e.movie().title().value());
|
let title = csv_escape(e.movie().title().value());
|
||||||
let year = e.movie().release_year().value();
|
let year = e.movie().release_year().value();
|
||||||
let director = e.movie().director().map(csv_escape).unwrap_or_default();
|
let director = e.movie().director().map(csv_escape).unwrap_or_default();
|
||||||
@@ -40,12 +80,10 @@ fn serialize_csv(entries: &[DiaryEntry]) -> Result<Vec<u8>, DomainError> {
|
|||||||
.external_metadata_id()
|
.external_metadata_id()
|
||||||
.map(|id| id.value().to_string())
|
.map(|id| id.value().to_string())
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
out.push_str(&format!(
|
format!(
|
||||||
"{},{},{},{},{},{},{}\n",
|
"{},{},{},{},{},{},{}\n",
|
||||||
title, year, director, rating, comment, watched_at, ext_id
|
title, year, director, rating, comment, watched_at, ext_id
|
||||||
));
|
)
|
||||||
}
|
|
||||||
Ok(out.into_bytes())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn csv_escape(s: &str) -> String {
|
fn csv_escape(s: &str) -> String {
|
||||||
@@ -56,22 +94,16 @@ fn csv_escape(s: &str) -> String {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn serialize_json(entries: &[DiaryEntry]) -> Result<Vec<u8>, DomainError> {
|
fn entry_to_json(e: &DiaryEntry) -> serde_json::Value {
|
||||||
let arr: Vec<serde_json::Value> = entries
|
|
||||||
.iter()
|
|
||||||
.map(|e| {
|
|
||||||
serde_json::json!({
|
serde_json::json!({
|
||||||
"title": e.movie().title().value(),
|
"title": e.movie().title().value(),
|
||||||
"year": e.movie().release_year().value(),
|
"year": e.movie().release_year().value(),
|
||||||
"director": e.movie().director(),
|
"director": e.movie().director(),
|
||||||
"rating": e.review().rating().value(),
|
"rating": e.review().rating().value(),
|
||||||
"comment": e.review().comment().map(|c| c.value()),
|
"comment": e.review().comment().map(|c| c.value().to_string()),
|
||||||
"watched_at": e.review().watched_at().format("%Y-%m-%d").to_string(),
|
"watched_at": e.review().watched_at().format("%Y-%m-%d").to_string(),
|
||||||
"external_metadata_id": e.movie().external_metadata_id().map(|id| id.value()),
|
"external_metadata_id": e.movie().external_metadata_id().map(|id| id.value().to_string()),
|
||||||
})
|
})
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
serde_json::to_vec_pretty(&arr).map_err(|e| DomainError::InfrastructureError(e.to_string()))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
@@ -5,6 +5,27 @@ use domain::{
|
|||||||
value_objects::{ExternalMetadataId, MovieTitle, Rating, ReleaseYear},
|
value_objects::{ExternalMetadataId, MovieTitle, Rating, ReleaseYear},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
async fn collect_stream(
|
||||||
|
stream: futures::stream::BoxStream<'static, Result<bytes::Bytes, domain::errors::DomainError>>,
|
||||||
|
) -> Vec<u8> {
|
||||||
|
use futures::StreamExt;
|
||||||
|
let mut out = Vec::new();
|
||||||
|
futures::pin_mut!(stream);
|
||||||
|
while let Some(chunk) = stream.next().await {
|
||||||
|
out.extend_from_slice(&chunk.unwrap());
|
||||||
|
}
|
||||||
|
out
|
||||||
|
}
|
||||||
|
|
||||||
|
fn entry_stream(
|
||||||
|
entries: Vec<domain::models::DiaryEntry>,
|
||||||
|
) -> futures::stream::BoxStream<
|
||||||
|
'static,
|
||||||
|
Result<domain::models::DiaryEntry, domain::errors::DomainError>,
|
||||||
|
> {
|
||||||
|
Box::pin(futures::stream::iter(entries.into_iter().map(Ok)))
|
||||||
|
}
|
||||||
|
|
||||||
fn make_entry(
|
fn make_entry(
|
||||||
title: &str,
|
title: &str,
|
||||||
year: u16,
|
year: u16,
|
||||||
@@ -55,10 +76,8 @@ async fn csv_has_header_and_one_row() {
|
|||||||
5,
|
5,
|
||||||
Some("great"),
|
Some("great"),
|
||||||
);
|
);
|
||||||
let bytes = adapter
|
let bytes =
|
||||||
.serialize_entries(&[entry], ExportFormat::Csv)
|
collect_stream(adapter.stream_entries(entry_stream(vec![entry]), ExportFormat::Csv)).await;
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let text = String::from_utf8(bytes).unwrap();
|
let text = String::from_utf8(bytes).unwrap();
|
||||||
assert!(
|
assert!(
|
||||||
text.starts_with("title,year,director,rating,comment,watched_at,external_metadata_id\n")
|
text.starts_with("title,year,director,rating,comment,watched_at,external_metadata_id\n")
|
||||||
@@ -75,10 +94,8 @@ async fn csv_has_header_and_one_row() {
|
|||||||
async fn csv_escapes_commas_in_title() {
|
async fn csv_escapes_commas_in_title() {
|
||||||
let adapter = ExportAdapter;
|
let adapter = ExportAdapter;
|
||||||
let entry = make_entry("Tár, A Film", 2022, None, 4, None);
|
let entry = make_entry("Tár, A Film", 2022, None, 4, None);
|
||||||
let bytes = adapter
|
let bytes =
|
||||||
.serialize_entries(&[entry], ExportFormat::Csv)
|
collect_stream(adapter.stream_entries(entry_stream(vec![entry]), ExportFormat::Csv)).await;
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let text = String::from_utf8(bytes).unwrap();
|
let text = String::from_utf8(bytes).unwrap();
|
||||||
assert!(text.contains("\"Tár, A Film\""));
|
assert!(text.contains("\"Tár, A Film\""));
|
||||||
}
|
}
|
||||||
@@ -87,10 +104,8 @@ async fn csv_escapes_commas_in_title() {
|
|||||||
async fn json_is_valid_array() {
|
async fn json_is_valid_array() {
|
||||||
let adapter = ExportAdapter;
|
let adapter = ExportAdapter;
|
||||||
let entry = make_entry("Dune", 2021, Some("Denis Villeneuve"), 5, None);
|
let entry = make_entry("Dune", 2021, Some("Denis Villeneuve"), 5, None);
|
||||||
let bytes = adapter
|
let bytes =
|
||||||
.serialize_entries(&[entry], ExportFormat::Json)
|
collect_stream(adapter.stream_entries(entry_stream(vec![entry]), ExportFormat::Json)).await;
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let arr: Vec<serde_json::Value> = serde_json::from_slice(&bytes).unwrap();
|
let arr: Vec<serde_json::Value> = serde_json::from_slice(&bytes).unwrap();
|
||||||
assert_eq!(arr.len(), 1);
|
assert_eq!(arr.len(), 1);
|
||||||
assert_eq!(arr[0]["title"], "Dune");
|
assert_eq!(arr[0]["title"], "Dune");
|
||||||
@@ -104,27 +119,23 @@ async fn json_is_valid_array() {
|
|||||||
async fn external_metadata_id_included_when_present() {
|
async fn external_metadata_id_included_when_present() {
|
||||||
let adapter = ExportAdapter;
|
let adapter = ExportAdapter;
|
||||||
let entry = make_entry_full("Alien", 1979, None, 5, None, Some("tt0078748"));
|
let entry = make_entry_full("Alien", 1979, None, 5, None, Some("tt0078748"));
|
||||||
let bytes = adapter
|
let bytes =
|
||||||
.serialize_entries(&[entry], ExportFormat::Json)
|
collect_stream(adapter.stream_entries(entry_stream(vec![entry]), ExportFormat::Json)).await;
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let arr: Vec<serde_json::Value> = serde_json::from_slice(&bytes).unwrap();
|
let arr: Vec<serde_json::Value> = serde_json::from_slice(&bytes).unwrap();
|
||||||
assert_eq!(arr[0]["external_metadata_id"], "tt0078748");
|
assert_eq!(arr[0]["external_metadata_id"], "tt0078748");
|
||||||
|
|
||||||
let bytes = adapter
|
let bytes = collect_stream(adapter.stream_entries(
|
||||||
.serialize_entries(
|
entry_stream(vec![make_entry_full(
|
||||||
&[make_entry_full(
|
|
||||||
"Alien",
|
"Alien",
|
||||||
1979,
|
1979,
|
||||||
None,
|
None,
|
||||||
5,
|
5,
|
||||||
None,
|
None,
|
||||||
Some("tt0078748"),
|
Some("tt0078748"),
|
||||||
)],
|
)]),
|
||||||
ExportFormat::Csv,
|
ExportFormat::Csv,
|
||||||
)
|
))
|
||||||
.await
|
.await;
|
||||||
.unwrap();
|
|
||||||
let text = String::from_utf8(bytes).unwrap();
|
let text = String::from_utf8(bytes).unwrap();
|
||||||
assert!(text.contains("tt0078748"));
|
assert!(text.contains("tt0078748"));
|
||||||
}
|
}
|
||||||
@@ -132,13 +143,20 @@ async fn external_metadata_id_included_when_present() {
|
|||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn empty_entries_returns_csv_header_only() {
|
async fn empty_entries_returns_csv_header_only() {
|
||||||
let adapter = ExportAdapter;
|
let adapter = ExportAdapter;
|
||||||
let bytes = adapter
|
let bytes =
|
||||||
.serialize_entries(&[], ExportFormat::Csv)
|
collect_stream(adapter.stream_entries(entry_stream(vec![]), ExportFormat::Csv)).await;
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let text = String::from_utf8(bytes).unwrap();
|
let text = String::from_utf8(bytes).unwrap();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
text,
|
text,
|
||||||
"title,year,director,rating,comment,watched_at,external_metadata_id\n"
|
"title,year,director,rating,comment,watched_at,external_metadata_id\n"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn empty_json_is_valid_empty_array() {
|
||||||
|
let adapter = ExportAdapter;
|
||||||
|
let bytes =
|
||||||
|
collect_stream(adapter.stream_entries(entry_stream(vec![]), ExportFormat::Json)).await;
|
||||||
|
let arr: Vec<serde_json::Value> = serde_json::from_slice(&bytes).unwrap();
|
||||||
|
assert!(arr.is_empty());
|
||||||
|
}
|
||||||
|
|||||||
@@ -20,3 +20,6 @@ async-trait = { workspace = true }
|
|||||||
tokio = { workspace = true }
|
tokio = { workspace = true }
|
||||||
serde = { workspace = true, features = ["derive"] }
|
serde = { workspace = true, features = ["derive"] }
|
||||||
serde_json = { workspace = true }
|
serde_json = { workspace = true }
|
||||||
|
futures = { workspace = true }
|
||||||
|
bytes = { workspace = true }
|
||||||
|
async-stream = { workspace = true }
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ use domain::{
|
|||||||
ports::DiaryRepository,
|
ports::DiaryRepository,
|
||||||
value_objects::{MovieId, UserId},
|
value_objects::{MovieId, UserId},
|
||||||
};
|
};
|
||||||
|
use futures::stream::BoxStream;
|
||||||
use sqlx::PgPool;
|
use sqlx::PgPool;
|
||||||
|
|
||||||
use crate::models::{DiaryRow, FeedRow, MovieRow, MovieStatsRow, ReviewRow};
|
use crate::models::{DiaryRow, FeedRow, MovieRow, MovieStatsRow, ReviewRow};
|
||||||
@@ -427,6 +428,35 @@ impl DiaryRepository for PostgresDiaryRepository {
|
|||||||
rows.into_iter().map(DiaryRow::into_domain).collect()
|
rows.into_iter().map(DiaryRow::into_domain).collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn stream_user_history(
|
||||||
|
&self,
|
||||||
|
user_id: UserId,
|
||||||
|
) -> BoxStream<'static, Result<DiaryEntry, DomainError>> {
|
||||||
|
let pool = self.pool.clone();
|
||||||
|
let uid = user_id.value().to_string();
|
||||||
|
Box::pin(async_stream::stream! {
|
||||||
|
let mut rows = sqlx::query_as::<_, DiaryRow>(
|
||||||
|
"SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path,
|
||||||
|
r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment,
|
||||||
|
to_char(r.watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at,
|
||||||
|
to_char(r.created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at,
|
||||||
|
r.remote_actor_url
|
||||||
|
FROM reviews r
|
||||||
|
INNER JOIN movies m ON m.id = r.movie_id
|
||||||
|
WHERE r.user_id = $1
|
||||||
|
ORDER BY r.watched_at DESC",
|
||||||
|
)
|
||||||
|
.bind(&uid)
|
||||||
|
.fetch(&pool);
|
||||||
|
while let Some(row) = futures::StreamExt::next(&mut rows).await {
|
||||||
|
yield match row {
|
||||||
|
Ok(r) => r.into_domain(),
|
||||||
|
Err(e) => Err(Self::map_err(e)),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
async fn get_movie_stats(&self, movie_id: &MovieId) -> Result<MovieStats, DomainError> {
|
async fn get_movie_stats(&self, movie_id: &MovieId) -> Result<MovieStats, DomainError> {
|
||||||
let id_str = movie_id.value().to_string();
|
let id_str = movie_id.value().to_string();
|
||||||
sqlx::query_as::<_, MovieStatsRow>(
|
sqlx::query_as::<_, MovieStatsRow>(
|
||||||
|
|||||||
@@ -20,3 +20,6 @@ chrono = { workspace = true }
|
|||||||
tracing = { workspace = true }
|
tracing = { workspace = true }
|
||||||
async-trait = { workspace = true }
|
async-trait = { workspace = true }
|
||||||
tokio = { workspace = true }
|
tokio = { workspace = true }
|
||||||
|
futures = { workspace = true }
|
||||||
|
bytes = { workspace = true }
|
||||||
|
async-stream = { workspace = true }
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ use domain::{
|
|||||||
ports::DiaryRepository,
|
ports::DiaryRepository,
|
||||||
value_objects::{MovieId, UserId},
|
value_objects::{MovieId, UserId},
|
||||||
};
|
};
|
||||||
|
use futures::stream::BoxStream;
|
||||||
use sqlx::SqlitePool;
|
use sqlx::SqlitePool;
|
||||||
|
|
||||||
use crate::models::{DiaryRow, FeedRow, MovieRow, MovieStatsRow, ReviewRow};
|
use crate::models::{DiaryRow, FeedRow, MovieRow, MovieStatsRow, ReviewRow};
|
||||||
@@ -389,6 +390,32 @@ impl DiaryRepository for SqliteDiaryRepository {
|
|||||||
rows.into_iter().map(DiaryRow::into_domain).collect()
|
rows.into_iter().map(DiaryRow::into_domain).collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn stream_user_history(
|
||||||
|
&self,
|
||||||
|
user_id: UserId,
|
||||||
|
) -> BoxStream<'static, Result<DiaryEntry, DomainError>> {
|
||||||
|
let pool = self.pool.clone();
|
||||||
|
let uid = user_id.value().to_string();
|
||||||
|
Box::pin(async_stream::stream! {
|
||||||
|
let mut rows = sqlx::query_as::<_, DiaryRow>(
|
||||||
|
"SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path,
|
||||||
|
r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, r.watched_at, r.created_at, r.remote_actor_url
|
||||||
|
FROM reviews r
|
||||||
|
INNER JOIN movies m ON m.id = r.movie_id
|
||||||
|
WHERE r.user_id = ?
|
||||||
|
ORDER BY r.watched_at DESC",
|
||||||
|
)
|
||||||
|
.bind(&uid)
|
||||||
|
.fetch(&pool);
|
||||||
|
while let Some(row) = futures::StreamExt::next(&mut rows).await {
|
||||||
|
yield match row {
|
||||||
|
Ok(r) => r.into_domain(),
|
||||||
|
Err(e) => Err(Self::map_err(e)),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
async fn get_movie_stats(&self, movie_id: &MovieId) -> Result<MovieStats, DomainError> {
|
async fn get_movie_stats(&self, movie_id: &MovieId) -> Result<MovieStats, DomainError> {
|
||||||
let id_str = movie_id.value().to_string();
|
let id_str = movie_id.value().to_string();
|
||||||
sqlx::query_as::<_, MovieStatsRow>(
|
sqlx::query_as::<_, MovieStatsRow>(
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ sha2 = { workspace = true }
|
|||||||
rand = { workspace = true }
|
rand = { workspace = true }
|
||||||
hex = { workspace = true }
|
hex = { workspace = true }
|
||||||
serde_json = { workspace = true }
|
serde_json = { workspace = true }
|
||||||
|
bytes = { workspace = true }
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
xlsx = []
|
xlsx = []
|
||||||
|
|||||||
@@ -1,22 +1,21 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use bytes::Bytes;
|
||||||
use domain::{
|
use domain::{
|
||||||
errors::DomainError,
|
errors::DomainError,
|
||||||
ports::{DiaryExporter, DiaryRepository},
|
ports::{DiaryExporter, DiaryRepository},
|
||||||
value_objects::UserId,
|
value_objects::UserId,
|
||||||
};
|
};
|
||||||
|
use futures::stream::BoxStream;
|
||||||
|
|
||||||
use crate::diary::queries::ExportQuery;
|
use crate::diary::queries::ExportQuery;
|
||||||
|
|
||||||
pub async fn execute(
|
pub fn execute(
|
||||||
diary: &Arc<dyn DiaryRepository>,
|
diary: &Arc<dyn DiaryRepository>,
|
||||||
diary_exporter: &Arc<dyn DiaryExporter>,
|
diary_exporter: &Arc<dyn DiaryExporter>,
|
||||||
query: ExportQuery,
|
query: ExportQuery,
|
||||||
) -> Result<Vec<u8>, DomainError> {
|
) -> BoxStream<'static, Result<Bytes, DomainError>> {
|
||||||
let entries = diary
|
let user_id = UserId::from_uuid(query.user_id);
|
||||||
.get_user_history(&UserId::from_uuid(query.user_id))
|
let entry_stream = diary.stream_user_history(user_id);
|
||||||
.await?;
|
diary_exporter.stream_entries(entry_stream, query.format)
|
||||||
diary_exporter
|
|
||||||
.serialize_entries(&entries, query.format)
|
|
||||||
.await
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -144,6 +144,10 @@ pub trait DiaryRepository: Send + Sync {
|
|||||||
) -> Result<Paginated<FeedEntry>, DomainError>;
|
) -> Result<Paginated<FeedEntry>, DomainError>;
|
||||||
async fn get_review_history(&self, movie_id: &MovieId) -> Result<ReviewHistory, DomainError>;
|
async fn get_review_history(&self, movie_id: &MovieId) -> Result<ReviewHistory, DomainError>;
|
||||||
async fn get_user_history(&self, user_id: &UserId) -> Result<Vec<DiaryEntry>, DomainError>;
|
async fn get_user_history(&self, user_id: &UserId) -> Result<Vec<DiaryEntry>, DomainError>;
|
||||||
|
fn stream_user_history(
|
||||||
|
&self,
|
||||||
|
user_id: UserId,
|
||||||
|
) -> futures::stream::BoxStream<'static, Result<DiaryEntry, DomainError>>;
|
||||||
async fn get_movie_stats(&self, movie_id: &MovieId) -> Result<MovieStats, DomainError>;
|
async fn get_movie_stats(&self, movie_id: &MovieId) -> Result<MovieStats, DomainError>;
|
||||||
async fn get_movie_social_feed(
|
async fn get_movie_social_feed(
|
||||||
&self,
|
&self,
|
||||||
@@ -253,13 +257,12 @@ pub trait PasswordHasher: Send + Sync {
|
|||||||
async fn verify(&self, plain_password: &str, hash: &PasswordHash) -> Result<bool, DomainError>;
|
async fn verify(&self, plain_password: &str, hash: &PasswordHash) -> Result<bool, DomainError>;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
pub trait DiaryExporter: Send + Sync {
|
pub trait DiaryExporter: Send + Sync {
|
||||||
async fn serialize_entries(
|
fn stream_entries(
|
||||||
&self,
|
&self,
|
||||||
entries: &[DiaryEntry],
|
stream: futures::stream::BoxStream<'static, Result<DiaryEntry, DomainError>>,
|
||||||
format: ExportFormat,
|
format: ExportFormat,
|
||||||
) -> Result<Vec<u8>, DomainError>;
|
) -> futures::stream::BoxStream<'static, Result<bytes::Bytes, DomainError>>;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
|
|||||||
@@ -154,6 +154,13 @@ impl DiaryRepository for FakeDiaryRepository {
|
|||||||
Ok(vec![])
|
Ok(vec![])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn stream_user_history(
|
||||||
|
&self,
|
||||||
|
_user_id: UserId,
|
||||||
|
) -> futures::stream::BoxStream<'static, Result<DiaryEntry, DomainError>> {
|
||||||
|
Box::pin(futures::stream::empty())
|
||||||
|
}
|
||||||
|
|
||||||
async fn get_movie_stats(&self, _movie_id: &MovieId) -> Result<MovieStats, DomainError> {
|
async fn get_movie_stats(&self, _movie_id: &MovieId) -> Result<MovieStats, DomainError> {
|
||||||
Ok(MovieStats {
|
Ok(MovieStats {
|
||||||
total_count: 0,
|
total_count: 0,
|
||||||
|
|||||||
@@ -49,6 +49,12 @@ impl DiaryRepository for PanicDiaryRepository {
|
|||||||
async fn get_user_history(&self, _: &UserId) -> Result<Vec<DiaryEntry>, DomainError> {
|
async fn get_user_history(&self, _: &UserId) -> Result<Vec<DiaryEntry>, DomainError> {
|
||||||
panic!("PanicDiaryRepository called")
|
panic!("PanicDiaryRepository called")
|
||||||
}
|
}
|
||||||
|
fn stream_user_history(
|
||||||
|
&self,
|
||||||
|
_: UserId,
|
||||||
|
) -> futures::stream::BoxStream<'static, Result<DiaryEntry, DomainError>> {
|
||||||
|
panic!("PanicDiaryRepository called")
|
||||||
|
}
|
||||||
async fn get_movie_stats(&self, _: &MovieId) -> Result<MovieStats, DomainError> {
|
async fn get_movie_stats(&self, _: &MovieId) -> Result<MovieStats, DomainError> {
|
||||||
panic!("PanicDiaryRepository called")
|
panic!("PanicDiaryRepository called")
|
||||||
}
|
}
|
||||||
@@ -250,13 +256,12 @@ impl PosterFetcherClient for PanicPosterFetcher {
|
|||||||
|
|
||||||
pub struct PanicDiaryExporter;
|
pub struct PanicDiaryExporter;
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl DiaryExporter for PanicDiaryExporter {
|
impl DiaryExporter for PanicDiaryExporter {
|
||||||
async fn serialize_entries(
|
fn stream_entries(
|
||||||
&self,
|
&self,
|
||||||
_: &[DiaryEntry],
|
_stream: futures::stream::BoxStream<'static, Result<DiaryEntry, DomainError>>,
|
||||||
_: ExportFormat,
|
_format: ExportFormat,
|
||||||
) -> Result<Vec<u8>, DomainError> {
|
) -> futures::stream::BoxStream<'static, Result<bytes::Bytes, DomainError>> {
|
||||||
panic!("PanicDiaryExporter called")
|
panic!("PanicDiaryExporter called")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -42,6 +42,7 @@ dotenvy = { workspace = true }
|
|||||||
uuid = { workspace = true }
|
uuid = { workspace = true }
|
||||||
chrono = { workspace = true }
|
chrono = { workspace = true }
|
||||||
async-trait = { workspace = true }
|
async-trait = { workspace = true }
|
||||||
|
futures = { workspace = true }
|
||||||
|
|
||||||
api-types = { workspace = true }
|
api-types = { workspace = true }
|
||||||
domain = { workspace = true, features = ["test-helpers"] }
|
domain = { workspace = true, features = ["test-helpers"] }
|
||||||
|
|||||||
@@ -1,9 +1,11 @@
|
|||||||
use axum::{
|
use axum::{
|
||||||
Form, Json,
|
Form, Json,
|
||||||
|
body::Body,
|
||||||
extract::{Extension, Path, Query, State},
|
extract::{Extension, Path, Query, State},
|
||||||
http::StatusCode,
|
http::StatusCode,
|
||||||
response::{IntoResponse, Redirect},
|
response::{IntoResponse, Redirect},
|
||||||
};
|
};
|
||||||
|
use futures::StreamExt;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use application::diary::{
|
use application::diary::{
|
||||||
@@ -147,14 +149,18 @@ pub async fn export_diary(
|
|||||||
user_id: user.0.value(),
|
user_id: user.0.value(),
|
||||||
format,
|
format,
|
||||||
};
|
};
|
||||||
match export_diary_uc::execute(
|
let stream = export_diary_uc::execute(
|
||||||
&state.app_ctx.repos.diary,
|
&state.app_ctx.repos.diary,
|
||||||
&state.app_ctx.services.diary_exporter,
|
&state.app_ctx.services.diary_exporter,
|
||||||
query,
|
query,
|
||||||
)
|
);
|
||||||
.await
|
let stream = stream.map(|r| {
|
||||||
{
|
if let Err(ref e) = r {
|
||||||
Ok(bytes) => (
|
tracing::error!("diary export stream error: {e}");
|
||||||
|
}
|
||||||
|
r
|
||||||
|
});
|
||||||
|
(
|
||||||
StatusCode::OK,
|
StatusCode::OK,
|
||||||
[
|
[
|
||||||
(axum::http::header::CONTENT_TYPE, content_type.to_string()),
|
(axum::http::header::CONTENT_TYPE, content_type.to_string()),
|
||||||
@@ -163,14 +169,9 @@ pub async fn export_diary(
|
|||||||
format!("attachment; filename=\"{}\"", filename),
|
format!("attachment; filename=\"{}\"", filename),
|
||||||
),
|
),
|
||||||
],
|
],
|
||||||
bytes,
|
Body::from_stream(stream),
|
||||||
)
|
)
|
||||||
.into_response(),
|
.into_response()
|
||||||
Err(e) => {
|
|
||||||
tracing::error!("export error: {:?}", e);
|
|
||||||
StatusCode::INTERNAL_SERVER_ERROR.into_response()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[utoipa::path(
|
#[utoipa::path(
|
||||||
@@ -314,14 +315,18 @@ pub async fn get_export_html(
|
|||||||
user_id: user_id.value(),
|
user_id: user_id.value(),
|
||||||
format,
|
format,
|
||||||
};
|
};
|
||||||
match export_diary_uc::execute(
|
let stream = export_diary_uc::execute(
|
||||||
&state.app_ctx.repos.diary,
|
&state.app_ctx.repos.diary,
|
||||||
&state.app_ctx.services.diary_exporter,
|
&state.app_ctx.services.diary_exporter,
|
||||||
query,
|
query,
|
||||||
)
|
);
|
||||||
.await
|
let stream = stream.map(|r| {
|
||||||
{
|
if let Err(ref e) = r {
|
||||||
Ok(bytes) => (
|
tracing::error!("diary export stream error: {e}");
|
||||||
|
}
|
||||||
|
r
|
||||||
|
});
|
||||||
|
(
|
||||||
StatusCode::OK,
|
StatusCode::OK,
|
||||||
[
|
[
|
||||||
(axum::http::header::CONTENT_TYPE, content_type.to_string()),
|
(axum::http::header::CONTENT_TYPE, content_type.to_string()),
|
||||||
@@ -330,11 +335,9 @@ pub async fn get_export_html(
|
|||||||
format!("attachment; filename=\"{}\"", filename),
|
format!("attachment; filename=\"{}\"", filename),
|
||||||
),
|
),
|
||||||
],
|
],
|
||||||
bytes,
|
Body::from_stream(stream),
|
||||||
)
|
)
|
||||||
.into_response(),
|
.into_response()
|
||||||
Err(e) => crate::errors::domain_error_response(e),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_activity_feed_html(
|
pub async fn get_activity_feed_html(
|
||||||
|
|||||||
@@ -120,6 +120,12 @@ impl DiaryRepository for Panic {
|
|||||||
async fn get_user_history(&self, _: &UserId) -> Result<Vec<DiaryEntry>, DomainError> {
|
async fn get_user_history(&self, _: &UserId) -> Result<Vec<DiaryEntry>, DomainError> {
|
||||||
panic!()
|
panic!()
|
||||||
}
|
}
|
||||||
|
fn stream_user_history(
|
||||||
|
&self,
|
||||||
|
_: UserId,
|
||||||
|
) -> futures::stream::BoxStream<'static, Result<DiaryEntry, DomainError>> {
|
||||||
|
panic!()
|
||||||
|
}
|
||||||
async fn get_movie_stats(
|
async fn get_movie_stats(
|
||||||
&self,
|
&self,
|
||||||
_: &MovieId,
|
_: &MovieId,
|
||||||
@@ -379,14 +385,17 @@ impl domain::ports::MovieProfileRepository for Panic {
|
|||||||
Ok(vec![])
|
Ok(vec![])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#[async_trait::async_trait]
|
|
||||||
impl domain::ports::DiaryExporter for Panic {
|
impl domain::ports::DiaryExporter for Panic {
|
||||||
async fn serialize_entries(
|
fn stream_entries(
|
||||||
&self,
|
&self,
|
||||||
_: &[domain::models::DiaryEntry],
|
_stream: futures::stream::BoxStream<
|
||||||
_: domain::models::ExportFormat,
|
'static,
|
||||||
) -> Result<Vec<u8>, domain::errors::DomainError> {
|
Result<domain::models::DiaryEntry, domain::errors::DomainError>,
|
||||||
panic!()
|
>,
|
||||||
|
_format: domain::models::ExportFormat,
|
||||||
|
) -> futures::stream::BoxStream<'static, Result<bytes::Bytes, domain::errors::DomainError>>
|
||||||
|
{
|
||||||
|
panic!("Panic DiaryExporter called")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -165,14 +165,16 @@ impl domain::ports::UserProfileFieldsRepository for PanicProfileFields {
|
|||||||
}
|
}
|
||||||
|
|
||||||
struct PanicExporter;
|
struct PanicExporter;
|
||||||
#[async_trait]
|
|
||||||
impl domain::ports::DiaryExporter for PanicExporter {
|
impl domain::ports::DiaryExporter for PanicExporter {
|
||||||
async fn serialize_entries(
|
fn stream_entries(
|
||||||
&self,
|
&self,
|
||||||
_: &[domain::models::DiaryEntry],
|
_stream: futures::stream::BoxStream<
|
||||||
_: domain::models::ExportFormat,
|
'static,
|
||||||
) -> Result<Vec<u8>, DomainError> {
|
Result<domain::models::DiaryEntry, DomainError>,
|
||||||
panic!()
|
>,
|
||||||
|
_format: domain::models::ExportFormat,
|
||||||
|
) -> futures::stream::BoxStream<'static, Result<bytes::Bytes, DomainError>> {
|
||||||
|
panic!("PanicExporter::stream_entries")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user