diff --git a/crates/worker/src/db.rs b/crates/worker/src/db.rs new file mode 100644 index 0000000..3b83a1c --- /dev/null +++ b/crates/worker/src/db.rs @@ -0,0 +1,44 @@ +use std::sync::Arc; + +use anyhow::Context; +use domain::ports::{ + DiaryRepository, ImportProfileRepository, ImportSessionRepository, MovieProfileRepository, + MovieRepository, ReviewRepository, StatsRepository, UserRepository, +}; + +pub enum DbPool { + #[cfg(feature = "sqlite")] + Sqlite(sqlx::SqlitePool), + #[cfg(feature = "postgres")] + Postgres(sqlx::PgPool), +} + +pub struct Repos { + pub movie: Arc, + pub review: Arc, + pub diary: Arc, + pub stats: Arc, + pub user: Arc, + pub import_session: Arc, + pub import_profile: Arc, + pub movie_profile: Arc, +} + +pub async fn connect(database_url: &str, backend: &str) -> anyhow::Result<(Repos, DbPool)> { + match backend { + #[cfg(feature = "postgres")] + "postgres" => { + let (pool, m, r, d, s, u, is, ip, mp) = + postgres::wire(database_url).await.context("PostgreSQL connection failed")?; + Ok((Repos { movie: m, review: r, diary: d, stats: s, user: u, import_session: is, import_profile: ip, movie_profile: mp }, DbPool::Postgres(pool))) + } + #[cfg(feature = "sqlite")] + _ => { + let (pool, m, r, d, s, u, is, ip, mp) = + sqlite::wire(database_url).await.context("SQLite connection failed")?; + Ok((Repos { movie: m, review: r, diary: d, stats: s, user: u, import_session: is, import_profile: ip, movie_profile: mp }, DbPool::Sqlite(pool))) + } + #[cfg(not(feature = "sqlite"))] + _ => anyhow::bail!("DATABASE_BACKEND={backend} is not supported by this build"), + } +} diff --git a/crates/worker/src/event_bus.rs b/crates/worker/src/event_bus.rs new file mode 100644 index 0000000..923fa9a --- /dev/null +++ b/crates/worker/src/event_bus.rs @@ -0,0 +1,58 @@ +use std::sync::Arc; + +use anyhow::Context; +use domain::ports::{EventConsumer, EventPublisher}; + +use crate::db::DbPool; + +#[derive(Clone, Copy)] +pub enum EventBusBackend { + Db, + #[cfg(feature = "nats")] + Nats, +} + +impl EventBusBackend { + pub fn from_env() -> anyhow::Result { + match std::env::var("EVENT_BUS_BACKEND") + .unwrap_or_else(|_| "db".to_string()) + .as_str() + { + "db" => Ok(Self::Db), + #[cfg(feature = "nats")] + "nats" => Ok(Self::Nats), + #[cfg(not(feature = "nats"))] + "nats" => anyhow::bail!( + "EVENT_BUS_BACKEND=nats requires the nats feature to be compiled in" + ), + other => anyhow::bail!("unknown EVENT_BUS_BACKEND={other}, expected 'db' or 'nats'"), + } + } +} + +pub async fn create( + db_pool: &DbPool, +) -> anyhow::Result<(Arc, Arc)> { + match EventBusBackend::from_env()? { + EventBusBackend::Db => { + tracing::info!("event bus: DB queue"); + match db_pool { + #[cfg(feature = "postgres")] + DbPool::Postgres(pool) => { + Ok(postgres_event_queue::PostgresEventQueue::create_channel(pool.clone()).await?) + } + #[cfg(feature = "sqlite")] + DbPool::Sqlite(pool) => { + Ok(sqlite_event_queue::SqliteEventQueue::create_channel(pool.clone()).await?) + } + } + } + #[cfg(feature = "nats")] + EventBusBackend::Nats => { + let cfg = nats::NatsConfig::from_env() + .context("EVENT_BUS_BACKEND=nats requires NATS_URL to be set")?; + tracing::info!("event bus: NATS ({})", cfg.url); + Ok(nats::create_channel(cfg).await?) + } + } +} diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index 2a6ddaa..3d8a458 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -1,3 +1,6 @@ +mod db; +mod event_bus; + use std::sync::Arc; use anyhow::Context; @@ -25,84 +28,49 @@ async fn main() -> anyhow::Result<()> { let poster_fetcher = poster_fetcher::create()?; let image_storage = image_storage::create()?; - let (movie_repository, review_repository, diary_repository, stats_repository, user_repository, import_session_repository, import_profile_repository, movie_profile_repository, db_pool) = - match backend.as_str() { - #[cfg(feature = "postgres")] - "postgres" => { - let (pool, m, r, d, s, u, is, ip, mp) = postgres::wire(&database_url).await?; - (m, r, d, s, u, is, ip, mp, DbPool::Postgres(pool)) - } - #[cfg(feature = "sqlite")] - _ => { - let (pool, m, r, d, s, u, is, ip, mp) = sqlite::wire(&database_url).await?; - (m, r, d, s, u, is, ip, mp, DbPool::Sqlite(pool)) - } - #[cfg(not(feature = "sqlite"))] - _ => anyhow::bail!("DATABASE_BACKEND={backend} is not supported by this build"), - }; + let (repos, db_pool) = db::connect(&database_url, &backend).await?; + let (event_publisher_arc, consumer_arc) = event_bus::create(&db_pool).await?; - let (event_publisher_arc, consumer_arc): ( - Arc, - Arc, - ) = match EventBusBackend::from_env()? { - EventBusBackend::Db => { - tracing::info!("event bus: DB queue"); - match &db_pool { - #[cfg(feature = "postgres")] - DbPool::Postgres(pool) => postgres_event_queue::PostgresEventQueue::create_channel(pool.clone()).await?, - #[cfg(feature = "sqlite")] - DbPool::Sqlite(pool) => sqlite_event_queue::SqliteEventQueue::create_channel(pool.clone()).await?, - } - } - #[cfg(feature = "nats")] - EventBusBackend::Nats => { - let cfg = nats::NatsConfig::from_env() - .context("EVENT_BUS_BACKEND=nats requires NATS_URL to be set")?; - tracing::info!("event bus: NATS ({})", cfg.url); - nats::create_channel(cfg).await? - } - }; - - let profile_repo = movie_profile_repository; - - // Clone what federation handler needs before ctx and app_config are consumed. + // Clone refs federation handler needs before ctx consumes them. #[cfg(feature = "federation")] let (fed_movie_repo, fed_review_repo, fed_diary_repo, fed_user_repo, base_url, allow_registration) = ( - Arc::clone(&movie_repository), - Arc::clone(&review_repository), - Arc::clone(&diary_repository), - Arc::clone(&user_repository), + Arc::clone(&repos.movie), + Arc::clone(&repos.review), + Arc::clone(&repos.diary), + Arc::clone(&repos.user), app_config.base_url.clone(), app_config.allow_registration, ); let ctx = AppContext { - movie_repository, - review_repository, - diary_repository, - diary_exporter: Arc::new(ExportAdapter) as Arc, - document_parser: Arc::new(ImporterDocumentParser) as Arc, - stats_repository, + movie_repository: repos.movie, + review_repository: repos.review, + diary_repository: repos.diary, + diary_exporter: Arc::new(ExportAdapter) as Arc, + document_parser: Arc::new(ImporterDocumentParser) as Arc, + stats_repository: repos.stats, metadata_client, poster_fetcher, image_storage, - event_publisher: event_publisher_arc, + event_publisher: event_publisher_arc, auth_service, password_hasher, - user_repository, - import_session_repository, - import_profile_repository, - movie_profile_repository: Arc::clone(&profile_repo) as _, + user_repository: repos.user, + import_session_repository: repos.import_session, + import_profile_repository: repos.import_profile, + movie_profile_repository: repos.movie_profile, config: app_config, }; + // ── Enrichment ──────────────────────────────────────────────────────────── + let enrichment_handler: Option> = match tmdb_enrichment::TmdbEnrichmentClient::from_env() { Ok(client) => { tracing::info!("TMDb enrichment enabled"); Some(Arc::new(tmdb_enrichment::EnrichmentHandler { enrichment_client: Arc::new(client), - profile_repo: Arc::clone(&profile_repo), + profile_repo: Arc::clone(&ctx.movie_profile_repository), })) } Err(e) => { @@ -111,6 +79,8 @@ async fn main() -> anyhow::Result<()> { } }; + // ── Periodic jobs ───────────────────────────────────────────────────────── + let periodic_jobs: Vec> = vec![ Arc::new(application::jobs::ImportSessionCleanupJob::new(ctx.clone())), Arc::new(application::jobs::EnrichmentStalenessJob::new(ctx.clone())), @@ -128,6 +98,8 @@ async fn main() -> anyhow::Result<()> { }); } + // ── Event handlers ──────────────────────────────────────────────────────── + let handlers: Vec> = { let poster = Arc::new(poster_sync::PosterSyncHandler::new( Arc::clone(&ctx.movie_repository), @@ -141,12 +113,10 @@ async fn main() -> anyhow::Result<()> { Arc::clone(&ctx.image_storage), )) as Arc; - let enrichment = enrichment_handler; - #[cfg(not(feature = "federation"))] { - let mut h: Vec> = vec![poster, cleanup]; - if let Some(e) = enrichment { h.push(e); } + let mut h = vec![poster, cleanup]; + if let Some(e) = enrichment_handler { h.push(e); } h } @@ -154,9 +124,9 @@ async fn main() -> anyhow::Result<()> { { let (federation_repo, _social_query, review_store) = match &db_pool { #[cfg(feature = "sqlite-federation")] - DbPool::Sqlite(pool) => sqlite_federation::wire(pool.clone()), + db::DbPool::Sqlite(pool) => sqlite_federation::wire(pool.clone()), #[cfg(feature = "postgres-federation")] - DbPool::Postgres(pool) => postgres_federation::wire(pool.clone()), + db::DbPool::Postgres(pool) => postgres_federation::wire(pool.clone()), }; let ap = activitypub::wire( @@ -171,14 +141,15 @@ async fn main() -> anyhow::Result<()> { ).await?.event_handler; tracing::info!("federation event handler registered"); - let mut h: Vec> = vec![poster, cleanup, ap]; - if let Some(e) = enrichment { h.push(e); } + let mut h = vec![poster, cleanup, ap]; + if let Some(e) = enrichment_handler { h.push(e); } h } }; - let worker = WorkerService::new(consumer_arc, handlers); + // ── Run ─────────────────────────────────────────────────────────────────── + let worker = WorkerService::new(consumer_arc, handlers); tracing::info!("worker started"); worker.run().await; tracing::info!("worker stopped"); @@ -186,36 +157,6 @@ async fn main() -> anyhow::Result<()> { Ok(()) } -enum DbPool { - #[cfg(feature = "sqlite")] - Sqlite(sqlx::SqlitePool), - #[cfg(feature = "postgres")] - Postgres(sqlx::PgPool), -} - -#[derive(Clone, Copy)] -enum EventBusBackend { - Db, - #[cfg(feature = "nats")] - Nats, -} - -impl EventBusBackend { - fn from_env() -> anyhow::Result { - match std::env::var("EVENT_BUS_BACKEND") - .unwrap_or_else(|_| "db".to_string()) - .as_str() - { - "db" => Ok(Self::Db), - #[cfg(feature = "nats")] - "nats" => Ok(Self::Nats), - #[cfg(not(feature = "nats"))] - "nats" => anyhow::bail!("EVENT_BUS_BACKEND=nats requires the nats feature to be compiled in"), - other => anyhow::bail!("unknown EVENT_BUS_BACKEND={other}, expected 'db' or 'nats'"), - } - } -} - fn init_tracing() { let filter = std::env::var("RUST_LOG") .unwrap_or_else(|_| "worker=info,application=info".to_string()); @@ -224,4 +165,3 @@ fn init_tracing() { .with(tracing_subscriber::fmt::layer()) .init(); } -