feat: route backfill through EventPublisher; add run_backfill_for_follower

This commit is contained in:
2026-05-29 02:05:03 +02:00
parent 0519bed66c
commit d5f75b4b57
2 changed files with 79 additions and 23 deletions

View File

@@ -8,26 +8,39 @@ use crate::user::ApUserRepository;
/// Typed event emitted by the federation layer. /// Typed event emitted by the federation layer.
/// ///
/// When an [`EventPublisher`] is configured, outbound activities are NOT /// **Delivery:** When an [`EventPublisher`] is configured, outbound activities
/// delivered directly — instead a [`FederationEvent::DeliveryRequested`] event /// are published as [`FederationEvent::DeliveryRequested`] instead of being sent
/// is published per inbox. The consumer's job queue should: /// directly. Process them by calling
/// 1. Persist the event. /// [`crate::service::ActivityPubService::deliver_to_inbox`].
/// 2. Call [`crate::service::ActivityPubService::deliver_to_inbox`] when processing.
/// ///
/// Without a publisher, the library falls back to `tokio::spawn` delivery. /// **Backfill:** When a follower is accepted and an [`EventPublisher`] is
/// configured, [`FederationEvent::BackfillRequested`] is published instead of
/// spawning an in-process task. Process it by calling
/// [`crate::service::ActivityPubService::run_backfill_for_follower`].
///
/// Without a publisher, both fall back to `tokio::spawn`.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum FederationEvent { pub enum FederationEvent {
/// An outbound activity must be delivered to `inbox`.
/// Call `ActivityPubService::deliver_to_inbox(inbox, activity, signing_actor_id)`.
DeliveryRequested { DeliveryRequested {
inbox: url::Url, inbox: url::Url,
activity: serde_json::Value, activity: serde_json::Value,
signing_actor_id: uuid::Uuid, signing_actor_id: uuid::Uuid,
}, },
/// Delivery to `inbox` failed permanently after all in-process retries.
DeliveryFailed { DeliveryFailed {
inbox: url::Url, inbox: url::Url,
activity: serde_json::Value, activity: serde_json::Value,
signing_actor_id: uuid::Uuid, signing_actor_id: uuid::Uuid,
error: String, error: String,
}, },
/// A new follower was accepted and their inbox needs backfilling.
/// Call `ActivityPubService::run_backfill_for_follower(owner_user_id, follower_inbox_url)`.
BackfillRequested {
owner_user_id: uuid::Uuid,
follower_inbox_url: String,
},
} }
/// Receives typed federation events. /// Receives typed federation events.

View File

@@ -85,9 +85,29 @@ impl ActivityPubService {
Ok(()) Ok(())
} }
/// Spawns the backfill task in the background. /// Route backfill through [`EventPublisher`] (if configured) or fall back
/// to a fire-and-forget `tokio::spawn`.
///
/// When `EventPublisher` is set, a [`FederationEvent::BackfillRequested`]
/// event is published so the consumer's job queue can process it — allowing
/// backfill to run in a separate worker process rather than in the API server.
/// The worker calls [`ActivityPubService::run_backfill_for_follower`] to execute.
///
/// `pub(crate)` so `service::follow` can call it from `accept_follower`. /// `pub(crate)` so `service::follow` can call it from `accept_follower`.
pub(crate) fn spawn_backfill(&self, owner_user_id: uuid::Uuid, follower_inbox_url: String) { pub(crate) fn spawn_backfill(&self, owner_user_id: uuid::Uuid, follower_inbox_url: String) {
let data = self.federation_config.to_request_data();
if let Some(publisher) = data.event_publisher.as_ref() {
let publisher = publisher.clone();
let event = crate::data::FederationEvent::BackfillRequested {
owner_user_id,
follower_inbox_url,
};
tokio::spawn(async move {
if let Err(e) = publisher.publish(event).await {
tracing::warn!(error = %e, "failed to enqueue BackfillRequested event");
}
});
} else {
let config = self.federation_config.clone(); let config = self.federation_config.clone();
let base_url = self.base_url.clone(); let base_url = self.base_url.clone();
let max_attempts = self.delivery_max_attempts; let max_attempts = self.delivery_max_attempts;
@@ -107,6 +127,29 @@ impl ActivityPubService {
} }
}); });
} }
}
/// Execute backfill for a single follower inbox. Call this from a job-queue
/// consumer that received a [`FederationEvent::BackfillRequested`] event.
///
/// Sends all of `owner_user_id`'s locally-authored content to `follower_inbox_url`,
/// oldest-to-newest, with a small sleep between batches to avoid overwhelming
/// the remote server.
pub async fn run_backfill_for_follower(
&self,
owner_user_id: uuid::Uuid,
follower_inbox_url: String,
) -> anyhow::Result<()> {
ActivityPubService::run_backfill(
self.federation_config.clone(),
self.base_url.clone(),
owner_user_id,
follower_inbox_url,
self.delivery_max_attempts,
self.delivery_initial_delay_secs,
)
.await
}
async fn run_backfill( async fn run_backfill(
config: ApFederationConfig, config: ApFederationConfig,