Page:
Async Delivery and EventPublisher
Clone
1
Async Delivery and EventPublisher
Gabriel Kaszewski edited this page 2026-05-29 02:14:20 +00:00
Async Delivery and EventPublisher
By default, k-ap delivers activities and runs backfill in-process via tokio::spawn. For production scale, implement EventPublisher to route these operations through your job queue.
Default mode
No configuration needed. k-ap spawns delivery and backfill tasks with tokio::spawn. Delivery is retried up to delivery_max_attempts times with exponential backoff starting at delivery_initial_delay_secs.
EventPublisher
use k_ap::{EventPublisher, FederationEvent};
#[async_trait]
impl EventPublisher for MyQueue {
async fn publish(&self, event: FederationEvent) -> anyhow::Result<()> {
match event {
FederationEvent::DeliveryRequested { inbox, activity, signing_actor_id } => {
// Persist to your job queue.
// Your worker calls service.deliver_to_inbox(inbox, activity, signing_actor_id).
self.enqueue_delivery(inbox, activity, signing_actor_id).await?;
}
FederationEvent::DeliveryFailed { inbox, error, .. } => {
tracing::error!(%inbox, %error, "permanent delivery failure");
}
FederationEvent::BackfillRequested { owner_user_id, follower_inbox_url } => {
// Persist to your job queue.
// Your worker calls service.run_backfill_for_follower(owner_user_id, follower_inbox_url).
self.enqueue_backfill(owner_user_id, follower_inbox_url).await?;
}
}
Ok(())
}
}
Register in the builder:
let service = ActivityPubService::builder("https://example.com")
// ... repos ...
.event_publisher(Arc::new(my_queue))
.build()
.await?;
FederationEvent variants
| Variant | Fields | When published |
|---|---|---|
DeliveryRequested |
inbox: Url, activity: Value, signing_actor_id: Uuid |
Every outbound activity delivery |
DeliveryFailed |
inbox: Url, activity: Value, signing_actor_id: Uuid, error: String |
After all in-process retries exhausted |
BackfillRequested |
owner_user_id: Uuid, follower_inbox_url: String |
When a new follower is accepted |
Worker methods
// Execute a single delivery — called from a DeliveryRequested worker.
service.deliver_to_inbox(inbox, activity, signing_actor_id).await?;
// Send local content to a new follower's inbox — called from a BackfillRequested worker.
// Distinct from import_remote_outbox, which imports FROM a remote server.
service.run_backfill_for_follower(owner_user_id, follower_inbox_url).await?;
run_backfill_for_follower paginates through the local user's post history and delivers oldest-to-newest with a small sleep between batches to avoid overwhelming the remote server.