Compare commits
10 Commits
4c75113c4f
...
c0b3fb6940
| Author | SHA1 | Date | |
|---|---|---|---|
| c0b3fb6940 | |||
| ea43911984 | |||
| 5a15bea3d4 | |||
| 5e740ba2a1 | |||
| 7ef8912d69 | |||
| ac05cdfeaf | |||
| b171d2d1e2 | |||
| 59b42ce810 | |||
| 5a6abdcc23 | |||
| a95d831fd1 |
@@ -2,7 +2,7 @@ use chrono::NaiveDateTime;
|
||||
use domain::{
|
||||
errors::DomainError,
|
||||
events::DomainEvent,
|
||||
value_objects::{ExternalMetadataId, MovieId, PosterPath, Rating, ReviewId, UserId},
|
||||
value_objects::{ExternalMetadataId, MovieId, PosterPath, Rating, ReviewId, UserId, WrapUpId},
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use uuid::Uuid;
|
||||
@@ -77,6 +77,15 @@ pub enum EventPayload {
|
||||
title: String,
|
||||
source: String,
|
||||
},
|
||||
WrapUpRequested {
|
||||
wrapup_id: String,
|
||||
user_id: Option<String>,
|
||||
start_date: String,
|
||||
end_date: String,
|
||||
},
|
||||
WrapUpCompleted {
|
||||
wrapup_id: String,
|
||||
},
|
||||
}
|
||||
|
||||
impl EventPayload {
|
||||
@@ -96,6 +105,8 @@ impl EventPayload {
|
||||
EventPayload::BackfillFollower { .. } => "BackfillFollower",
|
||||
EventPayload::FederationDeliveryRequested { .. } => "FederationDeliveryRequested",
|
||||
EventPayload::WatchEventIngested { .. } => "WatchEventIngested",
|
||||
EventPayload::WrapUpRequested { .. } => "WrapUpRequested",
|
||||
EventPayload::WrapUpCompleted { .. } => "WrapUpCompleted",
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -223,6 +234,20 @@ impl From<&DomainEvent> for EventPayload {
|
||||
title: title.clone(),
|
||||
source: source.clone(),
|
||||
},
|
||||
DomainEvent::WrapUpRequested {
|
||||
wrapup_id,
|
||||
user_id,
|
||||
start_date,
|
||||
end_date,
|
||||
} => EventPayload::WrapUpRequested {
|
||||
wrapup_id: wrapup_id.value().to_string(),
|
||||
user_id: user_id.as_ref().map(|u| u.value().to_string()),
|
||||
start_date: start_date.to_string(),
|
||||
end_date: end_date.to_string(),
|
||||
},
|
||||
DomainEvent::WrapUpCompleted { wrapup_id } => EventPayload::WrapUpCompleted {
|
||||
wrapup_id: wrapup_id.value().to_string(),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -348,6 +373,33 @@ impl TryFrom<EventPayload> for DomainEvent {
|
||||
title,
|
||||
source,
|
||||
}),
|
||||
EventPayload::WrapUpRequested {
|
||||
wrapup_id,
|
||||
user_id,
|
||||
start_date,
|
||||
end_date,
|
||||
} => {
|
||||
let wid = parse_uuid(&wrapup_id, "wrapup_id")?;
|
||||
let uid = user_id
|
||||
.map(|s| parse_uuid(&s, "user_id"))
|
||||
.transpose()?;
|
||||
let sd = chrono::NaiveDate::parse_from_str(&start_date, "%Y-%m-%d")
|
||||
.map_err(|e| DomainError::ValidationError(e.to_string()))?;
|
||||
let ed = chrono::NaiveDate::parse_from_str(&end_date, "%Y-%m-%d")
|
||||
.map_err(|e| DomainError::ValidationError(e.to_string()))?;
|
||||
Ok(DomainEvent::WrapUpRequested {
|
||||
wrapup_id: WrapUpId::from_uuid(wid),
|
||||
user_id: uid.map(UserId::from_uuid),
|
||||
start_date: sd,
|
||||
end_date: ed,
|
||||
})
|
||||
}
|
||||
EventPayload::WrapUpCompleted { wrapup_id } => {
|
||||
let wid = parse_uuid(&wrapup_id, "wrapup_id")?;
|
||||
Ok(DomainEvent::WrapUpCompleted {
|
||||
wrapup_id: WrapUpId::from_uuid(wid),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,6 +16,8 @@ pub fn event_to_subject(prefix: &str, event: &DomainEvent) -> String {
|
||||
DomainEvent::BackfillFollower { .. } => "backfill.follower",
|
||||
DomainEvent::FederationDeliveryRequested { .. } => "federation.delivery.requested",
|
||||
DomainEvent::WatchEventIngested { .. } => "watch.event.ingested",
|
||||
DomainEvent::WrapUpRequested { .. } => "wrapup.requested",
|
||||
DomainEvent::WrapUpCompleted { .. } => "wrapup.completed",
|
||||
};
|
||||
format!("{prefix}.{suffix}")
|
||||
}
|
||||
|
||||
12
crates/adapters/postgres/migrations/0024_wrap_up.sql
Normal file
12
crates/adapters/postgres/migrations/0024_wrap_up.sql
Normal file
@@ -0,0 +1,12 @@
|
||||
CREATE TABLE IF NOT EXISTS wrap_up_records (
|
||||
id TEXT PRIMARY KEY NOT NULL,
|
||||
user_id TEXT,
|
||||
start_date DATE NOT NULL,
|
||||
end_date DATE NOT NULL,
|
||||
status TEXT NOT NULL DEFAULT 'pending',
|
||||
report_json TEXT,
|
||||
error_message TEXT,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
completed_at TIMESTAMPTZ
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_wrap_up_user ON wrap_up_records (user_id, start_date, end_date);
|
||||
@@ -23,6 +23,7 @@ mod profile_fields;
|
||||
mod users;
|
||||
mod watch_event;
|
||||
mod watchlist;
|
||||
mod wrapup;
|
||||
|
||||
use models::{
|
||||
DiaryRow, DirectorCountRow, FeedRow, MonthlyRatingRow, MovieRow, MovieStatsRow,
|
||||
@@ -39,6 +40,7 @@ pub use profile_fields::PostgresProfileFieldsRepository;
|
||||
pub use users::PostgresUserRepository;
|
||||
pub use watch_event::{PostgresWatchEventRepository, PostgresWebhookTokenRepository};
|
||||
pub use watchlist::PostgresWatchlistRepository;
|
||||
pub use wrapup::{PostgresWrapUpRepository, PostgresWrapUpStatsQuery};
|
||||
|
||||
fn format_year_month(ym: &str) -> String {
|
||||
let parts: Vec<&str> = ym.splitn(2, '-').collect();
|
||||
@@ -1000,6 +1002,8 @@ pub struct PostgresWireOutput {
|
||||
pub movie_profile: std::sync::Arc<dyn domain::ports::MovieProfileRepository>,
|
||||
pub watchlist: std::sync::Arc<dyn domain::ports::WatchlistRepository>,
|
||||
pub ap_content: std::sync::Arc<dyn domain::ports::LocalApContentQuery>,
|
||||
pub wrapup_repo: std::sync::Arc<dyn domain::ports::WrapUpRepository>,
|
||||
pub wrapup_stats: std::sync::Arc<dyn domain::ports::WrapUpStatsQuery>,
|
||||
}
|
||||
|
||||
pub async fn wire(database_url: &str) -> anyhow::Result<PostgresWireOutput> {
|
||||
@@ -1028,6 +1032,8 @@ pub async fn wire(database_url: &str) -> anyhow::Result<PostgresWireOutput> {
|
||||
as _,
|
||||
movie_profile: std::sync::Arc::new(PostgresMovieProfileRepository::new(pool.clone())) as _,
|
||||
watchlist: std::sync::Arc::new(PostgresWatchlistRepository::new(pool.clone())) as _,
|
||||
ap_content: std::sync::Arc::new(PostgresApContentQuery::new(pool)) as _,
|
||||
ap_content: std::sync::Arc::new(PostgresApContentQuery::new(pool.clone())) as _,
|
||||
wrapup_repo: std::sync::Arc::new(PostgresWrapUpRepository::new(pool.clone())) as _,
|
||||
wrapup_stats: std::sync::Arc::new(PostgresWrapUpStatsQuery::new(pool)) as _,
|
||||
})
|
||||
}
|
||||
|
||||
430
crates/adapters/postgres/src/wrapup.rs
Normal file
430
crates/adapters/postgres/src/wrapup.rs
Normal file
@@ -0,0 +1,430 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use chrono::NaiveDate;
|
||||
use domain::{
|
||||
errors::DomainError,
|
||||
models::wrapup::{DateRange, WrapUpRecord, WrapUpScope, WrapUpStatus},
|
||||
ports::{WrapUpMovieRow, WrapUpRepository, WrapUpStatsQuery},
|
||||
value_objects::WrapUpId,
|
||||
};
|
||||
use sqlx::{PgPool, Row};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::models::{parse_datetime, parse_uuid};
|
||||
|
||||
fn map_err(e: sqlx::Error) -> DomainError {
|
||||
tracing::error!("Database error: {:?}", e);
|
||||
DomainError::InfrastructureError("Database operation failed".into())
|
||||
}
|
||||
|
||||
fn status_to_str(s: &WrapUpStatus) -> &'static str {
|
||||
match s {
|
||||
WrapUpStatus::Pending => "pending",
|
||||
WrapUpStatus::Generating => "generating",
|
||||
WrapUpStatus::Ready => "ready",
|
||||
WrapUpStatus::Failed => "failed",
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_status(s: &str) -> Result<WrapUpStatus, DomainError> {
|
||||
match s {
|
||||
"pending" => Ok(WrapUpStatus::Pending),
|
||||
"generating" => Ok(WrapUpStatus::Generating),
|
||||
"ready" => Ok(WrapUpStatus::Ready),
|
||||
"failed" => Ok(WrapUpStatus::Failed),
|
||||
other => Err(DomainError::InfrastructureError(format!(
|
||||
"Unknown wrap-up status: {other}"
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
||||
// ── WrapUpRepository ─────────────────────────────────────────────────────────
|
||||
|
||||
pub struct PostgresWrapUpRepository {
|
||||
pool: PgPool,
|
||||
}
|
||||
|
||||
impl PostgresWrapUpRepository {
|
||||
pub fn new(pool: PgPool) -> Self {
|
||||
Self { pool }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl WrapUpRepository for PostgresWrapUpRepository {
|
||||
async fn create(&self, record: &WrapUpRecord) -> Result<(), DomainError> {
|
||||
let id = record.id.value().to_string();
|
||||
let user_id = record.user_id.map(|u| u.to_string());
|
||||
let status = status_to_str(&record.status);
|
||||
|
||||
sqlx::query(
|
||||
"INSERT INTO wrap_up_records \
|
||||
(id, user_id, start_date, end_date, status, report_json, error_message, created_at, completed_at) \
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
|
||||
)
|
||||
.bind(&id)
|
||||
.bind(&user_id)
|
||||
.bind(record.start_date)
|
||||
.bind(record.end_date)
|
||||
.bind(status)
|
||||
.bind(&record.report_json)
|
||||
.bind(&record.error_message)
|
||||
.bind(record.created_at)
|
||||
.bind(record.completed_at)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(map_err)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn update_status(
|
||||
&self,
|
||||
id: &WrapUpId,
|
||||
status: &WrapUpStatus,
|
||||
error: Option<&str>,
|
||||
) -> Result<(), DomainError> {
|
||||
let id_str = id.value().to_string();
|
||||
let status_str = status_to_str(status);
|
||||
|
||||
sqlx::query(
|
||||
"UPDATE wrap_up_records SET status = $1, error_message = $2 WHERE id = $3",
|
||||
)
|
||||
.bind(status_str)
|
||||
.bind(error)
|
||||
.bind(&id_str)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(map_err)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn set_complete(&self, id: &WrapUpId, report_json: &str) -> Result<(), DomainError> {
|
||||
let id_str = id.value().to_string();
|
||||
|
||||
sqlx::query(
|
||||
"UPDATE wrap_up_records \
|
||||
SET status = 'ready', report_json = $1, completed_at = NOW() \
|
||||
WHERE id = $2",
|
||||
)
|
||||
.bind(report_json)
|
||||
.bind(&id_str)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(map_err)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_by_id(&self, id: &WrapUpId) -> Result<Option<WrapUpRecord>, DomainError> {
|
||||
let id_str = id.value().to_string();
|
||||
|
||||
let row = sqlx::query(
|
||||
"SELECT id, user_id, start_date, end_date, status, report_json, error_message, \
|
||||
to_char(created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, \
|
||||
to_char(completed_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS completed_at \
|
||||
FROM wrap_up_records WHERE id = $1",
|
||||
)
|
||||
.bind(&id_str)
|
||||
.fetch_optional(&self.pool)
|
||||
.await
|
||||
.map_err(map_err)?;
|
||||
|
||||
row.as_ref().map(row_to_record).transpose()
|
||||
}
|
||||
|
||||
async fn list_for_user(&self, user_id: Uuid) -> Result<Vec<WrapUpRecord>, DomainError> {
|
||||
let uid = user_id.to_string();
|
||||
|
||||
let rows = sqlx::query(
|
||||
"SELECT id, user_id, start_date, end_date, status, report_json, error_message, \
|
||||
to_char(created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, \
|
||||
to_char(completed_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS completed_at \
|
||||
FROM wrap_up_records WHERE user_id = $1 ORDER BY created_at DESC",
|
||||
)
|
||||
.bind(&uid)
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(map_err)?;
|
||||
|
||||
rows.iter().map(row_to_record).collect()
|
||||
}
|
||||
|
||||
async fn list_global(&self) -> Result<Vec<WrapUpRecord>, DomainError> {
|
||||
let rows = sqlx::query(
|
||||
"SELECT id, user_id, start_date, end_date, status, report_json, error_message, \
|
||||
to_char(created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, \
|
||||
to_char(completed_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS completed_at \
|
||||
FROM wrap_up_records WHERE user_id IS NULL ORDER BY created_at DESC",
|
||||
)
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(map_err)?;
|
||||
|
||||
rows.iter().map(row_to_record).collect()
|
||||
}
|
||||
|
||||
async fn find_existing(
|
||||
&self,
|
||||
user_id: Option<Uuid>,
|
||||
start: NaiveDate,
|
||||
end: NaiveDate,
|
||||
) -> Result<Option<WrapUpRecord>, DomainError> {
|
||||
let uid = user_id.map(|u| u.to_string());
|
||||
|
||||
let row = sqlx::query(
|
||||
"SELECT id, user_id, start_date, end_date, status, report_json, error_message, \
|
||||
to_char(created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, \
|
||||
to_char(completed_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS completed_at \
|
||||
FROM wrap_up_records \
|
||||
WHERE (($1::text IS NULL AND user_id IS NULL) OR user_id = $1) \
|
||||
AND start_date = $2 AND end_date = $3 \
|
||||
LIMIT 1",
|
||||
)
|
||||
.bind(&uid)
|
||||
.bind(start)
|
||||
.bind(end)
|
||||
.fetch_optional(&self.pool)
|
||||
.await
|
||||
.map_err(map_err)?;
|
||||
|
||||
row.as_ref().map(row_to_record).transpose()
|
||||
}
|
||||
}
|
||||
|
||||
fn row_to_record(row: &sqlx::postgres::PgRow) -> Result<WrapUpRecord, DomainError> {
|
||||
let id_str: String = row.try_get("id").map_err(map_err)?;
|
||||
let user_id_str: Option<String> = row.try_get("user_id").map_err(map_err)?;
|
||||
let start_date: NaiveDate = row.try_get("start_date").map_err(map_err)?;
|
||||
let end_date: NaiveDate = row.try_get("end_date").map_err(map_err)?;
|
||||
let status_str: String = row.try_get("status").map_err(map_err)?;
|
||||
let report_json: Option<String> = row.try_get("report_json").map_err(map_err)?;
|
||||
let error_message: Option<String> = row.try_get("error_message").map_err(map_err)?;
|
||||
let created_at_str: String = row.try_get("created_at").map_err(map_err)?;
|
||||
let completed_at_str: Option<String> = row.try_get("completed_at").map_err(map_err)?;
|
||||
|
||||
let user_id = user_id_str
|
||||
.as_deref()
|
||||
.map(parse_uuid)
|
||||
.transpose()?;
|
||||
|
||||
Ok(WrapUpRecord {
|
||||
id: WrapUpId::from_uuid(parse_uuid(&id_str)?),
|
||||
user_id,
|
||||
start_date,
|
||||
end_date,
|
||||
status: parse_status(&status_str)?,
|
||||
report_json,
|
||||
error_message,
|
||||
created_at: parse_datetime(&created_at_str)?,
|
||||
completed_at: completed_at_str.as_deref().map(parse_datetime).transpose()?,
|
||||
})
|
||||
}
|
||||
|
||||
// ── WrapUpStatsQuery ─────────────────────────────────────────────────────────
|
||||
|
||||
pub struct PostgresWrapUpStatsQuery {
|
||||
pool: PgPool,
|
||||
}
|
||||
|
||||
impl PostgresWrapUpStatsQuery {
|
||||
pub fn new(pool: PgPool) -> Self {
|
||||
Self { pool }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl WrapUpStatsQuery for PostgresWrapUpStatsQuery {
|
||||
async fn get_reviews_with_profiles(
|
||||
&self,
|
||||
scope: &WrapUpScope,
|
||||
range: &DateRange,
|
||||
) -> Result<Vec<WrapUpMovieRow>, DomainError> {
|
||||
// 1) Main query: reviews + movies + movie_profiles
|
||||
let (scope_clause, scope_bind) = match scope {
|
||||
WrapUpScope::User(uid) => ("AND r.user_id = $3", Some(uid.to_string())),
|
||||
WrapUpScope::Global => ("", None),
|
||||
};
|
||||
|
||||
let sql = format!(
|
||||
"SELECT r.movie_id, m.title, m.release_year, m.director, m.poster_path, \
|
||||
r.rating, \
|
||||
to_char(r.watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at, \
|
||||
r.user_id, \
|
||||
p.runtime_minutes, p.budget_usd, p.original_language \
|
||||
FROM reviews r \
|
||||
INNER JOIN movies m ON m.id = r.movie_id \
|
||||
LEFT JOIN movie_profiles p ON p.movie_id = m.id \
|
||||
WHERE r.watched_at >= $1 AND r.watched_at < $2 {scope_clause} \
|
||||
ORDER BY r.watched_at ASC"
|
||||
);
|
||||
|
||||
let mut q = sqlx::query(&sql)
|
||||
.bind(range.start)
|
||||
.bind(range.end);
|
||||
if let Some(ref uid) = scope_bind {
|
||||
q = q.bind(uid);
|
||||
}
|
||||
|
||||
let rows = q.fetch_all(&self.pool).await.map_err(map_err)?;
|
||||
|
||||
if rows.is_empty() {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
|
||||
// Collect unique movie IDs
|
||||
let mut movie_ids: Vec<String> = Vec::new();
|
||||
let mut seen = std::collections::HashSet::new();
|
||||
for row in &rows {
|
||||
let mid: String = row.try_get("movie_id").map_err(map_err)?;
|
||||
if seen.insert(mid.clone()) {
|
||||
movie_ids.push(mid);
|
||||
}
|
||||
}
|
||||
|
||||
// 2) Batch-fetch genres, keywords, cast
|
||||
let (genres_map, keywords_map, cast_map) = tokio::try_join!(
|
||||
fetch_genres_pg(&self.pool, &movie_ids),
|
||||
fetch_keywords_pg(&self.pool, &movie_ids),
|
||||
fetch_cast_pg(&self.pool, &movie_ids),
|
||||
)?;
|
||||
|
||||
// 3) Build result
|
||||
let mut result = Vec::with_capacity(rows.len());
|
||||
for row in &rows {
|
||||
let movie_id_str: String = row.try_get("movie_id").map_err(map_err)?;
|
||||
let title: String = row.try_get("title").map_err(map_err)?;
|
||||
let release_year: i64 = row.try_get("release_year").map_err(map_err)?;
|
||||
let director: Option<String> = row.try_get("director").map_err(map_err)?;
|
||||
let poster_path: Option<String> = row.try_get("poster_path").map_err(map_err)?;
|
||||
let rating: i64 = row.try_get("rating").map_err(map_err)?;
|
||||
let watched_at_str: String = row.try_get("watched_at").map_err(map_err)?;
|
||||
let user_id_str: String = row.try_get("user_id").map_err(map_err)?;
|
||||
let runtime_minutes: Option<i32> = row.try_get("runtime_minutes").map_err(map_err)?;
|
||||
let budget_usd: Option<i64> = row.try_get("budget_usd").map_err(map_err)?;
|
||||
let original_language: Option<String> =
|
||||
row.try_get("original_language").map_err(map_err)?;
|
||||
|
||||
let genres = genres_map
|
||||
.get(&movie_id_str)
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
let keywords = keywords_map
|
||||
.get(&movie_id_str)
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
let cast = cast_map
|
||||
.get(&movie_id_str)
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
|
||||
let cast_names: Vec<(String, u32)> = cast
|
||||
.iter()
|
||||
.map(|c| (c.name.clone(), c.billing_order))
|
||||
.collect();
|
||||
let cast_profile_paths: Vec<Option<String>> =
|
||||
cast.iter().map(|c| c.profile_path.clone()).collect();
|
||||
|
||||
result.push(WrapUpMovieRow {
|
||||
movie_id: parse_uuid(&movie_id_str)?,
|
||||
title,
|
||||
release_year: release_year as u16,
|
||||
director,
|
||||
poster_path,
|
||||
rating: rating as u8,
|
||||
watched_at: parse_datetime(&watched_at_str)?,
|
||||
user_id: parse_uuid(&user_id_str)?,
|
||||
runtime_minutes: runtime_minutes.map(|v| v as u32),
|
||||
budget_usd,
|
||||
original_language,
|
||||
genres,
|
||||
keywords,
|
||||
cast_names,
|
||||
cast_profile_paths,
|
||||
});
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct CastEntry {
|
||||
name: String,
|
||||
billing_order: u32,
|
||||
profile_path: Option<String>,
|
||||
}
|
||||
|
||||
async fn fetch_genres_pg(
|
||||
pool: &PgPool,
|
||||
movie_ids: &[String],
|
||||
) -> Result<HashMap<String, Vec<String>>, DomainError> {
|
||||
let rows = sqlx::query(
|
||||
"SELECT movie_id, name FROM movie_genres WHERE movie_id = ANY($1) ORDER BY name",
|
||||
)
|
||||
.bind(movie_ids)
|
||||
.fetch_all(pool)
|
||||
.await
|
||||
.map_err(map_err)?;
|
||||
|
||||
let mut map: HashMap<String, Vec<String>> = HashMap::new();
|
||||
for row in rows {
|
||||
let mid: String = row.try_get("movie_id").map_err(map_err)?;
|
||||
let name: String = row.try_get("name").map_err(map_err)?;
|
||||
map.entry(mid).or_default().push(name);
|
||||
}
|
||||
Ok(map)
|
||||
}
|
||||
|
||||
async fn fetch_keywords_pg(
|
||||
pool: &PgPool,
|
||||
movie_ids: &[String],
|
||||
) -> Result<HashMap<String, Vec<String>>, DomainError> {
|
||||
let rows = sqlx::query(
|
||||
"SELECT movie_id, name FROM movie_keywords WHERE movie_id = ANY($1) ORDER BY name",
|
||||
)
|
||||
.bind(movie_ids)
|
||||
.fetch_all(pool)
|
||||
.await
|
||||
.map_err(map_err)?;
|
||||
|
||||
let mut map: HashMap<String, Vec<String>> = HashMap::new();
|
||||
for row in rows {
|
||||
let mid: String = row.try_get("movie_id").map_err(map_err)?;
|
||||
let name: String = row.try_get("name").map_err(map_err)?;
|
||||
map.entry(mid).or_default().push(name);
|
||||
}
|
||||
Ok(map)
|
||||
}
|
||||
|
||||
async fn fetch_cast_pg(
|
||||
pool: &PgPool,
|
||||
movie_ids: &[String],
|
||||
) -> Result<HashMap<String, Vec<CastEntry>>, DomainError> {
|
||||
let rows = sqlx::query(
|
||||
"SELECT movie_id, name, billing_order, profile_path \
|
||||
FROM movie_cast \
|
||||
WHERE movie_id = ANY($1) AND billing_order <= 3 \
|
||||
ORDER BY billing_order ASC",
|
||||
)
|
||||
.bind(movie_ids)
|
||||
.fetch_all(pool)
|
||||
.await
|
||||
.map_err(map_err)?;
|
||||
|
||||
let mut map: HashMap<String, Vec<CastEntry>> = HashMap::new();
|
||||
for row in rows {
|
||||
let mid: String = row.try_get("movie_id").map_err(map_err)?;
|
||||
let name: String = row.try_get("name").map_err(map_err)?;
|
||||
let billing_order: i32 = row.try_get("billing_order").map_err(map_err)?;
|
||||
let profile_path: Option<String> = row.try_get("profile_path").map_err(map_err)?;
|
||||
map.entry(mid).or_default().push(CastEntry {
|
||||
name,
|
||||
billing_order: billing_order as u32,
|
||||
profile_path,
|
||||
});
|
||||
}
|
||||
Ok(map)
|
||||
}
|
||||
12
crates/adapters/sqlite/migrations/0024_wrap_up.sql
Normal file
12
crates/adapters/sqlite/migrations/0024_wrap_up.sql
Normal file
@@ -0,0 +1,12 @@
|
||||
CREATE TABLE IF NOT EXISTS wrap_up_records (
|
||||
id TEXT PRIMARY KEY NOT NULL,
|
||||
user_id TEXT,
|
||||
start_date TEXT NOT NULL,
|
||||
end_date TEXT NOT NULL,
|
||||
status TEXT NOT NULL DEFAULT 'pending',
|
||||
report_json TEXT,
|
||||
error_message TEXT,
|
||||
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%d %H:%M:%S', 'now')),
|
||||
completed_at TEXT
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_wrap_up_user ON wrap_up_records (user_id, start_date, end_date);
|
||||
@@ -24,6 +24,7 @@ mod profile_fields;
|
||||
mod users;
|
||||
mod watch_event;
|
||||
mod watchlist;
|
||||
mod wrapup;
|
||||
|
||||
use models::{
|
||||
DiaryRow, DirectorCountRow, FeedRow, MonthlyRatingRow, MovieRow, MovieStatsRow,
|
||||
@@ -40,6 +41,7 @@ pub use profile_fields::SqliteProfileFieldsRepository;
|
||||
pub use users::SqliteUserRepository;
|
||||
pub use watch_event::{SqliteWatchEventRepository, SqliteWebhookTokenRepository};
|
||||
pub use watchlist::SqliteWatchlistRepository;
|
||||
pub use wrapup::{SqliteWrapUpRepository, SqliteWrapUpStatsQuery};
|
||||
|
||||
pub fn create_profile_fields_repo(
|
||||
pool: sqlx::SqlitePool,
|
||||
@@ -998,6 +1000,8 @@ pub struct SqliteWireOutput {
|
||||
pub movie_profile: std::sync::Arc<dyn domain::ports::MovieProfileRepository>,
|
||||
pub watchlist: std::sync::Arc<dyn domain::ports::WatchlistRepository>,
|
||||
pub ap_content: std::sync::Arc<dyn domain::ports::LocalApContentQuery>,
|
||||
pub wrapup_repo: std::sync::Arc<dyn domain::ports::WrapUpRepository>,
|
||||
pub wrapup_stats: std::sync::Arc<dyn domain::ports::WrapUpStatsQuery>,
|
||||
}
|
||||
|
||||
pub async fn wire(database_url: &str) -> anyhow::Result<SqliteWireOutput> {
|
||||
@@ -1031,7 +1035,9 @@ pub async fn wire(database_url: &str) -> anyhow::Result<SqliteWireOutput> {
|
||||
import_profile: std::sync::Arc::new(SqliteImportProfileRepository::new(pool.clone())) as _,
|
||||
movie_profile: std::sync::Arc::new(SqliteMovieProfileRepository::new(pool.clone())) as _,
|
||||
watchlist: std::sync::Arc::new(SqliteWatchlistRepository::new(pool.clone())) as _,
|
||||
ap_content: std::sync::Arc::new(SqliteApContentQuery::new(pool)) as _,
|
||||
ap_content: std::sync::Arc::new(SqliteApContentQuery::new(pool.clone())) as _,
|
||||
wrapup_repo: std::sync::Arc::new(SqliteWrapUpRepository::new(pool.clone())) as _,
|
||||
wrapup_stats: std::sync::Arc::new(SqliteWrapUpStatsQuery::new(pool)) as _,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
465
crates/adapters/sqlite/src/wrapup.rs
Normal file
465
crates/adapters/sqlite/src/wrapup.rs
Normal file
@@ -0,0 +1,465 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use chrono::NaiveDate;
|
||||
use domain::{
|
||||
errors::DomainError,
|
||||
models::wrapup::{DateRange, WrapUpRecord, WrapUpScope, WrapUpStatus},
|
||||
ports::{WrapUpMovieRow, WrapUpRepository, WrapUpStatsQuery},
|
||||
value_objects::WrapUpId,
|
||||
};
|
||||
use sqlx::{Row, SqlitePool};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::models::{parse_datetime, parse_uuid};
|
||||
|
||||
fn map_err(e: sqlx::Error) -> DomainError {
|
||||
tracing::error!("Database error: {:?}", e);
|
||||
DomainError::InfrastructureError("Database operation failed".into())
|
||||
}
|
||||
|
||||
fn status_to_str(s: &WrapUpStatus) -> &'static str {
|
||||
match s {
|
||||
WrapUpStatus::Pending => "pending",
|
||||
WrapUpStatus::Generating => "generating",
|
||||
WrapUpStatus::Ready => "ready",
|
||||
WrapUpStatus::Failed => "failed",
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_status(s: &str) -> Result<WrapUpStatus, DomainError> {
|
||||
match s {
|
||||
"pending" => Ok(WrapUpStatus::Pending),
|
||||
"generating" => Ok(WrapUpStatus::Generating),
|
||||
"ready" => Ok(WrapUpStatus::Ready),
|
||||
"failed" => Ok(WrapUpStatus::Failed),
|
||||
other => Err(DomainError::InfrastructureError(format!(
|
||||
"Unknown wrap-up status: {other}"
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_date(s: &str) -> Result<NaiveDate, DomainError> {
|
||||
NaiveDate::parse_from_str(s, "%Y-%m-%d")
|
||||
.map_err(|e| DomainError::InfrastructureError(format!("Invalid date '{s}': {e}")))
|
||||
}
|
||||
|
||||
// ── WrapUpRepository ─────────────────────────────────────────────────────────
|
||||
|
||||
pub struct SqliteWrapUpRepository {
|
||||
pool: SqlitePool,
|
||||
}
|
||||
|
||||
impl SqliteWrapUpRepository {
|
||||
pub fn new(pool: SqlitePool) -> Self {
|
||||
Self { pool }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl WrapUpRepository for SqliteWrapUpRepository {
|
||||
async fn create(&self, record: &WrapUpRecord) -> Result<(), DomainError> {
|
||||
let id = record.id.value().to_string();
|
||||
let user_id = record.user_id.map(|u| u.to_string());
|
||||
let status = status_to_str(&record.status);
|
||||
let start = record.start_date.format("%Y-%m-%d").to_string();
|
||||
let end = record.end_date.format("%Y-%m-%d").to_string();
|
||||
let created = record.created_at.format("%Y-%m-%d %H:%M:%S").to_string();
|
||||
let completed = record
|
||||
.completed_at
|
||||
.map(|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string());
|
||||
|
||||
sqlx::query(
|
||||
"INSERT INTO wrap_up_records \
|
||||
(id, user_id, start_date, end_date, status, report_json, error_message, created_at, completed_at) \
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
)
|
||||
.bind(&id)
|
||||
.bind(&user_id)
|
||||
.bind(&start)
|
||||
.bind(&end)
|
||||
.bind(status)
|
||||
.bind(&record.report_json)
|
||||
.bind(&record.error_message)
|
||||
.bind(&created)
|
||||
.bind(&completed)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(map_err)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn update_status(
|
||||
&self,
|
||||
id: &WrapUpId,
|
||||
status: &WrapUpStatus,
|
||||
error: Option<&str>,
|
||||
) -> Result<(), DomainError> {
|
||||
let id_str = id.value().to_string();
|
||||
let status_str = status_to_str(status);
|
||||
|
||||
sqlx::query("UPDATE wrap_up_records SET status = ?, error_message = ? WHERE id = ?")
|
||||
.bind(status_str)
|
||||
.bind(error)
|
||||
.bind(&id_str)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(map_err)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn set_complete(&self, id: &WrapUpId, report_json: &str) -> Result<(), DomainError> {
|
||||
let id_str = id.value().to_string();
|
||||
|
||||
sqlx::query(
|
||||
"UPDATE wrap_up_records \
|
||||
SET status = 'ready', report_json = ?, completed_at = strftime('%Y-%m-%d %H:%M:%S', 'now') \
|
||||
WHERE id = ?",
|
||||
)
|
||||
.bind(report_json)
|
||||
.bind(&id_str)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(map_err)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_by_id(&self, id: &WrapUpId) -> Result<Option<WrapUpRecord>, DomainError> {
|
||||
let id_str = id.value().to_string();
|
||||
|
||||
let row = sqlx::query(
|
||||
"SELECT id, user_id, start_date, end_date, status, report_json, error_message, \
|
||||
created_at, completed_at \
|
||||
FROM wrap_up_records WHERE id = ?",
|
||||
)
|
||||
.bind(&id_str)
|
||||
.fetch_optional(&self.pool)
|
||||
.await
|
||||
.map_err(map_err)?;
|
||||
|
||||
row.as_ref().map(row_to_record).transpose()
|
||||
}
|
||||
|
||||
async fn list_for_user(&self, user_id: Uuid) -> Result<Vec<WrapUpRecord>, DomainError> {
|
||||
let uid = user_id.to_string();
|
||||
|
||||
let rows = sqlx::query(
|
||||
"SELECT id, user_id, start_date, end_date, status, report_json, error_message, \
|
||||
created_at, completed_at \
|
||||
FROM wrap_up_records WHERE user_id = ? ORDER BY created_at DESC",
|
||||
)
|
||||
.bind(&uid)
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(map_err)?;
|
||||
|
||||
rows.iter().map(row_to_record).collect()
|
||||
}
|
||||
|
||||
async fn list_global(&self) -> Result<Vec<WrapUpRecord>, DomainError> {
|
||||
let rows = sqlx::query(
|
||||
"SELECT id, user_id, start_date, end_date, status, report_json, error_message, \
|
||||
created_at, completed_at \
|
||||
FROM wrap_up_records WHERE user_id IS NULL ORDER BY created_at DESC",
|
||||
)
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(map_err)?;
|
||||
|
||||
rows.iter().map(row_to_record).collect()
|
||||
}
|
||||
|
||||
async fn find_existing(
|
||||
&self,
|
||||
user_id: Option<Uuid>,
|
||||
start: NaiveDate,
|
||||
end: NaiveDate,
|
||||
) -> Result<Option<WrapUpRecord>, DomainError> {
|
||||
let uid = user_id.map(|u| u.to_string());
|
||||
let start_str = start.format("%Y-%m-%d").to_string();
|
||||
let end_str = end.format("%Y-%m-%d").to_string();
|
||||
|
||||
let row = sqlx::query(
|
||||
"SELECT id, user_id, start_date, end_date, status, report_json, error_message, \
|
||||
created_at, completed_at \
|
||||
FROM wrap_up_records \
|
||||
WHERE ((? IS NULL AND user_id IS NULL) OR user_id = ?) \
|
||||
AND start_date = ? AND end_date = ? \
|
||||
LIMIT 1",
|
||||
)
|
||||
.bind(&uid)
|
||||
.bind(&uid)
|
||||
.bind(&start_str)
|
||||
.bind(&end_str)
|
||||
.fetch_optional(&self.pool)
|
||||
.await
|
||||
.map_err(map_err)?;
|
||||
|
||||
row.as_ref().map(row_to_record).transpose()
|
||||
}
|
||||
}
|
||||
|
||||
fn row_to_record(row: &sqlx::sqlite::SqliteRow) -> Result<WrapUpRecord, DomainError> {
|
||||
let id_str: String = row.try_get("id").map_err(map_err)?;
|
||||
let user_id_str: Option<String> = row.try_get("user_id").map_err(map_err)?;
|
||||
let start_date_str: String = row.try_get("start_date").map_err(map_err)?;
|
||||
let end_date_str: String = row.try_get("end_date").map_err(map_err)?;
|
||||
let status_str: String = row.try_get("status").map_err(map_err)?;
|
||||
let report_json: Option<String> = row.try_get("report_json").map_err(map_err)?;
|
||||
let error_message: Option<String> = row.try_get("error_message").map_err(map_err)?;
|
||||
let created_at_str: String = row.try_get("created_at").map_err(map_err)?;
|
||||
let completed_at_str: Option<String> = row.try_get("completed_at").map_err(map_err)?;
|
||||
|
||||
let user_id = user_id_str
|
||||
.as_deref()
|
||||
.map(parse_uuid)
|
||||
.transpose()?;
|
||||
|
||||
Ok(WrapUpRecord {
|
||||
id: WrapUpId::from_uuid(parse_uuid(&id_str)?),
|
||||
user_id,
|
||||
start_date: parse_date(&start_date_str)?,
|
||||
end_date: parse_date(&end_date_str)?,
|
||||
status: parse_status(&status_str)?,
|
||||
report_json,
|
||||
error_message,
|
||||
created_at: parse_datetime(&created_at_str)?,
|
||||
completed_at: completed_at_str.as_deref().map(parse_datetime).transpose()?,
|
||||
})
|
||||
}
|
||||
|
||||
// ── WrapUpStatsQuery ─────────────────────────────────────────────────────────
|
||||
|
||||
pub struct SqliteWrapUpStatsQuery {
|
||||
pool: SqlitePool,
|
||||
}
|
||||
|
||||
impl SqliteWrapUpStatsQuery {
|
||||
pub fn new(pool: SqlitePool) -> Self {
|
||||
Self { pool }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl WrapUpStatsQuery for SqliteWrapUpStatsQuery {
|
||||
async fn get_reviews_with_profiles(
|
||||
&self,
|
||||
scope: &WrapUpScope,
|
||||
range: &DateRange,
|
||||
) -> Result<Vec<WrapUpMovieRow>, DomainError> {
|
||||
let start_str = range.start.format("%Y-%m-%d").to_string();
|
||||
let end_str = range.end.format("%Y-%m-%d").to_string();
|
||||
|
||||
// 1) Main query
|
||||
let (scope_clause, scope_bind) = match scope {
|
||||
WrapUpScope::User(uid) => ("AND r.user_id = ?", Some(uid.to_string())),
|
||||
WrapUpScope::Global => ("", None),
|
||||
};
|
||||
|
||||
let sql = format!(
|
||||
"SELECT r.movie_id, m.title, m.release_year, m.director, m.poster_path, \
|
||||
r.rating, r.watched_at, r.user_id, \
|
||||
p.runtime_minutes, p.budget_usd, p.original_language \
|
||||
FROM reviews r \
|
||||
INNER JOIN movies m ON m.id = r.movie_id \
|
||||
LEFT JOIN movie_profiles p ON p.movie_id = m.id \
|
||||
WHERE r.watched_at >= ? AND r.watched_at < ? {scope_clause} \
|
||||
ORDER BY r.watched_at ASC"
|
||||
);
|
||||
|
||||
let mut q = sqlx::query(&sql)
|
||||
.bind(&start_str)
|
||||
.bind(&end_str);
|
||||
if let Some(ref uid) = scope_bind {
|
||||
q = q.bind(uid);
|
||||
}
|
||||
|
||||
let rows = q.fetch_all(&self.pool).await.map_err(map_err)?;
|
||||
|
||||
if rows.is_empty() {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
|
||||
// Collect unique movie IDs
|
||||
let mut movie_ids: Vec<String> = Vec::new();
|
||||
let mut seen = std::collections::HashSet::new();
|
||||
for row in &rows {
|
||||
let mid: String = row.try_get("movie_id").map_err(map_err)?;
|
||||
if seen.insert(mid.clone()) {
|
||||
movie_ids.push(mid);
|
||||
}
|
||||
}
|
||||
|
||||
// 2) Batch-fetch genres, keywords, cast
|
||||
let (genres_map, keywords_map, cast_map) = tokio::try_join!(
|
||||
fetch_genres_sqlite(&self.pool, &movie_ids),
|
||||
fetch_keywords_sqlite(&self.pool, &movie_ids),
|
||||
fetch_cast_sqlite(&self.pool, &movie_ids),
|
||||
)?;
|
||||
|
||||
// 3) Build result
|
||||
let mut result = Vec::with_capacity(rows.len());
|
||||
for row in &rows {
|
||||
let movie_id_str: String = row.try_get("movie_id").map_err(map_err)?;
|
||||
let title: String = row.try_get("title").map_err(map_err)?;
|
||||
let release_year: i64 = row.try_get("release_year").map_err(map_err)?;
|
||||
let director: Option<String> = row.try_get("director").map_err(map_err)?;
|
||||
let poster_path: Option<String> = row.try_get("poster_path").map_err(map_err)?;
|
||||
let rating: i64 = row.try_get("rating").map_err(map_err)?;
|
||||
let watched_at_str: String = row.try_get("watched_at").map_err(map_err)?;
|
||||
let user_id_str: String = row.try_get("user_id").map_err(map_err)?;
|
||||
let runtime_minutes: Option<i32> = row.try_get("runtime_minutes").map_err(map_err)?;
|
||||
let budget_usd: Option<i64> = row.try_get("budget_usd").map_err(map_err)?;
|
||||
let original_language: Option<String> =
|
||||
row.try_get("original_language").map_err(map_err)?;
|
||||
|
||||
let genres = genres_map
|
||||
.get(&movie_id_str)
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
let keywords = keywords_map
|
||||
.get(&movie_id_str)
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
let cast = cast_map
|
||||
.get(&movie_id_str)
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
|
||||
let cast_names: Vec<(String, u32)> = cast
|
||||
.iter()
|
||||
.map(|c| (c.name.clone(), c.billing_order))
|
||||
.collect();
|
||||
let cast_profile_paths: Vec<Option<String>> =
|
||||
cast.iter().map(|c| c.profile_path.clone()).collect();
|
||||
|
||||
result.push(WrapUpMovieRow {
|
||||
movie_id: parse_uuid(&movie_id_str)?,
|
||||
title,
|
||||
release_year: release_year as u16,
|
||||
director,
|
||||
poster_path,
|
||||
rating: rating as u8,
|
||||
watched_at: parse_datetime(&watched_at_str)?,
|
||||
user_id: parse_uuid(&user_id_str)?,
|
||||
runtime_minutes: runtime_minutes.map(|v| v as u32),
|
||||
budget_usd,
|
||||
original_language,
|
||||
genres,
|
||||
keywords,
|
||||
cast_names,
|
||||
cast_profile_paths,
|
||||
});
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct CastEntry {
|
||||
name: String,
|
||||
billing_order: u32,
|
||||
profile_path: Option<String>,
|
||||
}
|
||||
|
||||
fn in_placeholders(n: usize) -> String {
|
||||
let mut s = String::new();
|
||||
for i in 0..n {
|
||||
if i > 0 {
|
||||
s.push(',');
|
||||
}
|
||||
s.push('?');
|
||||
}
|
||||
s
|
||||
}
|
||||
|
||||
async fn fetch_genres_sqlite(
|
||||
pool: &SqlitePool,
|
||||
movie_ids: &[String],
|
||||
) -> Result<HashMap<String, Vec<String>>, DomainError> {
|
||||
if movie_ids.is_empty() {
|
||||
return Ok(HashMap::new());
|
||||
}
|
||||
let sql = format!(
|
||||
"SELECT movie_id, name FROM movie_genres WHERE movie_id IN ({}) ORDER BY name",
|
||||
in_placeholders(movie_ids.len())
|
||||
);
|
||||
let mut q = sqlx::query(&sql);
|
||||
for id in movie_ids {
|
||||
q = q.bind(id);
|
||||
}
|
||||
let rows = q.fetch_all(pool).await.map_err(map_err)?;
|
||||
|
||||
let mut map: HashMap<String, Vec<String>> = HashMap::new();
|
||||
for row in rows {
|
||||
let mid: String = row.try_get("movie_id").map_err(map_err)?;
|
||||
let name: String = row.try_get("name").map_err(map_err)?;
|
||||
map.entry(mid).or_default().push(name);
|
||||
}
|
||||
Ok(map)
|
||||
}
|
||||
|
||||
async fn fetch_keywords_sqlite(
|
||||
pool: &SqlitePool,
|
||||
movie_ids: &[String],
|
||||
) -> Result<HashMap<String, Vec<String>>, DomainError> {
|
||||
if movie_ids.is_empty() {
|
||||
return Ok(HashMap::new());
|
||||
}
|
||||
let sql = format!(
|
||||
"SELECT movie_id, name FROM movie_keywords WHERE movie_id IN ({}) ORDER BY name",
|
||||
in_placeholders(movie_ids.len())
|
||||
);
|
||||
let mut q = sqlx::query(&sql);
|
||||
for id in movie_ids {
|
||||
q = q.bind(id);
|
||||
}
|
||||
let rows = q.fetch_all(pool).await.map_err(map_err)?;
|
||||
|
||||
let mut map: HashMap<String, Vec<String>> = HashMap::new();
|
||||
for row in rows {
|
||||
let mid: String = row.try_get("movie_id").map_err(map_err)?;
|
||||
let name: String = row.try_get("name").map_err(map_err)?;
|
||||
map.entry(mid).or_default().push(name);
|
||||
}
|
||||
Ok(map)
|
||||
}
|
||||
|
||||
async fn fetch_cast_sqlite(
|
||||
pool: &SqlitePool,
|
||||
movie_ids: &[String],
|
||||
) -> Result<HashMap<String, Vec<CastEntry>>, DomainError> {
|
||||
if movie_ids.is_empty() {
|
||||
return Ok(HashMap::new());
|
||||
}
|
||||
let sql = format!(
|
||||
"SELECT movie_id, name, billing_order, profile_path \
|
||||
FROM movie_cast \
|
||||
WHERE movie_id IN ({}) AND billing_order <= 3 \
|
||||
ORDER BY billing_order ASC",
|
||||
in_placeholders(movie_ids.len())
|
||||
);
|
||||
let mut q = sqlx::query(&sql);
|
||||
for id in movie_ids {
|
||||
q = q.bind(id);
|
||||
}
|
||||
let rows = q.fetch_all(pool).await.map_err(map_err)?;
|
||||
|
||||
let mut map: HashMap<String, Vec<CastEntry>> = HashMap::new();
|
||||
for row in rows {
|
||||
let mid: String = row.try_get("movie_id").map_err(map_err)?;
|
||||
let name: String = row.try_get("name").map_err(map_err)?;
|
||||
let billing_order: i32 = row.try_get("billing_order").map_err(map_err)?;
|
||||
let profile_path: Option<String> = row.try_get("profile_path").map_err(map_err)?;
|
||||
map.entry(mid).or_default().push(CastEntry {
|
||||
name,
|
||||
billing_order: billing_order as u32,
|
||||
profile_path,
|
||||
});
|
||||
}
|
||||
Ok(map)
|
||||
}
|
||||
@@ -8,6 +8,7 @@ pub mod social;
|
||||
pub mod users;
|
||||
pub mod watchlist;
|
||||
pub mod webhook;
|
||||
pub mod wrapup;
|
||||
|
||||
pub use auth::*;
|
||||
pub use common::*;
|
||||
|
||||
30
crates/api-types/src/wrapup.rs
Normal file
30
crates/api-types/src/wrapup.rs
Normal file
@@ -0,0 +1,30 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Deserialize, utoipa::ToSchema)]
|
||||
pub struct GenerateWrapUpRequest {
|
||||
pub start_date: String,
|
||||
pub end_date: String,
|
||||
pub global: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, utoipa::ToSchema)]
|
||||
pub struct WrapUpGeneratedResponse {
|
||||
pub id: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, utoipa::ToSchema)]
|
||||
pub struct WrapUpStatusResponse {
|
||||
pub id: String,
|
||||
pub user_id: Option<String>,
|
||||
pub status: String,
|
||||
pub start_date: String,
|
||||
pub end_date: String,
|
||||
pub created_at: String,
|
||||
pub completed_at: Option<String>,
|
||||
pub error_message: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, utoipa::ToSchema)]
|
||||
pub struct WrapUpListResponse {
|
||||
pub items: Vec<WrapUpStatusResponse>,
|
||||
}
|
||||
@@ -14,6 +14,7 @@ tokio = { workspace = true }
|
||||
sha2 = { workspace = true }
|
||||
rand = { workspace = true }
|
||||
hex = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
|
||||
[features]
|
||||
xlsx = []
|
||||
|
||||
@@ -6,7 +6,7 @@ use domain::ports::{
|
||||
MovieRepository, PasswordHasher, PersonCommand, PersonQuery, PosterFetcherClient,
|
||||
RemoteWatchlistRepository, ReviewRepository, SearchCommand, SearchPort, SocialQueryPort,
|
||||
StatsRepository, UserProfileFieldsRepository, UserRepository, WatchEventRepository,
|
||||
WatchlistRepository, WrapUpStatsQuery, WebhookTokenRepository,
|
||||
WatchlistRepository, WrapUpRepository, WrapUpStatsQuery, WebhookTokenRepository,
|
||||
};
|
||||
|
||||
use crate::config::AppConfig;
|
||||
@@ -32,6 +32,7 @@ pub struct Repositories {
|
||||
pub remote_watchlist: Arc<dyn RemoteWatchlistRepository>,
|
||||
pub social_query: Arc<dyn SocialQueryPort>,
|
||||
pub wrapup_stats: Arc<dyn WrapUpStatsQuery>,
|
||||
pub wrapup_repo: Arc<dyn WrapUpRepository>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use chrono::Datelike;
|
||||
use domain::{errors::DomainError, events::DomainEvent, ports::PeriodicJob};
|
||||
|
||||
use crate::context::AppContext;
|
||||
@@ -85,3 +86,78 @@ impl PeriodicJob for EnrichmentStalenessJob {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct WrapUpAutoGenerateJob {
|
||||
ctx: AppContext,
|
||||
}
|
||||
|
||||
impl WrapUpAutoGenerateJob {
|
||||
pub fn new(ctx: AppContext) -> Self {
|
||||
Self { ctx }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl PeriodicJob for WrapUpAutoGenerateJob {
|
||||
fn interval(&self) -> Duration {
|
||||
Duration::from_secs(86400)
|
||||
}
|
||||
|
||||
async fn run(&self) -> Result<(), DomainError> {
|
||||
let now = chrono::Utc::now().naive_utc();
|
||||
// Only run in January
|
||||
if now.month() != 1 {
|
||||
return Ok(());
|
||||
}
|
||||
let year = now.year() - 1;
|
||||
let start = chrono::NaiveDate::from_ymd_opt(year, 1, 1)
|
||||
.ok_or_else(|| DomainError::ValidationError("invalid date".into()))?;
|
||||
let end = chrono::NaiveDate::from_ymd_opt(year + 1, 1, 1)
|
||||
.ok_or_else(|| DomainError::ValidationError("invalid date".into()))?;
|
||||
|
||||
let users = self.ctx.repos.user.list_with_stats().await?;
|
||||
for user in &users {
|
||||
if user.total_movies > 0 {
|
||||
let existing = self
|
||||
.ctx
|
||||
.repos
|
||||
.wrapup_repo
|
||||
.find_existing(Some(user.user_id.value()), start, end)
|
||||
.await?;
|
||||
if existing.is_none() {
|
||||
let cmd = crate::wrapup::commands::RequestWrapUpCommand {
|
||||
user_id: Some(user.user_id.value()),
|
||||
start_date: start,
|
||||
end_date: end,
|
||||
};
|
||||
if let Err(e) = crate::wrapup::generate::execute(&self.ctx, cmd).await {
|
||||
tracing::warn!(
|
||||
"auto-generate wrapup for user {} failed: {e}",
|
||||
user.user_id.value()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Global wrap-up
|
||||
let existing = self
|
||||
.ctx
|
||||
.repos
|
||||
.wrapup_repo
|
||||
.find_existing(None, start, end)
|
||||
.await?;
|
||||
if existing.is_none() {
|
||||
let cmd = crate::wrapup::commands::RequestWrapUpCommand {
|
||||
user_id: None,
|
||||
start_date: start,
|
||||
end_date: end,
|
||||
};
|
||||
if let Err(e) = crate::wrapup::generate::execute(&self.ctx, cmd).await {
|
||||
tracing::warn!("auto-generate global wrapup failed: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use domain::testing::{InMemoryWrapUpStatsQuery, NoopRemoteWatchlistRepository, NoopSocialQueryPort};
|
||||
use domain::testing::{InMemoryWrapUpRepository, InMemoryWrapUpStatsQuery, NoopRemoteWatchlistRepository, NoopSocialQueryPort};
|
||||
use domain::{
|
||||
ports::{
|
||||
AuthService, DiaryExporter, DiaryRepository, DocumentParser, EventPublisher, ImageStorage,
|
||||
@@ -8,7 +8,7 @@ use domain::{
|
||||
MovieRepository, PasswordHasher, PersonCommand, PersonQuery, PosterFetcherClient,
|
||||
ReviewRepository, SearchCommand, SearchPort, StatsRepository, UserProfileFieldsRepository,
|
||||
UserRepository, WatchEventRepository, WatchlistRepository, WebhookTokenRepository,
|
||||
WrapUpStatsQuery,
|
||||
WrapUpRepository, WrapUpStatsQuery,
|
||||
},
|
||||
testing::{
|
||||
FakeAuthService, FakeMetadataClient, FakePasswordHasher, InMemoryMovieRepository,
|
||||
@@ -52,6 +52,7 @@ pub struct TestContextBuilder {
|
||||
pub search_port: Arc<dyn SearchPort>,
|
||||
pub search_command: Arc<dyn SearchCommand>,
|
||||
pub wrapup_stats: Arc<dyn WrapUpStatsQuery>,
|
||||
pub wrapup_repo: Arc<dyn WrapUpRepository>,
|
||||
pub config: AppConfig,
|
||||
}
|
||||
|
||||
@@ -83,6 +84,7 @@ impl TestContextBuilder {
|
||||
search_port: Arc::new(PanicSearchPort),
|
||||
search_command: Arc::new(PanicSearchCommand),
|
||||
wrapup_stats: InMemoryWrapUpStatsQuery::new(),
|
||||
wrapup_repo: InMemoryWrapUpRepository::new(),
|
||||
config: AppConfig {
|
||||
allow_registration: true,
|
||||
base_url: "http://localhost:3000".into(),
|
||||
@@ -153,6 +155,7 @@ impl TestContextBuilder {
|
||||
remote_watchlist: Arc::new(NoopRemoteWatchlistRepository),
|
||||
social_query: Arc::new(NoopSocialQueryPort),
|
||||
wrapup_stats: self.wrapup_stats,
|
||||
wrapup_repo: self.wrapup_repo,
|
||||
},
|
||||
services: Services {
|
||||
auth: self.auth_service,
|
||||
|
||||
@@ -59,6 +59,8 @@ impl EventHandler for RecordingHandler {
|
||||
DomainEvent::BackfillFollower { .. } => "backfill_follower",
|
||||
DomainEvent::FederationDeliveryRequested { .. } => "federation_delivery",
|
||||
DomainEvent::WatchEventIngested { .. } => "watch_event_ingested",
|
||||
DomainEvent::WrapUpRequested { .. } => "wrapup_requested",
|
||||
DomainEvent::WrapUpCompleted { .. } => "wrapup_completed",
|
||||
};
|
||||
self.calls.lock().unwrap().push(label);
|
||||
Ok(())
|
||||
|
||||
8
crates/application/src/wrapup/commands.rs
Normal file
8
crates/application/src/wrapup/commands.rs
Normal file
@@ -0,0 +1,8 @@
|
||||
use chrono::NaiveDate;
|
||||
use uuid::Uuid;
|
||||
|
||||
pub struct RequestWrapUpCommand {
|
||||
pub user_id: Option<Uuid>,
|
||||
pub start_date: NaiveDate,
|
||||
pub end_date: NaiveDate,
|
||||
}
|
||||
40
crates/application/src/wrapup/event_handler.rs
Normal file
40
crates/application/src/wrapup/event_handler.rs
Normal file
@@ -0,0 +1,40 @@
|
||||
use async_trait::async_trait;
|
||||
use domain::errors::DomainError;
|
||||
use domain::events::DomainEvent;
|
||||
use domain::ports::EventHandler;
|
||||
|
||||
use crate::context::AppContext;
|
||||
|
||||
pub struct WrapUpEventHandler {
|
||||
ctx: AppContext,
|
||||
}
|
||||
|
||||
impl WrapUpEventHandler {
|
||||
pub fn new(ctx: AppContext) -> Self {
|
||||
Self { ctx }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl EventHandler for WrapUpEventHandler {
|
||||
async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> {
|
||||
match event {
|
||||
DomainEvent::WrapUpRequested {
|
||||
wrapup_id,
|
||||
user_id,
|
||||
start_date,
|
||||
end_date,
|
||||
} => {
|
||||
super::handle_requested::execute(
|
||||
&self.ctx,
|
||||
wrapup_id.clone(),
|
||||
user_id.as_ref().map(|u| u.value()),
|
||||
*start_date,
|
||||
*end_date,
|
||||
)
|
||||
.await
|
||||
}
|
||||
_ => Ok(()),
|
||||
}
|
||||
}
|
||||
}
|
||||
47
crates/application/src/wrapup/generate.rs
Normal file
47
crates/application/src/wrapup/generate.rs
Normal file
@@ -0,0 +1,47 @@
|
||||
use domain::errors::DomainError;
|
||||
use domain::events::DomainEvent;
|
||||
use domain::models::wrapup::WrapUpStatus;
|
||||
use domain::value_objects::{UserId, WrapUpId};
|
||||
|
||||
use crate::context::AppContext;
|
||||
use crate::wrapup::commands::RequestWrapUpCommand;
|
||||
|
||||
pub async fn execute(ctx: &AppContext, cmd: RequestWrapUpCommand) -> Result<WrapUpId, DomainError> {
|
||||
let existing = ctx
|
||||
.repos
|
||||
.wrapup_repo
|
||||
.find_existing(cmd.user_id, cmd.start_date, cmd.end_date)
|
||||
.await?;
|
||||
|
||||
if let Some(ref rec) = existing
|
||||
&& (rec.status == WrapUpStatus::Ready || rec.status == WrapUpStatus::Generating)
|
||||
{
|
||||
return Ok(rec.id.clone());
|
||||
}
|
||||
|
||||
let id = WrapUpId::generate();
|
||||
let record = domain::models::wrapup::WrapUpRecord {
|
||||
id: id.clone(),
|
||||
user_id: cmd.user_id,
|
||||
start_date: cmd.start_date,
|
||||
end_date: cmd.end_date,
|
||||
status: WrapUpStatus::Pending,
|
||||
report_json: None,
|
||||
error_message: None,
|
||||
created_at: chrono::Utc::now().naive_utc(),
|
||||
completed_at: None,
|
||||
};
|
||||
ctx.repos.wrapup_repo.create(&record).await?;
|
||||
|
||||
ctx.services
|
||||
.event_publisher
|
||||
.publish(&DomainEvent::WrapUpRequested {
|
||||
wrapup_id: id.clone(),
|
||||
user_id: cmd.user_id.map(UserId::from_uuid),
|
||||
start_date: cmd.start_date,
|
||||
end_date: cmd.end_date,
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(id)
|
||||
}
|
||||
9
crates/application/src/wrapup/get_wrapup.rs
Normal file
9
crates/application/src/wrapup/get_wrapup.rs
Normal file
@@ -0,0 +1,9 @@
|
||||
use domain::errors::DomainError;
|
||||
use domain::models::wrapup::WrapUpRecord;
|
||||
use domain::value_objects::WrapUpId;
|
||||
|
||||
use crate::context::AppContext;
|
||||
|
||||
pub async fn execute(ctx: &AppContext, id: WrapUpId) -> Result<Option<WrapUpRecord>, DomainError> {
|
||||
ctx.repos.wrapup_repo.get_by_id(&id).await
|
||||
}
|
||||
51
crates/application/src/wrapup/handle_requested.rs
Normal file
51
crates/application/src/wrapup/handle_requested.rs
Normal file
@@ -0,0 +1,51 @@
|
||||
use crate::context::AppContext;
|
||||
use crate::wrapup::{compute, queries::ComputeWrapUpQuery};
|
||||
use domain::errors::DomainError;
|
||||
use domain::events::DomainEvent;
|
||||
use domain::models::wrapup::{DateRange, WrapUpScope, WrapUpStatus};
|
||||
use domain::value_objects::WrapUpId;
|
||||
|
||||
pub async fn execute(
|
||||
ctx: &AppContext,
|
||||
wrapup_id: WrapUpId,
|
||||
user_id: Option<uuid::Uuid>,
|
||||
start_date: chrono::NaiveDate,
|
||||
end_date: chrono::NaiveDate,
|
||||
) -> Result<(), DomainError> {
|
||||
ctx.repos
|
||||
.wrapup_repo
|
||||
.update_status(&wrapup_id, &WrapUpStatus::Generating, None)
|
||||
.await?;
|
||||
|
||||
let scope = match user_id {
|
||||
Some(uid) => WrapUpScope::User(uid),
|
||||
None => WrapUpScope::Global,
|
||||
};
|
||||
let query = ComputeWrapUpQuery {
|
||||
scope,
|
||||
date_range: DateRange {
|
||||
start: start_date,
|
||||
end: end_date,
|
||||
},
|
||||
};
|
||||
|
||||
match compute::execute(ctx, query).await {
|
||||
Ok(report) => {
|
||||
let json = serde_json::to_string(&report)
|
||||
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
|
||||
ctx.repos.wrapup_repo.set_complete(&wrapup_id, &json).await?;
|
||||
ctx.services
|
||||
.event_publisher
|
||||
.publish(&DomainEvent::WrapUpCompleted { wrapup_id })
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
ctx.repos
|
||||
.wrapup_repo
|
||||
.update_status(&wrapup_id, &WrapUpStatus::Failed, Some(&e.to_string()))
|
||||
.await?;
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
20
crates/application/src/wrapup/list_wrapups.rs
Normal file
20
crates/application/src/wrapup/list_wrapups.rs
Normal file
@@ -0,0 +1,20 @@
|
||||
use uuid::Uuid;
|
||||
|
||||
use domain::errors::DomainError;
|
||||
use domain::models::wrapup::WrapUpRecord;
|
||||
|
||||
use crate::context::AppContext;
|
||||
|
||||
pub struct ListWrapUpsQuery {
|
||||
pub user_id: Option<Uuid>,
|
||||
}
|
||||
|
||||
pub async fn execute(
|
||||
ctx: &AppContext,
|
||||
query: ListWrapUpsQuery,
|
||||
) -> Result<Vec<WrapUpRecord>, DomainError> {
|
||||
match query.user_id {
|
||||
Some(uid) => ctx.repos.wrapup_repo.list_for_user(uid).await,
|
||||
None => ctx.repos.wrapup_repo.list_global().await,
|
||||
}
|
||||
}
|
||||
@@ -1,2 +1,8 @@
|
||||
pub mod commands;
|
||||
pub mod compute;
|
||||
pub mod event_handler;
|
||||
pub mod generate;
|
||||
pub mod get_wrapup;
|
||||
pub mod handle_requested;
|
||||
pub mod list_wrapups;
|
||||
pub mod queries;
|
||||
|
||||
@@ -9,6 +9,7 @@ chrono = { workspace = true }
|
||||
async-trait = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
|
||||
email_address = "0.2.9"
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@ use chrono::NaiveDateTime;
|
||||
|
||||
use crate::{
|
||||
errors::DomainError,
|
||||
value_objects::{ExternalMetadataId, MovieId, PosterPath, Rating, ReviewId, UserId},
|
||||
value_objects::{ExternalMetadataId, MovieId, PosterPath, Rating, ReviewId, UserId, WrapUpId},
|
||||
};
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
@@ -75,6 +75,15 @@ pub enum DomainEvent {
|
||||
title: String,
|
||||
source: String,
|
||||
},
|
||||
WrapUpRequested {
|
||||
wrapup_id: WrapUpId,
|
||||
user_id: Option<UserId>,
|
||||
start_date: chrono::NaiveDate,
|
||||
end_date: chrono::NaiveDate,
|
||||
},
|
||||
WrapUpCompleted {
|
||||
wrapup_id: WrapUpId,
|
||||
},
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
||||
@@ -1,13 +1,16 @@
|
||||
use chrono::NaiveDate;
|
||||
use chrono::{NaiveDate, NaiveDateTime};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
use crate::value_objects::WrapUpId;
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct DateRange {
|
||||
pub start: NaiveDate,
|
||||
pub end: NaiveDate,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct MovieRef {
|
||||
pub title: String,
|
||||
pub year: u16,
|
||||
@@ -15,52 +18,52 @@ pub struct MovieRef {
|
||||
pub poster_path: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct UserRef {
|
||||
pub user_id: Uuid,
|
||||
pub display_name: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct PersonStat {
|
||||
pub name: String,
|
||||
pub count: u32,
|
||||
pub avg_rating: f64,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct GenreStat {
|
||||
pub genre: String,
|
||||
pub count: u32,
|
||||
pub avg_rating: f64,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct KeywordStat {
|
||||
pub keyword: String,
|
||||
pub count: u32,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct LangStat {
|
||||
pub language: String,
|
||||
pub count: u32,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct MonthCount {
|
||||
pub year_month: String,
|
||||
pub label: String,
|
||||
pub count: u32,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub enum WrapUpScope {
|
||||
User(Uuid),
|
||||
Global,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct WrapUpReport {
|
||||
pub scope: WrapUpScope,
|
||||
pub date_range: DateRange,
|
||||
@@ -117,3 +120,24 @@ pub struct WrapUpReport {
|
||||
pub poster_paths: Vec<String>,
|
||||
pub top_cast_profile_paths: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub enum WrapUpStatus {
|
||||
Pending,
|
||||
Generating,
|
||||
Ready,
|
||||
Failed,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct WrapUpRecord {
|
||||
pub id: WrapUpId,
|
||||
pub user_id: Option<Uuid>,
|
||||
pub start_date: NaiveDate,
|
||||
pub end_date: NaiveDate,
|
||||
pub status: WrapUpStatus,
|
||||
pub report_json: Option<String>,
|
||||
pub error_message: Option<String>,
|
||||
pub created_at: NaiveDateTime,
|
||||
pub completed_at: Option<NaiveDateTime>,
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use async_trait::async_trait;
|
||||
use chrono::{DateTime, NaiveDateTime, Utc};
|
||||
use chrono::{DateTime, NaiveDate, NaiveDateTime, Utc};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{
|
||||
@@ -13,12 +13,12 @@ use crate::{
|
||||
ReviewHistory, SearchQuery, SearchResults, User, UserStats, UserSummary, UserTrends,
|
||||
WatchEvent, WatchEventStatus, WatchlistEntry, WatchlistWithMovie, WebhookToken,
|
||||
collections::{self, PageParams, Paginated},
|
||||
wrapup::{DateRange, WrapUpScope},
|
||||
wrapup::{DateRange, WrapUpRecord, WrapUpScope, WrapUpStatus},
|
||||
},
|
||||
value_objects::{
|
||||
Email, ExternalMetadataId, ImportProfileId, ImportSessionId, MovieId, MovieTitle,
|
||||
PasswordHash, PosterUrl, ReleaseYear, ReviewId, UserId, Username, WatchEventId,
|
||||
WebhookTokenId,
|
||||
WebhookTokenId, WrapUpId,
|
||||
},
|
||||
};
|
||||
|
||||
@@ -471,6 +471,27 @@ pub trait WebhookTokenRepository: Send + Sync {
|
||||
async fn touch_last_used(&self, id: &WebhookTokenId) -> Result<(), DomainError>;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait WrapUpRepository: Send + Sync {
|
||||
async fn create(&self, record: &WrapUpRecord) -> Result<(), DomainError>;
|
||||
async fn update_status(
|
||||
&self,
|
||||
id: &WrapUpId,
|
||||
status: &WrapUpStatus,
|
||||
error: Option<&str>,
|
||||
) -> Result<(), DomainError>;
|
||||
async fn set_complete(&self, id: &WrapUpId, report_json: &str) -> Result<(), DomainError>;
|
||||
async fn get_by_id(&self, id: &WrapUpId) -> Result<Option<WrapUpRecord>, DomainError>;
|
||||
async fn list_for_user(&self, user_id: Uuid) -> Result<Vec<WrapUpRecord>, DomainError>;
|
||||
async fn list_global(&self) -> Result<Vec<WrapUpRecord>, DomainError>;
|
||||
async fn find_existing(
|
||||
&self,
|
||||
user_id: Option<Uuid>,
|
||||
start: NaiveDate,
|
||||
end: NaiveDate,
|
||||
) -> Result<Option<WrapUpRecord>, DomainError>;
|
||||
}
|
||||
|
||||
// ── Wrap-up / Year-in-Review ─────────────────────────────────────────────────
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
|
||||
@@ -24,11 +24,11 @@ use crate::{
|
||||
ImportSessionRepository, MetadataClient, MetadataSearchCriteria, MovieProfileRepository,
|
||||
MovieRepository, PasswordHasher, PersonCommand, PersonQuery, PosterFetcherClient,
|
||||
ReviewRepository, SearchCommand, SearchPort, StatsRepository, UserProfileFieldsRepository,
|
||||
UserRepository, WatchlistRepository,
|
||||
UserRepository, WatchlistRepository, WrapUpRepository,
|
||||
},
|
||||
value_objects::{
|
||||
Email, ExternalMetadataId, ImportProfileId, ImportSessionId, MovieId, MovieTitle,
|
||||
PasswordHash, PosterUrl, ReleaseYear, ReviewId, UserId, Username,
|
||||
PasswordHash, PosterUrl, ReleaseYear, ReviewId, UserId, Username, WrapUpId,
|
||||
},
|
||||
};
|
||||
|
||||
@@ -1050,3 +1050,143 @@ impl crate::ports::WrapUpStatsQuery for InMemoryWrapUpStatsQuery {
|
||||
Ok(filtered)
|
||||
}
|
||||
}
|
||||
|
||||
// ── InMemoryWrapUpRepository ────────────────────────────────────────────────
|
||||
|
||||
pub struct InMemoryWrapUpRepository {
|
||||
pub store: Mutex<Vec<crate::models::wrapup::WrapUpRecord>>,
|
||||
}
|
||||
|
||||
impl InMemoryWrapUpRepository {
|
||||
pub fn new() -> Arc<Self> {
|
||||
Arc::new(Self {
|
||||
store: Mutex::new(Vec::new()),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl WrapUpRepository for InMemoryWrapUpRepository {
|
||||
async fn create(
|
||||
&self,
|
||||
record: &crate::models::wrapup::WrapUpRecord,
|
||||
) -> Result<(), DomainError> {
|
||||
self.store.lock().unwrap().push(record.clone());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn update_status(
|
||||
&self,
|
||||
id: &WrapUpId,
|
||||
status: &crate::models::wrapup::WrapUpStatus,
|
||||
error: Option<&str>,
|
||||
) -> Result<(), DomainError> {
|
||||
let mut store = self.store.lock().unwrap();
|
||||
if let Some(rec) = store.iter_mut().find(|r| r.id == *id) {
|
||||
rec.status = status.clone();
|
||||
rec.error_message = error.map(|s| s.to_string());
|
||||
Ok(())
|
||||
} else {
|
||||
Err(DomainError::NotFound("wrapup record".into()))
|
||||
}
|
||||
}
|
||||
|
||||
async fn set_complete(&self, id: &WrapUpId, report_json: &str) -> Result<(), DomainError> {
|
||||
let mut store = self.store.lock().unwrap();
|
||||
if let Some(rec) = store.iter_mut().find(|r| r.id == *id) {
|
||||
rec.status = crate::models::wrapup::WrapUpStatus::Ready;
|
||||
rec.report_json = Some(report_json.to_string());
|
||||
rec.completed_at = Some(chrono::Utc::now().naive_utc());
|
||||
Ok(())
|
||||
} else {
|
||||
Err(DomainError::NotFound("wrapup record".into()))
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_by_id(
|
||||
&self,
|
||||
id: &WrapUpId,
|
||||
) -> Result<Option<crate::models::wrapup::WrapUpRecord>, DomainError> {
|
||||
let store = self.store.lock().unwrap();
|
||||
Ok(store.iter().find(|r| r.id == *id).cloned())
|
||||
}
|
||||
|
||||
async fn list_for_user(
|
||||
&self,
|
||||
user_id: Uuid,
|
||||
) -> Result<Vec<crate::models::wrapup::WrapUpRecord>, DomainError> {
|
||||
let store = self.store.lock().unwrap();
|
||||
Ok(store
|
||||
.iter()
|
||||
.filter(|r| r.user_id == Some(user_id))
|
||||
.cloned()
|
||||
.collect())
|
||||
}
|
||||
|
||||
async fn list_global(
|
||||
&self,
|
||||
) -> Result<Vec<crate::models::wrapup::WrapUpRecord>, DomainError> {
|
||||
let store = self.store.lock().unwrap();
|
||||
Ok(store.iter().filter(|r| r.user_id.is_none()).cloned().collect())
|
||||
}
|
||||
|
||||
async fn find_existing(
|
||||
&self,
|
||||
user_id: Option<Uuid>,
|
||||
start: chrono::NaiveDate,
|
||||
end: chrono::NaiveDate,
|
||||
) -> Result<Option<crate::models::wrapup::WrapUpRecord>, DomainError> {
|
||||
let store = self.store.lock().unwrap();
|
||||
Ok(store
|
||||
.iter()
|
||||
.find(|r| r.user_id == user_id && r.start_date == start && r.end_date == end)
|
||||
.cloned())
|
||||
}
|
||||
}
|
||||
|
||||
// ── PanicWrapUpRepository ──────────────────────────────────────────────────
|
||||
|
||||
pub struct PanicWrapUpRepository;
|
||||
|
||||
#[async_trait]
|
||||
impl WrapUpRepository for PanicWrapUpRepository {
|
||||
async fn create(&self, _: &crate::models::wrapup::WrapUpRecord) -> Result<(), DomainError> {
|
||||
panic!("PanicWrapUpRepository called")
|
||||
}
|
||||
async fn update_status(
|
||||
&self,
|
||||
_: &WrapUpId,
|
||||
_: &crate::models::wrapup::WrapUpStatus,
|
||||
_: Option<&str>,
|
||||
) -> Result<(), DomainError> {
|
||||
panic!("PanicWrapUpRepository called")
|
||||
}
|
||||
async fn set_complete(&self, _: &WrapUpId, _: &str) -> Result<(), DomainError> {
|
||||
panic!("PanicWrapUpRepository called")
|
||||
}
|
||||
async fn get_by_id(
|
||||
&self,
|
||||
_: &WrapUpId,
|
||||
) -> Result<Option<crate::models::wrapup::WrapUpRecord>, DomainError> {
|
||||
panic!("PanicWrapUpRepository called")
|
||||
}
|
||||
async fn list_for_user(
|
||||
&self,
|
||||
_: Uuid,
|
||||
) -> Result<Vec<crate::models::wrapup::WrapUpRecord>, DomainError> {
|
||||
panic!("PanicWrapUpRepository called")
|
||||
}
|
||||
async fn list_global(
|
||||
&self,
|
||||
) -> Result<Vec<crate::models::wrapup::WrapUpRecord>, DomainError> {
|
||||
panic!("PanicWrapUpRepository called")
|
||||
}
|
||||
async fn find_existing(
|
||||
&self,
|
||||
_: Option<Uuid>,
|
||||
_: chrono::NaiveDate,
|
||||
_: chrono::NaiveDate,
|
||||
) -> Result<Option<crate::models::wrapup::WrapUpRecord>, DomainError> {
|
||||
panic!("PanicWrapUpRepository called")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@ use uuid::Uuid;
|
||||
|
||||
macro_rules! uuid_id {
|
||||
($name:ident) => {
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
pub struct $name(Uuid);
|
||||
|
||||
impl $name {
|
||||
@@ -28,6 +28,7 @@ uuid_id!(ImportProfileId);
|
||||
uuid_id!(WatchlistEntryId);
|
||||
uuid_id!(WatchEventId);
|
||||
uuid_id!(WebhookTokenId);
|
||||
uuid_id!(WrapUpId);
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
pub struct ExternalMetadataId(String);
|
||||
|
||||
@@ -31,6 +31,8 @@ pub struct DatabaseOutput {
|
||||
pub search_command: Arc<dyn domain::ports::SearchCommand>,
|
||||
pub profile_fields: Arc<dyn UserProfileFieldsRepository>,
|
||||
pub ap_content: Arc<dyn LocalApContentQuery>,
|
||||
pub wrapup_stats: Arc<dyn domain::ports::WrapUpStatsQuery>,
|
||||
pub wrapup_repo: Arc<dyn domain::ports::WrapUpRepository>,
|
||||
pub db_pool: DbPool,
|
||||
}
|
||||
|
||||
@@ -67,6 +69,8 @@ pub async fn build_database_adapters(backend: &str, url: &str) -> anyhow::Result
|
||||
search_command: sc,
|
||||
profile_fields: pf,
|
||||
ap_content: w.ap_content,
|
||||
wrapup_stats: w.wrapup_stats,
|
||||
wrapup_repo: w.wrapup_repo,
|
||||
db_pool: DbPool::Postgres(w.pool),
|
||||
})
|
||||
}
|
||||
@@ -100,6 +104,8 @@ pub async fn build_database_adapters(backend: &str, url: &str) -> anyhow::Result
|
||||
search_command: sc,
|
||||
profile_fields: pf,
|
||||
ap_content: w.ap_content,
|
||||
wrapup_stats: w.wrapup_stats,
|
||||
wrapup_repo: w.wrapup_repo,
|
||||
db_pool: DbPool::Sqlite(w.pool),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ pub mod images;
|
||||
pub mod import;
|
||||
pub mod rss;
|
||||
pub mod webhook;
|
||||
pub mod wrapup;
|
||||
|
||||
const DEFAULT_PAGE_LIMIT: u32 = 5;
|
||||
const RSS_FEED_LIMIT: u32 = 50;
|
||||
|
||||
147
crates/presentation/src/handlers/wrapup.rs
Normal file
147
crates/presentation/src/handlers/wrapup.rs
Normal file
@@ -0,0 +1,147 @@
|
||||
use axum::{
|
||||
Json,
|
||||
extract::{Path, State},
|
||||
http::StatusCode,
|
||||
response::IntoResponse,
|
||||
};
|
||||
use chrono::NaiveDate;
|
||||
use uuid::Uuid;
|
||||
|
||||
use application::wrapup::{
|
||||
commands::RequestWrapUpCommand,
|
||||
generate, get_wrapup,
|
||||
list_wrapups::{self, ListWrapUpsQuery},
|
||||
};
|
||||
use domain::errors::DomainError;
|
||||
use domain::models::wrapup::{WrapUpRecord, WrapUpStatus};
|
||||
use domain::value_objects::WrapUpId;
|
||||
|
||||
use crate::{errors::ApiError, extractors::AuthenticatedUser, state::AppState};
|
||||
use api_types::wrapup::{
|
||||
GenerateWrapUpRequest, WrapUpGeneratedResponse, WrapUpListResponse, WrapUpStatusResponse,
|
||||
};
|
||||
|
||||
fn record_to_dto(r: &WrapUpRecord) -> WrapUpStatusResponse {
|
||||
WrapUpStatusResponse {
|
||||
id: r.id.value().to_string(),
|
||||
user_id: r.user_id.map(|u| u.to_string()),
|
||||
status: format!("{:?}", r.status),
|
||||
start_date: r.start_date.to_string(),
|
||||
end_date: r.end_date.to_string(),
|
||||
created_at: r.created_at.to_string(),
|
||||
completed_at: r.completed_at.map(|t| t.to_string()),
|
||||
error_message: r.error_message.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
post, path = "/api/v1/wrapups/generate",
|
||||
request_body = GenerateWrapUpRequest,
|
||||
responses(
|
||||
(status = 200, body = WrapUpGeneratedResponse),
|
||||
(status = 400, description = "Invalid date format"),
|
||||
(status = 401, description = "Unauthorized"),
|
||||
),
|
||||
security(("bearer_auth" = []))
|
||||
)]
|
||||
pub async fn post_generate(
|
||||
State(state): State<AppState>,
|
||||
user: AuthenticatedUser,
|
||||
Json(req): Json<GenerateWrapUpRequest>,
|
||||
) -> Result<Json<WrapUpGeneratedResponse>, ApiError> {
|
||||
let start = NaiveDate::parse_from_str(&req.start_date, "%Y-%m-%d")
|
||||
.map_err(|_| DomainError::ValidationError("invalid start_date".into()))?;
|
||||
let end = NaiveDate::parse_from_str(&req.end_date, "%Y-%m-%d")
|
||||
.map_err(|_| DomainError::ValidationError("invalid end_date".into()))?;
|
||||
let user_id = if req.global.unwrap_or(false) {
|
||||
None
|
||||
} else {
|
||||
Some(user.0.value())
|
||||
};
|
||||
let cmd = RequestWrapUpCommand {
|
||||
user_id,
|
||||
start_date: start,
|
||||
end_date: end,
|
||||
};
|
||||
let id = generate::execute(&state.app_ctx, cmd).await?;
|
||||
Ok(Json(WrapUpGeneratedResponse {
|
||||
id: id.value().to_string(),
|
||||
}))
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
get, path = "/api/v1/wrapups",
|
||||
responses(
|
||||
(status = 200, body = WrapUpListResponse),
|
||||
(status = 401, description = "Unauthorized"),
|
||||
),
|
||||
security(("bearer_auth" = []))
|
||||
)]
|
||||
pub async fn get_list(
|
||||
State(state): State<AppState>,
|
||||
user: AuthenticatedUser,
|
||||
) -> Result<Json<WrapUpListResponse>, ApiError> {
|
||||
let records = list_wrapups::execute(
|
||||
&state.app_ctx,
|
||||
ListWrapUpsQuery {
|
||||
user_id: Some(user.0.value()),
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
Ok(Json(WrapUpListResponse {
|
||||
items: records.iter().map(record_to_dto).collect(),
|
||||
}))
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
get, path = "/api/v1/wrapups/{id}",
|
||||
params(("id" = Uuid, Path, description = "Wrap-up ID")),
|
||||
responses(
|
||||
(status = 200, body = WrapUpStatusResponse),
|
||||
(status = 401, description = "Unauthorized"),
|
||||
(status = 404, description = "Not found"),
|
||||
),
|
||||
security(("bearer_auth" = []))
|
||||
)]
|
||||
pub async fn get_status(
|
||||
State(state): State<AppState>,
|
||||
_user: AuthenticatedUser,
|
||||
Path(id): Path<Uuid>,
|
||||
) -> Result<Json<WrapUpStatusResponse>, ApiError> {
|
||||
let record = get_wrapup::execute(&state.app_ctx, WrapUpId::from_uuid(id))
|
||||
.await?
|
||||
.ok_or_else(|| DomainError::NotFound("wrap-up not found".into()))?;
|
||||
Ok(Json(record_to_dto(&record)))
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
get, path = "/api/v1/wrapups/{id}/report",
|
||||
params(("id" = Uuid, Path, description = "Wrap-up ID")),
|
||||
responses(
|
||||
(status = 200, description = "Report JSON", content_type = "application/json"),
|
||||
(status = 202, description = "Still generating"),
|
||||
(status = 401, description = "Unauthorized"),
|
||||
(status = 404, description = "Not found"),
|
||||
),
|
||||
security(("bearer_auth" = []))
|
||||
)]
|
||||
pub async fn get_report(
|
||||
State(state): State<AppState>,
|
||||
_user: AuthenticatedUser,
|
||||
Path(id): Path<Uuid>,
|
||||
) -> impl IntoResponse {
|
||||
match get_wrapup::execute(&state.app_ctx, WrapUpId::from_uuid(id)).await {
|
||||
Ok(Some(record)) if record.status == WrapUpStatus::Ready => match record.report_json {
|
||||
Some(json) => (
|
||||
StatusCode::OK,
|
||||
[("content-type", "application/json")],
|
||||
json,
|
||||
)
|
||||
.into_response(),
|
||||
None => StatusCode::NOT_FOUND.into_response(),
|
||||
},
|
||||
Ok(Some(_)) => StatusCode::ACCEPTED.into_response(),
|
||||
Ok(None) => StatusCode::NOT_FOUND.into_response(),
|
||||
Err(e) => crate::errors::domain_error_response(e),
|
||||
}
|
||||
}
|
||||
@@ -193,7 +193,8 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> {
|
||||
social_query: social_query.clone(),
|
||||
#[cfg(not(feature = "federation"))]
|
||||
social_query: Arc::new(domain::testing::NoopSocialQueryPort),
|
||||
wrapup_stats: Arc::new(domain::testing::PanicWrapUpStatsQuery) as Arc<dyn domain::ports::WrapUpStatsQuery>,
|
||||
wrapup_stats: db.wrapup_stats,
|
||||
wrapup_repo: db.wrapup_repo,
|
||||
},
|
||||
services: Services {
|
||||
auth: auth_service,
|
||||
|
||||
@@ -7,6 +7,7 @@ mod social;
|
||||
mod users;
|
||||
mod watchlist;
|
||||
mod webhook;
|
||||
mod wrapup;
|
||||
|
||||
use axum::Router;
|
||||
use utoipa::{
|
||||
@@ -42,6 +43,7 @@ fn build() -> utoipa::openapi::OpenApi {
|
||||
api.merge(search::SearchDoc::openapi());
|
||||
api.merge(watchlist::WatchlistDoc::openapi());
|
||||
api.merge(webhook::WebhookDoc::openapi());
|
||||
api.merge(wrapup::WrapUpDoc::openapi());
|
||||
#[cfg(feature = "federation")]
|
||||
api.merge(social::SocialDoc::openapi());
|
||||
SecurityAddon.modify(&mut api);
|
||||
|
||||
18
crates/presentation/src/openapi/wrapup.rs
Normal file
18
crates/presentation/src/openapi/wrapup.rs
Normal file
@@ -0,0 +1,18 @@
|
||||
use utoipa::OpenApi;
|
||||
|
||||
#[derive(OpenApi)]
|
||||
#[openapi(
|
||||
paths(
|
||||
crate::handlers::wrapup::post_generate,
|
||||
crate::handlers::wrapup::get_list,
|
||||
crate::handlers::wrapup::get_status,
|
||||
crate::handlers::wrapup::get_report,
|
||||
),
|
||||
components(schemas(
|
||||
api_types::wrapup::GenerateWrapUpRequest,
|
||||
api_types::wrapup::WrapUpGeneratedResponse,
|
||||
api_types::wrapup::WrapUpStatusResponse,
|
||||
api_types::wrapup::WrapUpListResponse,
|
||||
))
|
||||
)]
|
||||
pub struct WrapUpDoc;
|
||||
@@ -346,6 +346,16 @@ fn api_routes(rate_limit: u64) -> Router<AppState> {
|
||||
.route(
|
||||
"/watch-queue/dismiss",
|
||||
routing::post(handlers::webhook::post_dismiss_watch_events),
|
||||
)
|
||||
.route(
|
||||
"/wrapups/generate",
|
||||
routing::post(handlers::wrapup::post_generate),
|
||||
)
|
||||
.route("/wrapups", routing::get(handlers::wrapup::get_list))
|
||||
.route("/wrapups/{id}", routing::get(handlers::wrapup::get_status))
|
||||
.route(
|
||||
"/wrapups/{id}/report",
|
||||
routing::get(handlers::wrapup::get_report),
|
||||
);
|
||||
|
||||
#[cfg(feature = "federation")]
|
||||
|
||||
@@ -568,6 +568,52 @@ impl domain::ports::WebhookTokenRepository for Panic {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl domain::ports::WrapUpStatsQuery for Panic {
|
||||
async fn get_reviews_with_profiles(
|
||||
&self,
|
||||
_: &domain::models::wrapup::WrapUpScope,
|
||||
_: &domain::models::wrapup::DateRange,
|
||||
) -> Result<Vec<domain::ports::WrapUpMovieRow>, DomainError> {
|
||||
panic!()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl domain::ports::WrapUpRepository for Panic {
|
||||
async fn create(&self, _: &domain::models::wrapup::WrapUpRecord) -> Result<(), DomainError> {
|
||||
panic!()
|
||||
}
|
||||
async fn update_status(
|
||||
&self,
|
||||
_: &domain::value_objects::WrapUpId,
|
||||
_: &domain::models::wrapup::WrapUpStatus,
|
||||
_: Option<&str>,
|
||||
) -> Result<(), DomainError> {
|
||||
panic!()
|
||||
}
|
||||
async fn set_complete(&self, _: &domain::value_objects::WrapUpId, _: &str) -> Result<(), DomainError> {
|
||||
panic!()
|
||||
}
|
||||
async fn get_by_id(&self, _: &domain::value_objects::WrapUpId) -> Result<Option<domain::models::wrapup::WrapUpRecord>, DomainError> {
|
||||
panic!()
|
||||
}
|
||||
async fn list_for_user(&self, _: uuid::Uuid) -> Result<Vec<domain::models::wrapup::WrapUpRecord>, DomainError> {
|
||||
panic!()
|
||||
}
|
||||
async fn list_global(&self) -> Result<Vec<domain::models::wrapup::WrapUpRecord>, DomainError> {
|
||||
panic!()
|
||||
}
|
||||
async fn find_existing(
|
||||
&self,
|
||||
_: Option<uuid::Uuid>,
|
||||
_: chrono::NaiveDate,
|
||||
_: chrono::NaiveDate,
|
||||
) -> Result<Option<domain::models::wrapup::WrapUpRecord>, DomainError> {
|
||||
panic!()
|
||||
}
|
||||
}
|
||||
|
||||
// --- Single state factory — only auth_service varies ---
|
||||
|
||||
pub fn make_test_state(auth_service: Arc<dyn AuthService>) -> crate::state::AppState {
|
||||
@@ -593,6 +639,8 @@ pub fn make_test_state(auth_service: Arc<dyn AuthService>) -> crate::state::AppS
|
||||
search_command: Arc::clone(&repo) as _,
|
||||
remote_watchlist: Arc::clone(&repo) as _,
|
||||
social_query: Arc::clone(&repo) as _,
|
||||
wrapup_stats: Arc::clone(&repo) as _,
|
||||
wrapup_repo: Arc::clone(&repo) as _,
|
||||
},
|
||||
services: Services {
|
||||
auth: auth_service,
|
||||
|
||||
@@ -415,6 +415,8 @@ async fn test_app() -> Router {
|
||||
search_command: Arc::new(PanicSearchCommand),
|
||||
remote_watchlist: Arc::new(PanicRemoteWatchlist),
|
||||
social_query: Arc::new(PanicSocialQuery),
|
||||
wrapup_stats: Arc::new(domain::testing::PanicWrapUpStatsQuery) as _,
|
||||
wrapup_repo: Arc::new(domain::testing::PanicWrapUpRepository) as _,
|
||||
},
|
||||
services: Services {
|
||||
auth: Arc::new(PanicAuth),
|
||||
|
||||
@@ -36,6 +36,8 @@ pub struct WorkerDbOutput {
|
||||
pub ap_content: Arc<dyn LocalApContentQuery>,
|
||||
pub image_ref_command: Arc<dyn ImageRefCommand>,
|
||||
pub image_ref_query: Arc<dyn ImageRefQuery>,
|
||||
pub wrapup_stats: Arc<dyn domain::ports::WrapUpStatsQuery>,
|
||||
pub wrapup_repo: Arc<dyn domain::ports::WrapUpRepository>,
|
||||
pub db_pool: DbPool,
|
||||
}
|
||||
|
||||
@@ -76,6 +78,8 @@ pub async fn connect(database_url: &str, backend: &str) -> anyhow::Result<Worker
|
||||
ap_content: w.ap_content,
|
||||
image_ref_command,
|
||||
image_ref_query,
|
||||
wrapup_stats: w.wrapup_stats,
|
||||
wrapup_repo: w.wrapup_repo,
|
||||
db_pool: DbPool::Postgres(w.pool),
|
||||
})
|
||||
}
|
||||
@@ -113,6 +117,8 @@ pub async fn connect(database_url: &str, backend: &str) -> anyhow::Result<Worker
|
||||
ap_content: w.ap_content,
|
||||
image_ref_command,
|
||||
image_ref_query,
|
||||
wrapup_stats: w.wrapup_stats,
|
||||
wrapup_repo: w.wrapup_repo,
|
||||
db_pool: DbPool::Sqlite(w.pool),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -92,7 +92,8 @@ async fn main() -> anyhow::Result<()> {
|
||||
social_query: fed_social_query,
|
||||
#[cfg(not(feature = "federation"))]
|
||||
social_query: Arc::new(domain::testing::NoopSocialQueryPort),
|
||||
wrapup_stats: Arc::new(domain::testing::PanicWrapUpStatsQuery) as Arc<dyn domain::ports::WrapUpStatsQuery>,
|
||||
wrapup_stats: db.wrapup_stats,
|
||||
wrapup_repo: db.wrapup_repo,
|
||||
},
|
||||
services: Services {
|
||||
auth: auth_service,
|
||||
@@ -147,6 +148,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
let mut periodic_jobs: Vec<Arc<dyn PeriodicJob>> = vec![
|
||||
Arc::new(application::jobs::ImportSessionCleanupJob::new(ctx.clone())),
|
||||
Arc::new(application::jobs::WatchEventCleanupJob::new(ctx.clone())),
|
||||
Arc::new(application::jobs::WrapUpAutoGenerateJob::new(ctx.clone())),
|
||||
];
|
||||
if let Some(job) = enrichment_job {
|
||||
periodic_jobs.push(job);
|
||||
@@ -193,7 +195,10 @@ async fn main() -> anyhow::Result<()> {
|
||||
Arc::clone(&ctx.repos.movie),
|
||||
Arc::clone(&ctx.repos.search_command),
|
||||
)) as Arc<dyn EventHandler>;
|
||||
let mut h = vec![poster, cleanup, search_cleanup, discovery_indexer];
|
||||
let wrapup_handler = Arc::new(application::wrapup::event_handler::WrapUpEventHandler::new(
|
||||
ctx.clone(),
|
||||
)) as Arc<dyn EventHandler>;
|
||||
let mut h = vec![poster, cleanup, search_cleanup, discovery_indexer, wrapup_handler];
|
||||
if let Some(e) = enrichment_handler {
|
||||
h.push(e);
|
||||
}
|
||||
@@ -234,6 +239,9 @@ async fn main() -> anyhow::Result<()> {
|
||||
Arc::clone(&ctx.repos.search_command),
|
||||
)) as Arc<dyn EventHandler>;
|
||||
tracing::info!("federation event handler registered");
|
||||
let wrapup_handler = Arc::new(application::wrapup::event_handler::WrapUpEventHandler::new(
|
||||
ctx.clone(),
|
||||
)) as Arc<dyn EventHandler>;
|
||||
let mut h = vec![
|
||||
poster,
|
||||
cleanup,
|
||||
@@ -241,6 +249,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
backfill,
|
||||
search_cleanup,
|
||||
discovery_indexer,
|
||||
wrapup_handler,
|
||||
];
|
||||
if let Some(e) = enrichment_handler {
|
||||
h.push(e);
|
||||
|
||||
Reference in New Issue
Block a user