feat: implement event bus backend configuration for DB and NATS
This commit is contained in:
@@ -39,17 +39,18 @@ ALLOW_REGISTRATION=false
|
|||||||
RATE_LIMIT=20
|
RATE_LIMIT=20
|
||||||
POSTER_FETCH_TIMEOUT_SECONDS=30
|
POSTER_FETCH_TIMEOUT_SECONDS=30
|
||||||
|
|
||||||
# Event bus — controls background poster sync and other async work.
|
# Event bus — "db" (default) or "nats"
|
||||||
# Priority: NATS (if NATS_URL set) → DB queue (default, uses existing DB)
|
|
||||||
# The worker binary must run alongside the presentation to process events.
|
# The worker binary must run alongside the presentation to process events.
|
||||||
|
EVENT_BUS_BACKEND=db
|
||||||
|
|
||||||
# Option A: DB queue (default — no extra infrastructure needed)
|
# Option A: DB queue (default — no extra infrastructure needed)
|
||||||
# Events are stored in the same database as the app and polled by the worker.
|
# Events are persisted in the same database as the app and polled by the worker.
|
||||||
# EVENT_QUEUE_POLL_INTERVAL_MS=500 # polling interval (default: 500ms)
|
# EVENT_QUEUE_POLL_INTERVAL_MS=500 # polling interval (default: 500ms)
|
||||||
# EVENT_QUEUE_BATCH_SIZE=10 # rows claimed per poll cycle (default: 10)
|
# EVENT_QUEUE_BATCH_SIZE=10 # rows claimed per poll cycle (default: 10)
|
||||||
# EVENT_QUEUE_MAX_ATTEMPTS=5 # retries before dead-lettering (default: 5)
|
# EVENT_QUEUE_MAX_ATTEMPTS=5 # retries before dead-lettering (default: 5)
|
||||||
|
|
||||||
# Option B: NATS (at-least-once delivery, recommended for higher throughput)
|
# Option B: NATS (at-least-once delivery, recommended for higher throughput)
|
||||||
|
# EVENT_BUS_BACKEND=nats
|
||||||
# NATS_URL=nats://localhost:4222
|
# NATS_URL=nats://localhost:4222
|
||||||
# NATS_MODE=jetstream # "jetstream" (default, at-least-once) or "core" (fire-and-forget)
|
# NATS_MODE=jetstream # "jetstream" (default, at-least-once) or "core" (fire-and-forget)
|
||||||
# NATS_SUBJECT_PREFIX=movies-diary.events
|
# NATS_SUBJECT_PREFIX=movies-diary.events
|
||||||
|
|||||||
@@ -45,6 +45,8 @@ COPY crates ./crates
|
|||||||
#
|
#
|
||||||
# To build with PostgreSQL backend instead:
|
# To build with PostgreSQL backend instead:
|
||||||
# --build-arg FEATURES=postgres,postgres-federation
|
# --build-arg FEATURES=postgres,postgres-federation
|
||||||
|
# To add NATS support (EVENT_BUS_BACKEND=nats):
|
||||||
|
# --build-arg FEATURES=sqlite,sqlite-federation,nats
|
||||||
ARG FEATURES=sqlite,sqlite-federation
|
ARG FEATURES=sqlite,sqlite-federation
|
||||||
RUN cargo build --release -p presentation -p worker --no-default-features --features "${FEATURES}"
|
RUN cargo build --release -p presentation -p worker --no-default-features --features "${FEATURES}"
|
||||||
|
|
||||||
|
|||||||
@@ -91,10 +91,9 @@ ALLOW_REGISTRATION=true # set to false to disable new sign-ups
|
|||||||
SECURE_COOKIES=true # set when serving over HTTPS
|
SECURE_COOKIES=true # set when serving over HTTPS
|
||||||
RUST_LOG=presentation=info,tower_http=info,worker=info,application=info
|
RUST_LOG=presentation=info,tower_http=info,worker=info,application=info
|
||||||
|
|
||||||
# Event bus — background poster sync runs in the worker binary.
|
# Event bus — "db" (default, uses same database) or "nats"
|
||||||
# Default: DB queue (uses the same database, no extra infrastructure).
|
EVENT_BUS_BACKEND=db
|
||||||
# Set NATS_URL to use NATS instead (JetStream recommended for at-least-once delivery).
|
# NATS_URL=nats://localhost:4222 # required when EVENT_BUS_BACKEND=nats
|
||||||
# NATS_URL=nats://localhost:4222
|
|
||||||
```
|
```
|
||||||
|
|
||||||
The `worker` binary must run alongside `presentation` to process events:
|
The `worker` binary must run alongside `presentation` to process events:
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ edition = "2024"
|
|||||||
default = ["sqlite", "sqlite-federation"]
|
default = ["sqlite", "sqlite-federation"]
|
||||||
sqlite = ["dep:sqlite", "dep:sqlite-event-queue"]
|
sqlite = ["dep:sqlite", "dep:sqlite-event-queue"]
|
||||||
postgres = ["dep:postgres", "dep:postgres-event-queue"]
|
postgres = ["dep:postgres", "dep:postgres-event-queue"]
|
||||||
|
nats = ["dep:nats"]
|
||||||
# Meta-feature: true when any federation adapter is active — keeps all #[cfg(feature = "federation")] gates working
|
# Meta-feature: true when any federation adapter is active — keeps all #[cfg(feature = "federation")] gates working
|
||||||
federation = []
|
federation = []
|
||||||
sqlite-federation = [
|
sqlite-federation = [
|
||||||
@@ -49,7 +50,7 @@ poster-fetcher = { workspace = true }
|
|||||||
poster-storage = { workspace = true }
|
poster-storage = { workspace = true }
|
||||||
template-askama = { workspace = true }
|
template-askama = { workspace = true }
|
||||||
event-publisher = { workspace = true }
|
event-publisher = { workspace = true }
|
||||||
nats = { workspace = true }
|
nats = { workspace = true, optional = true }
|
||||||
rss = { workspace = true }
|
rss = { workspace = true }
|
||||||
export = { workspace = true }
|
export = { workspace = true }
|
||||||
doc = { workspace = true }
|
doc = { workspace = true }
|
||||||
|
|||||||
@@ -114,6 +114,8 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Wire up event channel, federation service, and ap_router
|
// Wire up event channel, federation service, and ap_router
|
||||||
|
let event_bus = EventBusBackend::from_env()?;
|
||||||
|
|
||||||
#[cfg(feature = "federation")]
|
#[cfg(feature = "federation")]
|
||||||
let (event_publisher_arc, ap_router, ap_service, social_query) = {
|
let (event_publisher_arc, ap_router, ap_service, social_query) = {
|
||||||
let (federation_repo, social_query_arc, review_store): (
|
let (federation_repo, social_query_arc, review_store): (
|
||||||
@@ -157,10 +159,8 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> {
|
|||||||
let ap_router = concrete_ap_service.router();
|
let ap_router = concrete_ap_service.router();
|
||||||
let ap_service_arc: Arc<dyn ActivityPubPort> = concrete_ap_service;
|
let ap_service_arc: Arc<dyn ActivityPubPort> = concrete_ap_service;
|
||||||
|
|
||||||
let ep: Arc<dyn EventPublisher> = if let Ok(cfg) = nats::NatsConfig::from_env() {
|
let ep: Arc<dyn EventPublisher> = match event_bus {
|
||||||
tracing::info!("event bus: NATS ({})", cfg.url);
|
EventBusBackend::Db => {
|
||||||
nats::create_publisher(cfg).await?
|
|
||||||
} else {
|
|
||||||
tracing::info!("event bus: DB queue");
|
tracing::info!("event bus: DB queue");
|
||||||
match backend.as_str() {
|
match backend.as_str() {
|
||||||
#[cfg(feature = "postgres")]
|
#[cfg(feature = "postgres")]
|
||||||
@@ -172,17 +172,23 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> {
|
|||||||
sqlite_pool.as_ref().unwrap().clone()
|
sqlite_pool.as_ref().unwrap().clone()
|
||||||
).await?,
|
).await?,
|
||||||
#[cfg(not(feature = "sqlite"))]
|
#[cfg(not(feature = "sqlite"))]
|
||||||
_ => anyhow::bail!("no event bus: NATS_URL not set and sqlite feature not enabled"),
|
_ => anyhow::bail!("EVENT_BUS_BACKEND=db has no adapter for DATABASE_BACKEND={backend}; enable the sqlite or postgres feature"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#[cfg(feature = "nats")]
|
||||||
|
EventBusBackend::Nats => {
|
||||||
|
let cfg = nats::NatsConfig::from_env()
|
||||||
|
.context("EVENT_BUS_BACKEND=nats requires NATS_URL to be set")?;
|
||||||
|
tracing::info!("event bus: NATS ({})", cfg.url);
|
||||||
|
nats::create_publisher(cfg).await?
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
(ep, ap_router, ap_service_arc, social_query_arc)
|
(ep, ap_router, ap_service_arc, social_query_arc)
|
||||||
};
|
};
|
||||||
|
|
||||||
#[cfg(not(feature = "federation"))]
|
#[cfg(not(feature = "federation"))]
|
||||||
let event_publisher_arc: Arc<dyn EventPublisher> = if let Ok(cfg) = nats::NatsConfig::from_env() {
|
let event_publisher_arc: Arc<dyn EventPublisher> = match event_bus {
|
||||||
tracing::info!("event bus: NATS ({})", cfg.url);
|
EventBusBackend::Db => {
|
||||||
nats::create_publisher(cfg).await?
|
|
||||||
} else {
|
|
||||||
tracing::info!("event bus: DB queue");
|
tracing::info!("event bus: DB queue");
|
||||||
match backend.as_str() {
|
match backend.as_str() {
|
||||||
#[cfg(feature = "postgres")]
|
#[cfg(feature = "postgres")]
|
||||||
@@ -194,7 +200,15 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> {
|
|||||||
sqlite_pool.as_ref().unwrap().clone()
|
sqlite_pool.as_ref().unwrap().clone()
|
||||||
).await?,
|
).await?,
|
||||||
#[cfg(not(feature = "sqlite"))]
|
#[cfg(not(feature = "sqlite"))]
|
||||||
_ => anyhow::bail!("no event bus: NATS_URL not set and sqlite feature not enabled"),
|
_ => anyhow::bail!("EVENT_BUS_BACKEND=db has no adapter for DATABASE_BACKEND={backend}; enable the sqlite or postgres feature"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#[cfg(feature = "nats")]
|
||||||
|
EventBusBackend::Nats => {
|
||||||
|
let cfg = nats::NatsConfig::from_env()
|
||||||
|
.context("EVENT_BUS_BACKEND=nats requires NATS_URL to be set")?;
|
||||||
|
tracing::info!("event bus: NATS ({})", cfg.url);
|
||||||
|
nats::create_publisher(cfg).await?
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
#[cfg(not(feature = "federation"))]
|
#[cfg(not(feature = "federation"))]
|
||||||
@@ -297,6 +311,29 @@ async fn wire_postgres(database_url: &str) -> anyhow::Result<(
|
|||||||
Ok((pool, movie_repository, review_repository, diary_repository, stats_repository, user_repository))
|
Ok((pool, movie_repository, review_repository, diary_repository, stats_repository, user_repository))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Copy)]
|
||||||
|
enum EventBusBackend {
|
||||||
|
Db,
|
||||||
|
#[cfg(feature = "nats")]
|
||||||
|
Nats,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EventBusBackend {
|
||||||
|
fn from_env() -> anyhow::Result<Self> {
|
||||||
|
match std::env::var("EVENT_BUS_BACKEND")
|
||||||
|
.unwrap_or_else(|_| "db".to_string())
|
||||||
|
.as_str()
|
||||||
|
{
|
||||||
|
"db" => Ok(Self::Db),
|
||||||
|
#[cfg(feature = "nats")]
|
||||||
|
"nats" => Ok(Self::Nats),
|
||||||
|
#[cfg(not(feature = "nats"))]
|
||||||
|
"nats" => anyhow::bail!("EVENT_BUS_BACKEND=nats requires the nats feature to be compiled in"),
|
||||||
|
other => anyhow::bail!("unknown EVENT_BUS_BACKEND={other}, expected 'db' or 'nats'"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn init_tracing() {
|
fn init_tracing() {
|
||||||
tracing_subscriber::registry()
|
tracing_subscriber::registry()
|
||||||
.with(tracing_subscriber::EnvFilter::new(
|
.with(tracing_subscriber::EnvFilter::new(
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ edition = "2024"
|
|||||||
default = ["sqlite"]
|
default = ["sqlite"]
|
||||||
sqlite = ["dep:sqlite", "dep:sqlite-event-queue"]
|
sqlite = ["dep:sqlite", "dep:sqlite-event-queue"]
|
||||||
postgres = ["dep:postgres", "dep:postgres-event-queue"]
|
postgres = ["dep:postgres", "dep:postgres-event-queue"]
|
||||||
|
nats = ["dep:nats"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
domain = { workspace = true }
|
domain = { workspace = true }
|
||||||
@@ -29,7 +30,7 @@ metadata = { workspace = true }
|
|||||||
poster-fetcher = { workspace = true }
|
poster-fetcher = { workspace = true }
|
||||||
poster-storage = { workspace = true }
|
poster-storage = { workspace = true }
|
||||||
export = { workspace = true }
|
export = { workspace = true }
|
||||||
nats = { workspace = true }
|
nats = { workspace = true, optional = true }
|
||||||
sqlx = { workspace = true }
|
sqlx = { workspace = true }
|
||||||
|
|
||||||
# Optional — database backends
|
# Optional — database backends
|
||||||
|
|||||||
@@ -79,12 +79,8 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
let (event_publisher_arc, consumer_arc): (
|
let (event_publisher_arc, consumer_arc): (
|
||||||
Arc<dyn domain::ports::EventPublisher>,
|
Arc<dyn domain::ports::EventPublisher>,
|
||||||
Arc<dyn domain::ports::EventConsumer>,
|
Arc<dyn domain::ports::EventConsumer>,
|
||||||
) = match nats::NatsConfig::from_env() {
|
) = match EventBusBackend::from_env()? {
|
||||||
Ok(cfg) => {
|
EventBusBackend::Db => {
|
||||||
tracing::info!("event bus: NATS ({})", cfg.url);
|
|
||||||
nats::create_channel(cfg).await?
|
|
||||||
}
|
|
||||||
Err(_) => {
|
|
||||||
tracing::info!("event bus: DB queue");
|
tracing::info!("event bus: DB queue");
|
||||||
match backend.as_str() {
|
match backend.as_str() {
|
||||||
#[cfg(feature = "postgres")]
|
#[cfg(feature = "postgres")]
|
||||||
@@ -96,9 +92,16 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
sqlite_pool.unwrap()
|
sqlite_pool.unwrap()
|
||||||
).await?,
|
).await?,
|
||||||
#[cfg(not(feature = "sqlite"))]
|
#[cfg(not(feature = "sqlite"))]
|
||||||
_ => anyhow::bail!("no event bus: NATS_URL not set and sqlite feature not enabled"),
|
_ => anyhow::bail!("EVENT_BUS_BACKEND=db has no adapter for DATABASE_BACKEND={backend}; enable the sqlite or postgres feature"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
#[cfg(feature = "nats")]
|
||||||
|
EventBusBackend::Nats => {
|
||||||
|
let cfg = nats::NatsConfig::from_env()
|
||||||
|
.context("EVENT_BUS_BACKEND=nats requires NATS_URL to be set")?;
|
||||||
|
tracing::info!("event bus: NATS ({})", cfg.url);
|
||||||
|
nats::create_channel(cfg).await?
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let ctx = AppContext {
|
let ctx = AppContext {
|
||||||
@@ -127,6 +130,29 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Copy)]
|
||||||
|
enum EventBusBackend {
|
||||||
|
Db,
|
||||||
|
#[cfg(feature = "nats")]
|
||||||
|
Nats,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EventBusBackend {
|
||||||
|
fn from_env() -> anyhow::Result<Self> {
|
||||||
|
match std::env::var("EVENT_BUS_BACKEND")
|
||||||
|
.unwrap_or_else(|_| "db".to_string())
|
||||||
|
.as_str()
|
||||||
|
{
|
||||||
|
"db" => Ok(Self::Db),
|
||||||
|
#[cfg(feature = "nats")]
|
||||||
|
"nats" => Ok(Self::Nats),
|
||||||
|
#[cfg(not(feature = "nats"))]
|
||||||
|
"nats" => anyhow::bail!("EVENT_BUS_BACKEND=nats requires the nats feature to be compiled in"),
|
||||||
|
other => anyhow::bail!("unknown EVENT_BUS_BACKEND={other}, expected 'db' or 'nats'"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn init_tracing() {
|
fn init_tracing() {
|
||||||
let filter = std::env::var("RUST_LOG")
|
let filter = std::env::var("RUST_LOG")
|
||||||
.unwrap_or_else(|_| "worker=info,application=info".to_string());
|
.unwrap_or_else(|_| "worker=info,application=info".to_string());
|
||||||
|
|||||||
Reference in New Issue
Block a user