diff --git a/src/data.rs b/src/data.rs index 7238271..e85957d 100644 --- a/src/data.rs +++ b/src/data.rs @@ -8,26 +8,39 @@ use crate::user::ApUserRepository; /// Typed event emitted by the federation layer. /// -/// When an [`EventPublisher`] is configured, outbound activities are NOT -/// delivered directly — instead a [`FederationEvent::DeliveryRequested`] event -/// is published per inbox. The consumer's job queue should: -/// 1. Persist the event. -/// 2. Call [`crate::service::ActivityPubService::deliver_to_inbox`] when processing. +/// **Delivery:** When an [`EventPublisher`] is configured, outbound activities +/// are published as [`FederationEvent::DeliveryRequested`] instead of being sent +/// directly. Process them by calling +/// [`crate::service::ActivityPubService::deliver_to_inbox`]. /// -/// Without a publisher, the library falls back to `tokio::spawn` delivery. +/// **Backfill:** When a follower is accepted and an [`EventPublisher`] is +/// configured, [`FederationEvent::BackfillRequested`] is published instead of +/// spawning an in-process task. Process it by calling +/// [`crate::service::ActivityPubService::run_backfill_for_follower`]. +/// +/// Without a publisher, both fall back to `tokio::spawn`. #[derive(Debug, Clone)] pub enum FederationEvent { + /// An outbound activity must be delivered to `inbox`. + /// Call `ActivityPubService::deliver_to_inbox(inbox, activity, signing_actor_id)`. DeliveryRequested { inbox: url::Url, activity: serde_json::Value, signing_actor_id: uuid::Uuid, }, + /// Delivery to `inbox` failed permanently after all in-process retries. DeliveryFailed { inbox: url::Url, activity: serde_json::Value, signing_actor_id: uuid::Uuid, error: String, }, + /// A new follower was accepted and their inbox needs backfilling. + /// Call `ActivityPubService::run_backfill_for_follower(owner_user_id, follower_inbox_url)`. + BackfillRequested { + owner_user_id: uuid::Uuid, + follower_inbox_url: String, + }, } /// Receives typed federation events. diff --git a/src/service/backfill.rs b/src/service/backfill.rs index 114d2e8..a4e70ea 100644 --- a/src/service/backfill.rs +++ b/src/service/backfill.rs @@ -85,27 +85,70 @@ impl ActivityPubService { Ok(()) } - /// Spawns the backfill task in the background. + /// Route backfill through [`EventPublisher`] (if configured) or fall back + /// to a fire-and-forget `tokio::spawn`. + /// + /// When `EventPublisher` is set, a [`FederationEvent::BackfillRequested`] + /// event is published so the consumer's job queue can process it — allowing + /// backfill to run in a separate worker process rather than in the API server. + /// The worker calls [`ActivityPubService::run_backfill_for_follower`] to execute. + /// /// `pub(crate)` so `service::follow` can call it from `accept_follower`. pub(crate) fn spawn_backfill(&self, owner_user_id: uuid::Uuid, follower_inbox_url: String) { - let config = self.federation_config.clone(); - let base_url = self.base_url.clone(); - let max_attempts = self.delivery_max_attempts; - let initial_delay = self.delivery_initial_delay_secs; - tokio::spawn(async move { - if let Err(e) = ActivityPubService::run_backfill( - config, - base_url, + let data = self.federation_config.to_request_data(); + if let Some(publisher) = data.event_publisher.as_ref() { + let publisher = publisher.clone(); + let event = crate::data::FederationEvent::BackfillRequested { owner_user_id, follower_inbox_url, - max_attempts, - initial_delay, - ) - .await - { - tracing::warn!(error = %e, "backfill: task failed"); - } - }); + }; + tokio::spawn(async move { + if let Err(e) = publisher.publish(event).await { + tracing::warn!(error = %e, "failed to enqueue BackfillRequested event"); + } + }); + } else { + let config = self.federation_config.clone(); + let base_url = self.base_url.clone(); + let max_attempts = self.delivery_max_attempts; + let initial_delay = self.delivery_initial_delay_secs; + tokio::spawn(async move { + if let Err(e) = ActivityPubService::run_backfill( + config, + base_url, + owner_user_id, + follower_inbox_url, + max_attempts, + initial_delay, + ) + .await + { + tracing::warn!(error = %e, "backfill: task failed"); + } + }); + } + } + + /// Execute backfill for a single follower inbox. Call this from a job-queue + /// consumer that received a [`FederationEvent::BackfillRequested`] event. + /// + /// Sends all of `owner_user_id`'s locally-authored content to `follower_inbox_url`, + /// oldest-to-newest, with a small sleep between batches to avoid overwhelming + /// the remote server. + pub async fn run_backfill_for_follower( + &self, + owner_user_id: uuid::Uuid, + follower_inbox_url: String, + ) -> anyhow::Result<()> { + ActivityPubService::run_backfill( + self.federation_config.clone(), + self.base_url.clone(), + owner_user_id, + follower_inbox_url, + self.delivery_max_attempts, + self.delivery_initial_delay_secs, + ) + .await } async fn run_backfill(