1
Async Delivery and EventPublisher
Gabriel Kaszewski edited this page 2026-05-29 02:14:20 +00:00

Async Delivery and EventPublisher

By default, k-ap delivers activities and runs backfill in-process via tokio::spawn. For production scale, implement EventPublisher to route these operations through your job queue.


Default mode

No configuration needed. k-ap spawns delivery and backfill tasks with tokio::spawn. Delivery is retried up to delivery_max_attempts times with exponential backoff starting at delivery_initial_delay_secs.


EventPublisher

use k_ap::{EventPublisher, FederationEvent};

#[async_trait]
impl EventPublisher for MyQueue {
    async fn publish(&self, event: FederationEvent) -> anyhow::Result<()> {
        match event {
            FederationEvent::DeliveryRequested { inbox, activity, signing_actor_id } => {
                // Persist to your job queue.
                // Your worker calls service.deliver_to_inbox(inbox, activity, signing_actor_id).
                self.enqueue_delivery(inbox, activity, signing_actor_id).await?;
            }
            FederationEvent::DeliveryFailed { inbox, error, .. } => {
                tracing::error!(%inbox, %error, "permanent delivery failure");
            }
            FederationEvent::BackfillRequested { owner_user_id, follower_inbox_url } => {
                // Persist to your job queue.
                // Your worker calls service.run_backfill_for_follower(owner_user_id, follower_inbox_url).
                self.enqueue_backfill(owner_user_id, follower_inbox_url).await?;
            }
        }
        Ok(())
    }
}

Register in the builder:

let service = ActivityPubService::builder("https://example.com")
    // ... repos ...
    .event_publisher(Arc::new(my_queue))
    .build()
    .await?;

FederationEvent variants

Variant Fields When published
DeliveryRequested inbox: Url, activity: Value, signing_actor_id: Uuid Every outbound activity delivery
DeliveryFailed inbox: Url, activity: Value, signing_actor_id: Uuid, error: String After all in-process retries exhausted
BackfillRequested owner_user_id: Uuid, follower_inbox_url: String When a new follower is accepted

Worker methods

// Execute a single delivery — called from a DeliveryRequested worker.
service.deliver_to_inbox(inbox, activity, signing_actor_id).await?;

// Send local content to a new follower's inbox — called from a BackfillRequested worker.
// Distinct from import_remote_outbox, which imports FROM a remote server.
service.run_backfill_for_follower(owner_user_id, follower_inbox_url).await?;

run_backfill_for_follower paginates through the local user's post history and delivers oldest-to-newest with a small sleep between batches to avoid overwhelming the remote server.