From 1fa8389a69e065c6b9e9c58adb32865051e700af Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Thu, 14 May 2026 13:46:31 +0200 Subject: [PATCH] feat(activitypub-base): Announce broadcast + impl OutboundFederationPort for ActivityPubService --- .../activitypub-base/src/activities.rs | 4 + .../adapters/activitypub-base/src/service.rs | 161 ++++++++++++++++++ 2 files changed, 165 insertions(+) diff --git a/crates/adapters/activitypub-base/src/activities.rs b/crates/adapters/activitypub-base/src/activities.rs index a055a63..ee2f369 100644 --- a/crates/adapters/activitypub-base/src/activities.rs +++ b/crates/adapters/activitypub-base/src/activities.rs @@ -439,6 +439,10 @@ pub struct AnnounceActivity { pub(crate) actor: ObjectId, pub(crate) object: Url, pub(crate) published: Option>, + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub(crate) to: Vec, + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub(crate) cc: Vec, } #[async_trait::async_trait] diff --git a/crates/adapters/activitypub-base/src/service.rs b/crates/adapters/activitypub-base/src/service.rs index a536187..f2c185f 100644 --- a/crates/adapters/activitypub-base/src/service.rs +++ b/crates/adapters/activitypub-base/src/service.rs @@ -140,6 +140,66 @@ impl ActivityPubService { .layer(self.federation_config.middleware()) } + /// Fan out an Announce activity to all accepted followers. + pub async fn broadcast_announce_to_followers( + &self, + local_user_id: uuid::Uuid, + object_ap_id: url::Url, + ) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + let local_actor = get_local_actor(local_user_id, &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + + let followers = data.federation_repo.get_followers(local_user_id).await?; + let blocked = data.federation_repo.get_blocked_actors(local_user_id).await.unwrap_or_default(); + let blocked_set: std::collections::HashSet = blocked.into_iter().collect(); + let blocked_domains = data.federation_repo.get_blocked_domains().await.unwrap_or_default(); + let blocked_domain_set: std::collections::HashSet = + blocked_domains.into_iter().map(|d| d.domain).collect(); + + let accepted: Vec<_> = followers + .into_iter() + .filter(|f| f.status == FollowerStatus::Accepted) + .filter(|f| !blocked_set.contains(&f.actor.url)) + .filter(|f| { + let domain = url::Url::parse(&f.actor.inbox_url) + .ok() + .and_then(|u| u.host_str().map(|s| s.to_string())) + .unwrap_or_default(); + !blocked_domain_set.contains(&domain) + }) + .collect(); + + if accepted.is_empty() { + return Ok(()); + } + + let announce = crate::activities::AnnounceActivity { + id: crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?, + kind: Default::default(), + actor: activitypub_federation::fetch::object_id::ObjectId::from(local_actor.ap_id.clone()), + object: object_ap_id, + published: Some(chrono::Utc::now()), + to: vec![crate::urls::AS_PUBLIC.to_string()], + cc: vec![local_actor.followers_url.to_string()], + }; + + let inboxes = collect_inboxes(&accepted); + let sends = activitypub_federation::activity_sending::SendActivityTask::prepare( + &activitypub_federation::protocol::context::WithContext::new_default(announce), + &local_actor, + inboxes, + &data, + ) + .await?; + let failures = send_with_retry(sends, &data).await; + if !failures.is_empty() { + tracing::warn!(count = failures.len(), "some Announce deliveries failed"); + } + Ok(()) + } + pub async fn follow(&self, local_user_id: uuid::Uuid, handle: &str) -> anyhow::Result<()> { let data = self.federation_config.to_request_data(); @@ -1216,6 +1276,107 @@ impl ActivityPubService { } } +#[async_trait::async_trait] +impl domain::ports::OutboundFederationPort for ActivityPubService { + async fn broadcast_create( + &self, + author_user_id: &domain::value_objects::UserId, + thought: &domain::models::thought::Thought, + _author_username: &str, + ) -> Result<(), domain::errors::DomainError> { + let user_uuid = author_user_id.as_uuid(); + let data = self.federation_config.to_request_data(); + let local_actor = get_local_actor(user_uuid, &data) + .await + .map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?; + + let ap_id = url::Url::parse(&format!("{}/thoughts/{}", self.base_url, thought.id)) + .map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?; + + let mut note = serde_json::json!({ + "type": "Note", + "id": ap_id.to_string(), + "attributedTo": local_actor.ap_id.to_string(), + "content": thought.content.as_str(), + "published": thought.created_at.to_rfc3339(), + "to": [crate::urls::AS_PUBLIC], + "cc": [local_actor.followers_url.to_string()], + "sensitive": thought.sensitive, + }); + if let Some(ref cw) = thought.content_warning { + note["summary"] = serde_json::json!(cw); + } + if let Some(ref reply_url) = thought.in_reply_to_url { + note["inReplyTo"] = serde_json::json!(reply_url); + } + + self.broadcast_to_followers(user_uuid, ap_id, note) + .await + .map_err(|e| domain::errors::DomainError::Internal(e.to_string())) + } + + async fn broadcast_delete( + &self, + author_user_id: &domain::value_objects::UserId, + thought_ap_id: &str, + ) -> Result<(), domain::errors::DomainError> { + let user_uuid = author_user_id.as_uuid(); + let ap_id = url::Url::parse(thought_ap_id) + .map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?; + self.broadcast_delete_to_followers(user_uuid, ap_id) + .await + .map_err(|e| domain::errors::DomainError::Internal(e.to_string())) + } + + async fn broadcast_update( + &self, + author_user_id: &domain::value_objects::UserId, + thought: &domain::models::thought::Thought, + _author_username: &str, + ) -> Result<(), domain::errors::DomainError> { + let user_uuid = author_user_id.as_uuid(); + let data = self.federation_config.to_request_data(); + let local_actor = get_local_actor(user_uuid, &data) + .await + .map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?; + + let ap_id = format!("{}/thoughts/{}", self.base_url, thought.id); + let mut note = serde_json::json!({ + "type": "Note", + "id": ap_id, + "attributedTo": local_actor.ap_id.to_string(), + "content": thought.content.as_str(), + "published": thought.created_at.to_rfc3339(), + "to": [crate::urls::AS_PUBLIC], + "cc": [local_actor.followers_url.to_string()], + "sensitive": thought.sensitive, + }); + if let Some(ref cw) = thought.content_warning { + note["summary"] = serde_json::json!(cw); + } + if let Some(ref reply_url) = thought.in_reply_to_url { + note["inReplyTo"] = serde_json::json!(reply_url); + } + + self.broadcast_update_to_followers(user_uuid, note) + .await + .map_err(|e| domain::errors::DomainError::Internal(e.to_string())) + } + + async fn broadcast_announce( + &self, + booster_user_id: &domain::value_objects::UserId, + object_ap_id: &str, + ) -> Result<(), domain::errors::DomainError> { + let user_uuid = booster_user_id.as_uuid(); + let ap_id = url::Url::parse(object_ap_id) + .map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?; + self.broadcast_announce_to_followers(user_uuid, ap_id) + .await + .map_err(|e| domain::errors::DomainError::Internal(e.to_string())) + } +} + #[cfg(test)] #[path = "tests/service.rs"] mod tests;