diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index 0ce3938..710d629 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -31,6 +31,7 @@ importer = { workspace = true } image-converter = { workspace = true } nats = { workspace = true, optional = true } sqlx = { workspace = true } +async-trait = { workspace = true } # Optional — database backends sqlite = { workspace = true, optional = true } diff --git a/crates/worker/src/follow_backfill_handler.rs b/crates/worker/src/follow_backfill_handler.rs new file mode 100644 index 0000000..5c99613 --- /dev/null +++ b/crates/worker/src/follow_backfill_handler.rs @@ -0,0 +1,22 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use domain::{errors::DomainError, events::DomainEvent, ports::EventHandler}; + +pub struct FollowBackfillHandler { + pub ap_service: Arc, +} + +#[async_trait] +impl EventHandler for FollowBackfillHandler { + async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> { + let DomainEvent::FollowAccepted { remote_actor_url, outbox_url, .. } = event else { + return Ok(()); + }; + tracing::info!(actor = %remote_actor_url, outbox = %outbox_url, "starting outbox backfill"); + self.ap_service + .backfill_outbox(outbox_url, remote_actor_url) + .await + .map_err(|e| DomainError::InfrastructureError(e.to_string())) + } +} diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index df406a9..e25ddc5 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -1,5 +1,6 @@ mod db; mod event_bus; +mod follow_backfill_handler; use std::sync::Arc; @@ -180,12 +181,14 @@ async fn main() -> anyhow::Result<()> { ).await?; let ap_event_handler = ap_wire.event_handler; - let _ap_service = ap_wire.service; // used by FollowBackfillHandler in Task 8 + let backfill = Arc::new(follow_backfill_handler::FollowBackfillHandler { + ap_service: ap_wire.service, + }) as Arc; let search_cleanup = Arc::new(SearchCleanupHandler::new(Arc::clone(&ctx.search_command), Arc::clone(&ctx.person_query))) as Arc; let discovery_indexer = Arc::new(MovieDiscoveryIndexer::new(Arc::clone(&ctx.movie_repository), Arc::clone(&ctx.search_command))) as Arc; tracing::info!("federation event handler registered"); - let mut h = vec![poster, cleanup, ap_event_handler, search_cleanup, discovery_indexer]; + let mut h = vec![poster, cleanup, ap_event_handler, backfill, search_cleanup, discovery_indexer]; if let Some(e) = enrichment_handler { h.push(e); } if let Some((ref conv_handler, _)) = conversion { h.push(Arc::clone(conv_handler)); } h