feat: implement event bus backend configuration for DB and NATS

This commit is contained in:
2026-05-10 17:55:51 +02:00
parent dca50b46d1
commit c2a5541706
7 changed files with 109 additions and 42 deletions

View File

@@ -7,6 +7,7 @@ edition = "2024"
default = ["sqlite", "sqlite-federation"]
sqlite = ["dep:sqlite", "dep:sqlite-event-queue"]
postgres = ["dep:postgres", "dep:postgres-event-queue"]
nats = ["dep:nats"]
# Meta-feature: true when any federation adapter is active — keeps all #[cfg(feature = "federation")] gates working
federation = []
sqlite-federation = [
@@ -49,7 +50,7 @@ poster-fetcher = { workspace = true }
poster-storage = { workspace = true }
template-askama = { workspace = true }
event-publisher = { workspace = true }
nats = { workspace = true }
nats = { workspace = true, optional = true }
rss = { workspace = true }
export = { workspace = true }
doc = { workspace = true }

View File

@@ -114,6 +114,8 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> {
};
// Wire up event channel, federation service, and ap_router
let event_bus = EventBusBackend::from_env()?;
#[cfg(feature = "federation")]
let (event_publisher_arc, ap_router, ap_service, social_query) = {
let (federation_repo, social_query_arc, review_store): (
@@ -157,10 +159,36 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> {
let ap_router = concrete_ap_service.router();
let ap_service_arc: Arc<dyn ActivityPubPort> = concrete_ap_service;
let ep: Arc<dyn EventPublisher> = if let Ok(cfg) = nats::NatsConfig::from_env() {
tracing::info!("event bus: NATS ({})", cfg.url);
nats::create_publisher(cfg).await?
} else {
let ep: Arc<dyn EventPublisher> = match event_bus {
EventBusBackend::Db => {
tracing::info!("event bus: DB queue");
match backend.as_str() {
#[cfg(feature = "postgres")]
"postgres" => postgres_event_queue::PostgresEventQueue::create_publisher(
pg_pool.as_ref().unwrap().clone()
).await?,
#[cfg(feature = "sqlite")]
_ => sqlite_event_queue::SqliteEventQueue::create_publisher(
sqlite_pool.as_ref().unwrap().clone()
).await?,
#[cfg(not(feature = "sqlite"))]
_ => anyhow::bail!("EVENT_BUS_BACKEND=db has no adapter for DATABASE_BACKEND={backend}; enable the sqlite or postgres feature"),
}
}
#[cfg(feature = "nats")]
EventBusBackend::Nats => {
let cfg = nats::NatsConfig::from_env()
.context("EVENT_BUS_BACKEND=nats requires NATS_URL to be set")?;
tracing::info!("event bus: NATS ({})", cfg.url);
nats::create_publisher(cfg).await?
}
};
(ep, ap_router, ap_service_arc, social_query_arc)
};
#[cfg(not(feature = "federation"))]
let event_publisher_arc: Arc<dyn EventPublisher> = match event_bus {
EventBusBackend::Db => {
tracing::info!("event bus: DB queue");
match backend.as_str() {
#[cfg(feature = "postgres")]
@@ -172,29 +200,15 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> {
sqlite_pool.as_ref().unwrap().clone()
).await?,
#[cfg(not(feature = "sqlite"))]
_ => anyhow::bail!("no event bus: NATS_URL not set and sqlite feature not enabled"),
_ => anyhow::bail!("EVENT_BUS_BACKEND=db has no adapter for DATABASE_BACKEND={backend}; enable the sqlite or postgres feature"),
}
};
(ep, ap_router, ap_service_arc, social_query_arc)
};
#[cfg(not(feature = "federation"))]
let event_publisher_arc: Arc<dyn EventPublisher> = if let Ok(cfg) = nats::NatsConfig::from_env() {
tracing::info!("event bus: NATS ({})", cfg.url);
nats::create_publisher(cfg).await?
} else {
tracing::info!("event bus: DB queue");
match backend.as_str() {
#[cfg(feature = "postgres")]
"postgres" => postgres_event_queue::PostgresEventQueue::create_publisher(
pg_pool.as_ref().unwrap().clone()
).await?,
#[cfg(feature = "sqlite")]
_ => sqlite_event_queue::SqliteEventQueue::create_publisher(
sqlite_pool.as_ref().unwrap().clone()
).await?,
#[cfg(not(feature = "sqlite"))]
_ => anyhow::bail!("no event bus: NATS_URL not set and sqlite feature not enabled"),
}
#[cfg(feature = "nats")]
EventBusBackend::Nats => {
let cfg = nats::NatsConfig::from_env()
.context("EVENT_BUS_BACKEND=nats requires NATS_URL to be set")?;
tracing::info!("event bus: NATS ({})", cfg.url);
nats::create_publisher(cfg).await?
}
};
#[cfg(not(feature = "federation"))]
@@ -297,6 +311,29 @@ async fn wire_postgres(database_url: &str) -> anyhow::Result<(
Ok((pool, movie_repository, review_repository, diary_repository, stats_repository, user_repository))
}
#[derive(Clone, Copy)]
enum EventBusBackend {
Db,
#[cfg(feature = "nats")]
Nats,
}
impl EventBusBackend {
fn from_env() -> anyhow::Result<Self> {
match std::env::var("EVENT_BUS_BACKEND")
.unwrap_or_else(|_| "db".to_string())
.as_str()
{
"db" => Ok(Self::Db),
#[cfg(feature = "nats")]
"nats" => Ok(Self::Nats),
#[cfg(not(feature = "nats"))]
"nats" => anyhow::bail!("EVENT_BUS_BACKEND=nats requires the nats feature to be compiled in"),
other => anyhow::bail!("unknown EVENT_BUS_BACKEND={other}, expected 'db' or 'nats'"),
}
}
}
fn init_tracing() {
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::new(

View File

@@ -7,6 +7,7 @@ edition = "2024"
default = ["sqlite"]
sqlite = ["dep:sqlite", "dep:sqlite-event-queue"]
postgres = ["dep:postgres", "dep:postgres-event-queue"]
nats = ["dep:nats"]
[dependencies]
domain = { workspace = true }
@@ -29,7 +30,7 @@ metadata = { workspace = true }
poster-fetcher = { workspace = true }
poster-storage = { workspace = true }
export = { workspace = true }
nats = { workspace = true }
nats = { workspace = true, optional = true }
sqlx = { workspace = true }
# Optional — database backends

View File

@@ -79,12 +79,8 @@ async fn main() -> anyhow::Result<()> {
let (event_publisher_arc, consumer_arc): (
Arc<dyn domain::ports::EventPublisher>,
Arc<dyn domain::ports::EventConsumer>,
) = match nats::NatsConfig::from_env() {
Ok(cfg) => {
tracing::info!("event bus: NATS ({})", cfg.url);
nats::create_channel(cfg).await?
}
Err(_) => {
) = match EventBusBackend::from_env()? {
EventBusBackend::Db => {
tracing::info!("event bus: DB queue");
match backend.as_str() {
#[cfg(feature = "postgres")]
@@ -96,9 +92,16 @@ async fn main() -> anyhow::Result<()> {
sqlite_pool.unwrap()
).await?,
#[cfg(not(feature = "sqlite"))]
_ => anyhow::bail!("no event bus: NATS_URL not set and sqlite feature not enabled"),
_ => anyhow::bail!("EVENT_BUS_BACKEND=db has no adapter for DATABASE_BACKEND={backend}; enable the sqlite or postgres feature"),
}
}
#[cfg(feature = "nats")]
EventBusBackend::Nats => {
let cfg = nats::NatsConfig::from_env()
.context("EVENT_BUS_BACKEND=nats requires NATS_URL to be set")?;
tracing::info!("event bus: NATS ({})", cfg.url);
nats::create_channel(cfg).await?
}
};
let ctx = AppContext {
@@ -127,6 +130,29 @@ async fn main() -> anyhow::Result<()> {
Ok(())
}
#[derive(Clone, Copy)]
enum EventBusBackend {
Db,
#[cfg(feature = "nats")]
Nats,
}
impl EventBusBackend {
fn from_env() -> anyhow::Result<Self> {
match std::env::var("EVENT_BUS_BACKEND")
.unwrap_or_else(|_| "db".to_string())
.as_str()
{
"db" => Ok(Self::Db),
#[cfg(feature = "nats")]
"nats" => Ok(Self::Nats),
#[cfg(not(feature = "nats"))]
"nats" => anyhow::bail!("EVENT_BUS_BACKEND=nats requires the nats feature to be compiled in"),
other => anyhow::bail!("unknown EVENT_BUS_BACKEND={other}, expected 'db' or 'nats'"),
}
}
}
fn init_tracing() {
let filter = std::env::var("RUST_LOG")
.unwrap_or_else(|_| "worker=info,application=info".to_string());