feat: implement database connection and event bus handling

This commit is contained in:
2026-05-12 13:28:13 +02:00
parent 38d13fbff1
commit 78c2d9b1d3
3 changed files with 138 additions and 96 deletions

44
crates/worker/src/db.rs Normal file
View File

@@ -0,0 +1,44 @@
use std::sync::Arc;
use anyhow::Context;
use domain::ports::{
DiaryRepository, ImportProfileRepository, ImportSessionRepository, MovieProfileRepository,
MovieRepository, ReviewRepository, StatsRepository, UserRepository,
};
pub enum DbPool {
#[cfg(feature = "sqlite")]
Sqlite(sqlx::SqlitePool),
#[cfg(feature = "postgres")]
Postgres(sqlx::PgPool),
}
pub struct Repos {
pub movie: Arc<dyn MovieRepository>,
pub review: Arc<dyn ReviewRepository>,
pub diary: Arc<dyn DiaryRepository>,
pub stats: Arc<dyn StatsRepository>,
pub user: Arc<dyn UserRepository>,
pub import_session: Arc<dyn ImportSessionRepository>,
pub import_profile: Arc<dyn ImportProfileRepository>,
pub movie_profile: Arc<dyn MovieProfileRepository>,
}
pub async fn connect(database_url: &str, backend: &str) -> anyhow::Result<(Repos, DbPool)> {
match backend {
#[cfg(feature = "postgres")]
"postgres" => {
let (pool, m, r, d, s, u, is, ip, mp) =
postgres::wire(database_url).await.context("PostgreSQL connection failed")?;
Ok((Repos { movie: m, review: r, diary: d, stats: s, user: u, import_session: is, import_profile: ip, movie_profile: mp }, DbPool::Postgres(pool)))
}
#[cfg(feature = "sqlite")]
_ => {
let (pool, m, r, d, s, u, is, ip, mp) =
sqlite::wire(database_url).await.context("SQLite connection failed")?;
Ok((Repos { movie: m, review: r, diary: d, stats: s, user: u, import_session: is, import_profile: ip, movie_profile: mp }, DbPool::Sqlite(pool)))
}
#[cfg(not(feature = "sqlite"))]
_ => anyhow::bail!("DATABASE_BACKEND={backend} is not supported by this build"),
}
}

View File

@@ -0,0 +1,58 @@
use std::sync::Arc;
use anyhow::Context;
use domain::ports::{EventConsumer, EventPublisher};
use crate::db::DbPool;
#[derive(Clone, Copy)]
pub enum EventBusBackend {
Db,
#[cfg(feature = "nats")]
Nats,
}
impl EventBusBackend {
pub fn from_env() -> anyhow::Result<Self> {
match std::env::var("EVENT_BUS_BACKEND")
.unwrap_or_else(|_| "db".to_string())
.as_str()
{
"db" => Ok(Self::Db),
#[cfg(feature = "nats")]
"nats" => Ok(Self::Nats),
#[cfg(not(feature = "nats"))]
"nats" => anyhow::bail!(
"EVENT_BUS_BACKEND=nats requires the nats feature to be compiled in"
),
other => anyhow::bail!("unknown EVENT_BUS_BACKEND={other}, expected 'db' or 'nats'"),
}
}
}
pub async fn create(
db_pool: &DbPool,
) -> anyhow::Result<(Arc<dyn EventPublisher>, Arc<dyn EventConsumer>)> {
match EventBusBackend::from_env()? {
EventBusBackend::Db => {
tracing::info!("event bus: DB queue");
match db_pool {
#[cfg(feature = "postgres")]
DbPool::Postgres(pool) => {
Ok(postgres_event_queue::PostgresEventQueue::create_channel(pool.clone()).await?)
}
#[cfg(feature = "sqlite")]
DbPool::Sqlite(pool) => {
Ok(sqlite_event_queue::SqliteEventQueue::create_channel(pool.clone()).await?)
}
}
}
#[cfg(feature = "nats")]
EventBusBackend::Nats => {
let cfg = nats::NatsConfig::from_env()
.context("EVENT_BUS_BACKEND=nats requires NATS_URL to be set")?;
tracing::info!("event bus: NATS ({})", cfg.url);
Ok(nats::create_channel(cfg).await?)
}
}
}

View File

@@ -1,3 +1,6 @@
mod db;
mod event_bus;
use std::sync::Arc; use std::sync::Arc;
use anyhow::Context; use anyhow::Context;
@@ -25,84 +28,49 @@ async fn main() -> anyhow::Result<()> {
let poster_fetcher = poster_fetcher::create()?; let poster_fetcher = poster_fetcher::create()?;
let image_storage = image_storage::create()?; let image_storage = image_storage::create()?;
let (movie_repository, review_repository, diary_repository, stats_repository, user_repository, import_session_repository, import_profile_repository, movie_profile_repository, db_pool) = let (repos, db_pool) = db::connect(&database_url, &backend).await?;
match backend.as_str() { let (event_publisher_arc, consumer_arc) = event_bus::create(&db_pool).await?;
#[cfg(feature = "postgres")]
"postgres" => {
let (pool, m, r, d, s, u, is, ip, mp) = postgres::wire(&database_url).await?;
(m, r, d, s, u, is, ip, mp, DbPool::Postgres(pool))
}
#[cfg(feature = "sqlite")]
_ => {
let (pool, m, r, d, s, u, is, ip, mp) = sqlite::wire(&database_url).await?;
(m, r, d, s, u, is, ip, mp, DbPool::Sqlite(pool))
}
#[cfg(not(feature = "sqlite"))]
_ => anyhow::bail!("DATABASE_BACKEND={backend} is not supported by this build"),
};
let (event_publisher_arc, consumer_arc): ( // Clone refs federation handler needs before ctx consumes them.
Arc<dyn domain::ports::EventPublisher>,
Arc<dyn domain::ports::EventConsumer>,
) = match EventBusBackend::from_env()? {
EventBusBackend::Db => {
tracing::info!("event bus: DB queue");
match &db_pool {
#[cfg(feature = "postgres")]
DbPool::Postgres(pool) => postgres_event_queue::PostgresEventQueue::create_channel(pool.clone()).await?,
#[cfg(feature = "sqlite")]
DbPool::Sqlite(pool) => sqlite_event_queue::SqliteEventQueue::create_channel(pool.clone()).await?,
}
}
#[cfg(feature = "nats")]
EventBusBackend::Nats => {
let cfg = nats::NatsConfig::from_env()
.context("EVENT_BUS_BACKEND=nats requires NATS_URL to be set")?;
tracing::info!("event bus: NATS ({})", cfg.url);
nats::create_channel(cfg).await?
}
};
let profile_repo = movie_profile_repository;
// Clone what federation handler needs before ctx and app_config are consumed.
#[cfg(feature = "federation")] #[cfg(feature = "federation")]
let (fed_movie_repo, fed_review_repo, fed_diary_repo, fed_user_repo, base_url, allow_registration) = ( let (fed_movie_repo, fed_review_repo, fed_diary_repo, fed_user_repo, base_url, allow_registration) = (
Arc::clone(&movie_repository), Arc::clone(&repos.movie),
Arc::clone(&review_repository), Arc::clone(&repos.review),
Arc::clone(&diary_repository), Arc::clone(&repos.diary),
Arc::clone(&user_repository), Arc::clone(&repos.user),
app_config.base_url.clone(), app_config.base_url.clone(),
app_config.allow_registration, app_config.allow_registration,
); );
let ctx = AppContext { let ctx = AppContext {
movie_repository, movie_repository: repos.movie,
review_repository, review_repository: repos.review,
diary_repository, diary_repository: repos.diary,
diary_exporter: Arc::new(ExportAdapter) as Arc<dyn DiaryExporter>, diary_exporter: Arc::new(ExportAdapter) as Arc<dyn DiaryExporter>,
document_parser: Arc::new(ImporterDocumentParser) as Arc<dyn DocumentParser>, document_parser: Arc::new(ImporterDocumentParser) as Arc<dyn DocumentParser>,
stats_repository, stats_repository: repos.stats,
metadata_client, metadata_client,
poster_fetcher, poster_fetcher,
image_storage, image_storage,
event_publisher: event_publisher_arc, event_publisher: event_publisher_arc,
auth_service, auth_service,
password_hasher, password_hasher,
user_repository, user_repository: repos.user,
import_session_repository, import_session_repository: repos.import_session,
import_profile_repository, import_profile_repository: repos.import_profile,
movie_profile_repository: Arc::clone(&profile_repo) as _, movie_profile_repository: repos.movie_profile,
config: app_config, config: app_config,
}; };
// ── Enrichment ────────────────────────────────────────────────────────────
let enrichment_handler: Option<Arc<dyn EventHandler>> = let enrichment_handler: Option<Arc<dyn EventHandler>> =
match tmdb_enrichment::TmdbEnrichmentClient::from_env() { match tmdb_enrichment::TmdbEnrichmentClient::from_env() {
Ok(client) => { Ok(client) => {
tracing::info!("TMDb enrichment enabled"); tracing::info!("TMDb enrichment enabled");
Some(Arc::new(tmdb_enrichment::EnrichmentHandler { Some(Arc::new(tmdb_enrichment::EnrichmentHandler {
enrichment_client: Arc::new(client), enrichment_client: Arc::new(client),
profile_repo: Arc::clone(&profile_repo), profile_repo: Arc::clone(&ctx.movie_profile_repository),
})) }))
} }
Err(e) => { Err(e) => {
@@ -111,6 +79,8 @@ async fn main() -> anyhow::Result<()> {
} }
}; };
// ── Periodic jobs ─────────────────────────────────────────────────────────
let periodic_jobs: Vec<Arc<dyn PeriodicJob>> = vec![ let periodic_jobs: Vec<Arc<dyn PeriodicJob>> = vec![
Arc::new(application::jobs::ImportSessionCleanupJob::new(ctx.clone())), Arc::new(application::jobs::ImportSessionCleanupJob::new(ctx.clone())),
Arc::new(application::jobs::EnrichmentStalenessJob::new(ctx.clone())), Arc::new(application::jobs::EnrichmentStalenessJob::new(ctx.clone())),
@@ -128,6 +98,8 @@ async fn main() -> anyhow::Result<()> {
}); });
} }
// ── Event handlers ────────────────────────────────────────────────────────
let handlers: Vec<Arc<dyn EventHandler>> = { let handlers: Vec<Arc<dyn EventHandler>> = {
let poster = Arc::new(poster_sync::PosterSyncHandler::new( let poster = Arc::new(poster_sync::PosterSyncHandler::new(
Arc::clone(&ctx.movie_repository), Arc::clone(&ctx.movie_repository),
@@ -141,12 +113,10 @@ async fn main() -> anyhow::Result<()> {
Arc::clone(&ctx.image_storage), Arc::clone(&ctx.image_storage),
)) as Arc<dyn EventHandler>; )) as Arc<dyn EventHandler>;
let enrichment = enrichment_handler;
#[cfg(not(feature = "federation"))] #[cfg(not(feature = "federation"))]
{ {
let mut h: Vec<Arc<dyn EventHandler>> = vec![poster, cleanup]; let mut h = vec![poster, cleanup];
if let Some(e) = enrichment { h.push(e); } if let Some(e) = enrichment_handler { h.push(e); }
h h
} }
@@ -154,9 +124,9 @@ async fn main() -> anyhow::Result<()> {
{ {
let (federation_repo, _social_query, review_store) = match &db_pool { let (federation_repo, _social_query, review_store) = match &db_pool {
#[cfg(feature = "sqlite-federation")] #[cfg(feature = "sqlite-federation")]
DbPool::Sqlite(pool) => sqlite_federation::wire(pool.clone()), db::DbPool::Sqlite(pool) => sqlite_federation::wire(pool.clone()),
#[cfg(feature = "postgres-federation")] #[cfg(feature = "postgres-federation")]
DbPool::Postgres(pool) => postgres_federation::wire(pool.clone()), db::DbPool::Postgres(pool) => postgres_federation::wire(pool.clone()),
}; };
let ap = activitypub::wire( let ap = activitypub::wire(
@@ -171,14 +141,15 @@ async fn main() -> anyhow::Result<()> {
).await?.event_handler; ).await?.event_handler;
tracing::info!("federation event handler registered"); tracing::info!("federation event handler registered");
let mut h: Vec<Arc<dyn EventHandler>> = vec![poster, cleanup, ap]; let mut h = vec![poster, cleanup, ap];
if let Some(e) = enrichment { h.push(e); } if let Some(e) = enrichment_handler { h.push(e); }
h h
} }
}; };
let worker = WorkerService::new(consumer_arc, handlers); // ── Run ───────────────────────────────────────────────────────────────────
let worker = WorkerService::new(consumer_arc, handlers);
tracing::info!("worker started"); tracing::info!("worker started");
worker.run().await; worker.run().await;
tracing::info!("worker stopped"); tracing::info!("worker stopped");
@@ -186,36 +157,6 @@ async fn main() -> anyhow::Result<()> {
Ok(()) Ok(())
} }
enum DbPool {
#[cfg(feature = "sqlite")]
Sqlite(sqlx::SqlitePool),
#[cfg(feature = "postgres")]
Postgres(sqlx::PgPool),
}
#[derive(Clone, Copy)]
enum EventBusBackend {
Db,
#[cfg(feature = "nats")]
Nats,
}
impl EventBusBackend {
fn from_env() -> anyhow::Result<Self> {
match std::env::var("EVENT_BUS_BACKEND")
.unwrap_or_else(|_| "db".to_string())
.as_str()
{
"db" => Ok(Self::Db),
#[cfg(feature = "nats")]
"nats" => Ok(Self::Nats),
#[cfg(not(feature = "nats"))]
"nats" => anyhow::bail!("EVENT_BUS_BACKEND=nats requires the nats feature to be compiled in"),
other => anyhow::bail!("unknown EVENT_BUS_BACKEND={other}, expected 'db' or 'nats'"),
}
}
}
fn init_tracing() { fn init_tracing() {
let filter = std::env::var("RUST_LOG") let filter = std::env::var("RUST_LOG")
.unwrap_or_else(|_| "worker=info,application=info".to_string()); .unwrap_or_else(|_| "worker=info,application=info".to_string());
@@ -224,4 +165,3 @@ fn init_tracing() {
.with(tracing_subscriber::fmt::layer()) .with(tracing_subscriber::fmt::layer())
.init(); .init();
} }