feat: wrapup worker handler + auto-generate job

This commit is contained in:
2026-06-02 22:13:08 +02:00
parent ac05cdfeaf
commit 7ef8912d69
9 changed files with 194 additions and 14 deletions

View File

@@ -14,6 +14,7 @@ tokio = { workspace = true }
sha2 = { workspace = true }
rand = { workspace = true }
hex = { workspace = true }
serde_json = { workspace = true }
[features]
xlsx = []

View File

@@ -1,6 +1,7 @@
use std::time::Duration;
use async_trait::async_trait;
use chrono::Datelike;
use domain::{errors::DomainError, events::DomainEvent, ports::PeriodicJob};
use crate::context::AppContext;
@@ -85,3 +86,78 @@ impl PeriodicJob for EnrichmentStalenessJob {
Ok(())
}
}
pub struct WrapUpAutoGenerateJob {
ctx: AppContext,
}
impl WrapUpAutoGenerateJob {
pub fn new(ctx: AppContext) -> Self {
Self { ctx }
}
}
#[async_trait]
impl PeriodicJob for WrapUpAutoGenerateJob {
fn interval(&self) -> Duration {
Duration::from_secs(86400)
}
async fn run(&self) -> Result<(), DomainError> {
let now = chrono::Utc::now().naive_utc();
// Only run in January
if now.month() != 1 {
return Ok(());
}
let year = now.year() - 1;
let start = chrono::NaiveDate::from_ymd_opt(year, 1, 1)
.ok_or_else(|| DomainError::ValidationError("invalid date".into()))?;
let end = chrono::NaiveDate::from_ymd_opt(year + 1, 1, 1)
.ok_or_else(|| DomainError::ValidationError("invalid date".into()))?;
let users = self.ctx.repos.user.list_with_stats().await?;
for user in &users {
if user.total_movies > 0 {
let existing = self
.ctx
.repos
.wrapup_repo
.find_existing(Some(user.user_id.value()), start, end)
.await?;
if existing.is_none() {
let cmd = crate::wrapup::commands::RequestWrapUpCommand {
user_id: Some(user.user_id.value()),
start_date: start,
end_date: end,
};
if let Err(e) = crate::wrapup::generate::execute(&self.ctx, cmd).await {
tracing::warn!(
"auto-generate wrapup for user {} failed: {e}",
user.user_id.value()
);
}
}
}
}
// Global wrap-up
let existing = self
.ctx
.repos
.wrapup_repo
.find_existing(None, start, end)
.await?;
if existing.is_none() {
let cmd = crate::wrapup::commands::RequestWrapUpCommand {
user_id: None,
start_date: start,
end_date: end,
};
if let Err(e) = crate::wrapup::generate::execute(&self.ctx, cmd).await {
tracing::warn!("auto-generate global wrapup failed: {e}");
}
}
Ok(())
}
}

View File

@@ -0,0 +1,40 @@
use async_trait::async_trait;
use domain::errors::DomainError;
use domain::events::DomainEvent;
use domain::ports::EventHandler;
use crate::context::AppContext;
pub struct WrapUpEventHandler {
ctx: AppContext,
}
impl WrapUpEventHandler {
pub fn new(ctx: AppContext) -> Self {
Self { ctx }
}
}
#[async_trait]
impl EventHandler for WrapUpEventHandler {
async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> {
match event {
DomainEvent::WrapUpRequested {
wrapup_id,
user_id,
start_date,
end_date,
} => {
super::handle_requested::execute(
&self.ctx,
wrapup_id.clone(),
user_id.as_ref().map(|u| u.value()),
*start_date,
*end_date,
)
.await
}
_ => Ok(()),
}
}
}

View File

@@ -0,0 +1,51 @@
use crate::context::AppContext;
use crate::wrapup::{compute, queries::ComputeWrapUpQuery};
use domain::errors::DomainError;
use domain::events::DomainEvent;
use domain::models::wrapup::{DateRange, WrapUpScope, WrapUpStatus};
use domain::value_objects::WrapUpId;
pub async fn execute(
ctx: &AppContext,
wrapup_id: WrapUpId,
user_id: Option<uuid::Uuid>,
start_date: chrono::NaiveDate,
end_date: chrono::NaiveDate,
) -> Result<(), DomainError> {
ctx.repos
.wrapup_repo
.update_status(&wrapup_id, &WrapUpStatus::Generating, None)
.await?;
let scope = match user_id {
Some(uid) => WrapUpScope::User(uid),
None => WrapUpScope::Global,
};
let query = ComputeWrapUpQuery {
scope,
date_range: DateRange {
start: start_date,
end: end_date,
},
};
match compute::execute(ctx, query).await {
Ok(report) => {
let json = serde_json::to_string(&report)
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
ctx.repos.wrapup_repo.set_complete(&wrapup_id, &json).await?;
ctx.services
.event_publisher
.publish(&DomainEvent::WrapUpCompleted { wrapup_id })
.await?;
Ok(())
}
Err(e) => {
ctx.repos
.wrapup_repo
.update_status(&wrapup_id, &WrapUpStatus::Failed, Some(&e.to_string()))
.await?;
Err(e)
}
}
}

View File

@@ -1,6 +1,8 @@
pub mod commands;
pub mod compute;
pub mod event_handler;
pub mod generate;
pub mod get_wrapup;
pub mod handle_requested;
pub mod list_wrapups;
pub mod queries;

View File

@@ -9,6 +9,7 @@ chrono = { workspace = true }
async-trait = { workspace = true }
thiserror = { workspace = true }
futures = { workspace = true }
serde = { workspace = true }
email_address = "0.2.9"

View File

@@ -1,15 +1,16 @@
use chrono::{NaiveDate, NaiveDateTime};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::value_objects::WrapUpId;
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DateRange {
pub start: NaiveDate,
pub end: NaiveDate,
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct MovieRef {
pub title: String,
pub year: u16,
@@ -17,52 +18,52 @@ pub struct MovieRef {
pub poster_path: Option<String>,
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct UserRef {
pub user_id: Uuid,
pub display_name: String,
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PersonStat {
pub name: String,
pub count: u32,
pub avg_rating: f64,
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct GenreStat {
pub genre: String,
pub count: u32,
pub avg_rating: f64,
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct KeywordStat {
pub keyword: String,
pub count: u32,
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct LangStat {
pub language: String,
pub count: u32,
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct MonthCount {
pub year_month: String,
pub label: String,
pub count: u32,
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum WrapUpScope {
User(Uuid),
Global,
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct WrapUpReport {
pub scope: WrapUpScope,
pub date_range: DateRange,
@@ -120,7 +121,7 @@ pub struct WrapUpReport {
pub top_cast_profile_paths: Vec<String>,
}
#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum WrapUpStatus {
Pending,
Generating,
@@ -128,7 +129,7 @@ pub enum WrapUpStatus {
Failed,
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct WrapUpRecord {
pub id: WrapUpId,
pub user_id: Option<Uuid>,

View File

@@ -3,7 +3,7 @@ use uuid::Uuid;
macro_rules! uuid_id {
($name:ident) => {
#[derive(Clone, Debug, PartialEq, Eq)]
#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct $name(Uuid);
impl $name {

View File

@@ -148,6 +148,7 @@ async fn main() -> anyhow::Result<()> {
let mut periodic_jobs: Vec<Arc<dyn PeriodicJob>> = vec![
Arc::new(application::jobs::ImportSessionCleanupJob::new(ctx.clone())),
Arc::new(application::jobs::WatchEventCleanupJob::new(ctx.clone())),
Arc::new(application::jobs::WrapUpAutoGenerateJob::new(ctx.clone())),
];
if let Some(job) = enrichment_job {
periodic_jobs.push(job);
@@ -194,7 +195,10 @@ async fn main() -> anyhow::Result<()> {
Arc::clone(&ctx.repos.movie),
Arc::clone(&ctx.repos.search_command),
)) as Arc<dyn EventHandler>;
let mut h = vec![poster, cleanup, search_cleanup, discovery_indexer];
let wrapup_handler = Arc::new(application::wrapup::event_handler::WrapUpEventHandler::new(
ctx.clone(),
)) as Arc<dyn EventHandler>;
let mut h = vec![poster, cleanup, search_cleanup, discovery_indexer, wrapup_handler];
if let Some(e) = enrichment_handler {
h.push(e);
}
@@ -235,6 +239,9 @@ async fn main() -> anyhow::Result<()> {
Arc::clone(&ctx.repos.search_command),
)) as Arc<dyn EventHandler>;
tracing::info!("federation event handler registered");
let wrapup_handler = Arc::new(application::wrapup::event_handler::WrapUpEventHandler::new(
ctx.clone(),
)) as Arc<dyn EventHandler>;
let mut h = vec![
poster,
cleanup,
@@ -242,6 +249,7 @@ async fn main() -> anyhow::Result<()> {
backfill,
search_cleanup,
discovery_indexer,
wrapup_handler,
];
if let Some(e) = enrichment_handler {
h.push(e);