Compare commits

..

10 Commits

39 changed files with 1753 additions and 27 deletions

View File

@@ -2,7 +2,7 @@ use chrono::NaiveDateTime;
use domain::{
errors::DomainError,
events::DomainEvent,
value_objects::{ExternalMetadataId, MovieId, PosterPath, Rating, ReviewId, UserId},
value_objects::{ExternalMetadataId, MovieId, PosterPath, Rating, ReviewId, UserId, WrapUpId},
};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
@@ -77,6 +77,15 @@ pub enum EventPayload {
title: String,
source: String,
},
WrapUpRequested {
wrapup_id: String,
user_id: Option<String>,
start_date: String,
end_date: String,
},
WrapUpCompleted {
wrapup_id: String,
},
}
impl EventPayload {
@@ -96,6 +105,8 @@ impl EventPayload {
EventPayload::BackfillFollower { .. } => "BackfillFollower",
EventPayload::FederationDeliveryRequested { .. } => "FederationDeliveryRequested",
EventPayload::WatchEventIngested { .. } => "WatchEventIngested",
EventPayload::WrapUpRequested { .. } => "WrapUpRequested",
EventPayload::WrapUpCompleted { .. } => "WrapUpCompleted",
}
}
}
@@ -223,6 +234,20 @@ impl From<&DomainEvent> for EventPayload {
title: title.clone(),
source: source.clone(),
},
DomainEvent::WrapUpRequested {
wrapup_id,
user_id,
start_date,
end_date,
} => EventPayload::WrapUpRequested {
wrapup_id: wrapup_id.value().to_string(),
user_id: user_id.as_ref().map(|u| u.value().to_string()),
start_date: start_date.to_string(),
end_date: end_date.to_string(),
},
DomainEvent::WrapUpCompleted { wrapup_id } => EventPayload::WrapUpCompleted {
wrapup_id: wrapup_id.value().to_string(),
},
}
}
}
@@ -348,6 +373,33 @@ impl TryFrom<EventPayload> for DomainEvent {
title,
source,
}),
EventPayload::WrapUpRequested {
wrapup_id,
user_id,
start_date,
end_date,
} => {
let wid = parse_uuid(&wrapup_id, "wrapup_id")?;
let uid = user_id
.map(|s| parse_uuid(&s, "user_id"))
.transpose()?;
let sd = chrono::NaiveDate::parse_from_str(&start_date, "%Y-%m-%d")
.map_err(|e| DomainError::ValidationError(e.to_string()))?;
let ed = chrono::NaiveDate::parse_from_str(&end_date, "%Y-%m-%d")
.map_err(|e| DomainError::ValidationError(e.to_string()))?;
Ok(DomainEvent::WrapUpRequested {
wrapup_id: WrapUpId::from_uuid(wid),
user_id: uid.map(UserId::from_uuid),
start_date: sd,
end_date: ed,
})
}
EventPayload::WrapUpCompleted { wrapup_id } => {
let wid = parse_uuid(&wrapup_id, "wrapup_id")?;
Ok(DomainEvent::WrapUpCompleted {
wrapup_id: WrapUpId::from_uuid(wid),
})
}
}
}
}

View File

@@ -16,6 +16,8 @@ pub fn event_to_subject(prefix: &str, event: &DomainEvent) -> String {
DomainEvent::BackfillFollower { .. } => "backfill.follower",
DomainEvent::FederationDeliveryRequested { .. } => "federation.delivery.requested",
DomainEvent::WatchEventIngested { .. } => "watch.event.ingested",
DomainEvent::WrapUpRequested { .. } => "wrapup.requested",
DomainEvent::WrapUpCompleted { .. } => "wrapup.completed",
};
format!("{prefix}.{suffix}")
}

View File

@@ -0,0 +1,12 @@
CREATE TABLE IF NOT EXISTS wrap_up_records (
id TEXT PRIMARY KEY NOT NULL,
user_id TEXT,
start_date DATE NOT NULL,
end_date DATE NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
report_json TEXT,
error_message TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
completed_at TIMESTAMPTZ
);
CREATE INDEX IF NOT EXISTS idx_wrap_up_user ON wrap_up_records (user_id, start_date, end_date);

View File

@@ -23,6 +23,7 @@ mod profile_fields;
mod users;
mod watch_event;
mod watchlist;
mod wrapup;
use models::{
DiaryRow, DirectorCountRow, FeedRow, MonthlyRatingRow, MovieRow, MovieStatsRow,
@@ -39,6 +40,7 @@ pub use profile_fields::PostgresProfileFieldsRepository;
pub use users::PostgresUserRepository;
pub use watch_event::{PostgresWatchEventRepository, PostgresWebhookTokenRepository};
pub use watchlist::PostgresWatchlistRepository;
pub use wrapup::{PostgresWrapUpRepository, PostgresWrapUpStatsQuery};
fn format_year_month(ym: &str) -> String {
let parts: Vec<&str> = ym.splitn(2, '-').collect();
@@ -1000,6 +1002,8 @@ pub struct PostgresWireOutput {
pub movie_profile: std::sync::Arc<dyn domain::ports::MovieProfileRepository>,
pub watchlist: std::sync::Arc<dyn domain::ports::WatchlistRepository>,
pub ap_content: std::sync::Arc<dyn domain::ports::LocalApContentQuery>,
pub wrapup_repo: std::sync::Arc<dyn domain::ports::WrapUpRepository>,
pub wrapup_stats: std::sync::Arc<dyn domain::ports::WrapUpStatsQuery>,
}
pub async fn wire(database_url: &str) -> anyhow::Result<PostgresWireOutput> {
@@ -1028,6 +1032,8 @@ pub async fn wire(database_url: &str) -> anyhow::Result<PostgresWireOutput> {
as _,
movie_profile: std::sync::Arc::new(PostgresMovieProfileRepository::new(pool.clone())) as _,
watchlist: std::sync::Arc::new(PostgresWatchlistRepository::new(pool.clone())) as _,
ap_content: std::sync::Arc::new(PostgresApContentQuery::new(pool)) as _,
ap_content: std::sync::Arc::new(PostgresApContentQuery::new(pool.clone())) as _,
wrapup_repo: std::sync::Arc::new(PostgresWrapUpRepository::new(pool.clone())) as _,
wrapup_stats: std::sync::Arc::new(PostgresWrapUpStatsQuery::new(pool)) as _,
})
}

View File

@@ -0,0 +1,430 @@
use std::collections::HashMap;
use async_trait::async_trait;
use chrono::NaiveDate;
use domain::{
errors::DomainError,
models::wrapup::{DateRange, WrapUpRecord, WrapUpScope, WrapUpStatus},
ports::{WrapUpMovieRow, WrapUpRepository, WrapUpStatsQuery},
value_objects::WrapUpId,
};
use sqlx::{PgPool, Row};
use uuid::Uuid;
use crate::models::{parse_datetime, parse_uuid};
fn map_err(e: sqlx::Error) -> DomainError {
tracing::error!("Database error: {:?}", e);
DomainError::InfrastructureError("Database operation failed".into())
}
fn status_to_str(s: &WrapUpStatus) -> &'static str {
match s {
WrapUpStatus::Pending => "pending",
WrapUpStatus::Generating => "generating",
WrapUpStatus::Ready => "ready",
WrapUpStatus::Failed => "failed",
}
}
fn parse_status(s: &str) -> Result<WrapUpStatus, DomainError> {
match s {
"pending" => Ok(WrapUpStatus::Pending),
"generating" => Ok(WrapUpStatus::Generating),
"ready" => Ok(WrapUpStatus::Ready),
"failed" => Ok(WrapUpStatus::Failed),
other => Err(DomainError::InfrastructureError(format!(
"Unknown wrap-up status: {other}"
))),
}
}
// ── WrapUpRepository ─────────────────────────────────────────────────────────
pub struct PostgresWrapUpRepository {
pool: PgPool,
}
impl PostgresWrapUpRepository {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
}
#[async_trait]
impl WrapUpRepository for PostgresWrapUpRepository {
async fn create(&self, record: &WrapUpRecord) -> Result<(), DomainError> {
let id = record.id.value().to_string();
let user_id = record.user_id.map(|u| u.to_string());
let status = status_to_str(&record.status);
sqlx::query(
"INSERT INTO wrap_up_records \
(id, user_id, start_date, end_date, status, report_json, error_message, created_at, completed_at) \
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
)
.bind(&id)
.bind(&user_id)
.bind(record.start_date)
.bind(record.end_date)
.bind(status)
.bind(&record.report_json)
.bind(&record.error_message)
.bind(record.created_at)
.bind(record.completed_at)
.execute(&self.pool)
.await
.map_err(map_err)?;
Ok(())
}
async fn update_status(
&self,
id: &WrapUpId,
status: &WrapUpStatus,
error: Option<&str>,
) -> Result<(), DomainError> {
let id_str = id.value().to_string();
let status_str = status_to_str(status);
sqlx::query(
"UPDATE wrap_up_records SET status = $1, error_message = $2 WHERE id = $3",
)
.bind(status_str)
.bind(error)
.bind(&id_str)
.execute(&self.pool)
.await
.map_err(map_err)?;
Ok(())
}
async fn set_complete(&self, id: &WrapUpId, report_json: &str) -> Result<(), DomainError> {
let id_str = id.value().to_string();
sqlx::query(
"UPDATE wrap_up_records \
SET status = 'ready', report_json = $1, completed_at = NOW() \
WHERE id = $2",
)
.bind(report_json)
.bind(&id_str)
.execute(&self.pool)
.await
.map_err(map_err)?;
Ok(())
}
async fn get_by_id(&self, id: &WrapUpId) -> Result<Option<WrapUpRecord>, DomainError> {
let id_str = id.value().to_string();
let row = sqlx::query(
"SELECT id, user_id, start_date, end_date, status, report_json, error_message, \
to_char(created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, \
to_char(completed_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS completed_at \
FROM wrap_up_records WHERE id = $1",
)
.bind(&id_str)
.fetch_optional(&self.pool)
.await
.map_err(map_err)?;
row.as_ref().map(row_to_record).transpose()
}
async fn list_for_user(&self, user_id: Uuid) -> Result<Vec<WrapUpRecord>, DomainError> {
let uid = user_id.to_string();
let rows = sqlx::query(
"SELECT id, user_id, start_date, end_date, status, report_json, error_message, \
to_char(created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, \
to_char(completed_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS completed_at \
FROM wrap_up_records WHERE user_id = $1 ORDER BY created_at DESC",
)
.bind(&uid)
.fetch_all(&self.pool)
.await
.map_err(map_err)?;
rows.iter().map(row_to_record).collect()
}
async fn list_global(&self) -> Result<Vec<WrapUpRecord>, DomainError> {
let rows = sqlx::query(
"SELECT id, user_id, start_date, end_date, status, report_json, error_message, \
to_char(created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, \
to_char(completed_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS completed_at \
FROM wrap_up_records WHERE user_id IS NULL ORDER BY created_at DESC",
)
.fetch_all(&self.pool)
.await
.map_err(map_err)?;
rows.iter().map(row_to_record).collect()
}
async fn find_existing(
&self,
user_id: Option<Uuid>,
start: NaiveDate,
end: NaiveDate,
) -> Result<Option<WrapUpRecord>, DomainError> {
let uid = user_id.map(|u| u.to_string());
let row = sqlx::query(
"SELECT id, user_id, start_date, end_date, status, report_json, error_message, \
to_char(created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, \
to_char(completed_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS completed_at \
FROM wrap_up_records \
WHERE (($1::text IS NULL AND user_id IS NULL) OR user_id = $1) \
AND start_date = $2 AND end_date = $3 \
LIMIT 1",
)
.bind(&uid)
.bind(start)
.bind(end)
.fetch_optional(&self.pool)
.await
.map_err(map_err)?;
row.as_ref().map(row_to_record).transpose()
}
}
fn row_to_record(row: &sqlx::postgres::PgRow) -> Result<WrapUpRecord, DomainError> {
let id_str: String = row.try_get("id").map_err(map_err)?;
let user_id_str: Option<String> = row.try_get("user_id").map_err(map_err)?;
let start_date: NaiveDate = row.try_get("start_date").map_err(map_err)?;
let end_date: NaiveDate = row.try_get("end_date").map_err(map_err)?;
let status_str: String = row.try_get("status").map_err(map_err)?;
let report_json: Option<String> = row.try_get("report_json").map_err(map_err)?;
let error_message: Option<String> = row.try_get("error_message").map_err(map_err)?;
let created_at_str: String = row.try_get("created_at").map_err(map_err)?;
let completed_at_str: Option<String> = row.try_get("completed_at").map_err(map_err)?;
let user_id = user_id_str
.as_deref()
.map(parse_uuid)
.transpose()?;
Ok(WrapUpRecord {
id: WrapUpId::from_uuid(parse_uuid(&id_str)?),
user_id,
start_date,
end_date,
status: parse_status(&status_str)?,
report_json,
error_message,
created_at: parse_datetime(&created_at_str)?,
completed_at: completed_at_str.as_deref().map(parse_datetime).transpose()?,
})
}
// ── WrapUpStatsQuery ─────────────────────────────────────────────────────────
pub struct PostgresWrapUpStatsQuery {
pool: PgPool,
}
impl PostgresWrapUpStatsQuery {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
}
#[async_trait]
impl WrapUpStatsQuery for PostgresWrapUpStatsQuery {
async fn get_reviews_with_profiles(
&self,
scope: &WrapUpScope,
range: &DateRange,
) -> Result<Vec<WrapUpMovieRow>, DomainError> {
// 1) Main query: reviews + movies + movie_profiles
let (scope_clause, scope_bind) = match scope {
WrapUpScope::User(uid) => ("AND r.user_id = $3", Some(uid.to_string())),
WrapUpScope::Global => ("", None),
};
let sql = format!(
"SELECT r.movie_id, m.title, m.release_year, m.director, m.poster_path, \
r.rating, \
to_char(r.watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at, \
r.user_id, \
p.runtime_minutes, p.budget_usd, p.original_language \
FROM reviews r \
INNER JOIN movies m ON m.id = r.movie_id \
LEFT JOIN movie_profiles p ON p.movie_id = m.id \
WHERE r.watched_at >= $1 AND r.watched_at < $2 {scope_clause} \
ORDER BY r.watched_at ASC"
);
let mut q = sqlx::query(&sql)
.bind(range.start)
.bind(range.end);
if let Some(ref uid) = scope_bind {
q = q.bind(uid);
}
let rows = q.fetch_all(&self.pool).await.map_err(map_err)?;
if rows.is_empty() {
return Ok(vec![]);
}
// Collect unique movie IDs
let mut movie_ids: Vec<String> = Vec::new();
let mut seen = std::collections::HashSet::new();
for row in &rows {
let mid: String = row.try_get("movie_id").map_err(map_err)?;
if seen.insert(mid.clone()) {
movie_ids.push(mid);
}
}
// 2) Batch-fetch genres, keywords, cast
let (genres_map, keywords_map, cast_map) = tokio::try_join!(
fetch_genres_pg(&self.pool, &movie_ids),
fetch_keywords_pg(&self.pool, &movie_ids),
fetch_cast_pg(&self.pool, &movie_ids),
)?;
// 3) Build result
let mut result = Vec::with_capacity(rows.len());
for row in &rows {
let movie_id_str: String = row.try_get("movie_id").map_err(map_err)?;
let title: String = row.try_get("title").map_err(map_err)?;
let release_year: i64 = row.try_get("release_year").map_err(map_err)?;
let director: Option<String> = row.try_get("director").map_err(map_err)?;
let poster_path: Option<String> = row.try_get("poster_path").map_err(map_err)?;
let rating: i64 = row.try_get("rating").map_err(map_err)?;
let watched_at_str: String = row.try_get("watched_at").map_err(map_err)?;
let user_id_str: String = row.try_get("user_id").map_err(map_err)?;
let runtime_minutes: Option<i32> = row.try_get("runtime_minutes").map_err(map_err)?;
let budget_usd: Option<i64> = row.try_get("budget_usd").map_err(map_err)?;
let original_language: Option<String> =
row.try_get("original_language").map_err(map_err)?;
let genres = genres_map
.get(&movie_id_str)
.cloned()
.unwrap_or_default();
let keywords = keywords_map
.get(&movie_id_str)
.cloned()
.unwrap_or_default();
let cast = cast_map
.get(&movie_id_str)
.cloned()
.unwrap_or_default();
let cast_names: Vec<(String, u32)> = cast
.iter()
.map(|c| (c.name.clone(), c.billing_order))
.collect();
let cast_profile_paths: Vec<Option<String>> =
cast.iter().map(|c| c.profile_path.clone()).collect();
result.push(WrapUpMovieRow {
movie_id: parse_uuid(&movie_id_str)?,
title,
release_year: release_year as u16,
director,
poster_path,
rating: rating as u8,
watched_at: parse_datetime(&watched_at_str)?,
user_id: parse_uuid(&user_id_str)?,
runtime_minutes: runtime_minutes.map(|v| v as u32),
budget_usd,
original_language,
genres,
keywords,
cast_names,
cast_profile_paths,
});
}
Ok(result)
}
}
#[derive(Clone)]
struct CastEntry {
name: String,
billing_order: u32,
profile_path: Option<String>,
}
async fn fetch_genres_pg(
pool: &PgPool,
movie_ids: &[String],
) -> Result<HashMap<String, Vec<String>>, DomainError> {
let rows = sqlx::query(
"SELECT movie_id, name FROM movie_genres WHERE movie_id = ANY($1) ORDER BY name",
)
.bind(movie_ids)
.fetch_all(pool)
.await
.map_err(map_err)?;
let mut map: HashMap<String, Vec<String>> = HashMap::new();
for row in rows {
let mid: String = row.try_get("movie_id").map_err(map_err)?;
let name: String = row.try_get("name").map_err(map_err)?;
map.entry(mid).or_default().push(name);
}
Ok(map)
}
async fn fetch_keywords_pg(
pool: &PgPool,
movie_ids: &[String],
) -> Result<HashMap<String, Vec<String>>, DomainError> {
let rows = sqlx::query(
"SELECT movie_id, name FROM movie_keywords WHERE movie_id = ANY($1) ORDER BY name",
)
.bind(movie_ids)
.fetch_all(pool)
.await
.map_err(map_err)?;
let mut map: HashMap<String, Vec<String>> = HashMap::new();
for row in rows {
let mid: String = row.try_get("movie_id").map_err(map_err)?;
let name: String = row.try_get("name").map_err(map_err)?;
map.entry(mid).or_default().push(name);
}
Ok(map)
}
async fn fetch_cast_pg(
pool: &PgPool,
movie_ids: &[String],
) -> Result<HashMap<String, Vec<CastEntry>>, DomainError> {
let rows = sqlx::query(
"SELECT movie_id, name, billing_order, profile_path \
FROM movie_cast \
WHERE movie_id = ANY($1) AND billing_order <= 3 \
ORDER BY billing_order ASC",
)
.bind(movie_ids)
.fetch_all(pool)
.await
.map_err(map_err)?;
let mut map: HashMap<String, Vec<CastEntry>> = HashMap::new();
for row in rows {
let mid: String = row.try_get("movie_id").map_err(map_err)?;
let name: String = row.try_get("name").map_err(map_err)?;
let billing_order: i32 = row.try_get("billing_order").map_err(map_err)?;
let profile_path: Option<String> = row.try_get("profile_path").map_err(map_err)?;
map.entry(mid).or_default().push(CastEntry {
name,
billing_order: billing_order as u32,
profile_path,
});
}
Ok(map)
}

View File

@@ -0,0 +1,12 @@
CREATE TABLE IF NOT EXISTS wrap_up_records (
id TEXT PRIMARY KEY NOT NULL,
user_id TEXT,
start_date TEXT NOT NULL,
end_date TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
report_json TEXT,
error_message TEXT,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%d %H:%M:%S', 'now')),
completed_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_wrap_up_user ON wrap_up_records (user_id, start_date, end_date);

View File

@@ -24,6 +24,7 @@ mod profile_fields;
mod users;
mod watch_event;
mod watchlist;
mod wrapup;
use models::{
DiaryRow, DirectorCountRow, FeedRow, MonthlyRatingRow, MovieRow, MovieStatsRow,
@@ -40,6 +41,7 @@ pub use profile_fields::SqliteProfileFieldsRepository;
pub use users::SqliteUserRepository;
pub use watch_event::{SqliteWatchEventRepository, SqliteWebhookTokenRepository};
pub use watchlist::SqliteWatchlistRepository;
pub use wrapup::{SqliteWrapUpRepository, SqliteWrapUpStatsQuery};
pub fn create_profile_fields_repo(
pool: sqlx::SqlitePool,
@@ -998,6 +1000,8 @@ pub struct SqliteWireOutput {
pub movie_profile: std::sync::Arc<dyn domain::ports::MovieProfileRepository>,
pub watchlist: std::sync::Arc<dyn domain::ports::WatchlistRepository>,
pub ap_content: std::sync::Arc<dyn domain::ports::LocalApContentQuery>,
pub wrapup_repo: std::sync::Arc<dyn domain::ports::WrapUpRepository>,
pub wrapup_stats: std::sync::Arc<dyn domain::ports::WrapUpStatsQuery>,
}
pub async fn wire(database_url: &str) -> anyhow::Result<SqliteWireOutput> {
@@ -1031,7 +1035,9 @@ pub async fn wire(database_url: &str) -> anyhow::Result<SqliteWireOutput> {
import_profile: std::sync::Arc::new(SqliteImportProfileRepository::new(pool.clone())) as _,
movie_profile: std::sync::Arc::new(SqliteMovieProfileRepository::new(pool.clone())) as _,
watchlist: std::sync::Arc::new(SqliteWatchlistRepository::new(pool.clone())) as _,
ap_content: std::sync::Arc::new(SqliteApContentQuery::new(pool)) as _,
ap_content: std::sync::Arc::new(SqliteApContentQuery::new(pool.clone())) as _,
wrapup_repo: std::sync::Arc::new(SqliteWrapUpRepository::new(pool.clone())) as _,
wrapup_stats: std::sync::Arc::new(SqliteWrapUpStatsQuery::new(pool)) as _,
})
}

View File

@@ -0,0 +1,465 @@
use std::collections::HashMap;
use async_trait::async_trait;
use chrono::NaiveDate;
use domain::{
errors::DomainError,
models::wrapup::{DateRange, WrapUpRecord, WrapUpScope, WrapUpStatus},
ports::{WrapUpMovieRow, WrapUpRepository, WrapUpStatsQuery},
value_objects::WrapUpId,
};
use sqlx::{Row, SqlitePool};
use uuid::Uuid;
use crate::models::{parse_datetime, parse_uuid};
fn map_err(e: sqlx::Error) -> DomainError {
tracing::error!("Database error: {:?}", e);
DomainError::InfrastructureError("Database operation failed".into())
}
fn status_to_str(s: &WrapUpStatus) -> &'static str {
match s {
WrapUpStatus::Pending => "pending",
WrapUpStatus::Generating => "generating",
WrapUpStatus::Ready => "ready",
WrapUpStatus::Failed => "failed",
}
}
fn parse_status(s: &str) -> Result<WrapUpStatus, DomainError> {
match s {
"pending" => Ok(WrapUpStatus::Pending),
"generating" => Ok(WrapUpStatus::Generating),
"ready" => Ok(WrapUpStatus::Ready),
"failed" => Ok(WrapUpStatus::Failed),
other => Err(DomainError::InfrastructureError(format!(
"Unknown wrap-up status: {other}"
))),
}
}
fn parse_date(s: &str) -> Result<NaiveDate, DomainError> {
NaiveDate::parse_from_str(s, "%Y-%m-%d")
.map_err(|e| DomainError::InfrastructureError(format!("Invalid date '{s}': {e}")))
}
// ── WrapUpRepository ─────────────────────────────────────────────────────────
pub struct SqliteWrapUpRepository {
pool: SqlitePool,
}
impl SqliteWrapUpRepository {
pub fn new(pool: SqlitePool) -> Self {
Self { pool }
}
}
#[async_trait]
impl WrapUpRepository for SqliteWrapUpRepository {
async fn create(&self, record: &WrapUpRecord) -> Result<(), DomainError> {
let id = record.id.value().to_string();
let user_id = record.user_id.map(|u| u.to_string());
let status = status_to_str(&record.status);
let start = record.start_date.format("%Y-%m-%d").to_string();
let end = record.end_date.format("%Y-%m-%d").to_string();
let created = record.created_at.format("%Y-%m-%d %H:%M:%S").to_string();
let completed = record
.completed_at
.map(|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string());
sqlx::query(
"INSERT INTO wrap_up_records \
(id, user_id, start_date, end_date, status, report_json, error_message, created_at, completed_at) \
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
)
.bind(&id)
.bind(&user_id)
.bind(&start)
.bind(&end)
.bind(status)
.bind(&record.report_json)
.bind(&record.error_message)
.bind(&created)
.bind(&completed)
.execute(&self.pool)
.await
.map_err(map_err)?;
Ok(())
}
async fn update_status(
&self,
id: &WrapUpId,
status: &WrapUpStatus,
error: Option<&str>,
) -> Result<(), DomainError> {
let id_str = id.value().to_string();
let status_str = status_to_str(status);
sqlx::query("UPDATE wrap_up_records SET status = ?, error_message = ? WHERE id = ?")
.bind(status_str)
.bind(error)
.bind(&id_str)
.execute(&self.pool)
.await
.map_err(map_err)?;
Ok(())
}
async fn set_complete(&self, id: &WrapUpId, report_json: &str) -> Result<(), DomainError> {
let id_str = id.value().to_string();
sqlx::query(
"UPDATE wrap_up_records \
SET status = 'ready', report_json = ?, completed_at = strftime('%Y-%m-%d %H:%M:%S', 'now') \
WHERE id = ?",
)
.bind(report_json)
.bind(&id_str)
.execute(&self.pool)
.await
.map_err(map_err)?;
Ok(())
}
async fn get_by_id(&self, id: &WrapUpId) -> Result<Option<WrapUpRecord>, DomainError> {
let id_str = id.value().to_string();
let row = sqlx::query(
"SELECT id, user_id, start_date, end_date, status, report_json, error_message, \
created_at, completed_at \
FROM wrap_up_records WHERE id = ?",
)
.bind(&id_str)
.fetch_optional(&self.pool)
.await
.map_err(map_err)?;
row.as_ref().map(row_to_record).transpose()
}
async fn list_for_user(&self, user_id: Uuid) -> Result<Vec<WrapUpRecord>, DomainError> {
let uid = user_id.to_string();
let rows = sqlx::query(
"SELECT id, user_id, start_date, end_date, status, report_json, error_message, \
created_at, completed_at \
FROM wrap_up_records WHERE user_id = ? ORDER BY created_at DESC",
)
.bind(&uid)
.fetch_all(&self.pool)
.await
.map_err(map_err)?;
rows.iter().map(row_to_record).collect()
}
async fn list_global(&self) -> Result<Vec<WrapUpRecord>, DomainError> {
let rows = sqlx::query(
"SELECT id, user_id, start_date, end_date, status, report_json, error_message, \
created_at, completed_at \
FROM wrap_up_records WHERE user_id IS NULL ORDER BY created_at DESC",
)
.fetch_all(&self.pool)
.await
.map_err(map_err)?;
rows.iter().map(row_to_record).collect()
}
async fn find_existing(
&self,
user_id: Option<Uuid>,
start: NaiveDate,
end: NaiveDate,
) -> Result<Option<WrapUpRecord>, DomainError> {
let uid = user_id.map(|u| u.to_string());
let start_str = start.format("%Y-%m-%d").to_string();
let end_str = end.format("%Y-%m-%d").to_string();
let row = sqlx::query(
"SELECT id, user_id, start_date, end_date, status, report_json, error_message, \
created_at, completed_at \
FROM wrap_up_records \
WHERE ((? IS NULL AND user_id IS NULL) OR user_id = ?) \
AND start_date = ? AND end_date = ? \
LIMIT 1",
)
.bind(&uid)
.bind(&uid)
.bind(&start_str)
.bind(&end_str)
.fetch_optional(&self.pool)
.await
.map_err(map_err)?;
row.as_ref().map(row_to_record).transpose()
}
}
fn row_to_record(row: &sqlx::sqlite::SqliteRow) -> Result<WrapUpRecord, DomainError> {
let id_str: String = row.try_get("id").map_err(map_err)?;
let user_id_str: Option<String> = row.try_get("user_id").map_err(map_err)?;
let start_date_str: String = row.try_get("start_date").map_err(map_err)?;
let end_date_str: String = row.try_get("end_date").map_err(map_err)?;
let status_str: String = row.try_get("status").map_err(map_err)?;
let report_json: Option<String> = row.try_get("report_json").map_err(map_err)?;
let error_message: Option<String> = row.try_get("error_message").map_err(map_err)?;
let created_at_str: String = row.try_get("created_at").map_err(map_err)?;
let completed_at_str: Option<String> = row.try_get("completed_at").map_err(map_err)?;
let user_id = user_id_str
.as_deref()
.map(parse_uuid)
.transpose()?;
Ok(WrapUpRecord {
id: WrapUpId::from_uuid(parse_uuid(&id_str)?),
user_id,
start_date: parse_date(&start_date_str)?,
end_date: parse_date(&end_date_str)?,
status: parse_status(&status_str)?,
report_json,
error_message,
created_at: parse_datetime(&created_at_str)?,
completed_at: completed_at_str.as_deref().map(parse_datetime).transpose()?,
})
}
// ── WrapUpStatsQuery ─────────────────────────────────────────────────────────
pub struct SqliteWrapUpStatsQuery {
pool: SqlitePool,
}
impl SqliteWrapUpStatsQuery {
pub fn new(pool: SqlitePool) -> Self {
Self { pool }
}
}
#[async_trait]
impl WrapUpStatsQuery for SqliteWrapUpStatsQuery {
async fn get_reviews_with_profiles(
&self,
scope: &WrapUpScope,
range: &DateRange,
) -> Result<Vec<WrapUpMovieRow>, DomainError> {
let start_str = range.start.format("%Y-%m-%d").to_string();
let end_str = range.end.format("%Y-%m-%d").to_string();
// 1) Main query
let (scope_clause, scope_bind) = match scope {
WrapUpScope::User(uid) => ("AND r.user_id = ?", Some(uid.to_string())),
WrapUpScope::Global => ("", None),
};
let sql = format!(
"SELECT r.movie_id, m.title, m.release_year, m.director, m.poster_path, \
r.rating, r.watched_at, r.user_id, \
p.runtime_minutes, p.budget_usd, p.original_language \
FROM reviews r \
INNER JOIN movies m ON m.id = r.movie_id \
LEFT JOIN movie_profiles p ON p.movie_id = m.id \
WHERE r.watched_at >= ? AND r.watched_at < ? {scope_clause} \
ORDER BY r.watched_at ASC"
);
let mut q = sqlx::query(&sql)
.bind(&start_str)
.bind(&end_str);
if let Some(ref uid) = scope_bind {
q = q.bind(uid);
}
let rows = q.fetch_all(&self.pool).await.map_err(map_err)?;
if rows.is_empty() {
return Ok(vec![]);
}
// Collect unique movie IDs
let mut movie_ids: Vec<String> = Vec::new();
let mut seen = std::collections::HashSet::new();
for row in &rows {
let mid: String = row.try_get("movie_id").map_err(map_err)?;
if seen.insert(mid.clone()) {
movie_ids.push(mid);
}
}
// 2) Batch-fetch genres, keywords, cast
let (genres_map, keywords_map, cast_map) = tokio::try_join!(
fetch_genres_sqlite(&self.pool, &movie_ids),
fetch_keywords_sqlite(&self.pool, &movie_ids),
fetch_cast_sqlite(&self.pool, &movie_ids),
)?;
// 3) Build result
let mut result = Vec::with_capacity(rows.len());
for row in &rows {
let movie_id_str: String = row.try_get("movie_id").map_err(map_err)?;
let title: String = row.try_get("title").map_err(map_err)?;
let release_year: i64 = row.try_get("release_year").map_err(map_err)?;
let director: Option<String> = row.try_get("director").map_err(map_err)?;
let poster_path: Option<String> = row.try_get("poster_path").map_err(map_err)?;
let rating: i64 = row.try_get("rating").map_err(map_err)?;
let watched_at_str: String = row.try_get("watched_at").map_err(map_err)?;
let user_id_str: String = row.try_get("user_id").map_err(map_err)?;
let runtime_minutes: Option<i32> = row.try_get("runtime_minutes").map_err(map_err)?;
let budget_usd: Option<i64> = row.try_get("budget_usd").map_err(map_err)?;
let original_language: Option<String> =
row.try_get("original_language").map_err(map_err)?;
let genres = genres_map
.get(&movie_id_str)
.cloned()
.unwrap_or_default();
let keywords = keywords_map
.get(&movie_id_str)
.cloned()
.unwrap_or_default();
let cast = cast_map
.get(&movie_id_str)
.cloned()
.unwrap_or_default();
let cast_names: Vec<(String, u32)> = cast
.iter()
.map(|c| (c.name.clone(), c.billing_order))
.collect();
let cast_profile_paths: Vec<Option<String>> =
cast.iter().map(|c| c.profile_path.clone()).collect();
result.push(WrapUpMovieRow {
movie_id: parse_uuid(&movie_id_str)?,
title,
release_year: release_year as u16,
director,
poster_path,
rating: rating as u8,
watched_at: parse_datetime(&watched_at_str)?,
user_id: parse_uuid(&user_id_str)?,
runtime_minutes: runtime_minutes.map(|v| v as u32),
budget_usd,
original_language,
genres,
keywords,
cast_names,
cast_profile_paths,
});
}
Ok(result)
}
}
#[derive(Clone)]
struct CastEntry {
name: String,
billing_order: u32,
profile_path: Option<String>,
}
fn in_placeholders(n: usize) -> String {
let mut s = String::new();
for i in 0..n {
if i > 0 {
s.push(',');
}
s.push('?');
}
s
}
async fn fetch_genres_sqlite(
pool: &SqlitePool,
movie_ids: &[String],
) -> Result<HashMap<String, Vec<String>>, DomainError> {
if movie_ids.is_empty() {
return Ok(HashMap::new());
}
let sql = format!(
"SELECT movie_id, name FROM movie_genres WHERE movie_id IN ({}) ORDER BY name",
in_placeholders(movie_ids.len())
);
let mut q = sqlx::query(&sql);
for id in movie_ids {
q = q.bind(id);
}
let rows = q.fetch_all(pool).await.map_err(map_err)?;
let mut map: HashMap<String, Vec<String>> = HashMap::new();
for row in rows {
let mid: String = row.try_get("movie_id").map_err(map_err)?;
let name: String = row.try_get("name").map_err(map_err)?;
map.entry(mid).or_default().push(name);
}
Ok(map)
}
async fn fetch_keywords_sqlite(
pool: &SqlitePool,
movie_ids: &[String],
) -> Result<HashMap<String, Vec<String>>, DomainError> {
if movie_ids.is_empty() {
return Ok(HashMap::new());
}
let sql = format!(
"SELECT movie_id, name FROM movie_keywords WHERE movie_id IN ({}) ORDER BY name",
in_placeholders(movie_ids.len())
);
let mut q = sqlx::query(&sql);
for id in movie_ids {
q = q.bind(id);
}
let rows = q.fetch_all(pool).await.map_err(map_err)?;
let mut map: HashMap<String, Vec<String>> = HashMap::new();
for row in rows {
let mid: String = row.try_get("movie_id").map_err(map_err)?;
let name: String = row.try_get("name").map_err(map_err)?;
map.entry(mid).or_default().push(name);
}
Ok(map)
}
async fn fetch_cast_sqlite(
pool: &SqlitePool,
movie_ids: &[String],
) -> Result<HashMap<String, Vec<CastEntry>>, DomainError> {
if movie_ids.is_empty() {
return Ok(HashMap::new());
}
let sql = format!(
"SELECT movie_id, name, billing_order, profile_path \
FROM movie_cast \
WHERE movie_id IN ({}) AND billing_order <= 3 \
ORDER BY billing_order ASC",
in_placeholders(movie_ids.len())
);
let mut q = sqlx::query(&sql);
for id in movie_ids {
q = q.bind(id);
}
let rows = q.fetch_all(pool).await.map_err(map_err)?;
let mut map: HashMap<String, Vec<CastEntry>> = HashMap::new();
for row in rows {
let mid: String = row.try_get("movie_id").map_err(map_err)?;
let name: String = row.try_get("name").map_err(map_err)?;
let billing_order: i32 = row.try_get("billing_order").map_err(map_err)?;
let profile_path: Option<String> = row.try_get("profile_path").map_err(map_err)?;
map.entry(mid).or_default().push(CastEntry {
name,
billing_order: billing_order as u32,
profile_path,
});
}
Ok(map)
}

View File

@@ -8,6 +8,7 @@ pub mod social;
pub mod users;
pub mod watchlist;
pub mod webhook;
pub mod wrapup;
pub use auth::*;
pub use common::*;

View File

@@ -0,0 +1,30 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, utoipa::ToSchema)]
pub struct GenerateWrapUpRequest {
pub start_date: String,
pub end_date: String,
pub global: Option<bool>,
}
#[derive(Debug, Serialize, utoipa::ToSchema)]
pub struct WrapUpGeneratedResponse {
pub id: String,
}
#[derive(Debug, Serialize, utoipa::ToSchema)]
pub struct WrapUpStatusResponse {
pub id: String,
pub user_id: Option<String>,
pub status: String,
pub start_date: String,
pub end_date: String,
pub created_at: String,
pub completed_at: Option<String>,
pub error_message: Option<String>,
}
#[derive(Debug, Serialize, utoipa::ToSchema)]
pub struct WrapUpListResponse {
pub items: Vec<WrapUpStatusResponse>,
}

View File

@@ -14,6 +14,7 @@ tokio = { workspace = true }
sha2 = { workspace = true }
rand = { workspace = true }
hex = { workspace = true }
serde_json = { workspace = true }
[features]
xlsx = []

View File

@@ -6,7 +6,7 @@ use domain::ports::{
MovieRepository, PasswordHasher, PersonCommand, PersonQuery, PosterFetcherClient,
RemoteWatchlistRepository, ReviewRepository, SearchCommand, SearchPort, SocialQueryPort,
StatsRepository, UserProfileFieldsRepository, UserRepository, WatchEventRepository,
WatchlistRepository, WrapUpStatsQuery, WebhookTokenRepository,
WatchlistRepository, WrapUpRepository, WrapUpStatsQuery, WebhookTokenRepository,
};
use crate::config::AppConfig;
@@ -32,6 +32,7 @@ pub struct Repositories {
pub remote_watchlist: Arc<dyn RemoteWatchlistRepository>,
pub social_query: Arc<dyn SocialQueryPort>,
pub wrapup_stats: Arc<dyn WrapUpStatsQuery>,
pub wrapup_repo: Arc<dyn WrapUpRepository>,
}
#[derive(Clone)]

View File

@@ -1,6 +1,7 @@
use std::time::Duration;
use async_trait::async_trait;
use chrono::Datelike;
use domain::{errors::DomainError, events::DomainEvent, ports::PeriodicJob};
use crate::context::AppContext;
@@ -85,3 +86,78 @@ impl PeriodicJob for EnrichmentStalenessJob {
Ok(())
}
}
pub struct WrapUpAutoGenerateJob {
ctx: AppContext,
}
impl WrapUpAutoGenerateJob {
pub fn new(ctx: AppContext) -> Self {
Self { ctx }
}
}
#[async_trait]
impl PeriodicJob for WrapUpAutoGenerateJob {
fn interval(&self) -> Duration {
Duration::from_secs(86400)
}
async fn run(&self) -> Result<(), DomainError> {
let now = chrono::Utc::now().naive_utc();
// Only run in January
if now.month() != 1 {
return Ok(());
}
let year = now.year() - 1;
let start = chrono::NaiveDate::from_ymd_opt(year, 1, 1)
.ok_or_else(|| DomainError::ValidationError("invalid date".into()))?;
let end = chrono::NaiveDate::from_ymd_opt(year + 1, 1, 1)
.ok_or_else(|| DomainError::ValidationError("invalid date".into()))?;
let users = self.ctx.repos.user.list_with_stats().await?;
for user in &users {
if user.total_movies > 0 {
let existing = self
.ctx
.repos
.wrapup_repo
.find_existing(Some(user.user_id.value()), start, end)
.await?;
if existing.is_none() {
let cmd = crate::wrapup::commands::RequestWrapUpCommand {
user_id: Some(user.user_id.value()),
start_date: start,
end_date: end,
};
if let Err(e) = crate::wrapup::generate::execute(&self.ctx, cmd).await {
tracing::warn!(
"auto-generate wrapup for user {} failed: {e}",
user.user_id.value()
);
}
}
}
}
// Global wrap-up
let existing = self
.ctx
.repos
.wrapup_repo
.find_existing(None, start, end)
.await?;
if existing.is_none() {
let cmd = crate::wrapup::commands::RequestWrapUpCommand {
user_id: None,
start_date: start,
end_date: end,
};
if let Err(e) = crate::wrapup::generate::execute(&self.ctx, cmd).await {
tracing::warn!("auto-generate global wrapup failed: {e}");
}
}
Ok(())
}
}

View File

@@ -1,6 +1,6 @@
use std::sync::Arc;
use domain::testing::{InMemoryWrapUpStatsQuery, NoopRemoteWatchlistRepository, NoopSocialQueryPort};
use domain::testing::{InMemoryWrapUpRepository, InMemoryWrapUpStatsQuery, NoopRemoteWatchlistRepository, NoopSocialQueryPort};
use domain::{
ports::{
AuthService, DiaryExporter, DiaryRepository, DocumentParser, EventPublisher, ImageStorage,
@@ -8,7 +8,7 @@ use domain::{
MovieRepository, PasswordHasher, PersonCommand, PersonQuery, PosterFetcherClient,
ReviewRepository, SearchCommand, SearchPort, StatsRepository, UserProfileFieldsRepository,
UserRepository, WatchEventRepository, WatchlistRepository, WebhookTokenRepository,
WrapUpStatsQuery,
WrapUpRepository, WrapUpStatsQuery,
},
testing::{
FakeAuthService, FakeMetadataClient, FakePasswordHasher, InMemoryMovieRepository,
@@ -52,6 +52,7 @@ pub struct TestContextBuilder {
pub search_port: Arc<dyn SearchPort>,
pub search_command: Arc<dyn SearchCommand>,
pub wrapup_stats: Arc<dyn WrapUpStatsQuery>,
pub wrapup_repo: Arc<dyn WrapUpRepository>,
pub config: AppConfig,
}
@@ -83,6 +84,7 @@ impl TestContextBuilder {
search_port: Arc::new(PanicSearchPort),
search_command: Arc::new(PanicSearchCommand),
wrapup_stats: InMemoryWrapUpStatsQuery::new(),
wrapup_repo: InMemoryWrapUpRepository::new(),
config: AppConfig {
allow_registration: true,
base_url: "http://localhost:3000".into(),
@@ -153,6 +155,7 @@ impl TestContextBuilder {
remote_watchlist: Arc::new(NoopRemoteWatchlistRepository),
social_query: Arc::new(NoopSocialQueryPort),
wrapup_stats: self.wrapup_stats,
wrapup_repo: self.wrapup_repo,
},
services: Services {
auth: self.auth_service,

View File

@@ -59,6 +59,8 @@ impl EventHandler for RecordingHandler {
DomainEvent::BackfillFollower { .. } => "backfill_follower",
DomainEvent::FederationDeliveryRequested { .. } => "federation_delivery",
DomainEvent::WatchEventIngested { .. } => "watch_event_ingested",
DomainEvent::WrapUpRequested { .. } => "wrapup_requested",
DomainEvent::WrapUpCompleted { .. } => "wrapup_completed",
};
self.calls.lock().unwrap().push(label);
Ok(())

View File

@@ -0,0 +1,8 @@
use chrono::NaiveDate;
use uuid::Uuid;
pub struct RequestWrapUpCommand {
pub user_id: Option<Uuid>,
pub start_date: NaiveDate,
pub end_date: NaiveDate,
}

View File

@@ -0,0 +1,40 @@
use async_trait::async_trait;
use domain::errors::DomainError;
use domain::events::DomainEvent;
use domain::ports::EventHandler;
use crate::context::AppContext;
pub struct WrapUpEventHandler {
ctx: AppContext,
}
impl WrapUpEventHandler {
pub fn new(ctx: AppContext) -> Self {
Self { ctx }
}
}
#[async_trait]
impl EventHandler for WrapUpEventHandler {
async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> {
match event {
DomainEvent::WrapUpRequested {
wrapup_id,
user_id,
start_date,
end_date,
} => {
super::handle_requested::execute(
&self.ctx,
wrapup_id.clone(),
user_id.as_ref().map(|u| u.value()),
*start_date,
*end_date,
)
.await
}
_ => Ok(()),
}
}
}

View File

@@ -0,0 +1,47 @@
use domain::errors::DomainError;
use domain::events::DomainEvent;
use domain::models::wrapup::WrapUpStatus;
use domain::value_objects::{UserId, WrapUpId};
use crate::context::AppContext;
use crate::wrapup::commands::RequestWrapUpCommand;
pub async fn execute(ctx: &AppContext, cmd: RequestWrapUpCommand) -> Result<WrapUpId, DomainError> {
let existing = ctx
.repos
.wrapup_repo
.find_existing(cmd.user_id, cmd.start_date, cmd.end_date)
.await?;
if let Some(ref rec) = existing
&& (rec.status == WrapUpStatus::Ready || rec.status == WrapUpStatus::Generating)
{
return Ok(rec.id.clone());
}
let id = WrapUpId::generate();
let record = domain::models::wrapup::WrapUpRecord {
id: id.clone(),
user_id: cmd.user_id,
start_date: cmd.start_date,
end_date: cmd.end_date,
status: WrapUpStatus::Pending,
report_json: None,
error_message: None,
created_at: chrono::Utc::now().naive_utc(),
completed_at: None,
};
ctx.repos.wrapup_repo.create(&record).await?;
ctx.services
.event_publisher
.publish(&DomainEvent::WrapUpRequested {
wrapup_id: id.clone(),
user_id: cmd.user_id.map(UserId::from_uuid),
start_date: cmd.start_date,
end_date: cmd.end_date,
})
.await?;
Ok(id)
}

View File

@@ -0,0 +1,9 @@
use domain::errors::DomainError;
use domain::models::wrapup::WrapUpRecord;
use domain::value_objects::WrapUpId;
use crate::context::AppContext;
pub async fn execute(ctx: &AppContext, id: WrapUpId) -> Result<Option<WrapUpRecord>, DomainError> {
ctx.repos.wrapup_repo.get_by_id(&id).await
}

View File

@@ -0,0 +1,51 @@
use crate::context::AppContext;
use crate::wrapup::{compute, queries::ComputeWrapUpQuery};
use domain::errors::DomainError;
use domain::events::DomainEvent;
use domain::models::wrapup::{DateRange, WrapUpScope, WrapUpStatus};
use domain::value_objects::WrapUpId;
pub async fn execute(
ctx: &AppContext,
wrapup_id: WrapUpId,
user_id: Option<uuid::Uuid>,
start_date: chrono::NaiveDate,
end_date: chrono::NaiveDate,
) -> Result<(), DomainError> {
ctx.repos
.wrapup_repo
.update_status(&wrapup_id, &WrapUpStatus::Generating, None)
.await?;
let scope = match user_id {
Some(uid) => WrapUpScope::User(uid),
None => WrapUpScope::Global,
};
let query = ComputeWrapUpQuery {
scope,
date_range: DateRange {
start: start_date,
end: end_date,
},
};
match compute::execute(ctx, query).await {
Ok(report) => {
let json = serde_json::to_string(&report)
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
ctx.repos.wrapup_repo.set_complete(&wrapup_id, &json).await?;
ctx.services
.event_publisher
.publish(&DomainEvent::WrapUpCompleted { wrapup_id })
.await?;
Ok(())
}
Err(e) => {
ctx.repos
.wrapup_repo
.update_status(&wrapup_id, &WrapUpStatus::Failed, Some(&e.to_string()))
.await?;
Err(e)
}
}
}

View File

@@ -0,0 +1,20 @@
use uuid::Uuid;
use domain::errors::DomainError;
use domain::models::wrapup::WrapUpRecord;
use crate::context::AppContext;
pub struct ListWrapUpsQuery {
pub user_id: Option<Uuid>,
}
pub async fn execute(
ctx: &AppContext,
query: ListWrapUpsQuery,
) -> Result<Vec<WrapUpRecord>, DomainError> {
match query.user_id {
Some(uid) => ctx.repos.wrapup_repo.list_for_user(uid).await,
None => ctx.repos.wrapup_repo.list_global().await,
}
}

View File

@@ -1,2 +1,8 @@
pub mod commands;
pub mod compute;
pub mod event_handler;
pub mod generate;
pub mod get_wrapup;
pub mod handle_requested;
pub mod list_wrapups;
pub mod queries;

View File

@@ -9,6 +9,7 @@ chrono = { workspace = true }
async-trait = { workspace = true }
thiserror = { workspace = true }
futures = { workspace = true }
serde = { workspace = true }
email_address = "0.2.9"

View File

@@ -3,7 +3,7 @@ use chrono::NaiveDateTime;
use crate::{
errors::DomainError,
value_objects::{ExternalMetadataId, MovieId, PosterPath, Rating, ReviewId, UserId},
value_objects::{ExternalMetadataId, MovieId, PosterPath, Rating, ReviewId, UserId, WrapUpId},
};
#[derive(Clone, Debug)]
@@ -75,6 +75,15 @@ pub enum DomainEvent {
title: String,
source: String,
},
WrapUpRequested {
wrapup_id: WrapUpId,
user_id: Option<UserId>,
start_date: chrono::NaiveDate,
end_date: chrono::NaiveDate,
},
WrapUpCompleted {
wrapup_id: WrapUpId,
},
}
#[async_trait]

View File

@@ -1,13 +1,16 @@
use chrono::NaiveDate;
use chrono::{NaiveDate, NaiveDateTime};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Clone, Debug)]
use crate::value_objects::WrapUpId;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DateRange {
pub start: NaiveDate,
pub end: NaiveDate,
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct MovieRef {
pub title: String,
pub year: u16,
@@ -15,52 +18,52 @@ pub struct MovieRef {
pub poster_path: Option<String>,
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct UserRef {
pub user_id: Uuid,
pub display_name: String,
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PersonStat {
pub name: String,
pub count: u32,
pub avg_rating: f64,
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct GenreStat {
pub genre: String,
pub count: u32,
pub avg_rating: f64,
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct KeywordStat {
pub keyword: String,
pub count: u32,
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct LangStat {
pub language: String,
pub count: u32,
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct MonthCount {
pub year_month: String,
pub label: String,
pub count: u32,
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum WrapUpScope {
User(Uuid),
Global,
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct WrapUpReport {
pub scope: WrapUpScope,
pub date_range: DateRange,
@@ -117,3 +120,24 @@ pub struct WrapUpReport {
pub poster_paths: Vec<String>,
pub top_cast_profile_paths: Vec<String>,
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum WrapUpStatus {
Pending,
Generating,
Ready,
Failed,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct WrapUpRecord {
pub id: WrapUpId,
pub user_id: Option<Uuid>,
pub start_date: NaiveDate,
pub end_date: NaiveDate,
pub status: WrapUpStatus,
pub report_json: Option<String>,
pub error_message: Option<String>,
pub created_at: NaiveDateTime,
pub completed_at: Option<NaiveDateTime>,
}

View File

@@ -1,5 +1,5 @@
use async_trait::async_trait;
use chrono::{DateTime, NaiveDateTime, Utc};
use chrono::{DateTime, NaiveDate, NaiveDateTime, Utc};
use uuid::Uuid;
use crate::{
@@ -13,12 +13,12 @@ use crate::{
ReviewHistory, SearchQuery, SearchResults, User, UserStats, UserSummary, UserTrends,
WatchEvent, WatchEventStatus, WatchlistEntry, WatchlistWithMovie, WebhookToken,
collections::{self, PageParams, Paginated},
wrapup::{DateRange, WrapUpScope},
wrapup::{DateRange, WrapUpRecord, WrapUpScope, WrapUpStatus},
},
value_objects::{
Email, ExternalMetadataId, ImportProfileId, ImportSessionId, MovieId, MovieTitle,
PasswordHash, PosterUrl, ReleaseYear, ReviewId, UserId, Username, WatchEventId,
WebhookTokenId,
WebhookTokenId, WrapUpId,
},
};
@@ -471,6 +471,27 @@ pub trait WebhookTokenRepository: Send + Sync {
async fn touch_last_used(&self, id: &WebhookTokenId) -> Result<(), DomainError>;
}
#[async_trait]
pub trait WrapUpRepository: Send + Sync {
async fn create(&self, record: &WrapUpRecord) -> Result<(), DomainError>;
async fn update_status(
&self,
id: &WrapUpId,
status: &WrapUpStatus,
error: Option<&str>,
) -> Result<(), DomainError>;
async fn set_complete(&self, id: &WrapUpId, report_json: &str) -> Result<(), DomainError>;
async fn get_by_id(&self, id: &WrapUpId) -> Result<Option<WrapUpRecord>, DomainError>;
async fn list_for_user(&self, user_id: Uuid) -> Result<Vec<WrapUpRecord>, DomainError>;
async fn list_global(&self) -> Result<Vec<WrapUpRecord>, DomainError>;
async fn find_existing(
&self,
user_id: Option<Uuid>,
start: NaiveDate,
end: NaiveDate,
) -> Result<Option<WrapUpRecord>, DomainError>;
}
// ── Wrap-up / Year-in-Review ─────────────────────────────────────────────────
#[derive(Clone, Debug)]

View File

@@ -24,11 +24,11 @@ use crate::{
ImportSessionRepository, MetadataClient, MetadataSearchCriteria, MovieProfileRepository,
MovieRepository, PasswordHasher, PersonCommand, PersonQuery, PosterFetcherClient,
ReviewRepository, SearchCommand, SearchPort, StatsRepository, UserProfileFieldsRepository,
UserRepository, WatchlistRepository,
UserRepository, WatchlistRepository, WrapUpRepository,
},
value_objects::{
Email, ExternalMetadataId, ImportProfileId, ImportSessionId, MovieId, MovieTitle,
PasswordHash, PosterUrl, ReleaseYear, ReviewId, UserId, Username,
PasswordHash, PosterUrl, ReleaseYear, ReviewId, UserId, Username, WrapUpId,
},
};
@@ -1050,3 +1050,143 @@ impl crate::ports::WrapUpStatsQuery for InMemoryWrapUpStatsQuery {
Ok(filtered)
}
}
// ── InMemoryWrapUpRepository ────────────────────────────────────────────────
pub struct InMemoryWrapUpRepository {
pub store: Mutex<Vec<crate::models::wrapup::WrapUpRecord>>,
}
impl InMemoryWrapUpRepository {
pub fn new() -> Arc<Self> {
Arc::new(Self {
store: Mutex::new(Vec::new()),
})
}
}
#[async_trait]
impl WrapUpRepository for InMemoryWrapUpRepository {
async fn create(
&self,
record: &crate::models::wrapup::WrapUpRecord,
) -> Result<(), DomainError> {
self.store.lock().unwrap().push(record.clone());
Ok(())
}
async fn update_status(
&self,
id: &WrapUpId,
status: &crate::models::wrapup::WrapUpStatus,
error: Option<&str>,
) -> Result<(), DomainError> {
let mut store = self.store.lock().unwrap();
if let Some(rec) = store.iter_mut().find(|r| r.id == *id) {
rec.status = status.clone();
rec.error_message = error.map(|s| s.to_string());
Ok(())
} else {
Err(DomainError::NotFound("wrapup record".into()))
}
}
async fn set_complete(&self, id: &WrapUpId, report_json: &str) -> Result<(), DomainError> {
let mut store = self.store.lock().unwrap();
if let Some(rec) = store.iter_mut().find(|r| r.id == *id) {
rec.status = crate::models::wrapup::WrapUpStatus::Ready;
rec.report_json = Some(report_json.to_string());
rec.completed_at = Some(chrono::Utc::now().naive_utc());
Ok(())
} else {
Err(DomainError::NotFound("wrapup record".into()))
}
}
async fn get_by_id(
&self,
id: &WrapUpId,
) -> Result<Option<crate::models::wrapup::WrapUpRecord>, DomainError> {
let store = self.store.lock().unwrap();
Ok(store.iter().find(|r| r.id == *id).cloned())
}
async fn list_for_user(
&self,
user_id: Uuid,
) -> Result<Vec<crate::models::wrapup::WrapUpRecord>, DomainError> {
let store = self.store.lock().unwrap();
Ok(store
.iter()
.filter(|r| r.user_id == Some(user_id))
.cloned()
.collect())
}
async fn list_global(
&self,
) -> Result<Vec<crate::models::wrapup::WrapUpRecord>, DomainError> {
let store = self.store.lock().unwrap();
Ok(store.iter().filter(|r| r.user_id.is_none()).cloned().collect())
}
async fn find_existing(
&self,
user_id: Option<Uuid>,
start: chrono::NaiveDate,
end: chrono::NaiveDate,
) -> Result<Option<crate::models::wrapup::WrapUpRecord>, DomainError> {
let store = self.store.lock().unwrap();
Ok(store
.iter()
.find(|r| r.user_id == user_id && r.start_date == start && r.end_date == end)
.cloned())
}
}
// ── PanicWrapUpRepository ──────────────────────────────────────────────────
pub struct PanicWrapUpRepository;
#[async_trait]
impl WrapUpRepository for PanicWrapUpRepository {
async fn create(&self, _: &crate::models::wrapup::WrapUpRecord) -> Result<(), DomainError> {
panic!("PanicWrapUpRepository called")
}
async fn update_status(
&self,
_: &WrapUpId,
_: &crate::models::wrapup::WrapUpStatus,
_: Option<&str>,
) -> Result<(), DomainError> {
panic!("PanicWrapUpRepository called")
}
async fn set_complete(&self, _: &WrapUpId, _: &str) -> Result<(), DomainError> {
panic!("PanicWrapUpRepository called")
}
async fn get_by_id(
&self,
_: &WrapUpId,
) -> Result<Option<crate::models::wrapup::WrapUpRecord>, DomainError> {
panic!("PanicWrapUpRepository called")
}
async fn list_for_user(
&self,
_: Uuid,
) -> Result<Vec<crate::models::wrapup::WrapUpRecord>, DomainError> {
panic!("PanicWrapUpRepository called")
}
async fn list_global(
&self,
) -> Result<Vec<crate::models::wrapup::WrapUpRecord>, DomainError> {
panic!("PanicWrapUpRepository called")
}
async fn find_existing(
&self,
_: Option<Uuid>,
_: chrono::NaiveDate,
_: chrono::NaiveDate,
) -> Result<Option<crate::models::wrapup::WrapUpRecord>, DomainError> {
panic!("PanicWrapUpRepository called")
}
}

View File

@@ -3,7 +3,7 @@ use uuid::Uuid;
macro_rules! uuid_id {
($name:ident) => {
#[derive(Clone, Debug, PartialEq, Eq)]
#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct $name(Uuid);
impl $name {
@@ -28,6 +28,7 @@ uuid_id!(ImportProfileId);
uuid_id!(WatchlistEntryId);
uuid_id!(WatchEventId);
uuid_id!(WebhookTokenId);
uuid_id!(WrapUpId);
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ExternalMetadataId(String);

View File

@@ -31,6 +31,8 @@ pub struct DatabaseOutput {
pub search_command: Arc<dyn domain::ports::SearchCommand>,
pub profile_fields: Arc<dyn UserProfileFieldsRepository>,
pub ap_content: Arc<dyn LocalApContentQuery>,
pub wrapup_stats: Arc<dyn domain::ports::WrapUpStatsQuery>,
pub wrapup_repo: Arc<dyn domain::ports::WrapUpRepository>,
pub db_pool: DbPool,
}
@@ -67,6 +69,8 @@ pub async fn build_database_adapters(backend: &str, url: &str) -> anyhow::Result
search_command: sc,
profile_fields: pf,
ap_content: w.ap_content,
wrapup_stats: w.wrapup_stats,
wrapup_repo: w.wrapup_repo,
db_pool: DbPool::Postgres(w.pool),
})
}
@@ -100,6 +104,8 @@ pub async fn build_database_adapters(backend: &str, url: &str) -> anyhow::Result
search_command: sc,
profile_fields: pf,
ap_content: w.ap_content,
wrapup_stats: w.wrapup_stats,
wrapup_repo: w.wrapup_repo,
db_pool: DbPool::Sqlite(w.pool),
})
}

View File

@@ -4,6 +4,7 @@ pub mod images;
pub mod import;
pub mod rss;
pub mod webhook;
pub mod wrapup;
const DEFAULT_PAGE_LIMIT: u32 = 5;
const RSS_FEED_LIMIT: u32 = 50;

View File

@@ -0,0 +1,147 @@
use axum::{
Json,
extract::{Path, State},
http::StatusCode,
response::IntoResponse,
};
use chrono::NaiveDate;
use uuid::Uuid;
use application::wrapup::{
commands::RequestWrapUpCommand,
generate, get_wrapup,
list_wrapups::{self, ListWrapUpsQuery},
};
use domain::errors::DomainError;
use domain::models::wrapup::{WrapUpRecord, WrapUpStatus};
use domain::value_objects::WrapUpId;
use crate::{errors::ApiError, extractors::AuthenticatedUser, state::AppState};
use api_types::wrapup::{
GenerateWrapUpRequest, WrapUpGeneratedResponse, WrapUpListResponse, WrapUpStatusResponse,
};
fn record_to_dto(r: &WrapUpRecord) -> WrapUpStatusResponse {
WrapUpStatusResponse {
id: r.id.value().to_string(),
user_id: r.user_id.map(|u| u.to_string()),
status: format!("{:?}", r.status),
start_date: r.start_date.to_string(),
end_date: r.end_date.to_string(),
created_at: r.created_at.to_string(),
completed_at: r.completed_at.map(|t| t.to_string()),
error_message: r.error_message.clone(),
}
}
#[utoipa::path(
post, path = "/api/v1/wrapups/generate",
request_body = GenerateWrapUpRequest,
responses(
(status = 200, body = WrapUpGeneratedResponse),
(status = 400, description = "Invalid date format"),
(status = 401, description = "Unauthorized"),
),
security(("bearer_auth" = []))
)]
pub async fn post_generate(
State(state): State<AppState>,
user: AuthenticatedUser,
Json(req): Json<GenerateWrapUpRequest>,
) -> Result<Json<WrapUpGeneratedResponse>, ApiError> {
let start = NaiveDate::parse_from_str(&req.start_date, "%Y-%m-%d")
.map_err(|_| DomainError::ValidationError("invalid start_date".into()))?;
let end = NaiveDate::parse_from_str(&req.end_date, "%Y-%m-%d")
.map_err(|_| DomainError::ValidationError("invalid end_date".into()))?;
let user_id = if req.global.unwrap_or(false) {
None
} else {
Some(user.0.value())
};
let cmd = RequestWrapUpCommand {
user_id,
start_date: start,
end_date: end,
};
let id = generate::execute(&state.app_ctx, cmd).await?;
Ok(Json(WrapUpGeneratedResponse {
id: id.value().to_string(),
}))
}
#[utoipa::path(
get, path = "/api/v1/wrapups",
responses(
(status = 200, body = WrapUpListResponse),
(status = 401, description = "Unauthorized"),
),
security(("bearer_auth" = []))
)]
pub async fn get_list(
State(state): State<AppState>,
user: AuthenticatedUser,
) -> Result<Json<WrapUpListResponse>, ApiError> {
let records = list_wrapups::execute(
&state.app_ctx,
ListWrapUpsQuery {
user_id: Some(user.0.value()),
},
)
.await?;
Ok(Json(WrapUpListResponse {
items: records.iter().map(record_to_dto).collect(),
}))
}
#[utoipa::path(
get, path = "/api/v1/wrapups/{id}",
params(("id" = Uuid, Path, description = "Wrap-up ID")),
responses(
(status = 200, body = WrapUpStatusResponse),
(status = 401, description = "Unauthorized"),
(status = 404, description = "Not found"),
),
security(("bearer_auth" = []))
)]
pub async fn get_status(
State(state): State<AppState>,
_user: AuthenticatedUser,
Path(id): Path<Uuid>,
) -> Result<Json<WrapUpStatusResponse>, ApiError> {
let record = get_wrapup::execute(&state.app_ctx, WrapUpId::from_uuid(id))
.await?
.ok_or_else(|| DomainError::NotFound("wrap-up not found".into()))?;
Ok(Json(record_to_dto(&record)))
}
#[utoipa::path(
get, path = "/api/v1/wrapups/{id}/report",
params(("id" = Uuid, Path, description = "Wrap-up ID")),
responses(
(status = 200, description = "Report JSON", content_type = "application/json"),
(status = 202, description = "Still generating"),
(status = 401, description = "Unauthorized"),
(status = 404, description = "Not found"),
),
security(("bearer_auth" = []))
)]
pub async fn get_report(
State(state): State<AppState>,
_user: AuthenticatedUser,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
match get_wrapup::execute(&state.app_ctx, WrapUpId::from_uuid(id)).await {
Ok(Some(record)) if record.status == WrapUpStatus::Ready => match record.report_json {
Some(json) => (
StatusCode::OK,
[("content-type", "application/json")],
json,
)
.into_response(),
None => StatusCode::NOT_FOUND.into_response(),
},
Ok(Some(_)) => StatusCode::ACCEPTED.into_response(),
Ok(None) => StatusCode::NOT_FOUND.into_response(),
Err(e) => crate::errors::domain_error_response(e),
}
}

View File

@@ -193,7 +193,8 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> {
social_query: social_query.clone(),
#[cfg(not(feature = "federation"))]
social_query: Arc::new(domain::testing::NoopSocialQueryPort),
wrapup_stats: Arc::new(domain::testing::PanicWrapUpStatsQuery) as Arc<dyn domain::ports::WrapUpStatsQuery>,
wrapup_stats: db.wrapup_stats,
wrapup_repo: db.wrapup_repo,
},
services: Services {
auth: auth_service,

View File

@@ -7,6 +7,7 @@ mod social;
mod users;
mod watchlist;
mod webhook;
mod wrapup;
use axum::Router;
use utoipa::{
@@ -42,6 +43,7 @@ fn build() -> utoipa::openapi::OpenApi {
api.merge(search::SearchDoc::openapi());
api.merge(watchlist::WatchlistDoc::openapi());
api.merge(webhook::WebhookDoc::openapi());
api.merge(wrapup::WrapUpDoc::openapi());
#[cfg(feature = "federation")]
api.merge(social::SocialDoc::openapi());
SecurityAddon.modify(&mut api);

View File

@@ -0,0 +1,18 @@
use utoipa::OpenApi;
#[derive(OpenApi)]
#[openapi(
paths(
crate::handlers::wrapup::post_generate,
crate::handlers::wrapup::get_list,
crate::handlers::wrapup::get_status,
crate::handlers::wrapup::get_report,
),
components(schemas(
api_types::wrapup::GenerateWrapUpRequest,
api_types::wrapup::WrapUpGeneratedResponse,
api_types::wrapup::WrapUpStatusResponse,
api_types::wrapup::WrapUpListResponse,
))
)]
pub struct WrapUpDoc;

View File

@@ -346,6 +346,16 @@ fn api_routes(rate_limit: u64) -> Router<AppState> {
.route(
"/watch-queue/dismiss",
routing::post(handlers::webhook::post_dismiss_watch_events),
)
.route(
"/wrapups/generate",
routing::post(handlers::wrapup::post_generate),
)
.route("/wrapups", routing::get(handlers::wrapup::get_list))
.route("/wrapups/{id}", routing::get(handlers::wrapup::get_status))
.route(
"/wrapups/{id}/report",
routing::get(handlers::wrapup::get_report),
);
#[cfg(feature = "federation")]

View File

@@ -568,6 +568,52 @@ impl domain::ports::WebhookTokenRepository for Panic {
}
}
#[async_trait::async_trait]
impl domain::ports::WrapUpStatsQuery for Panic {
async fn get_reviews_with_profiles(
&self,
_: &domain::models::wrapup::WrapUpScope,
_: &domain::models::wrapup::DateRange,
) -> Result<Vec<domain::ports::WrapUpMovieRow>, DomainError> {
panic!()
}
}
#[async_trait::async_trait]
impl domain::ports::WrapUpRepository for Panic {
async fn create(&self, _: &domain::models::wrapup::WrapUpRecord) -> Result<(), DomainError> {
panic!()
}
async fn update_status(
&self,
_: &domain::value_objects::WrapUpId,
_: &domain::models::wrapup::WrapUpStatus,
_: Option<&str>,
) -> Result<(), DomainError> {
panic!()
}
async fn set_complete(&self, _: &domain::value_objects::WrapUpId, _: &str) -> Result<(), DomainError> {
panic!()
}
async fn get_by_id(&self, _: &domain::value_objects::WrapUpId) -> Result<Option<domain::models::wrapup::WrapUpRecord>, DomainError> {
panic!()
}
async fn list_for_user(&self, _: uuid::Uuid) -> Result<Vec<domain::models::wrapup::WrapUpRecord>, DomainError> {
panic!()
}
async fn list_global(&self) -> Result<Vec<domain::models::wrapup::WrapUpRecord>, DomainError> {
panic!()
}
async fn find_existing(
&self,
_: Option<uuid::Uuid>,
_: chrono::NaiveDate,
_: chrono::NaiveDate,
) -> Result<Option<domain::models::wrapup::WrapUpRecord>, DomainError> {
panic!()
}
}
// --- Single state factory — only auth_service varies ---
pub fn make_test_state(auth_service: Arc<dyn AuthService>) -> crate::state::AppState {
@@ -593,6 +639,8 @@ pub fn make_test_state(auth_service: Arc<dyn AuthService>) -> crate::state::AppS
search_command: Arc::clone(&repo) as _,
remote_watchlist: Arc::clone(&repo) as _,
social_query: Arc::clone(&repo) as _,
wrapup_stats: Arc::clone(&repo) as _,
wrapup_repo: Arc::clone(&repo) as _,
},
services: Services {
auth: auth_service,

View File

@@ -415,6 +415,8 @@ async fn test_app() -> Router {
search_command: Arc::new(PanicSearchCommand),
remote_watchlist: Arc::new(PanicRemoteWatchlist),
social_query: Arc::new(PanicSocialQuery),
wrapup_stats: Arc::new(domain::testing::PanicWrapUpStatsQuery) as _,
wrapup_repo: Arc::new(domain::testing::PanicWrapUpRepository) as _,
},
services: Services {
auth: Arc::new(PanicAuth),

View File

@@ -36,6 +36,8 @@ pub struct WorkerDbOutput {
pub ap_content: Arc<dyn LocalApContentQuery>,
pub image_ref_command: Arc<dyn ImageRefCommand>,
pub image_ref_query: Arc<dyn ImageRefQuery>,
pub wrapup_stats: Arc<dyn domain::ports::WrapUpStatsQuery>,
pub wrapup_repo: Arc<dyn domain::ports::WrapUpRepository>,
pub db_pool: DbPool,
}
@@ -76,6 +78,8 @@ pub async fn connect(database_url: &str, backend: &str) -> anyhow::Result<Worker
ap_content: w.ap_content,
image_ref_command,
image_ref_query,
wrapup_stats: w.wrapup_stats,
wrapup_repo: w.wrapup_repo,
db_pool: DbPool::Postgres(w.pool),
})
}
@@ -113,6 +117,8 @@ pub async fn connect(database_url: &str, backend: &str) -> anyhow::Result<Worker
ap_content: w.ap_content,
image_ref_command,
image_ref_query,
wrapup_stats: w.wrapup_stats,
wrapup_repo: w.wrapup_repo,
db_pool: DbPool::Sqlite(w.pool),
})
}

View File

@@ -92,7 +92,8 @@ async fn main() -> anyhow::Result<()> {
social_query: fed_social_query,
#[cfg(not(feature = "federation"))]
social_query: Arc::new(domain::testing::NoopSocialQueryPort),
wrapup_stats: Arc::new(domain::testing::PanicWrapUpStatsQuery) as Arc<dyn domain::ports::WrapUpStatsQuery>,
wrapup_stats: db.wrapup_stats,
wrapup_repo: db.wrapup_repo,
},
services: Services {
auth: auth_service,
@@ -147,6 +148,7 @@ async fn main() -> anyhow::Result<()> {
let mut periodic_jobs: Vec<Arc<dyn PeriodicJob>> = vec![
Arc::new(application::jobs::ImportSessionCleanupJob::new(ctx.clone())),
Arc::new(application::jobs::WatchEventCleanupJob::new(ctx.clone())),
Arc::new(application::jobs::WrapUpAutoGenerateJob::new(ctx.clone())),
];
if let Some(job) = enrichment_job {
periodic_jobs.push(job);
@@ -193,7 +195,10 @@ async fn main() -> anyhow::Result<()> {
Arc::clone(&ctx.repos.movie),
Arc::clone(&ctx.repos.search_command),
)) as Arc<dyn EventHandler>;
let mut h = vec![poster, cleanup, search_cleanup, discovery_indexer];
let wrapup_handler = Arc::new(application::wrapup::event_handler::WrapUpEventHandler::new(
ctx.clone(),
)) as Arc<dyn EventHandler>;
let mut h = vec![poster, cleanup, search_cleanup, discovery_indexer, wrapup_handler];
if let Some(e) = enrichment_handler {
h.push(e);
}
@@ -234,6 +239,9 @@ async fn main() -> anyhow::Result<()> {
Arc::clone(&ctx.repos.search_command),
)) as Arc<dyn EventHandler>;
tracing::info!("federation event handler registered");
let wrapup_handler = Arc::new(application::wrapup::event_handler::WrapUpEventHandler::new(
ctx.clone(),
)) as Arc<dyn EventHandler>;
let mut h = vec![
poster,
cleanup,
@@ -241,6 +249,7 @@ async fn main() -> anyhow::Result<()> {
backfill,
search_cleanup,
discovery_indexer,
wrapup_handler,
];
if let Some(e) = enrichment_handler {
h.push(e);