diff --git a/crates/adapters/activitypub/src/event_handler.rs b/crates/adapters/activitypub/src/event_handler.rs index 3694db2..147eafd 100644 --- a/crates/adapters/activitypub/src/event_handler.rs +++ b/crates/adapters/activitypub/src/event_handler.rs @@ -80,6 +80,23 @@ impl EventHandler for ActivityPubEventHandler { .on_watchlist_removed(user_id, movie_id) .await .map_err(|e| DomainError::InfrastructureError(e.to_string())), + DomainEvent::FederationDeliveryRequested { + inbox_url, + activity_json, + signing_actor_id, + } => { + let inbox: url::Url = inbox_url + .parse() + .map_err(|e| DomainError::InfrastructureError(format!("bad inbox URL: {e}")))?; + let activity: serde_json::Value = + serde_json::from_str(activity_json).map_err(|e| { + DomainError::InfrastructureError(format!("bad activity JSON: {e}")) + })?; + self.ap_service + .deliver_to_inbox(inbox, activity, *signing_actor_id) + .await + .map_err(|e| DomainError::InfrastructureError(e.to_string())) + } _ => Ok(()), } } diff --git a/crates/adapters/activitypub/src/federation_event_bridge.rs b/crates/adapters/activitypub/src/federation_event_bridge.rs index c350be7..cc920bb 100644 --- a/crates/adapters/activitypub/src/federation_event_bridge.rs +++ b/crates/adapters/activitypub/src/federation_event_bridge.rs @@ -29,8 +29,24 @@ impl k_ap::EventPublisher for FederationEventBridge { }) .await .map_err(|e| anyhow::anyhow!(e.to_string())), - _ => { - tracing::debug!("ignoring federation event: {:?}", event); + FederationEvent::DeliveryRequested { + inbox, + activity, + signing_actor_id, + } => { + let json = serde_json::to_string(&activity) + .map_err(|e| anyhow::anyhow!("serialize activity: {e}"))?; + self.domain_publisher + .publish(&DomainEvent::FederationDeliveryRequested { + inbox_url: inbox.to_string(), + activity_json: json, + signing_actor_id, + }) + .await + .map_err(|e| anyhow::anyhow!(e.to_string())) + } + other => { + tracing::debug!("ignoring federation event: {:?}", other); Ok(()) } } diff --git a/crates/adapters/event-payload/src/lib.rs b/crates/adapters/event-payload/src/lib.rs index 1372e0e..d1c5dc3 100644 --- a/crates/adapters/event-payload/src/lib.rs +++ b/crates/adapters/event-payload/src/lib.rs @@ -67,6 +67,11 @@ pub enum EventPayload { owner_user_id: String, follower_inbox_url: String, }, + FederationDeliveryRequested { + inbox_url: String, + activity_json: String, + signing_actor_id: String, + }, } impl EventPayload { @@ -84,6 +89,7 @@ impl EventPayload { EventPayload::WatchlistEntryRemoved { .. } => "WatchlistEntryRemoved", EventPayload::FollowAccepted { .. } => "FollowAccepted", EventPayload::BackfillFollower { .. } => "BackfillFollower", + EventPayload::FederationDeliveryRequested { .. } => "FederationDeliveryRequested", } } } @@ -193,6 +199,15 @@ impl From<&DomainEvent> for EventPayload { owner_user_id: owner_user_id.value().to_string(), follower_inbox_url: follower_inbox_url.clone(), }, + DomainEvent::FederationDeliveryRequested { + inbox_url, + activity_json, + signing_actor_id, + } => EventPayload::FederationDeliveryRequested { + inbox_url: inbox_url.clone(), + activity_json: activity_json.clone(), + signing_actor_id: signing_actor_id.to_string(), + }, } } } @@ -300,6 +315,15 @@ impl TryFrom for DomainEvent { owner_user_id: UserId::from_uuid(parse_uuid(&owner_user_id, "owner_user_id")?), follower_inbox_url, }), + EventPayload::FederationDeliveryRequested { + inbox_url, + activity_json, + signing_actor_id, + } => Ok(DomainEvent::FederationDeliveryRequested { + inbox_url, + activity_json, + signing_actor_id: parse_uuid(&signing_actor_id, "signing_actor_id")?, + }), } } } diff --git a/crates/adapters/nats/src/subject.rs b/crates/adapters/nats/src/subject.rs index bba796b..007e0d4 100644 --- a/crates/adapters/nats/src/subject.rs +++ b/crates/adapters/nats/src/subject.rs @@ -14,6 +14,7 @@ pub fn event_to_subject(prefix: &str, event: &DomainEvent) -> String { DomainEvent::WatchlistEntryRemoved { .. } => "watchlist.entry.removed", DomainEvent::FollowAccepted { .. } => "follow.accepted", DomainEvent::BackfillFollower { .. } => "backfill.follower", + DomainEvent::FederationDeliveryRequested { .. } => "federation.delivery.requested", }; format!("{prefix}.{suffix}") } diff --git a/crates/application/src/tests/worker.rs b/crates/application/src/tests/worker.rs index 2900793..c8e94f3 100644 --- a/crates/application/src/tests/worker.rs +++ b/crates/application/src/tests/worker.rs @@ -57,6 +57,7 @@ impl EventHandler for RecordingHandler { } DomainEvent::FollowAccepted { .. } => "follow_accepted", DomainEvent::BackfillFollower { .. } => "backfill_follower", + DomainEvent::FederationDeliveryRequested { .. } => "federation_delivery", }; self.calls.lock().unwrap().push(label); Ok(()) diff --git a/crates/domain/src/events.rs b/crates/domain/src/events.rs index f1dad6b..4abc182 100644 --- a/crates/domain/src/events.rs +++ b/crates/domain/src/events.rs @@ -65,6 +65,11 @@ pub enum DomainEvent { owner_user_id: UserId, follower_inbox_url: String, }, + FederationDeliveryRequested { + inbox_url: String, + activity_json: String, + signing_actor_id: uuid::Uuid, + }, } #[async_trait]