Refactor event payload handling across adapters
- Introduced `event-payload` crate to centralize event payload definitions. - Updated NATS and PostgreSQL adapters to use the new `EventPayload` type. - Removed redundant event payload definitions and conversion implementations from NATS and PostgreSQL adapters. - Simplified SQLite event queue to utilize the new `EventPayload`. - Refactored wiring functions for PostgreSQL and SQLite to improve database connection handling and migration. - Cleaned up presentation and worker crates by removing unused event publisher dependencies and related wiring functions.
This commit is contained in:
@@ -1,5 +1,4 @@
|
||||
use std::sync::Arc;
|
||||
use std::str::FromStr;
|
||||
|
||||
use anyhow::Context;
|
||||
use application::{config::AppConfig, context::AppContext, event_handlers::PosterSyncHandler, worker::WorkerService};
|
||||
@@ -10,12 +9,6 @@ use poster_fetcher::{PosterFetcherConfig, ReqwestPosterFetcher};
|
||||
use poster_storage::{PosterStorageAdapter, StorageConfig};
|
||||
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
||||
|
||||
#[cfg(feature = "sqlite")]
|
||||
use sqlite::{SqliteMovieRepository, SqliteUserRepository};
|
||||
|
||||
#[cfg(feature = "postgres")]
|
||||
use postgres::{PostgresRepository, PostgresUserRepository};
|
||||
|
||||
#[cfg(feature = "sqlite-federation")]
|
||||
use sqlite_federation::SqliteFederationRepository;
|
||||
#[cfg(feature = "postgres-federation")]
|
||||
@@ -27,9 +20,8 @@ use activitypub::{
|
||||
};
|
||||
|
||||
use domain::ports::{
|
||||
AuthService, DiaryExporter, DiaryRepository, EventHandler, MetadataClient, MovieRepository,
|
||||
PasswordHasher, PosterFetcherClient, PosterStorage, ReviewRepository, StatsRepository,
|
||||
UserRepository,
|
||||
AuthService, DiaryExporter, EventHandler, MetadataClient,
|
||||
PasswordHasher, PosterFetcherClient, PosterStorage,
|
||||
};
|
||||
|
||||
#[cfg(not(any(feature = "sqlite", feature = "postgres")))]
|
||||
@@ -65,12 +57,12 @@ async fn main() -> anyhow::Result<()> {
|
||||
match backend.as_str() {
|
||||
#[cfg(feature = "postgres")]
|
||||
"postgres" => {
|
||||
let (pool, m, r, d, s, u) = wire_postgres(&database_url).await?;
|
||||
let (pool, m, r, d, s, u) = postgres::wire(&database_url).await?;
|
||||
(m, r, d, s, u, DbPool::Postgres(pool))
|
||||
}
|
||||
#[cfg(feature = "sqlite")]
|
||||
_ => {
|
||||
let (pool, m, r, d, s, u) = wire_sqlite(&database_url).await?;
|
||||
let (pool, m, r, d, s, u) = sqlite::wire(&database_url).await?;
|
||||
(m, r, d, s, u, DbPool::Sqlite(pool))
|
||||
}
|
||||
#[cfg(not(feature = "sqlite"))]
|
||||
@@ -125,50 +117,57 @@ async fn main() -> anyhow::Result<()> {
|
||||
config: app_config,
|
||||
};
|
||||
|
||||
let mut handlers: Vec<Arc<dyn EventHandler>> = vec![Arc::new(PosterSyncHandler::new(ctx, 3))];
|
||||
let handlers: Vec<Arc<dyn EventHandler>> = {
|
||||
let poster = Arc::new(PosterSyncHandler::new(ctx, 3)) as Arc<dyn EventHandler>;
|
||||
|
||||
#[cfg(feature = "federation")]
|
||||
{
|
||||
let (federation_repo, review_store): (
|
||||
Arc<dyn activitypub::FederationRepository>,
|
||||
Arc<dyn activitypub::RemoteReviewRepository>,
|
||||
) = match &db_pool {
|
||||
#[cfg(feature = "sqlite-federation")]
|
||||
DbPool::Sqlite(pool) => {
|
||||
let fed = Arc::new(SqliteFederationRepository::new(pool.clone()));
|
||||
(Arc::clone(&fed) as _, fed as _)
|
||||
}
|
||||
#[cfg(feature = "postgres-federation")]
|
||||
DbPool::Postgres(pool) => {
|
||||
let fed = Arc::new(PostgresFederationRepository::new(pool.clone()));
|
||||
(Arc::clone(&fed) as _, fed as _)
|
||||
}
|
||||
};
|
||||
#[cfg(not(feature = "federation"))]
|
||||
{ vec![poster] }
|
||||
|
||||
let ap_service = Arc::new(
|
||||
ActivityPubService::new(
|
||||
federation_repo,
|
||||
Arc::new(DomainUserRepoAdapter(fed_user_repo)),
|
||||
Arc::new(ReviewObjectHandler {
|
||||
movie_repository: Arc::clone(&fed_movie_repo),
|
||||
diary_repository: fed_diary_repo,
|
||||
review_store,
|
||||
base_url: base_url.clone(),
|
||||
}),
|
||||
base_url.clone(),
|
||||
cfg!(debug_assertions),
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
#[cfg(feature = "federation")]
|
||||
{
|
||||
let (federation_repo, review_store): (
|
||||
Arc<dyn activitypub::FederationRepository>,
|
||||
Arc<dyn activitypub::RemoteReviewRepository>,
|
||||
) = match &db_pool {
|
||||
#[cfg(feature = "sqlite-federation")]
|
||||
DbPool::Sqlite(pool) => {
|
||||
let fed = Arc::new(SqliteFederationRepository::new(pool.clone()));
|
||||
(Arc::clone(&fed) as _, fed as _)
|
||||
}
|
||||
#[cfg(feature = "postgres-federation")]
|
||||
DbPool::Postgres(pool) => {
|
||||
let fed = Arc::new(PostgresFederationRepository::new(pool.clone()));
|
||||
(Arc::clone(&fed) as _, fed as _)
|
||||
}
|
||||
};
|
||||
|
||||
handlers.push(Arc::new(ActivityPubEventHandler::new(
|
||||
ap_service,
|
||||
fed_movie_repo,
|
||||
fed_review_repo,
|
||||
base_url,
|
||||
)));
|
||||
tracing::info!("federation event handler registered");
|
||||
}
|
||||
let ap_service = Arc::new(
|
||||
ActivityPubService::new(
|
||||
federation_repo,
|
||||
Arc::new(DomainUserRepoAdapter(fed_user_repo)),
|
||||
Arc::new(ReviewObjectHandler {
|
||||
movie_repository: Arc::clone(&fed_movie_repo),
|
||||
diary_repository: fed_diary_repo,
|
||||
review_store,
|
||||
base_url: base_url.clone(),
|
||||
}),
|
||||
base_url.clone(),
|
||||
cfg!(debug_assertions),
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
|
||||
let ap = Arc::new(ActivityPubEventHandler::new(
|
||||
ap_service,
|
||||
fed_movie_repo,
|
||||
fed_review_repo,
|
||||
base_url,
|
||||
)) as Arc<dyn EventHandler>;
|
||||
|
||||
tracing::info!("federation event handler registered");
|
||||
vec![poster, ap]
|
||||
}
|
||||
};
|
||||
|
||||
let worker = WorkerService::new(consumer_arc, handlers);
|
||||
|
||||
@@ -218,69 +217,3 @@ fn init_tracing() {
|
||||
.init();
|
||||
}
|
||||
|
||||
#[cfg(feature = "sqlite")]
|
||||
async fn wire_sqlite(database_url: &str) -> anyhow::Result<(
|
||||
sqlx::SqlitePool,
|
||||
Arc<dyn MovieRepository>,
|
||||
Arc<dyn ReviewRepository>,
|
||||
Arc<dyn DiaryRepository>,
|
||||
Arc<dyn StatsRepository>,
|
||||
Arc<dyn UserRepository>,
|
||||
)> {
|
||||
use sqlx::sqlite::SqliteConnectOptions;
|
||||
|
||||
let opts = SqliteConnectOptions::from_str(database_url)
|
||||
.context("Invalid DATABASE_URL")?
|
||||
.create_if_missing(true)
|
||||
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
|
||||
.busy_timeout(std::time::Duration::from_secs(5));
|
||||
let pool = sqlx::SqlitePool::connect_with(opts)
|
||||
.await
|
||||
.context("Failed to connect to SQLite database")?;
|
||||
|
||||
let sqlite_repo = Arc::new(SqliteMovieRepository::new(pool.clone()));
|
||||
sqlite_repo
|
||||
.migrate()
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{}", e))
|
||||
.context("Database migration failed")?;
|
||||
|
||||
let movie_repository: Arc<dyn MovieRepository> = Arc::clone(&sqlite_repo) as _;
|
||||
let review_repository: Arc<dyn ReviewRepository> = Arc::clone(&sqlite_repo) as _;
|
||||
let diary_repository: Arc<dyn DiaryRepository> = Arc::clone(&sqlite_repo) as _;
|
||||
let stats_repository: Arc<dyn StatsRepository> = Arc::clone(&sqlite_repo) as _;
|
||||
let user_repository: Arc<dyn UserRepository> =
|
||||
Arc::new(SqliteUserRepository::new(pool.clone()));
|
||||
|
||||
Ok((pool, movie_repository, review_repository, diary_repository, stats_repository, user_repository))
|
||||
}
|
||||
|
||||
#[cfg(feature = "postgres")]
|
||||
async fn wire_postgres(database_url: &str) -> anyhow::Result<(
|
||||
sqlx::PgPool,
|
||||
Arc<dyn MovieRepository>,
|
||||
Arc<dyn ReviewRepository>,
|
||||
Arc<dyn DiaryRepository>,
|
||||
Arc<dyn StatsRepository>,
|
||||
Arc<dyn UserRepository>,
|
||||
)> {
|
||||
let pool = sqlx::PgPool::connect(database_url)
|
||||
.await
|
||||
.context("Failed to connect to PostgreSQL database")?;
|
||||
|
||||
let pg_repo = Arc::new(PostgresRepository::new(pool.clone()));
|
||||
pg_repo
|
||||
.migrate()
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{}", e))
|
||||
.context("Database migration failed")?;
|
||||
|
||||
let movie_repository: Arc<dyn MovieRepository> = Arc::clone(&pg_repo) as _;
|
||||
let review_repository: Arc<dyn ReviewRepository> = Arc::clone(&pg_repo) as _;
|
||||
let diary_repository: Arc<dyn DiaryRepository> = Arc::clone(&pg_repo) as _;
|
||||
let stats_repository: Arc<dyn StatsRepository> = Arc::clone(&pg_repo) as _;
|
||||
let user_repository: Arc<dyn UserRepository> =
|
||||
Arc::new(PostgresUserRepository::new(pool.clone()));
|
||||
|
||||
Ok((pool, movie_repository, review_repository, diary_repository, stats_repository, user_repository))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user