From 7ef8912d69fbd68e2f645fe81f45d155ac63d994 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Tue, 2 Jun 2026 22:13:08 +0200 Subject: [PATCH] feat: wrapup worker handler + auto-generate job --- crates/application/Cargo.toml | 1 + crates/application/src/jobs.rs | 76 +++++++++++++++++++ .../application/src/wrapup/event_handler.rs | 40 ++++++++++ .../src/wrapup/handle_requested.rs | 51 +++++++++++++ crates/application/src/wrapup/mod.rs | 2 + crates/domain/Cargo.toml | 1 + crates/domain/src/models/wrapup.rs | 25 +++--- crates/domain/src/value_objects.rs | 2 +- crates/worker/src/main.rs | 10 ++- 9 files changed, 194 insertions(+), 14 deletions(-) create mode 100644 crates/application/src/wrapup/event_handler.rs create mode 100644 crates/application/src/wrapup/handle_requested.rs diff --git a/crates/application/Cargo.toml b/crates/application/Cargo.toml index b0a1706..98620b8 100644 --- a/crates/application/Cargo.toml +++ b/crates/application/Cargo.toml @@ -14,6 +14,7 @@ tokio = { workspace = true } sha2 = { workspace = true } rand = { workspace = true } hex = { workspace = true } +serde_json = { workspace = true } [features] xlsx = [] diff --git a/crates/application/src/jobs.rs b/crates/application/src/jobs.rs index 7eeb90a..87abbe9 100644 --- a/crates/application/src/jobs.rs +++ b/crates/application/src/jobs.rs @@ -1,6 +1,7 @@ use std::time::Duration; use async_trait::async_trait; +use chrono::Datelike; use domain::{errors::DomainError, events::DomainEvent, ports::PeriodicJob}; use crate::context::AppContext; @@ -85,3 +86,78 @@ impl PeriodicJob for EnrichmentStalenessJob { Ok(()) } } + +pub struct WrapUpAutoGenerateJob { + ctx: AppContext, +} + +impl WrapUpAutoGenerateJob { + pub fn new(ctx: AppContext) -> Self { + Self { ctx } + } +} + +#[async_trait] +impl PeriodicJob for WrapUpAutoGenerateJob { + fn interval(&self) -> Duration { + Duration::from_secs(86400) + } + + async fn run(&self) -> Result<(), DomainError> { + let now = chrono::Utc::now().naive_utc(); + // Only run in January + if now.month() != 1 { + return Ok(()); + } + let year = now.year() - 1; + let start = chrono::NaiveDate::from_ymd_opt(year, 1, 1) + .ok_or_else(|| DomainError::ValidationError("invalid date".into()))?; + let end = chrono::NaiveDate::from_ymd_opt(year + 1, 1, 1) + .ok_or_else(|| DomainError::ValidationError("invalid date".into()))?; + + let users = self.ctx.repos.user.list_with_stats().await?; + for user in &users { + if user.total_movies > 0 { + let existing = self + .ctx + .repos + .wrapup_repo + .find_existing(Some(user.user_id.value()), start, end) + .await?; + if existing.is_none() { + let cmd = crate::wrapup::commands::RequestWrapUpCommand { + user_id: Some(user.user_id.value()), + start_date: start, + end_date: end, + }; + if let Err(e) = crate::wrapup::generate::execute(&self.ctx, cmd).await { + tracing::warn!( + "auto-generate wrapup for user {} failed: {e}", + user.user_id.value() + ); + } + } + } + } + + // Global wrap-up + let existing = self + .ctx + .repos + .wrapup_repo + .find_existing(None, start, end) + .await?; + if existing.is_none() { + let cmd = crate::wrapup::commands::RequestWrapUpCommand { + user_id: None, + start_date: start, + end_date: end, + }; + if let Err(e) = crate::wrapup::generate::execute(&self.ctx, cmd).await { + tracing::warn!("auto-generate global wrapup failed: {e}"); + } + } + + Ok(()) + } +} diff --git a/crates/application/src/wrapup/event_handler.rs b/crates/application/src/wrapup/event_handler.rs new file mode 100644 index 0000000..c85ab05 --- /dev/null +++ b/crates/application/src/wrapup/event_handler.rs @@ -0,0 +1,40 @@ +use async_trait::async_trait; +use domain::errors::DomainError; +use domain::events::DomainEvent; +use domain::ports::EventHandler; + +use crate::context::AppContext; + +pub struct WrapUpEventHandler { + ctx: AppContext, +} + +impl WrapUpEventHandler { + pub fn new(ctx: AppContext) -> Self { + Self { ctx } + } +} + +#[async_trait] +impl EventHandler for WrapUpEventHandler { + async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> { + match event { + DomainEvent::WrapUpRequested { + wrapup_id, + user_id, + start_date, + end_date, + } => { + super::handle_requested::execute( + &self.ctx, + wrapup_id.clone(), + user_id.as_ref().map(|u| u.value()), + *start_date, + *end_date, + ) + .await + } + _ => Ok(()), + } + } +} diff --git a/crates/application/src/wrapup/handle_requested.rs b/crates/application/src/wrapup/handle_requested.rs new file mode 100644 index 0000000..3720f58 --- /dev/null +++ b/crates/application/src/wrapup/handle_requested.rs @@ -0,0 +1,51 @@ +use crate::context::AppContext; +use crate::wrapup::{compute, queries::ComputeWrapUpQuery}; +use domain::errors::DomainError; +use domain::events::DomainEvent; +use domain::models::wrapup::{DateRange, WrapUpScope, WrapUpStatus}; +use domain::value_objects::WrapUpId; + +pub async fn execute( + ctx: &AppContext, + wrapup_id: WrapUpId, + user_id: Option, + start_date: chrono::NaiveDate, + end_date: chrono::NaiveDate, +) -> Result<(), DomainError> { + ctx.repos + .wrapup_repo + .update_status(&wrapup_id, &WrapUpStatus::Generating, None) + .await?; + + let scope = match user_id { + Some(uid) => WrapUpScope::User(uid), + None => WrapUpScope::Global, + }; + let query = ComputeWrapUpQuery { + scope, + date_range: DateRange { + start: start_date, + end: end_date, + }, + }; + + match compute::execute(ctx, query).await { + Ok(report) => { + let json = serde_json::to_string(&report) + .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; + ctx.repos.wrapup_repo.set_complete(&wrapup_id, &json).await?; + ctx.services + .event_publisher + .publish(&DomainEvent::WrapUpCompleted { wrapup_id }) + .await?; + Ok(()) + } + Err(e) => { + ctx.repos + .wrapup_repo + .update_status(&wrapup_id, &WrapUpStatus::Failed, Some(&e.to_string())) + .await?; + Err(e) + } + } +} diff --git a/crates/application/src/wrapup/mod.rs b/crates/application/src/wrapup/mod.rs index 076a360..62e4c95 100644 --- a/crates/application/src/wrapup/mod.rs +++ b/crates/application/src/wrapup/mod.rs @@ -1,6 +1,8 @@ pub mod commands; pub mod compute; +pub mod event_handler; pub mod generate; pub mod get_wrapup; +pub mod handle_requested; pub mod list_wrapups; pub mod queries; diff --git a/crates/domain/Cargo.toml b/crates/domain/Cargo.toml index 20db610..afcb7dd 100644 --- a/crates/domain/Cargo.toml +++ b/crates/domain/Cargo.toml @@ -9,6 +9,7 @@ chrono = { workspace = true } async-trait = { workspace = true } thiserror = { workspace = true } futures = { workspace = true } +serde = { workspace = true } email_address = "0.2.9" diff --git a/crates/domain/src/models/wrapup.rs b/crates/domain/src/models/wrapup.rs index b7d8375..fe07dcd 100644 --- a/crates/domain/src/models/wrapup.rs +++ b/crates/domain/src/models/wrapup.rs @@ -1,15 +1,16 @@ use chrono::{NaiveDate, NaiveDateTime}; +use serde::{Deserialize, Serialize}; use uuid::Uuid; use crate::value_objects::WrapUpId; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct DateRange { pub start: NaiveDate, pub end: NaiveDate, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct MovieRef { pub title: String, pub year: u16, @@ -17,52 +18,52 @@ pub struct MovieRef { pub poster_path: Option, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct UserRef { pub user_id: Uuid, pub display_name: String, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct PersonStat { pub name: String, pub count: u32, pub avg_rating: f64, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct GenreStat { pub genre: String, pub count: u32, pub avg_rating: f64, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct KeywordStat { pub keyword: String, pub count: u32, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct LangStat { pub language: String, pub count: u32, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct MonthCount { pub year_month: String, pub label: String, pub count: u32, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub enum WrapUpScope { User(Uuid), Global, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct WrapUpReport { pub scope: WrapUpScope, pub date_range: DateRange, @@ -120,7 +121,7 @@ pub struct WrapUpReport { pub top_cast_profile_paths: Vec, } -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub enum WrapUpStatus { Pending, Generating, @@ -128,7 +129,7 @@ pub enum WrapUpStatus { Failed, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct WrapUpRecord { pub id: WrapUpId, pub user_id: Option, diff --git a/crates/domain/src/value_objects.rs b/crates/domain/src/value_objects.rs index a811a2e..b2d789d 100644 --- a/crates/domain/src/value_objects.rs +++ b/crates/domain/src/value_objects.rs @@ -3,7 +3,7 @@ use uuid::Uuid; macro_rules! uuid_id { ($name:ident) => { - #[derive(Clone, Debug, PartialEq, Eq)] + #[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct $name(Uuid); impl $name { diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index ee34db3..5db83d7 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -148,6 +148,7 @@ async fn main() -> anyhow::Result<()> { let mut periodic_jobs: Vec> = vec![ Arc::new(application::jobs::ImportSessionCleanupJob::new(ctx.clone())), Arc::new(application::jobs::WatchEventCleanupJob::new(ctx.clone())), + Arc::new(application::jobs::WrapUpAutoGenerateJob::new(ctx.clone())), ]; if let Some(job) = enrichment_job { periodic_jobs.push(job); @@ -194,7 +195,10 @@ async fn main() -> anyhow::Result<()> { Arc::clone(&ctx.repos.movie), Arc::clone(&ctx.repos.search_command), )) as Arc; - let mut h = vec![poster, cleanup, search_cleanup, discovery_indexer]; + let wrapup_handler = Arc::new(application::wrapup::event_handler::WrapUpEventHandler::new( + ctx.clone(), + )) as Arc; + let mut h = vec![poster, cleanup, search_cleanup, discovery_indexer, wrapup_handler]; if let Some(e) = enrichment_handler { h.push(e); } @@ -235,6 +239,9 @@ async fn main() -> anyhow::Result<()> { Arc::clone(&ctx.repos.search_command), )) as Arc; tracing::info!("federation event handler registered"); + let wrapup_handler = Arc::new(application::wrapup::event_handler::WrapUpEventHandler::new( + ctx.clone(), + )) as Arc; let mut h = vec![ poster, cleanup, @@ -242,6 +249,7 @@ async fn main() -> anyhow::Result<()> { backfill, search_cleanup, discovery_indexer, + wrapup_handler, ]; if let Some(e) = enrichment_handler { h.push(e);