From 8ac87a3735c2418ede17368c9b10bbf9acaeb96b Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Thu, 11 Jun 2026 14:44:23 +0200 Subject: [PATCH] refactor: split jobs.rs into per-context modules --- crates/adapters/postgres/src/lib.rs | 2 +- .../adapters/postgres/src/refresh_sessions.rs | 4 +- crates/adapters/sqlite/src/lib.rs | 2 +- .../adapters/sqlite/src/refresh_sessions.rs | 4 +- .../src/jobs/enrichment_staleness.rs | 39 +++++++ crates/application/src/jobs/import_cleanup.rs | 29 +++++ crates/application/src/jobs/mod.rs | 11 ++ .../src/jobs/refresh_session_cleanup.rs | 31 +++++ .../src/jobs/watch_event_cleanup.rs | 31 +++++ .../src/{jobs.rs => jobs/wrapup.rs} | 110 +----------------- crates/application/src/test_helpers.rs | 8 +- crates/domain/src/models/mod.rs | 2 +- crates/domain/src/ports.rs | 4 +- crates/domain/src/testing/in_memory.rs | 5 +- crates/presentation/src/factory.rs | 7 +- crates/presentation/src/handlers/auth.rs | 4 +- crates/worker/src/db.rs | 7 +- crates/worker/src/main.rs | 4 +- 18 files changed, 170 insertions(+), 134 deletions(-) create mode 100644 crates/application/src/jobs/enrichment_staleness.rs create mode 100644 crates/application/src/jobs/import_cleanup.rs create mode 100644 crates/application/src/jobs/mod.rs create mode 100644 crates/application/src/jobs/refresh_session_cleanup.rs create mode 100644 crates/application/src/jobs/watch_event_cleanup.rs rename crates/application/src/{jobs.rs => jobs/wrapup.rs} (54%) diff --git a/crates/adapters/postgres/src/lib.rs b/crates/adapters/postgres/src/lib.rs index 321f0cd..1c8bf7d 100644 --- a/crates/adapters/postgres/src/lib.rs +++ b/crates/adapters/postgres/src/lib.rs @@ -40,8 +40,8 @@ pub use import_profile::PostgresImportProfileRepository; pub use import_session::PostgresImportSessionRepository; pub use persons::{PostgresPersonAdapter, create_person_adapter}; pub use profile::PostgresMovieProfileRepository; -pub use refresh_sessions::PostgresRefreshSessionAdapter; pub use profile_fields::PostgresProfileFieldsRepository; +pub use refresh_sessions::PostgresRefreshSessionAdapter; pub use users::PostgresUserRepository; pub use watch_event::{PostgresWatchEventRepository, PostgresWebhookTokenRepository}; pub use watchlist::PostgresWatchlistRepository; diff --git a/crates/adapters/postgres/src/refresh_sessions.rs b/crates/adapters/postgres/src/refresh_sessions.rs index 4f916fd..48e97f1 100644 --- a/crates/adapters/postgres/src/refresh_sessions.rs +++ b/crates/adapters/postgres/src/refresh_sessions.rs @@ -1,9 +1,7 @@ use async_trait::async_trait; use chrono::DateTime; use domain::{ - errors::DomainError, - models::RefreshSession, - ports::RefreshSessionRepository, + errors::DomainError, models::RefreshSession, ports::RefreshSessionRepository, value_objects::UserId, }; use sqlx::PgPool; diff --git a/crates/adapters/sqlite/src/lib.rs b/crates/adapters/sqlite/src/lib.rs index 89f97ab..89f65ea 100644 --- a/crates/adapters/sqlite/src/lib.rs +++ b/crates/adapters/sqlite/src/lib.rs @@ -41,8 +41,8 @@ pub use import_profile::SqliteImportProfileRepository; pub use import_session::SqliteImportSessionRepository; pub use persons::{SqlitePersonAdapter, create_person_adapter}; pub use profile::SqliteMovieProfileRepository; -pub use refresh_sessions::SqliteRefreshSessionAdapter; pub use profile_fields::SqliteProfileFieldsRepository; +pub use refresh_sessions::SqliteRefreshSessionAdapter; pub use users::SqliteUserRepository; pub use watch_event::{SqliteWatchEventRepository, SqliteWebhookTokenRepository}; pub use watchlist::SqliteWatchlistRepository; diff --git a/crates/adapters/sqlite/src/refresh_sessions.rs b/crates/adapters/sqlite/src/refresh_sessions.rs index dd6cd8d..e17c0ee 100644 --- a/crates/adapters/sqlite/src/refresh_sessions.rs +++ b/crates/adapters/sqlite/src/refresh_sessions.rs @@ -1,9 +1,7 @@ use async_trait::async_trait; use chrono::DateTime; use domain::{ - errors::DomainError, - models::RefreshSession, - ports::RefreshSessionRepository, + errors::DomainError, models::RefreshSession, ports::RefreshSessionRepository, value_objects::UserId, }; use sqlx::SqlitePool; diff --git a/crates/application/src/jobs/enrichment_staleness.rs b/crates/application/src/jobs/enrichment_staleness.rs new file mode 100644 index 0000000..c7cc8ef --- /dev/null +++ b/crates/application/src/jobs/enrichment_staleness.rs @@ -0,0 +1,39 @@ +use std::time::Duration; + +use async_trait::async_trait; +use domain::{errors::DomainError, events::DomainEvent, ports::PeriodicJob}; + +use crate::context::AppContext; + +pub struct EnrichmentStalenessJob { + ctx: AppContext, +} + +impl EnrichmentStalenessJob { + pub fn new(ctx: AppContext) -> Self { + Self { ctx } + } +} + +#[async_trait] +impl PeriodicJob for EnrichmentStalenessJob { + fn interval(&self) -> Duration { + Duration::from_secs(3600) + } + + async fn run(&self) -> Result<(), DomainError> { + let stale = self.ctx.repos.movie_profile.list_stale().await?; + if stale.is_empty() { + return Ok(()); + } + tracing::info!("enrichment scan: {} stale movies", stale.len()); + for (movie_id, external_metadata_id) in stale { + let event = DomainEvent::MovieEnrichmentRequested { + movie_id, + external_metadata_id, + }; + self.ctx.services.event_publisher.publish(&event).await?; + } + Ok(()) + } +} diff --git a/crates/application/src/jobs/import_cleanup.rs b/crates/application/src/jobs/import_cleanup.rs new file mode 100644 index 0000000..4daf018 --- /dev/null +++ b/crates/application/src/jobs/import_cleanup.rs @@ -0,0 +1,29 @@ +use std::time::Duration; + +use async_trait::async_trait; +use domain::{errors::DomainError, ports::PeriodicJob}; + +use crate::context::AppContext; + +pub struct ImportSessionCleanupJob { + ctx: AppContext, +} + +impl ImportSessionCleanupJob { + pub fn new(ctx: AppContext) -> Self { + Self { ctx } + } +} + +#[async_trait] +impl PeriodicJob for ImportSessionCleanupJob { + fn interval(&self) -> Duration { + Duration::from_secs(3600) + } + + async fn run(&self) -> Result<(), DomainError> { + let n = crate::import::cleanup::execute(&self.ctx).await?; + tracing::info!("import session cleanup: removed {} expired sessions", n); + Ok(()) + } +} diff --git a/crates/application/src/jobs/mod.rs b/crates/application/src/jobs/mod.rs new file mode 100644 index 0000000..5b088bd --- /dev/null +++ b/crates/application/src/jobs/mod.rs @@ -0,0 +1,11 @@ +mod enrichment_staleness; +mod import_cleanup; +mod refresh_session_cleanup; +mod watch_event_cleanup; +mod wrapup; + +pub use enrichment_staleness::EnrichmentStalenessJob; +pub use import_cleanup::ImportSessionCleanupJob; +pub use refresh_session_cleanup::RefreshSessionCleanupJob; +pub use watch_event_cleanup::WatchEventCleanupJob; +pub use wrapup::{WrapUpAutoGenerateJob, WrapUpCleanupJob}; diff --git a/crates/application/src/jobs/refresh_session_cleanup.rs b/crates/application/src/jobs/refresh_session_cleanup.rs new file mode 100644 index 0000000..48b35d1 --- /dev/null +++ b/crates/application/src/jobs/refresh_session_cleanup.rs @@ -0,0 +1,31 @@ +use std::time::Duration; + +use async_trait::async_trait; +use domain::{errors::DomainError, ports::PeriodicJob}; + +use crate::context::AppContext; + +pub struct RefreshSessionCleanupJob { + ctx: AppContext, +} + +impl RefreshSessionCleanupJob { + pub fn new(ctx: AppContext) -> Self { + Self { ctx } + } +} + +#[async_trait] +impl PeriodicJob for RefreshSessionCleanupJob { + fn interval(&self) -> Duration { + Duration::from_secs(86400) + } + + async fn run(&self) -> Result<(), DomainError> { + let n = self.ctx.repos.refresh_session.delete_expired().await?; + if n > 0 { + tracing::info!("refresh session cleanup: removed {n} expired sessions"); + } + Ok(()) + } +} diff --git a/crates/application/src/jobs/watch_event_cleanup.rs b/crates/application/src/jobs/watch_event_cleanup.rs new file mode 100644 index 0000000..a29aeb4 --- /dev/null +++ b/crates/application/src/jobs/watch_event_cleanup.rs @@ -0,0 +1,31 @@ +use std::time::Duration; + +use async_trait::async_trait; +use domain::{errors::DomainError, ports::PeriodicJob}; + +use crate::context::AppContext; + +pub struct WatchEventCleanupJob { + ctx: AppContext, +} + +impl WatchEventCleanupJob { + pub fn new(ctx: AppContext) -> Self { + Self { ctx } + } +} + +#[async_trait] +impl PeriodicJob for WatchEventCleanupJob { + fn interval(&self) -> Duration { + Duration::from_secs(86400) + } + + async fn run(&self) -> Result<(), DomainError> { + let n = crate::integrations::cleanup::execute(&self.ctx).await?; + if n > 0 { + tracing::info!("watch event cleanup: removed {n} old entries"); + } + Ok(()) + } +} diff --git a/crates/application/src/jobs.rs b/crates/application/src/jobs/wrapup.rs similarity index 54% rename from crates/application/src/jobs.rs rename to crates/application/src/jobs/wrapup.rs index fe0041f..12e3e86 100644 --- a/crates/application/src/jobs.rs +++ b/crates/application/src/jobs/wrapup.rs @@ -2,91 +2,10 @@ use std::time::Duration; use async_trait::async_trait; use chrono::Datelike; -use domain::{errors::DomainError, events::DomainEvent, ports::PeriodicJob}; +use domain::{errors::DomainError, ports::PeriodicJob}; use crate::context::AppContext; -pub struct ImportSessionCleanupJob { - ctx: AppContext, -} - -impl ImportSessionCleanupJob { - pub fn new(ctx: AppContext) -> Self { - Self { ctx } - } -} - -#[async_trait] -impl PeriodicJob for ImportSessionCleanupJob { - fn interval(&self) -> Duration { - Duration::from_secs(3600) - } - - async fn run(&self) -> Result<(), DomainError> { - let n = crate::import::cleanup::execute(&self.ctx).await?; - tracing::info!("import session cleanup: removed {} expired sessions", n); - Ok(()) - } -} - -pub struct WatchEventCleanupJob { - ctx: AppContext, -} - -impl WatchEventCleanupJob { - pub fn new(ctx: AppContext) -> Self { - Self { ctx } - } -} - -#[async_trait] -impl PeriodicJob for WatchEventCleanupJob { - fn interval(&self) -> Duration { - Duration::from_secs(86400) - } - - async fn run(&self) -> Result<(), DomainError> { - let n = crate::integrations::cleanup::execute(&self.ctx).await?; - if n > 0 { - tracing::info!("watch event cleanup: removed {n} old entries"); - } - Ok(()) - } -} - -pub struct EnrichmentStalenessJob { - ctx: AppContext, -} - -impl EnrichmentStalenessJob { - pub fn new(ctx: AppContext) -> Self { - Self { ctx } - } -} - -#[async_trait] -impl PeriodicJob for EnrichmentStalenessJob { - fn interval(&self) -> Duration { - Duration::from_secs(3600) - } - - async fn run(&self) -> Result<(), DomainError> { - let stale = self.ctx.repos.movie_profile.list_stale().await?; - if stale.is_empty() { - return Ok(()); - } - tracing::info!("enrichment scan: {} stale movies", stale.len()); - for (movie_id, external_metadata_id) in stale { - let event = DomainEvent::MovieEnrichmentRequested { - movie_id, - external_metadata_id, - }; - self.ctx.services.event_publisher.publish(&event).await?; - } - Ok(()) - } -} - pub struct WrapUpAutoGenerateJob { ctx: AppContext, } @@ -105,7 +24,6 @@ impl PeriodicJob for WrapUpAutoGenerateJob { async fn run(&self) -> Result<(), DomainError> { let now = chrono::Utc::now().naive_utc(); - // Only run in January if now.month() != 1 { return Ok(()); } @@ -140,7 +58,6 @@ impl PeriodicJob for WrapUpAutoGenerateJob { } } - // Global wrap-up let existing = self .ctx .repos @@ -162,31 +79,6 @@ impl PeriodicJob for WrapUpAutoGenerateJob { } } -pub struct RefreshSessionCleanupJob { - ctx: AppContext, -} - -impl RefreshSessionCleanupJob { - pub fn new(ctx: AppContext) -> Self { - Self { ctx } - } -} - -#[async_trait] -impl PeriodicJob for RefreshSessionCleanupJob { - fn interval(&self) -> Duration { - Duration::from_secs(86400) - } - - async fn run(&self) -> Result<(), DomainError> { - let n = self.ctx.repos.refresh_session.delete_expired().await?; - if n > 0 { - tracing::info!("refresh session cleanup: removed {n} expired sessions"); - } - Ok(()) - } -} - pub struct WrapUpCleanupJob { ctx: AppContext, } diff --git a/crates/application/src/test_helpers.rs b/crates/application/src/test_helpers.rs index 25cdcbc..557fb19 100644 --- a/crates/application/src/test_helpers.rs +++ b/crates/application/src/test_helpers.rs @@ -11,8 +11,8 @@ use domain::{ MovieProfileRepository, MovieRepository, ObjectStorage, PasswordHasher, PersonCommand, PersonQuery, PosterFetcherClient, RefreshSessionRepository, ReviewRepository, SearchCommand, SearchPort, StatsRepository, UserProfileFieldsRepository, UserRepository, - UserSettingsRepository, WatchEventRepository, WatchlistRepository, - WebhookTokenRepository, WrapUpRepository, WrapUpStatsQuery, + UserSettingsRepository, WatchEventRepository, WatchlistRepository, WebhookTokenRepository, + WrapUpRepository, WrapUpStatsQuery, }, testing::{ FakeAuthService, FakeDiaryRepository, FakeDocumentParser, FakeMetadataClient, @@ -21,8 +21,8 @@ use domain::{ InMemoryMovieProfileRepository, InMemoryMovieRepository, InMemoryProfileFieldsRepo, InMemoryRefreshSessionRepository, InMemoryReviewRepository, InMemoryUserRepository, InMemoryUserSettingsRepository, InMemoryWatchEventRepository, InMemoryWatchlistRepository, - InMemoryWebhookTokenRepository, NoopEventPublisher, NoopObjectStorage, - PanicDiaryExporter, PanicPersonCommand, + InMemoryWebhookTokenRepository, NoopEventPublisher, NoopObjectStorage, PanicDiaryExporter, + PanicPersonCommand, }, }; diff --git a/crates/domain/src/models/mod.rs b/crates/domain/src/models/mod.rs index 621eb72..b12b929 100644 --- a/crates/domain/src/models/mod.rs +++ b/crates/domain/src/models/mod.rs @@ -44,10 +44,10 @@ pub use import::{ }; pub use import_profile::ImportProfile; pub use import_session::ImportSession; -pub use refresh_session::RefreshSession; pub use person::{ CastCredit, CrewCredit, ExternalPersonId, Person, PersonCredits, PersonEnrichmentData, PersonId, }; +pub use refresh_session::RefreshSession; pub use search::{ EntityType, IndexableDocument, MovieSearchHit, PersonSearchHit, SearchFilters, SearchQuery, SearchResults, diff --git a/crates/domain/src/ports.rs b/crates/domain/src/ports.rs index 78ef750..14d848e 100644 --- a/crates/domain/src/ports.rs +++ b/crates/domain/src/ports.rs @@ -8,9 +8,9 @@ use crate::{ models::wrapup::WrapUpReport, models::{ AnnotatedRow, DiaryEntry, DiaryFilter, EntityType, ExportFormat, ExternalPersonId, - FeedEntry, FieldMapping, FileFormat, Goal, ImportError, ImportProfile, ImportSession, RefreshSession, + FeedEntry, FieldMapping, FileFormat, Goal, ImportError, ImportProfile, ImportSession, IndexableDocument, Movie, MovieFilter, MovieProfile, MovieStats, MovieSummary, ParsedFile, - ParsedPlaybackEvent, Person, PersonCredits, PersonEnrichmentData, PersonId, + ParsedPlaybackEvent, Person, PersonCredits, PersonEnrichmentData, PersonId, RefreshSession, RemoteGoalEntry, RemoteWatchlistEntry, Review, ReviewHistory, SearchQuery, SearchResults, User, UserSettings, UserStats, UserSummary, UserTrends, WatchEvent, WatchEventStatus, WatchlistEntry, WatchlistWithMovie, WebhookToken, diff --git a/crates/domain/src/testing/in_memory.rs b/crates/domain/src/testing/in_memory.rs index 22a8b79..5c6f909 100644 --- a/crates/domain/src/testing/in_memory.rs +++ b/crates/domain/src/testing/in_memory.rs @@ -818,10 +818,7 @@ impl RefreshSessionRepository for InMemoryRefreshSessionRepository { } async fn revoke_all_for_user(&self, user_id: &UserId) -> Result<(), DomainError> { - self.store - .lock() - .unwrap() - .retain(|s| s.user_id != *user_id); + self.store.lock().unwrap().retain(|s| s.user_id != *user_id); Ok(()) } diff --git a/crates/presentation/src/factory.rs b/crates/presentation/src/factory.rs index 1757956..0d5a05a 100644 --- a/crates/presentation/src/factory.rs +++ b/crates/presentation/src/factory.rs @@ -79,7 +79,9 @@ pub async fn build_database_adapters(backend: &str, url: &str) -> anyhow::Result goal: w.goal, user_settings: w.user_settings, remote_goal: w.remote_goal, - refresh_session: Arc::new(postgres::PostgresRefreshSessionAdapter::new(w.pool.clone())) as _, + refresh_session: Arc::new(postgres::PostgresRefreshSessionAdapter::new( + w.pool.clone(), + )) as _, db_pool: DbPool::Postgres(w.pool), }) } @@ -118,7 +120,8 @@ pub async fn build_database_adapters(backend: &str, url: &str) -> anyhow::Result goal: w.goal, user_settings: w.user_settings, remote_goal: w.remote_goal, - refresh_session: Arc::new(sqlite::SqliteRefreshSessionAdapter::new(w.pool.clone())) as _, + refresh_session: Arc::new(sqlite::SqliteRefreshSessionAdapter::new(w.pool.clone())) + as _, db_pool: DbPool::Sqlite(w.pool), }) } diff --git a/crates/presentation/src/handlers/auth.rs b/crates/presentation/src/handlers/auth.rs index 9e40715..1f493b1 100644 --- a/crates/presentation/src/handlers/auth.rs +++ b/crates/presentation/src/handlers/auth.rs @@ -17,7 +17,9 @@ use crate::{ render::render_page, state::AppState, }; -use api_types::{LoginRequest, LoginResponse, LogoutRequest, RefreshRequest, RefreshResponse, RegisterRequest}; +use api_types::{ + LoginRequest, LoginResponse, LogoutRequest, RefreshRequest, RefreshResponse, RegisterRequest, +}; use application::ports::HtmlPageContext; use template_askama::{LoginTemplate, RegisterTemplate}; diff --git a/crates/worker/src/db.rs b/crates/worker/src/db.rs index 76efd91..207010f 100644 --- a/crates/worker/src/db.rs +++ b/crates/worker/src/db.rs @@ -87,7 +87,9 @@ pub async fn connect(database_url: &str, backend: &str) -> anyhow::Result anyhow::Result anyhow::Result<()> { Arc::new(application::jobs::WatchEventCleanupJob::new(ctx.clone())), Arc::new(application::jobs::WrapUpAutoGenerateJob::new(ctx.clone())), Arc::new(application::jobs::WrapUpCleanupJob::new(ctx.clone())), - Arc::new(application::jobs::RefreshSessionCleanupJob::new(ctx.clone())), + Arc::new(application::jobs::RefreshSessionCleanupJob::new( + ctx.clone(), + )), ]; if let Some(job) = enrichment_job { periodic_jobs.push(job);