wiki: add Async Delivery and EventPublisher page
74
Async-Delivery-and-EventPublisher.md
Normal file
74
Async-Delivery-and-EventPublisher.md
Normal file
@@ -0,0 +1,74 @@
|
|||||||
|
# Async Delivery and EventPublisher
|
||||||
|
|
||||||
|
By default, k-ap delivers activities and runs backfill in-process via `tokio::spawn`. For production scale, implement `EventPublisher` to route these operations through your job queue.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Default mode
|
||||||
|
|
||||||
|
No configuration needed. k-ap spawns delivery and backfill tasks with `tokio::spawn`. Delivery is retried up to `delivery_max_attempts` times with exponential backoff starting at `delivery_initial_delay_secs`.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## EventPublisher
|
||||||
|
|
||||||
|
```rust
|
||||||
|
use k_ap::{EventPublisher, FederationEvent};
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl EventPublisher for MyQueue {
|
||||||
|
async fn publish(&self, event: FederationEvent) -> anyhow::Result<()> {
|
||||||
|
match event {
|
||||||
|
FederationEvent::DeliveryRequested { inbox, activity, signing_actor_id } => {
|
||||||
|
// Persist to your job queue.
|
||||||
|
// Your worker calls service.deliver_to_inbox(inbox, activity, signing_actor_id).
|
||||||
|
self.enqueue_delivery(inbox, activity, signing_actor_id).await?;
|
||||||
|
}
|
||||||
|
FederationEvent::DeliveryFailed { inbox, error, .. } => {
|
||||||
|
tracing::error!(%inbox, %error, "permanent delivery failure");
|
||||||
|
}
|
||||||
|
FederationEvent::BackfillRequested { owner_user_id, follower_inbox_url } => {
|
||||||
|
// Persist to your job queue.
|
||||||
|
// Your worker calls service.run_backfill_for_follower(owner_user_id, follower_inbox_url).
|
||||||
|
self.enqueue_backfill(owner_user_id, follower_inbox_url).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
Register in the builder:
|
||||||
|
|
||||||
|
```rust
|
||||||
|
let service = ActivityPubService::builder("https://example.com")
|
||||||
|
// ... repos ...
|
||||||
|
.event_publisher(Arc::new(my_queue))
|
||||||
|
.build()
|
||||||
|
.await?;
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## FederationEvent variants
|
||||||
|
|
||||||
|
| Variant | Fields | When published |
|
||||||
|
|---------|--------|----------------|
|
||||||
|
| `DeliveryRequested` | `inbox: Url`, `activity: Value`, `signing_actor_id: Uuid` | Every outbound activity delivery |
|
||||||
|
| `DeliveryFailed` | `inbox: Url`, `activity: Value`, `signing_actor_id: Uuid`, `error: String` | After all in-process retries exhausted |
|
||||||
|
| `BackfillRequested` | `owner_user_id: Uuid`, `follower_inbox_url: String` | When a new follower is accepted |
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Worker methods
|
||||||
|
|
||||||
|
```rust
|
||||||
|
// Execute a single delivery — called from a DeliveryRequested worker.
|
||||||
|
service.deliver_to_inbox(inbox, activity, signing_actor_id).await?;
|
||||||
|
|
||||||
|
// Send local content to a new follower's inbox — called from a BackfillRequested worker.
|
||||||
|
// Distinct from import_remote_outbox, which imports FROM a remote server.
|
||||||
|
service.run_backfill_for_follower(owner_user_id, follower_inbox_url).await?;
|
||||||
|
```
|
||||||
|
|
||||||
|
`run_backfill_for_follower` paginates through the local user's post history and delivers oldest-to-newest with a small sleep between batches to avoid overwhelming the remote server.
|
||||||
Reference in New Issue
Block a user