From c2a554170663b6e1c7ce582246935a6b342584d3 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Sun, 10 May 2026 17:55:51 +0200 Subject: [PATCH] feat: implement event bus backend configuration for DB and NATS --- .env.example | 7 +-- Dockerfile | 2 + README.md | 7 ++- crates/presentation/Cargo.toml | 3 +- crates/presentation/src/main.rs | 89 +++++++++++++++++++++++---------- crates/worker/Cargo.toml | 3 +- crates/worker/src/main.rs | 40 ++++++++++++--- 7 files changed, 109 insertions(+), 42 deletions(-) diff --git a/.env.example b/.env.example index 6696f43..0eb0039 100644 --- a/.env.example +++ b/.env.example @@ -39,17 +39,18 @@ ALLOW_REGISTRATION=false RATE_LIMIT=20 POSTER_FETCH_TIMEOUT_SECONDS=30 -# Event bus — controls background poster sync and other async work. -# Priority: NATS (if NATS_URL set) → DB queue (default, uses existing DB) +# Event bus — "db" (default) or "nats" # The worker binary must run alongside the presentation to process events. +EVENT_BUS_BACKEND=db # Option A: DB queue (default — no extra infrastructure needed) -# Events are stored in the same database as the app and polled by the worker. +# Events are persisted in the same database as the app and polled by the worker. # EVENT_QUEUE_POLL_INTERVAL_MS=500 # polling interval (default: 500ms) # EVENT_QUEUE_BATCH_SIZE=10 # rows claimed per poll cycle (default: 10) # EVENT_QUEUE_MAX_ATTEMPTS=5 # retries before dead-lettering (default: 5) # Option B: NATS (at-least-once delivery, recommended for higher throughput) +# EVENT_BUS_BACKEND=nats # NATS_URL=nats://localhost:4222 # NATS_MODE=jetstream # "jetstream" (default, at-least-once) or "core" (fire-and-forget) # NATS_SUBJECT_PREFIX=movies-diary.events diff --git a/Dockerfile b/Dockerfile index 7f9de61..0512597 100644 --- a/Dockerfile +++ b/Dockerfile @@ -45,6 +45,8 @@ COPY crates ./crates # # To build with PostgreSQL backend instead: # --build-arg FEATURES=postgres,postgres-federation +# To add NATS support (EVENT_BUS_BACKEND=nats): +# --build-arg FEATURES=sqlite,sqlite-federation,nats ARG FEATURES=sqlite,sqlite-federation RUN cargo build --release -p presentation -p worker --no-default-features --features "${FEATURES}" diff --git a/README.md b/README.md index 0288428..5edaa2a 100644 --- a/README.md +++ b/README.md @@ -91,10 +91,9 @@ ALLOW_REGISTRATION=true # set to false to disable new sign-ups SECURE_COOKIES=true # set when serving over HTTPS RUST_LOG=presentation=info,tower_http=info,worker=info,application=info -# Event bus — background poster sync runs in the worker binary. -# Default: DB queue (uses the same database, no extra infrastructure). -# Set NATS_URL to use NATS instead (JetStream recommended for at-least-once delivery). -# NATS_URL=nats://localhost:4222 +# Event bus — "db" (default, uses same database) or "nats" +EVENT_BUS_BACKEND=db +# NATS_URL=nats://localhost:4222 # required when EVENT_BUS_BACKEND=nats ``` The `worker` binary must run alongside `presentation` to process events: diff --git a/crates/presentation/Cargo.toml b/crates/presentation/Cargo.toml index d955fca..0894edd 100644 --- a/crates/presentation/Cargo.toml +++ b/crates/presentation/Cargo.toml @@ -7,6 +7,7 @@ edition = "2024" default = ["sqlite", "sqlite-federation"] sqlite = ["dep:sqlite", "dep:sqlite-event-queue"] postgres = ["dep:postgres", "dep:postgres-event-queue"] +nats = ["dep:nats"] # Meta-feature: true when any federation adapter is active — keeps all #[cfg(feature = "federation")] gates working federation = [] sqlite-federation = [ @@ -49,7 +50,7 @@ poster-fetcher = { workspace = true } poster-storage = { workspace = true } template-askama = { workspace = true } event-publisher = { workspace = true } -nats = { workspace = true } +nats = { workspace = true, optional = true } rss = { workspace = true } export = { workspace = true } doc = { workspace = true } diff --git a/crates/presentation/src/main.rs b/crates/presentation/src/main.rs index 25f9f53..f030569 100644 --- a/crates/presentation/src/main.rs +++ b/crates/presentation/src/main.rs @@ -114,6 +114,8 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> { }; // Wire up event channel, federation service, and ap_router + let event_bus = EventBusBackend::from_env()?; + #[cfg(feature = "federation")] let (event_publisher_arc, ap_router, ap_service, social_query) = { let (federation_repo, social_query_arc, review_store): ( @@ -157,10 +159,36 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> { let ap_router = concrete_ap_service.router(); let ap_service_arc: Arc = concrete_ap_service; - let ep: Arc = if let Ok(cfg) = nats::NatsConfig::from_env() { - tracing::info!("event bus: NATS ({})", cfg.url); - nats::create_publisher(cfg).await? - } else { + let ep: Arc = match event_bus { + EventBusBackend::Db => { + tracing::info!("event bus: DB queue"); + match backend.as_str() { + #[cfg(feature = "postgres")] + "postgres" => postgres_event_queue::PostgresEventQueue::create_publisher( + pg_pool.as_ref().unwrap().clone() + ).await?, + #[cfg(feature = "sqlite")] + _ => sqlite_event_queue::SqliteEventQueue::create_publisher( + sqlite_pool.as_ref().unwrap().clone() + ).await?, + #[cfg(not(feature = "sqlite"))] + _ => anyhow::bail!("EVENT_BUS_BACKEND=db has no adapter for DATABASE_BACKEND={backend}; enable the sqlite or postgres feature"), + } + } + #[cfg(feature = "nats")] + EventBusBackend::Nats => { + let cfg = nats::NatsConfig::from_env() + .context("EVENT_BUS_BACKEND=nats requires NATS_URL to be set")?; + tracing::info!("event bus: NATS ({})", cfg.url); + nats::create_publisher(cfg).await? + } + }; + (ep, ap_router, ap_service_arc, social_query_arc) + }; + + #[cfg(not(feature = "federation"))] + let event_publisher_arc: Arc = match event_bus { + EventBusBackend::Db => { tracing::info!("event bus: DB queue"); match backend.as_str() { #[cfg(feature = "postgres")] @@ -172,29 +200,15 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> { sqlite_pool.as_ref().unwrap().clone() ).await?, #[cfg(not(feature = "sqlite"))] - _ => anyhow::bail!("no event bus: NATS_URL not set and sqlite feature not enabled"), + _ => anyhow::bail!("EVENT_BUS_BACKEND=db has no adapter for DATABASE_BACKEND={backend}; enable the sqlite or postgres feature"), } - }; - (ep, ap_router, ap_service_arc, social_query_arc) - }; - - #[cfg(not(feature = "federation"))] - let event_publisher_arc: Arc = if let Ok(cfg) = nats::NatsConfig::from_env() { - tracing::info!("event bus: NATS ({})", cfg.url); - nats::create_publisher(cfg).await? - } else { - tracing::info!("event bus: DB queue"); - match backend.as_str() { - #[cfg(feature = "postgres")] - "postgres" => postgres_event_queue::PostgresEventQueue::create_publisher( - pg_pool.as_ref().unwrap().clone() - ).await?, - #[cfg(feature = "sqlite")] - _ => sqlite_event_queue::SqliteEventQueue::create_publisher( - sqlite_pool.as_ref().unwrap().clone() - ).await?, - #[cfg(not(feature = "sqlite"))] - _ => anyhow::bail!("no event bus: NATS_URL not set and sqlite feature not enabled"), + } + #[cfg(feature = "nats")] + EventBusBackend::Nats => { + let cfg = nats::NatsConfig::from_env() + .context("EVENT_BUS_BACKEND=nats requires NATS_URL to be set")?; + tracing::info!("event bus: NATS ({})", cfg.url); + nats::create_publisher(cfg).await? } }; #[cfg(not(feature = "federation"))] @@ -297,6 +311,29 @@ async fn wire_postgres(database_url: &str) -> anyhow::Result<( Ok((pool, movie_repository, review_repository, diary_repository, stats_repository, user_repository)) } +#[derive(Clone, Copy)] +enum EventBusBackend { + Db, + #[cfg(feature = "nats")] + Nats, +} + +impl EventBusBackend { + fn from_env() -> anyhow::Result { + match std::env::var("EVENT_BUS_BACKEND") + .unwrap_or_else(|_| "db".to_string()) + .as_str() + { + "db" => Ok(Self::Db), + #[cfg(feature = "nats")] + "nats" => Ok(Self::Nats), + #[cfg(not(feature = "nats"))] + "nats" => anyhow::bail!("EVENT_BUS_BACKEND=nats requires the nats feature to be compiled in"), + other => anyhow::bail!("unknown EVENT_BUS_BACKEND={other}, expected 'db' or 'nats'"), + } + } +} + fn init_tracing() { tracing_subscriber::registry() .with(tracing_subscriber::EnvFilter::new( diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index 43d1c3f..fa8169a 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -7,6 +7,7 @@ edition = "2024" default = ["sqlite"] sqlite = ["dep:sqlite", "dep:sqlite-event-queue"] postgres = ["dep:postgres", "dep:postgres-event-queue"] +nats = ["dep:nats"] [dependencies] domain = { workspace = true } @@ -29,7 +30,7 @@ metadata = { workspace = true } poster-fetcher = { workspace = true } poster-storage = { workspace = true } export = { workspace = true } -nats = { workspace = true } +nats = { workspace = true, optional = true } sqlx = { workspace = true } # Optional — database backends diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index c765eb4..13be846 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -79,12 +79,8 @@ async fn main() -> anyhow::Result<()> { let (event_publisher_arc, consumer_arc): ( Arc, Arc, - ) = match nats::NatsConfig::from_env() { - Ok(cfg) => { - tracing::info!("event bus: NATS ({})", cfg.url); - nats::create_channel(cfg).await? - } - Err(_) => { + ) = match EventBusBackend::from_env()? { + EventBusBackend::Db => { tracing::info!("event bus: DB queue"); match backend.as_str() { #[cfg(feature = "postgres")] @@ -96,9 +92,16 @@ async fn main() -> anyhow::Result<()> { sqlite_pool.unwrap() ).await?, #[cfg(not(feature = "sqlite"))] - _ => anyhow::bail!("no event bus: NATS_URL not set and sqlite feature not enabled"), + _ => anyhow::bail!("EVENT_BUS_BACKEND=db has no adapter for DATABASE_BACKEND={backend}; enable the sqlite or postgres feature"), } } + #[cfg(feature = "nats")] + EventBusBackend::Nats => { + let cfg = nats::NatsConfig::from_env() + .context("EVENT_BUS_BACKEND=nats requires NATS_URL to be set")?; + tracing::info!("event bus: NATS ({})", cfg.url); + nats::create_channel(cfg).await? + } }; let ctx = AppContext { @@ -127,6 +130,29 @@ async fn main() -> anyhow::Result<()> { Ok(()) } +#[derive(Clone, Copy)] +enum EventBusBackend { + Db, + #[cfg(feature = "nats")] + Nats, +} + +impl EventBusBackend { + fn from_env() -> anyhow::Result { + match std::env::var("EVENT_BUS_BACKEND") + .unwrap_or_else(|_| "db".to_string()) + .as_str() + { + "db" => Ok(Self::Db), + #[cfg(feature = "nats")] + "nats" => Ok(Self::Nats), + #[cfg(not(feature = "nats"))] + "nats" => anyhow::bail!("EVENT_BUS_BACKEND=nats requires the nats feature to be compiled in"), + other => anyhow::bail!("unknown EVENT_BUS_BACKEND={other}, expected 'db' or 'nats'"), + } + } +} + fn init_tracing() { let filter = std::env::var("RUST_LOG") .unwrap_or_else(|_| "worker=info,application=info".to_string());