This commit is contained in:
@@ -16,7 +16,7 @@ pub enum DbPool {
|
||||
Postgres(sqlx::PgPool),
|
||||
}
|
||||
|
||||
pub struct Repos {
|
||||
pub struct WorkerDbOutput {
|
||||
pub movie: Arc<dyn MovieRepository>,
|
||||
pub review: Arc<dyn ReviewRepository>,
|
||||
pub diary: Arc<dyn DiaryRepository>,
|
||||
@@ -26,96 +26,95 @@ pub struct Repos {
|
||||
pub import_profile: Arc<dyn ImportProfileRepository>,
|
||||
pub movie_profile: Arc<dyn MovieProfileRepository>,
|
||||
pub watchlist: Arc<dyn WatchlistRepository>,
|
||||
pub ap_content: Arc<dyn LocalApContentQuery>,
|
||||
pub image_ref_command: Arc<dyn ImageRefCommand>,
|
||||
pub image_ref_query: Arc<dyn ImageRefQuery>,
|
||||
pub watch_event: Arc<dyn WatchEventRepository>,
|
||||
pub webhook_token: Arc<dyn WebhookTokenRepository>,
|
||||
pub person_command: Arc<dyn PersonCommand>,
|
||||
pub person_query: Arc<dyn PersonQuery>,
|
||||
pub search_command: Arc<dyn SearchCommand>,
|
||||
pub search_port: Arc<dyn SearchPort>,
|
||||
pub profile_fields: Arc<dyn UserProfileFieldsRepository>,
|
||||
pub watch_event: Arc<dyn WatchEventRepository>,
|
||||
pub webhook_token: Arc<dyn WebhookTokenRepository>,
|
||||
pub ap_content: Arc<dyn LocalApContentQuery>,
|
||||
pub image_ref_command: Arc<dyn ImageRefCommand>,
|
||||
pub image_ref_query: Arc<dyn ImageRefQuery>,
|
||||
pub db_pool: DbPool,
|
||||
}
|
||||
|
||||
pub async fn connect(database_url: &str, backend: &str) -> anyhow::Result<(Repos, DbPool)> {
|
||||
pub async fn connect(database_url: &str, backend: &str) -> anyhow::Result<WorkerDbOutput> {
|
||||
match backend {
|
||||
#[cfg(feature = "postgres")]
|
||||
"postgres" => {
|
||||
let (pool, m, r, d, s, u, is, ip, mp, wl, ac) = postgres::wire(database_url)
|
||||
let w = postgres::wire(database_url)
|
||||
.await
|
||||
.context("PostgreSQL connection failed")?;
|
||||
let (image_ref_command, image_ref_query) = postgres::create_image_ref(pool.clone());
|
||||
let (person_command, person_query) = postgres::create_person_adapter(pool.clone());
|
||||
let (image_ref_command, image_ref_query) = postgres::create_image_ref(w.pool.clone());
|
||||
let (person_command, person_query) = postgres::create_person_adapter(w.pool.clone());
|
||||
let (search_command, search_port) =
|
||||
postgres_search::create_search_adapter(pool.clone());
|
||||
let pf = postgres::create_profile_fields_repo(pool.clone());
|
||||
postgres_search::create_search_adapter(w.pool.clone());
|
||||
let pf = postgres::create_profile_fields_repo(w.pool.clone());
|
||||
let we: Arc<dyn WatchEventRepository> =
|
||||
Arc::new(postgres::PostgresWatchEventRepository::new(pool.clone()));
|
||||
let wt: Arc<dyn WebhookTokenRepository> =
|
||||
Arc::new(postgres::PostgresWebhookTokenRepository::new(pool.clone()));
|
||||
Ok((
|
||||
Repos {
|
||||
movie: m,
|
||||
review: r,
|
||||
diary: d,
|
||||
stats: s,
|
||||
user: u,
|
||||
import_session: is,
|
||||
import_profile: ip,
|
||||
movie_profile: mp,
|
||||
watchlist: wl,
|
||||
ap_content: ac,
|
||||
image_ref_command,
|
||||
image_ref_query,
|
||||
person_command,
|
||||
person_query,
|
||||
search_command,
|
||||
search_port,
|
||||
profile_fields: pf,
|
||||
watch_event: we,
|
||||
webhook_token: wt,
|
||||
},
|
||||
DbPool::Postgres(pool),
|
||||
))
|
||||
Arc::new(postgres::PostgresWatchEventRepository::new(w.pool.clone()));
|
||||
let wt: Arc<dyn WebhookTokenRepository> = Arc::new(
|
||||
postgres::PostgresWebhookTokenRepository::new(w.pool.clone()),
|
||||
);
|
||||
Ok(WorkerDbOutput {
|
||||
movie: w.movie,
|
||||
review: w.review,
|
||||
diary: w.diary,
|
||||
stats: w.stats,
|
||||
user: w.user,
|
||||
import_session: w.import_session,
|
||||
import_profile: w.import_profile,
|
||||
movie_profile: w.movie_profile,
|
||||
watchlist: w.watchlist,
|
||||
watch_event: we,
|
||||
webhook_token: wt,
|
||||
person_command,
|
||||
person_query,
|
||||
search_command,
|
||||
search_port,
|
||||
profile_fields: pf,
|
||||
ap_content: w.ap_content,
|
||||
image_ref_command,
|
||||
image_ref_query,
|
||||
db_pool: DbPool::Postgres(w.pool),
|
||||
})
|
||||
}
|
||||
#[cfg(feature = "sqlite")]
|
||||
_ => {
|
||||
let (pool, m, r, d, s, u, is, ip, mp, wl, ac) = sqlite::wire(database_url)
|
||||
let w = sqlite::wire(database_url)
|
||||
.await
|
||||
.context("SQLite connection failed")?;
|
||||
let (image_ref_command, image_ref_query) = sqlite::create_image_ref(pool.clone());
|
||||
let (person_command, person_query) = sqlite::create_person_adapter(pool.clone());
|
||||
let (search_command, search_port) = sqlite_search::create_search_adapter(pool.clone());
|
||||
let pf = sqlite::create_profile_fields_repo(pool.clone());
|
||||
let (image_ref_command, image_ref_query) = sqlite::create_image_ref(w.pool.clone());
|
||||
let (person_command, person_query) = sqlite::create_person_adapter(w.pool.clone());
|
||||
let (search_command, search_port) =
|
||||
sqlite_search::create_search_adapter(w.pool.clone());
|
||||
let pf = sqlite::create_profile_fields_repo(w.pool.clone());
|
||||
let we: Arc<dyn WatchEventRepository> =
|
||||
Arc::new(sqlite::SqliteWatchEventRepository::new(pool.clone()));
|
||||
Arc::new(sqlite::SqliteWatchEventRepository::new(w.pool.clone()));
|
||||
let wt: Arc<dyn WebhookTokenRepository> =
|
||||
Arc::new(sqlite::SqliteWebhookTokenRepository::new(pool.clone()));
|
||||
Ok((
|
||||
Repos {
|
||||
movie: m,
|
||||
review: r,
|
||||
diary: d,
|
||||
stats: s,
|
||||
user: u,
|
||||
import_session: is,
|
||||
import_profile: ip,
|
||||
movie_profile: mp,
|
||||
watchlist: wl,
|
||||
ap_content: ac,
|
||||
image_ref_command,
|
||||
image_ref_query,
|
||||
person_command,
|
||||
person_query,
|
||||
search_command,
|
||||
search_port,
|
||||
profile_fields: pf,
|
||||
watch_event: we,
|
||||
webhook_token: wt,
|
||||
},
|
||||
DbPool::Sqlite(pool),
|
||||
))
|
||||
Arc::new(sqlite::SqliteWebhookTokenRepository::new(w.pool.clone()));
|
||||
Ok(WorkerDbOutput {
|
||||
movie: w.movie,
|
||||
review: w.review,
|
||||
diary: w.diary,
|
||||
stats: w.stats,
|
||||
user: w.user,
|
||||
import_session: w.import_session,
|
||||
import_profile: w.import_profile,
|
||||
movie_profile: w.movie_profile,
|
||||
watchlist: w.watchlist,
|
||||
watch_event: we,
|
||||
webhook_token: wt,
|
||||
person_command,
|
||||
person_query,
|
||||
search_command,
|
||||
search_port,
|
||||
profile_fields: pf,
|
||||
ap_content: w.ap_content,
|
||||
image_ref_command,
|
||||
image_ref_query,
|
||||
db_pool: DbPool::Sqlite(w.pool),
|
||||
})
|
||||
}
|
||||
#[cfg(not(feature = "sqlite"))]
|
||||
_ => anyhow::bail!("DATABASE_BACKEND={backend} is not supported by this build"),
|
||||
|
||||
@@ -6,7 +6,9 @@ use std::sync::Arc;
|
||||
|
||||
use anyhow::Context;
|
||||
use application::{
|
||||
MovieDiscoveryIndexer, SearchCleanupHandler, config::AppConfig, context::AppContext,
|
||||
MovieDiscoveryIndexer, SearchCleanupHandler,
|
||||
config::AppConfig,
|
||||
context::{AppContext, Repositories, Services},
|
||||
worker::WorkerService,
|
||||
};
|
||||
use export::ExportAdapter;
|
||||
@@ -34,22 +36,16 @@ async fn main() -> anyhow::Result<()> {
|
||||
let poster_fetcher = poster_fetcher::create()?;
|
||||
let image_storage = image_storage::create()?;
|
||||
|
||||
let (repos, db_pool) = db::connect(&database_url, &backend).await?;
|
||||
let (event_publisher_arc, consumer_arc) = event_bus::create(&db_pool).await?;
|
||||
let db = db::connect(&database_url, &backend).await?;
|
||||
let (event_publisher_arc, consumer_arc) = event_bus::create(&db.db_pool).await?;
|
||||
|
||||
let image_ref_command = Arc::clone(&repos.image_ref_command);
|
||||
let image_ref_query = Arc::clone(&repos.image_ref_query);
|
||||
let person_command = Arc::clone(&repos.person_command);
|
||||
let person_query = Arc::clone(&repos.person_query);
|
||||
let search_command = Arc::clone(&repos.search_command);
|
||||
let search_port = Arc::clone(&repos.search_port);
|
||||
let profile_fields_repo = Arc::clone(&repos.profile_fields);
|
||||
let image_ref_command = Arc::clone(&db.image_ref_command);
|
||||
let image_ref_query = Arc::clone(&db.image_ref_query);
|
||||
|
||||
// Clone refs federation handler needs before ctx consumes them.
|
||||
#[cfg(feature = "federation")]
|
||||
let (fed_ap_content, fed_user_repo, base_url, allow_registration) = (
|
||||
Arc::clone(&repos.ap_content),
|
||||
Arc::clone(&repos.user),
|
||||
Arc::clone(&db.ap_content),
|
||||
Arc::clone(&db.user),
|
||||
app_config.base_url.clone(),
|
||||
app_config.allow_registration,
|
||||
);
|
||||
@@ -63,7 +59,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
fed_social_query,
|
||||
fed_review_store,
|
||||
fed_remote_watchlist_repo,
|
||||
) = match &db_pool {
|
||||
) = match &db.db_pool {
|
||||
#[cfg(feature = "sqlite-federation")]
|
||||
db::DbPool::Sqlite(pool) => sqlite_federation::wire(pool.clone()),
|
||||
#[cfg(feature = "postgres-federation")]
|
||||
@@ -71,34 +67,38 @@ async fn main() -> anyhow::Result<()> {
|
||||
};
|
||||
|
||||
let ctx = AppContext {
|
||||
movie_repository: repos.movie,
|
||||
review_repository: repos.review,
|
||||
diary_repository: repos.diary,
|
||||
diary_exporter: Arc::new(ExportAdapter) as Arc<dyn DiaryExporter>,
|
||||
document_parser: Arc::new(ImporterDocumentParser) as Arc<dyn DocumentParser>,
|
||||
stats_repository: repos.stats,
|
||||
metadata_client,
|
||||
poster_fetcher,
|
||||
image_storage,
|
||||
event_publisher: event_publisher_arc,
|
||||
auth_service,
|
||||
password_hasher,
|
||||
user_repository: repos.user,
|
||||
import_session_repository: repos.import_session,
|
||||
import_profile_repository: repos.import_profile,
|
||||
movie_profile_repository: repos.movie_profile,
|
||||
watchlist_repository: repos.watchlist,
|
||||
watch_event_repository: repos.watch_event,
|
||||
webhook_token_repository: repos.webhook_token,
|
||||
profile_fields_repository: Arc::clone(&profile_fields_repo),
|
||||
#[cfg(feature = "federation")]
|
||||
remote_watchlist_repository: fed_remote_watchlist_repo.clone(),
|
||||
#[cfg(feature = "federation")]
|
||||
social_query: fed_social_query,
|
||||
person_command: Arc::clone(&person_command),
|
||||
person_query: Arc::clone(&person_query),
|
||||
search_port: Arc::clone(&search_port),
|
||||
search_command: Arc::clone(&search_command),
|
||||
repos: Repositories {
|
||||
movie: db.movie,
|
||||
review: db.review,
|
||||
diary: db.diary,
|
||||
stats: db.stats,
|
||||
user: db.user,
|
||||
import_session: db.import_session,
|
||||
import_profile: db.import_profile,
|
||||
movie_profile: db.movie_profile,
|
||||
watchlist: db.watchlist,
|
||||
watch_event: db.watch_event,
|
||||
webhook_token: db.webhook_token,
|
||||
profile_fields: db.profile_fields,
|
||||
person_command: db.person_command,
|
||||
person_query: db.person_query,
|
||||
search_port: db.search_port,
|
||||
search_command: db.search_command,
|
||||
#[cfg(feature = "federation")]
|
||||
remote_watchlist: fed_remote_watchlist_repo.clone(),
|
||||
#[cfg(feature = "federation")]
|
||||
social_query: fed_social_query,
|
||||
},
|
||||
services: Services {
|
||||
auth: auth_service,
|
||||
password_hasher,
|
||||
metadata: metadata_client,
|
||||
poster_fetcher,
|
||||
image_storage,
|
||||
event_publisher: event_publisher_arc,
|
||||
diary_exporter: Arc::new(ExportAdapter) as Arc<dyn DiaryExporter>,
|
||||
document_parser: Arc::new(ImporterDocumentParser) as Arc<dyn DocumentParser>,
|
||||
},
|
||||
config: app_config,
|
||||
};
|
||||
|
||||
@@ -113,10 +113,10 @@ async fn main() -> anyhow::Result<()> {
|
||||
tracing::info!("TMDb enrichment enabled");
|
||||
let handler = Arc::new(tmdb_enrichment::EnrichmentHandler {
|
||||
enrichment_client: Arc::new(client),
|
||||
movie_repository: Arc::clone(&ctx.movie_repository),
|
||||
profile_repo: Arc::clone(&ctx.movie_profile_repository),
|
||||
person_command: Arc::clone(&ctx.person_command),
|
||||
search_command: Arc::clone(&ctx.search_command),
|
||||
movie_repository: Arc::clone(&ctx.repos.movie),
|
||||
profile_repo: Arc::clone(&ctx.repos.movie_profile),
|
||||
person_command: Arc::clone(&ctx.repos.person_command),
|
||||
search_command: Arc::clone(&ctx.repos.search_command),
|
||||
}) as Arc<dyn EventHandler>;
|
||||
let job = Arc::new(application::jobs::EnrichmentStalenessJob::new(ctx.clone()))
|
||||
as Arc<dyn PeriodicJob>;
|
||||
@@ -131,10 +131,10 @@ async fn main() -> anyhow::Result<()> {
|
||||
// ── Image conversion ──────────────────────────────────────────────────────
|
||||
|
||||
let conversion = image_converter::build(
|
||||
Arc::clone(&ctx.image_storage),
|
||||
Arc::clone(&ctx.services.image_storage),
|
||||
image_ref_command,
|
||||
image_ref_query,
|
||||
Arc::clone(&ctx.event_publisher),
|
||||
Arc::clone(&ctx.services.event_publisher),
|
||||
)?;
|
||||
|
||||
// ── Periodic jobs ─────────────────────────────────────────────────────────
|
||||
@@ -166,27 +166,27 @@ async fn main() -> anyhow::Result<()> {
|
||||
|
||||
let handlers: Vec<Arc<dyn EventHandler>> = {
|
||||
let poster = Arc::new(poster_sync::PosterSyncHandler::new(
|
||||
Arc::clone(&ctx.movie_repository),
|
||||
Arc::clone(&ctx.metadata_client),
|
||||
Arc::clone(&ctx.poster_fetcher),
|
||||
Arc::clone(&ctx.image_storage),
|
||||
Arc::clone(&ctx.event_publisher),
|
||||
Arc::clone(&ctx.repos.movie),
|
||||
Arc::clone(&ctx.services.metadata),
|
||||
Arc::clone(&ctx.services.poster_fetcher),
|
||||
Arc::clone(&ctx.services.image_storage),
|
||||
Arc::clone(&ctx.services.event_publisher),
|
||||
3,
|
||||
)) as Arc<dyn EventHandler>;
|
||||
|
||||
let cleanup = Arc::new(image_storage::ImageCleanupHandler::new(Arc::clone(
|
||||
&ctx.image_storage,
|
||||
&ctx.services.image_storage,
|
||||
))) as Arc<dyn EventHandler>;
|
||||
|
||||
#[cfg(not(feature = "federation"))]
|
||||
{
|
||||
let search_cleanup = Arc::new(SearchCleanupHandler::new(
|
||||
Arc::clone(&ctx.search_command),
|
||||
Arc::clone(&ctx.person_query),
|
||||
Arc::clone(&ctx.repos.search_command),
|
||||
Arc::clone(&ctx.repos.person_query),
|
||||
)) as Arc<dyn EventHandler>;
|
||||
let discovery_indexer = Arc::new(MovieDiscoveryIndexer::new(
|
||||
Arc::clone(&ctx.movie_repository),
|
||||
Arc::clone(&ctx.search_command),
|
||||
Arc::clone(&ctx.repos.movie),
|
||||
Arc::clone(&ctx.repos.search_command),
|
||||
)) as Arc<dyn EventHandler>;
|
||||
let mut h = vec![poster, cleanup, search_cleanup, discovery_indexer];
|
||||
if let Some(e) = enrichment_handler {
|
||||
@@ -211,7 +211,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
user_repo: fed_user_repo,
|
||||
base_url,
|
||||
allow_registration,
|
||||
event_publisher: Arc::clone(&ctx.event_publisher),
|
||||
event_publisher: Arc::clone(&ctx.services.event_publisher),
|
||||
})
|
||||
.await?;
|
||||
|
||||
@@ -221,12 +221,12 @@ async fn main() -> anyhow::Result<()> {
|
||||
}) as Arc<dyn EventHandler>;
|
||||
|
||||
let search_cleanup = Arc::new(SearchCleanupHandler::new(
|
||||
Arc::clone(&ctx.search_command),
|
||||
Arc::clone(&ctx.person_query),
|
||||
Arc::clone(&ctx.repos.search_command),
|
||||
Arc::clone(&ctx.repos.person_query),
|
||||
)) as Arc<dyn EventHandler>;
|
||||
let discovery_indexer = Arc::new(MovieDiscoveryIndexer::new(
|
||||
Arc::clone(&ctx.movie_repository),
|
||||
Arc::clone(&ctx.search_command),
|
||||
Arc::clone(&ctx.repos.movie),
|
||||
Arc::clone(&ctx.repos.search_command),
|
||||
)) as Arc<dyn EventHandler>;
|
||||
tracing::info!("federation event handler registered");
|
||||
let mut h = vec![
|
||||
|
||||
Reference in New Issue
Block a user