338 lines
13 KiB
Rust
338 lines
13 KiB
Rust
mod db;
|
|
mod event_bus;
|
|
mod follow_backfill_handler;
|
|
|
|
use std::sync::Arc;
|
|
|
|
use anyhow::Context;
|
|
use application::{
|
|
MovieDiscoveryIndexer, SearchCleanupHandler, SearchReindexHandler,
|
|
config::AppConfig,
|
|
context::{AppContext, Repositories, Services},
|
|
worker::WorkerService,
|
|
};
|
|
use export::ExportAdapter;
|
|
use importer::ImporterDocumentParser;
|
|
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
|
|
|
use domain::ports::{
|
|
DiaryExporter, DocumentParser, EventHandler, MovieEnrichmentClient, PeriodicJob,
|
|
PersonEnrichmentClient,
|
|
};
|
|
|
|
#[cfg(not(any(feature = "sqlite", feature = "postgres")))]
|
|
compile_error!(
|
|
"At least one database backend must be enabled. Use --features sqlite or --features postgres"
|
|
);
|
|
|
|
#[tokio::main]
|
|
async fn main() -> anyhow::Result<()> {
|
|
dotenvy::dotenv().ok();
|
|
init_tracing();
|
|
|
|
let database_url = std::env::var("DATABASE_URL").context("DATABASE_URL must be set")?;
|
|
let backend = std::env::var("DATABASE_BACKEND").unwrap_or_else(|_| "sqlite".to_string());
|
|
let app_config = AppConfig::from_env();
|
|
|
|
let (auth_service, password_hasher) = auth::create()?;
|
|
let metadata_client = metadata::create()?;
|
|
let poster_fetcher = poster_fetcher::create()?;
|
|
let object_storage = object_storage::create()?;
|
|
|
|
let db = db::connect(&database_url, &backend).await?;
|
|
let (event_publisher_arc, consumer_arc) = event_bus::create(&db.db_pool).await?;
|
|
|
|
let image_ref_command = Arc::clone(&db.image_ref_command);
|
|
let image_ref_query = Arc::clone(&db.image_ref_query);
|
|
|
|
#[cfg(feature = "federation")]
|
|
let (fed_ap_content, fed_user_repo, base_url, allow_registration) = (
|
|
Arc::clone(&db.ap_content),
|
|
Arc::clone(&db.user),
|
|
app_config.base_url.clone(),
|
|
app_config.allow_registration,
|
|
);
|
|
// Wire federation repos early to get remote_watchlist_repo for AppContext.
|
|
#[cfg(feature = "federation")]
|
|
let (
|
|
fed_activity_repo,
|
|
fed_follow_repo,
|
|
fed_actor_repo,
|
|
fed_blocklist_repo,
|
|
fed_social_query,
|
|
fed_review_store,
|
|
fed_remote_watchlist_repo,
|
|
) = match &db.db_pool {
|
|
#[cfg(feature = "sqlite-federation")]
|
|
db::DbPool::Sqlite(pool) => sqlite_federation::wire(pool.clone()),
|
|
#[cfg(feature = "postgres-federation")]
|
|
db::DbPool::Postgres(pool) => postgres_federation::wire(pool.clone()),
|
|
};
|
|
|
|
let review_logger = Arc::new(application::diary::review_logger::DefaultReviewLogger::new(
|
|
Arc::clone(&db.movie),
|
|
Arc::clone(&db.review),
|
|
Arc::clone(&db.watchlist),
|
|
Arc::clone(&metadata_client),
|
|
Arc::clone(&event_publisher_arc),
|
|
));
|
|
|
|
let mut ctx = AppContext {
|
|
repos: Repositories {
|
|
movie: db.movie,
|
|
review: db.review,
|
|
diary: db.diary,
|
|
stats: db.stats,
|
|
user: db.user,
|
|
import_session: db.import_session,
|
|
import_profile: db.import_profile,
|
|
movie_profile: db.movie_profile,
|
|
watchlist: db.watchlist,
|
|
watch_event: db.watch_event,
|
|
webhook_token: db.webhook_token,
|
|
profile_fields: db.profile_fields,
|
|
person_command: db.person_command,
|
|
person_query: db.person_query,
|
|
search_port: db.search_port,
|
|
search_command: db.search_command,
|
|
#[cfg(feature = "federation")]
|
|
remote_watchlist: fed_remote_watchlist_repo.clone(),
|
|
#[cfg(not(feature = "federation"))]
|
|
remote_watchlist: Arc::new(domain::testing::NoopRemoteWatchlistRepository),
|
|
#[cfg(feature = "federation")]
|
|
social_query: fed_social_query,
|
|
#[cfg(not(feature = "federation"))]
|
|
social_query: Arc::new(domain::testing::NoopSocialQueryPort),
|
|
wrapup_stats: db.wrapup_stats,
|
|
wrapup_repo: db.wrapup_repo,
|
|
goal: db.goal,
|
|
user_settings: db.user_settings,
|
|
remote_goal: db.remote_goal,
|
|
refresh_session: db.refresh_session,
|
|
},
|
|
services: Services {
|
|
auth: auth_service,
|
|
password_hasher,
|
|
metadata: metadata_client,
|
|
poster_fetcher,
|
|
object_storage,
|
|
event_publisher: event_publisher_arc,
|
|
diary_exporter: Arc::new(ExportAdapter) as Arc<dyn DiaryExporter>,
|
|
document_parser: Arc::new(ImporterDocumentParser) as Arc<dyn DocumentParser>,
|
|
review_logger,
|
|
person_enrichment: None,
|
|
},
|
|
config: app_config,
|
|
};
|
|
|
|
// ── Enrichment ────────────────────────────────────────────────────────────
|
|
// Both the event handler and the staleness job are gated on TMDB_API_KEY.
|
|
// Without a key, no MovieEnrichmentRequested events are produced or handled.
|
|
|
|
type EnrichmentParts = (
|
|
Option<Arc<dyn EventHandler>>,
|
|
Option<Arc<dyn EventHandler>>,
|
|
Option<Arc<dyn PeriodicJob>>,
|
|
);
|
|
let (enrichment_handler, person_enrichment_handler, enrichment_job): EnrichmentParts =
|
|
match tmdb_enrichment::TmdbEnrichmentClient::from_env() {
|
|
Ok(client) => {
|
|
tracing::info!("TMDb enrichment enabled");
|
|
let client = Arc::new(client);
|
|
let handler = Arc::new(tmdb_enrichment::MovieEnrichmentHandler::new(
|
|
Arc::clone(&client) as Arc<dyn MovieEnrichmentClient>,
|
|
Arc::clone(&ctx.repos.movie),
|
|
Arc::clone(&ctx.repos.movie_profile),
|
|
Arc::clone(&ctx.repos.person_command),
|
|
Arc::clone(&ctx.repos.search_command),
|
|
Arc::clone(&ctx.services.object_storage),
|
|
)) as Arc<dyn EventHandler>;
|
|
ctx.services.person_enrichment =
|
|
Some(Arc::clone(&client) as Arc<dyn PersonEnrichmentClient>);
|
|
let person_handler =
|
|
Arc::new(tmdb_enrichment::PersonEnrichmentHandler::new(ctx.clone()))
|
|
as Arc<dyn EventHandler>;
|
|
let job = Arc::new(application::jobs::EnrichmentStalenessJob::new(ctx.clone()))
|
|
as Arc<dyn PeriodicJob>;
|
|
(Some(handler), Some(person_handler), Some(job))
|
|
}
|
|
Err(e) => {
|
|
tracing::warn!("TMDb enrichment disabled: {e}");
|
|
(None, None, None)
|
|
}
|
|
};
|
|
|
|
// ── Image conversion ──────────────────────────────────────────────────────
|
|
|
|
let conversion = image_converter::build(
|
|
Arc::clone(&ctx.services.object_storage),
|
|
image_ref_command,
|
|
image_ref_query,
|
|
Arc::clone(&ctx.services.event_publisher),
|
|
)?;
|
|
|
|
// ── Periodic jobs ─────────────────────────────────────────────────────────
|
|
|
|
let mut periodic_jobs: Vec<Arc<dyn PeriodicJob>> = vec![
|
|
Arc::new(application::jobs::ImportSessionCleanupJob::new(ctx.repos.import_session.clone())),
|
|
Arc::new(application::jobs::WatchEventCleanupJob::new(ctx.clone())),
|
|
Arc::new(application::jobs::WrapUpAutoGenerateJob::new(ctx.clone())),
|
|
Arc::new(application::jobs::WrapUpCleanupJob::new(ctx.clone())),
|
|
Arc::new(application::jobs::RefreshSessionCleanupJob::new(
|
|
ctx.clone(),
|
|
)),
|
|
];
|
|
if let Some(job) = enrichment_job {
|
|
periodic_jobs.push(job);
|
|
}
|
|
if let Some((_, ref conv_job)) = conversion {
|
|
periodic_jobs.push(Arc::clone(conv_job));
|
|
}
|
|
|
|
for job in periodic_jobs {
|
|
tokio::spawn(async move {
|
|
let mut tick = tokio::time::interval(job.interval());
|
|
loop {
|
|
tick.tick().await;
|
|
if let Err(e) = job.run().await {
|
|
tracing::error!("periodic job failed: {e}");
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
// ── Event handlers ────────────────────────────────────────────────────────
|
|
|
|
let handlers: Vec<Arc<dyn EventHandler>> = {
|
|
let poster = Arc::new(poster_sync::PosterSyncHandler::new(
|
|
Arc::clone(&ctx.repos.movie),
|
|
Arc::clone(&ctx.services.metadata),
|
|
Arc::clone(&ctx.services.poster_fetcher),
|
|
Arc::clone(&ctx.services.object_storage),
|
|
Arc::clone(&ctx.services.event_publisher),
|
|
3,
|
|
)) as Arc<dyn EventHandler>;
|
|
|
|
let cleanup = Arc::new(object_storage::ImageCleanupHandler::new(Arc::clone(
|
|
&ctx.services.object_storage,
|
|
))) as Arc<dyn EventHandler>;
|
|
|
|
#[cfg(not(feature = "federation"))]
|
|
{
|
|
let search_cleanup = Arc::new(SearchCleanupHandler::new(
|
|
Arc::clone(&ctx.repos.search_command),
|
|
Arc::clone(&ctx.repos.person_query),
|
|
)) as Arc<dyn EventHandler>;
|
|
let discovery_indexer = Arc::new(MovieDiscoveryIndexer::new(
|
|
Arc::clone(&ctx.repos.movie),
|
|
Arc::clone(&ctx.repos.search_command),
|
|
)) as Arc<dyn EventHandler>;
|
|
let wrapup_handler = Arc::new(
|
|
application::wrapup::event_handler::WrapUpEventHandler::new(ctx.clone()),
|
|
) as Arc<dyn EventHandler>;
|
|
let reindex_handler =
|
|
Arc::new(SearchReindexHandler::new(ctx.clone())) as Arc<dyn EventHandler>;
|
|
let mut h = vec![
|
|
poster,
|
|
cleanup,
|
|
search_cleanup,
|
|
discovery_indexer,
|
|
wrapup_handler,
|
|
reindex_handler,
|
|
];
|
|
if let Some(e) = enrichment_handler {
|
|
h.push(e);
|
|
}
|
|
if let Some(e) = person_enrichment_handler {
|
|
h.push(e);
|
|
}
|
|
if let Some((ref conv_handler, _)) = conversion {
|
|
h.push(Arc::clone(conv_handler));
|
|
}
|
|
h
|
|
}
|
|
|
|
#[cfg(feature = "federation")]
|
|
{
|
|
let ap_wire = activitypub::wire(activitypub::ActivityPubDeps {
|
|
activity_repo: fed_activity_repo,
|
|
follow_repo: fed_follow_repo,
|
|
actor_repo: fed_actor_repo,
|
|
blocklist_repo: fed_blocklist_repo,
|
|
review_store: fed_review_store,
|
|
remote_watchlist_repo: fed_remote_watchlist_repo,
|
|
remote_goal_repo: Arc::clone(&ctx.repos.remote_goal),
|
|
local_ap_content: fed_ap_content,
|
|
user_repo: fed_user_repo,
|
|
base_url,
|
|
allow_registration,
|
|
event_publisher: Arc::clone(&ctx.services.event_publisher),
|
|
})
|
|
.await?;
|
|
|
|
let ap_event_handler = ap_wire.event_handler;
|
|
let backfill = Arc::new(follow_backfill_handler::FollowBackfillHandler {
|
|
ap_service: ap_wire.service,
|
|
}) as Arc<dyn EventHandler>;
|
|
|
|
let search_cleanup = Arc::new(SearchCleanupHandler::new(
|
|
Arc::clone(&ctx.repos.search_command),
|
|
Arc::clone(&ctx.repos.person_query),
|
|
)) as Arc<dyn EventHandler>;
|
|
let discovery_indexer = Arc::new(MovieDiscoveryIndexer::new(
|
|
Arc::clone(&ctx.repos.movie),
|
|
Arc::clone(&ctx.repos.search_command),
|
|
)) as Arc<dyn EventHandler>;
|
|
tracing::info!("federation event handler registered");
|
|
let wrapup_handler = Arc::new(
|
|
application::wrapup::event_handler::WrapUpEventHandler::new(ctx.clone()),
|
|
) as Arc<dyn EventHandler>;
|
|
let reindex_handler =
|
|
Arc::new(SearchReindexHandler::new(ctx.clone())) as Arc<dyn EventHandler>;
|
|
let mut h = vec![
|
|
poster,
|
|
cleanup,
|
|
ap_event_handler,
|
|
backfill,
|
|
search_cleanup,
|
|
discovery_indexer,
|
|
wrapup_handler,
|
|
reindex_handler,
|
|
];
|
|
if let Some(e) = enrichment_handler {
|
|
h.push(e);
|
|
}
|
|
if let Some(e) = person_enrichment_handler {
|
|
h.push(e);
|
|
}
|
|
if let Some((ref conv_handler, _)) = conversion {
|
|
h.push(Arc::clone(conv_handler));
|
|
}
|
|
h
|
|
}
|
|
};
|
|
|
|
// ── Run ───────────────────────────────────────────────────────────────────
|
|
|
|
let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
|
|
tokio::spawn(async move {
|
|
tokio::signal::ctrl_c().await.ok();
|
|
let _ = shutdown_tx.send(true);
|
|
});
|
|
|
|
let worker = WorkerService::new(consumer_arc, handlers);
|
|
tracing::info!("worker started");
|
|
worker.run(shutdown_rx).await;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
fn init_tracing() {
|
|
let filter = std::env::var("RUST_LOG")
|
|
.unwrap_or_else(|_| "worker=info,application=info,k_ap=info".to_string());
|
|
tracing_subscriber::registry()
|
|
.with(tracing_subscriber::EnvFilter::new(filter))
|
|
.with(tracing_subscriber::fmt::layer())
|
|
.init();
|
|
}
|