feat: wire event_publisher through activitypub::wire()
This commit is contained in:
@@ -36,6 +36,7 @@ pub async fn wire(
|
|||||||
diary_repo: std::sync::Arc<dyn domain::ports::DiaryRepository>,
|
diary_repo: std::sync::Arc<dyn domain::ports::DiaryRepository>,
|
||||||
base_url: String,
|
base_url: String,
|
||||||
allow_registration: bool,
|
allow_registration: bool,
|
||||||
|
event_publisher: std::sync::Arc<dyn domain::ports::EventPublisher>,
|
||||||
) -> anyhow::Result<ActivityPubWire> {
|
) -> anyhow::Result<ActivityPubWire> {
|
||||||
let review_handler = std::sync::Arc::new(ReviewObjectHandler {
|
let review_handler = std::sync::Arc::new(ReviewObjectHandler {
|
||||||
movie_repository: std::sync::Arc::clone(&movie_repo),
|
movie_repository: std::sync::Arc::clone(&movie_repo),
|
||||||
@@ -60,7 +61,7 @@ pub async fn wire(
|
|||||||
allow_registration,
|
allow_registration,
|
||||||
"movies-diary".to_string(),
|
"movies-diary".to_string(),
|
||||||
cfg!(debug_assertions),
|
cfg!(debug_assertions),
|
||||||
None, // event_publisher wired in Task 6
|
Some(event_publisher),
|
||||||
)
|
)
|
||||||
.await?,
|
.await?,
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -86,20 +86,6 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> {
|
|||||||
_ => anyhow::bail!("DATABASE_BACKEND={backend} federation is not supported by this build"),
|
_ => anyhow::bail!("DATABASE_BACKEND={backend} federation is not supported by this build"),
|
||||||
};
|
};
|
||||||
|
|
||||||
let ap = activitypub::wire(
|
|
||||||
federation_repo,
|
|
||||||
review_store,
|
|
||||||
remote_watchlist_repo.clone(),
|
|
||||||
Arc::clone(&user_repository),
|
|
||||||
Arc::clone(&movie_repository),
|
|
||||||
Arc::clone(&review_repository),
|
|
||||||
Arc::clone(&diary_repository),
|
|
||||||
app_config.base_url.clone(),
|
|
||||||
app_config.allow_registration,
|
|
||||||
).await?;
|
|
||||||
let ap_router = ap.router;
|
|
||||||
let ap_service_arc = ap.service;
|
|
||||||
|
|
||||||
let ep: Arc<dyn EventPublisher> = match event_bus {
|
let ep: Arc<dyn EventPublisher> = match event_bus {
|
||||||
EventBusBackend::Db => {
|
EventBusBackend::Db => {
|
||||||
tracing::info!("event bus: DB queue");
|
tracing::info!("event bus: DB queue");
|
||||||
@@ -122,6 +108,22 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> {
|
|||||||
nats::create_publisher(cfg).await?
|
nats::create_publisher(cfg).await?
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let ap = activitypub::wire(
|
||||||
|
federation_repo,
|
||||||
|
review_store,
|
||||||
|
remote_watchlist_repo.clone(),
|
||||||
|
Arc::clone(&user_repository),
|
||||||
|
Arc::clone(&movie_repository),
|
||||||
|
Arc::clone(&review_repository),
|
||||||
|
Arc::clone(&diary_repository),
|
||||||
|
app_config.base_url.clone(),
|
||||||
|
app_config.allow_registration,
|
||||||
|
Arc::clone(&ep),
|
||||||
|
).await?;
|
||||||
|
let ap_router = ap.router;
|
||||||
|
let ap_service_arc = ap.service;
|
||||||
|
|
||||||
(ep, ap_router, ap_service_arc, social_query_arc, remote_watchlist_repo)
|
(ep, ap_router, ap_service_arc, social_query_arc, remote_watchlist_repo)
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -166,7 +166,7 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
|
|
||||||
#[cfg(feature = "federation")]
|
#[cfg(feature = "federation")]
|
||||||
{
|
{
|
||||||
let ap = activitypub::wire(
|
let ap_wire = activitypub::wire(
|
||||||
fed_federation_repo,
|
fed_federation_repo,
|
||||||
fed_review_store,
|
fed_review_store,
|
||||||
fed_remote_watchlist_repo,
|
fed_remote_watchlist_repo,
|
||||||
@@ -176,12 +176,16 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
fed_diary_repo,
|
fed_diary_repo,
|
||||||
base_url,
|
base_url,
|
||||||
allow_registration,
|
allow_registration,
|
||||||
).await?.event_handler;
|
Arc::clone(&ctx.event_publisher),
|
||||||
|
).await?;
|
||||||
|
|
||||||
|
let ap_event_handler = ap_wire.event_handler;
|
||||||
|
let _ap_service = ap_wire.service; // used by FollowBackfillHandler in Task 8
|
||||||
|
|
||||||
let search_cleanup = Arc::new(SearchCleanupHandler::new(Arc::clone(&ctx.search_command), Arc::clone(&ctx.person_query))) as Arc<dyn EventHandler>;
|
let search_cleanup = Arc::new(SearchCleanupHandler::new(Arc::clone(&ctx.search_command), Arc::clone(&ctx.person_query))) as Arc<dyn EventHandler>;
|
||||||
let discovery_indexer = Arc::new(MovieDiscoveryIndexer::new(Arc::clone(&ctx.movie_repository), Arc::clone(&ctx.search_command))) as Arc<dyn EventHandler>;
|
let discovery_indexer = Arc::new(MovieDiscoveryIndexer::new(Arc::clone(&ctx.movie_repository), Arc::clone(&ctx.search_command))) as Arc<dyn EventHandler>;
|
||||||
tracing::info!("federation event handler registered");
|
tracing::info!("federation event handler registered");
|
||||||
let mut h = vec![poster, cleanup, ap, search_cleanup, discovery_indexer];
|
let mut h = vec![poster, cleanup, ap_event_handler, search_cleanup, discovery_indexer];
|
||||||
if let Some(e) = enrichment_handler { h.push(e); }
|
if let Some(e) = enrichment_handler { h.push(e); }
|
||||||
if let Some((ref conv_handler, _)) = conversion { h.push(Arc::clone(conv_handler)); }
|
if let Some((ref conv_handler, _)) = conversion { h.push(Arc::clone(conv_handler)); }
|
||||||
h
|
h
|
||||||
|
|||||||
Reference in New Issue
Block a user