From d1f9f55d4facbddabdc792fe1a0ee2051e0a5c91 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Fri, 29 May 2026 12:09:02 +0200 Subject: [PATCH] =?UTF-8?q?fix:=20wire=20DeliveryRequested=20federation=20?= =?UTF-8?q?events=20=E2=80=94=20outbound=20AP=20delivery=20was=20broken?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit FederationEventBridge silently dropped DeliveryRequested events from k-ap, so no Create/Delete/Accept activities were pushed to follower inboxes. Reviews only reached remote instances via outbox backfill (pull), and deletes never propagated. Bridge now publishes FederationDeliveryRequested domain events through the event bus; worker calls ap_service.deliver_to_inbox() to send them. --- .../adapters/activitypub/src/event_handler.rs | 17 +++++++++++++ .../src/federation_event_bridge.rs | 20 ++++++++++++++-- crates/adapters/event-payload/src/lib.rs | 24 +++++++++++++++++++ crates/adapters/nats/src/subject.rs | 1 + crates/application/src/tests/worker.rs | 1 + crates/domain/src/events.rs | 5 ++++ 6 files changed, 66 insertions(+), 2 deletions(-) diff --git a/crates/adapters/activitypub/src/event_handler.rs b/crates/adapters/activitypub/src/event_handler.rs index 3694db2..147eafd 100644 --- a/crates/adapters/activitypub/src/event_handler.rs +++ b/crates/adapters/activitypub/src/event_handler.rs @@ -80,6 +80,23 @@ impl EventHandler for ActivityPubEventHandler { .on_watchlist_removed(user_id, movie_id) .await .map_err(|e| DomainError::InfrastructureError(e.to_string())), + DomainEvent::FederationDeliveryRequested { + inbox_url, + activity_json, + signing_actor_id, + } => { + let inbox: url::Url = inbox_url + .parse() + .map_err(|e| DomainError::InfrastructureError(format!("bad inbox URL: {e}")))?; + let activity: serde_json::Value = + serde_json::from_str(activity_json).map_err(|e| { + DomainError::InfrastructureError(format!("bad activity JSON: {e}")) + })?; + self.ap_service + .deliver_to_inbox(inbox, activity, *signing_actor_id) + .await + .map_err(|e| DomainError::InfrastructureError(e.to_string())) + } _ => Ok(()), } } diff --git a/crates/adapters/activitypub/src/federation_event_bridge.rs b/crates/adapters/activitypub/src/federation_event_bridge.rs index c350be7..cc920bb 100644 --- a/crates/adapters/activitypub/src/federation_event_bridge.rs +++ b/crates/adapters/activitypub/src/federation_event_bridge.rs @@ -29,8 +29,24 @@ impl k_ap::EventPublisher for FederationEventBridge { }) .await .map_err(|e| anyhow::anyhow!(e.to_string())), - _ => { - tracing::debug!("ignoring federation event: {:?}", event); + FederationEvent::DeliveryRequested { + inbox, + activity, + signing_actor_id, + } => { + let json = serde_json::to_string(&activity) + .map_err(|e| anyhow::anyhow!("serialize activity: {e}"))?; + self.domain_publisher + .publish(&DomainEvent::FederationDeliveryRequested { + inbox_url: inbox.to_string(), + activity_json: json, + signing_actor_id, + }) + .await + .map_err(|e| anyhow::anyhow!(e.to_string())) + } + other => { + tracing::debug!("ignoring federation event: {:?}", other); Ok(()) } } diff --git a/crates/adapters/event-payload/src/lib.rs b/crates/adapters/event-payload/src/lib.rs index 1372e0e..d1c5dc3 100644 --- a/crates/adapters/event-payload/src/lib.rs +++ b/crates/adapters/event-payload/src/lib.rs @@ -67,6 +67,11 @@ pub enum EventPayload { owner_user_id: String, follower_inbox_url: String, }, + FederationDeliveryRequested { + inbox_url: String, + activity_json: String, + signing_actor_id: String, + }, } impl EventPayload { @@ -84,6 +89,7 @@ impl EventPayload { EventPayload::WatchlistEntryRemoved { .. } => "WatchlistEntryRemoved", EventPayload::FollowAccepted { .. } => "FollowAccepted", EventPayload::BackfillFollower { .. } => "BackfillFollower", + EventPayload::FederationDeliveryRequested { .. } => "FederationDeliveryRequested", } } } @@ -193,6 +199,15 @@ impl From<&DomainEvent> for EventPayload { owner_user_id: owner_user_id.value().to_string(), follower_inbox_url: follower_inbox_url.clone(), }, + DomainEvent::FederationDeliveryRequested { + inbox_url, + activity_json, + signing_actor_id, + } => EventPayload::FederationDeliveryRequested { + inbox_url: inbox_url.clone(), + activity_json: activity_json.clone(), + signing_actor_id: signing_actor_id.to_string(), + }, } } } @@ -300,6 +315,15 @@ impl TryFrom for DomainEvent { owner_user_id: UserId::from_uuid(parse_uuid(&owner_user_id, "owner_user_id")?), follower_inbox_url, }), + EventPayload::FederationDeliveryRequested { + inbox_url, + activity_json, + signing_actor_id, + } => Ok(DomainEvent::FederationDeliveryRequested { + inbox_url, + activity_json, + signing_actor_id: parse_uuid(&signing_actor_id, "signing_actor_id")?, + }), } } } diff --git a/crates/adapters/nats/src/subject.rs b/crates/adapters/nats/src/subject.rs index bba796b..007e0d4 100644 --- a/crates/adapters/nats/src/subject.rs +++ b/crates/adapters/nats/src/subject.rs @@ -14,6 +14,7 @@ pub fn event_to_subject(prefix: &str, event: &DomainEvent) -> String { DomainEvent::WatchlistEntryRemoved { .. } => "watchlist.entry.removed", DomainEvent::FollowAccepted { .. } => "follow.accepted", DomainEvent::BackfillFollower { .. } => "backfill.follower", + DomainEvent::FederationDeliveryRequested { .. } => "federation.delivery.requested", }; format!("{prefix}.{suffix}") } diff --git a/crates/application/src/tests/worker.rs b/crates/application/src/tests/worker.rs index 2900793..c8e94f3 100644 --- a/crates/application/src/tests/worker.rs +++ b/crates/application/src/tests/worker.rs @@ -57,6 +57,7 @@ impl EventHandler for RecordingHandler { } DomainEvent::FollowAccepted { .. } => "follow_accepted", DomainEvent::BackfillFollower { .. } => "backfill_follower", + DomainEvent::FederationDeliveryRequested { .. } => "federation_delivery", }; self.calls.lock().unwrap().push(label); Ok(()) diff --git a/crates/domain/src/events.rs b/crates/domain/src/events.rs index f1dad6b..4abc182 100644 --- a/crates/domain/src/events.rs +++ b/crates/domain/src/events.rs @@ -65,6 +65,11 @@ pub enum DomainEvent { owner_user_id: UserId, follower_inbox_url: String, }, + FederationDeliveryRequested { + inbox_url: String, + activity_json: String, + signing_actor_id: uuid::Uuid, + }, } #[async_trait]