diff --git a/crates/adapters/activitypub/src/lib.rs b/crates/adapters/activitypub/src/lib.rs index 0ea746c..1e0babc 100644 --- a/crates/adapters/activitypub/src/lib.rs +++ b/crates/adapters/activitypub/src/lib.rs @@ -36,6 +36,7 @@ pub async fn wire( diary_repo: std::sync::Arc, base_url: String, allow_registration: bool, + event_publisher: std::sync::Arc, ) -> anyhow::Result { let review_handler = std::sync::Arc::new(ReviewObjectHandler { movie_repository: std::sync::Arc::clone(&movie_repo), @@ -60,7 +61,7 @@ pub async fn wire( allow_registration, "movies-diary".to_string(), cfg!(debug_assertions), - None, // event_publisher wired in Task 6 + Some(event_publisher), ) .await?, ); diff --git a/crates/presentation/src/main.rs b/crates/presentation/src/main.rs index 32a1fe5..c397643 100644 --- a/crates/presentation/src/main.rs +++ b/crates/presentation/src/main.rs @@ -86,20 +86,6 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> { _ => anyhow::bail!("DATABASE_BACKEND={backend} federation is not supported by this build"), }; - let ap = activitypub::wire( - federation_repo, - review_store, - remote_watchlist_repo.clone(), - Arc::clone(&user_repository), - Arc::clone(&movie_repository), - Arc::clone(&review_repository), - Arc::clone(&diary_repository), - app_config.base_url.clone(), - app_config.allow_registration, - ).await?; - let ap_router = ap.router; - let ap_service_arc = ap.service; - let ep: Arc = match event_bus { EventBusBackend::Db => { tracing::info!("event bus: DB queue"); @@ -122,6 +108,22 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> { nats::create_publisher(cfg).await? } }; + + let ap = activitypub::wire( + federation_repo, + review_store, + remote_watchlist_repo.clone(), + Arc::clone(&user_repository), + Arc::clone(&movie_repository), + Arc::clone(&review_repository), + Arc::clone(&diary_repository), + app_config.base_url.clone(), + app_config.allow_registration, + Arc::clone(&ep), + ).await?; + let ap_router = ap.router; + let ap_service_arc = ap.service; + (ep, ap_router, ap_service_arc, social_query_arc, remote_watchlist_repo) }; diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index 488f5f3..df406a9 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -166,7 +166,7 @@ async fn main() -> anyhow::Result<()> { #[cfg(feature = "federation")] { - let ap = activitypub::wire( + let ap_wire = activitypub::wire( fed_federation_repo, fed_review_store, fed_remote_watchlist_repo, @@ -176,12 +176,16 @@ async fn main() -> anyhow::Result<()> { fed_diary_repo, base_url, allow_registration, - ).await?.event_handler; + Arc::clone(&ctx.event_publisher), + ).await?; + + let ap_event_handler = ap_wire.event_handler; + let _ap_service = ap_wire.service; // used by FollowBackfillHandler in Task 8 let search_cleanup = Arc::new(SearchCleanupHandler::new(Arc::clone(&ctx.search_command), Arc::clone(&ctx.person_query))) as Arc; let discovery_indexer = Arc::new(MovieDiscoveryIndexer::new(Arc::clone(&ctx.movie_repository), Arc::clone(&ctx.search_command))) as Arc; tracing::info!("federation event handler registered"); - let mut h = vec![poster, cleanup, ap, search_cleanup, discovery_indexer]; + let mut h = vec![poster, cleanup, ap_event_handler, search_cleanup, discovery_indexer]; if let Some(e) = enrichment_handler { h.push(e); } if let Some((ref conv_handler, _)) = conversion { h.push(Arc::clone(conv_handler)); } h