feat: refactor database connection handling to use DbPool enum for better abstraction
This commit is contained in:
@@ -87,27 +87,17 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> {
|
|||||||
let auth_service: Arc<dyn AuthService> = Arc::new(JwtAuthService::new(auth_config));
|
let auth_service: Arc<dyn AuthService> = Arc::new(JwtAuthService::new(auth_config));
|
||||||
let password_hasher: Arc<dyn PasswordHasher> = Arc::new(Argon2PasswordHasher);
|
let password_hasher: Arc<dyn PasswordHasher> = Arc::new(Argon2PasswordHasher);
|
||||||
|
|
||||||
// Track pools — needed for federation and DB event queue
|
let (movie_repository, review_repository, diary_repository, stats_repository, user_repository, db_pool) =
|
||||||
#[cfg(feature = "sqlite")]
|
|
||||||
let mut sqlite_pool: Option<sqlx::SqlitePool> = None;
|
|
||||||
#[cfg(feature = "postgres")]
|
|
||||||
let mut pg_pool: Option<sqlx::PgPool> = None;
|
|
||||||
|
|
||||||
let (movie_repository, review_repository, diary_repository, stats_repository, user_repository):
|
|
||||||
(Arc<dyn MovieRepository>, Arc<dyn ReviewRepository>, Arc<dyn DiaryRepository>,
|
|
||||||
Arc<dyn StatsRepository>, Arc<dyn UserRepository>) =
|
|
||||||
match backend.as_str() {
|
match backend.as_str() {
|
||||||
#[cfg(feature = "postgres")]
|
#[cfg(feature = "postgres")]
|
||||||
"postgres" => {
|
"postgres" => {
|
||||||
let (pool, m, r, d, s, u) = wire_postgres(&database_url).await?;
|
let (pool, m, r, d, s, u) = wire_postgres(&database_url).await?;
|
||||||
pg_pool = Some(pool);
|
(m, r, d, s, u, DbPool::Postgres(pool))
|
||||||
(m, r, d, s, u)
|
|
||||||
}
|
}
|
||||||
#[cfg(feature = "sqlite")]
|
#[cfg(feature = "sqlite")]
|
||||||
_ => {
|
_ => {
|
||||||
let (pool, m, r, d, s, u) = wire_sqlite(&database_url).await?;
|
let (pool, m, r, d, s, u) = wire_sqlite(&database_url).await?;
|
||||||
sqlite_pool = Some(pool);
|
(m, r, d, s, u, DbPool::Sqlite(pool))
|
||||||
(m, r, d, s, u)
|
|
||||||
}
|
}
|
||||||
#[cfg(not(feature = "sqlite"))]
|
#[cfg(not(feature = "sqlite"))]
|
||||||
_ => anyhow::bail!("DATABASE_BACKEND={backend} is not supported by this build (sqlite feature is not enabled)"),
|
_ => anyhow::bail!("DATABASE_BACKEND={backend} is not supported by this build (sqlite feature is not enabled)"),
|
||||||
@@ -122,17 +112,15 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> {
|
|||||||
Arc<dyn activitypub::FederationRepository>,
|
Arc<dyn activitypub::FederationRepository>,
|
||||||
Arc<dyn domain::ports::SocialQueryPort>,
|
Arc<dyn domain::ports::SocialQueryPort>,
|
||||||
Arc<dyn activitypub::RemoteReviewRepository>,
|
Arc<dyn activitypub::RemoteReviewRepository>,
|
||||||
) = match backend.as_str() {
|
) = match &db_pool {
|
||||||
#[cfg(feature = "postgres-federation")]
|
#[cfg(feature = "postgres-federation")]
|
||||||
"postgres" => {
|
DbPool::Postgres(pool) => {
|
||||||
let pool = pg_pool.as_ref().unwrap().clone();
|
let fed = Arc::new(PostgresFederationRepository::new(pool.clone()));
|
||||||
let fed = Arc::new(PostgresFederationRepository::new(pool));
|
|
||||||
(Arc::clone(&fed) as _, Arc::clone(&fed) as _, fed as _)
|
(Arc::clone(&fed) as _, Arc::clone(&fed) as _, fed as _)
|
||||||
}
|
}
|
||||||
#[cfg(feature = "sqlite-federation")]
|
#[cfg(feature = "sqlite-federation")]
|
||||||
_ => {
|
DbPool::Sqlite(pool) => {
|
||||||
let pool = sqlite_pool.as_ref().unwrap().clone();
|
let fed = Arc::new(SqliteFederationRepository::new(pool.clone()));
|
||||||
let fed = Arc::new(SqliteFederationRepository::new(pool));
|
|
||||||
(Arc::clone(&fed) as _, Arc::clone(&fed) as _, fed as _)
|
(Arc::clone(&fed) as _, Arc::clone(&fed) as _, fed as _)
|
||||||
}
|
}
|
||||||
#[cfg(not(feature = "sqlite-federation"))]
|
#[cfg(not(feature = "sqlite-federation"))]
|
||||||
@@ -162,17 +150,15 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> {
|
|||||||
let ep: Arc<dyn EventPublisher> = match event_bus {
|
let ep: Arc<dyn EventPublisher> = match event_bus {
|
||||||
EventBusBackend::Db => {
|
EventBusBackend::Db => {
|
||||||
tracing::info!("event bus: DB queue");
|
tracing::info!("event bus: DB queue");
|
||||||
match backend.as_str() {
|
match &db_pool {
|
||||||
#[cfg(feature = "postgres")]
|
#[cfg(feature = "postgres")]
|
||||||
"postgres" => postgres_event_queue::PostgresEventQueue::create_publisher(
|
DbPool::Postgres(pool) => postgres_event_queue::PostgresEventQueue::create_publisher(
|
||||||
pg_pool.as_ref().unwrap().clone()
|
pool.clone()
|
||||||
).await?,
|
).await?,
|
||||||
#[cfg(feature = "sqlite")]
|
#[cfg(feature = "sqlite")]
|
||||||
_ => sqlite_event_queue::SqliteEventQueue::create_publisher(
|
DbPool::Sqlite(pool) => sqlite_event_queue::SqliteEventQueue::create_publisher(
|
||||||
sqlite_pool.as_ref().unwrap().clone()
|
pool.clone()
|
||||||
).await?,
|
).await?,
|
||||||
#[cfg(not(feature = "sqlite"))]
|
|
||||||
_ => anyhow::bail!("EVENT_BUS_BACKEND=db has no adapter for DATABASE_BACKEND={backend}; enable the sqlite or postgres feature"),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#[cfg(feature = "nats")]
|
#[cfg(feature = "nats")]
|
||||||
@@ -311,6 +297,13 @@ async fn wire_postgres(database_url: &str) -> anyhow::Result<(
|
|||||||
Ok((pool, movie_repository, review_repository, diary_repository, stats_repository, user_repository))
|
Ok((pool, movie_repository, review_repository, diary_repository, stats_repository, user_repository))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
enum DbPool {
|
||||||
|
#[cfg(feature = "sqlite")]
|
||||||
|
Sqlite(sqlx::SqlitePool),
|
||||||
|
#[cfg(feature = "postgres")]
|
||||||
|
Postgres(sqlx::PgPool),
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone, Copy)]
|
#[derive(Clone, Copy)]
|
||||||
enum EventBusBackend {
|
enum EventBusBackend {
|
||||||
Db,
|
Db,
|
||||||
|
|||||||
@@ -51,26 +51,17 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
let auth_service: Arc<dyn AuthService> = Arc::new(JwtAuthService::new(auth_config));
|
let auth_service: Arc<dyn AuthService> = Arc::new(JwtAuthService::new(auth_config));
|
||||||
let password_hasher: Arc<dyn PasswordHasher> = Arc::new(Argon2PasswordHasher);
|
let password_hasher: Arc<dyn PasswordHasher> = Arc::new(Argon2PasswordHasher);
|
||||||
|
|
||||||
#[cfg(feature = "sqlite")]
|
let (movie_repository, review_repository, diary_repository, stats_repository, user_repository, db_pool) =
|
||||||
let mut sqlite_pool: Option<sqlx::SqlitePool> = None;
|
|
||||||
#[cfg(feature = "postgres")]
|
|
||||||
let mut pg_pool: Option<sqlx::PgPool> = None;
|
|
||||||
|
|
||||||
let (movie_repository, review_repository, diary_repository, stats_repository, user_repository):
|
|
||||||
(Arc<dyn MovieRepository>, Arc<dyn ReviewRepository>, Arc<dyn DiaryRepository>,
|
|
||||||
Arc<dyn StatsRepository>, Arc<dyn UserRepository>) =
|
|
||||||
match backend.as_str() {
|
match backend.as_str() {
|
||||||
#[cfg(feature = "postgres")]
|
#[cfg(feature = "postgres")]
|
||||||
"postgres" => {
|
"postgres" => {
|
||||||
let (pool, m, r, d, s, u) = wire_postgres(&database_url).await?;
|
let (pool, m, r, d, s, u) = wire_postgres(&database_url).await?;
|
||||||
pg_pool = Some(pool);
|
(m, r, d, s, u, DbPool::Postgres(pool))
|
||||||
(m, r, d, s, u)
|
|
||||||
}
|
}
|
||||||
#[cfg(feature = "sqlite")]
|
#[cfg(feature = "sqlite")]
|
||||||
_ => {
|
_ => {
|
||||||
let (pool, m, r, d, s, u) = wire_sqlite(&database_url).await?;
|
let (pool, m, r, d, s, u) = wire_sqlite(&database_url).await?;
|
||||||
sqlite_pool = Some(pool);
|
(m, r, d, s, u, DbPool::Sqlite(pool))
|
||||||
(m, r, d, s, u)
|
|
||||||
}
|
}
|
||||||
#[cfg(not(feature = "sqlite"))]
|
#[cfg(not(feature = "sqlite"))]
|
||||||
_ => anyhow::bail!("DATABASE_BACKEND={backend} is not supported by this build"),
|
_ => anyhow::bail!("DATABASE_BACKEND={backend} is not supported by this build"),
|
||||||
@@ -82,17 +73,11 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
) = match EventBusBackend::from_env()? {
|
) = match EventBusBackend::from_env()? {
|
||||||
EventBusBackend::Db => {
|
EventBusBackend::Db => {
|
||||||
tracing::info!("event bus: DB queue");
|
tracing::info!("event bus: DB queue");
|
||||||
match backend.as_str() {
|
match db_pool {
|
||||||
#[cfg(feature = "postgres")]
|
#[cfg(feature = "postgres")]
|
||||||
"postgres" => postgres_event_queue::PostgresEventQueue::create_channel(
|
DbPool::Postgres(pool) => postgres_event_queue::PostgresEventQueue::create_channel(pool).await?,
|
||||||
pg_pool.unwrap()
|
|
||||||
).await?,
|
|
||||||
#[cfg(feature = "sqlite")]
|
#[cfg(feature = "sqlite")]
|
||||||
_ => sqlite_event_queue::SqliteEventQueue::create_channel(
|
DbPool::Sqlite(pool) => sqlite_event_queue::SqliteEventQueue::create_channel(pool).await?,
|
||||||
sqlite_pool.unwrap()
|
|
||||||
).await?,
|
|
||||||
#[cfg(not(feature = "sqlite"))]
|
|
||||||
_ => anyhow::bail!("EVENT_BUS_BACKEND=db has no adapter for DATABASE_BACKEND={backend}; enable the sqlite or postgres feature"),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#[cfg(feature = "nats")]
|
#[cfg(feature = "nats")]
|
||||||
@@ -130,6 +115,13 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
enum DbPool {
|
||||||
|
#[cfg(feature = "sqlite")]
|
||||||
|
Sqlite(sqlx::SqlitePool),
|
||||||
|
#[cfg(feature = "postgres")]
|
||||||
|
Postgres(sqlx::PgPool),
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone, Copy)]
|
#[derive(Clone, Copy)]
|
||||||
enum EventBusBackend {
|
enum EventBusBackend {
|
||||||
Db,
|
Db,
|
||||||
|
|||||||
Reference in New Issue
Block a user