diff --git a/crates/adapters/activitypub-base/src/service.rs b/crates/adapters/activitypub-base/src/service.rs index d01e953..8089a78 100644 --- a/crates/adapters/activitypub-base/src/service.rs +++ b/crates/adapters/activitypub-base/src/service.rs @@ -200,6 +200,68 @@ impl ActivityPubService { Ok(()) } + /// Fan out an Undo(Announce) activity to all accepted followers. + pub async fn broadcast_undo_announce_to_followers( + &self, + local_user_id: uuid::Uuid, + object_ap_id: url::Url, + ) -> anyhow::Result<()> { + let data = self.federation_config.to_request_data(); + let local_actor = get_local_actor(local_user_id, &data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + + let followers = data.federation_repo.get_followers(local_user_id).await?; + let blocked = data.federation_repo.get_blocked_actors(local_user_id).await.unwrap_or_default(); + let blocked_set: std::collections::HashSet = blocked.into_iter().collect(); + let blocked_domains = data.federation_repo.get_blocked_domains().await.unwrap_or_default(); + let blocked_domain_set: std::collections::HashSet = + blocked_domains.into_iter().map(|d| d.domain).collect(); + + let accepted: Vec<_> = followers + .into_iter() + .filter(|f| f.status == FollowerStatus::Accepted) + .filter(|f| !blocked_set.contains(&f.actor.url)) + .filter(|f| { + let domain = url::Url::parse(&f.actor.inbox_url) + .ok() + .and_then(|u| u.host_str().map(|s| s.to_string())) + .unwrap_or_default(); + !blocked_domain_set.contains(&domain) + }) + .collect(); + + if accepted.is_empty() { + return Ok(()); + } + + let undo_id = crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; + let undo = crate::activities::UndoActivity { + id: undo_id, + kind: Default::default(), + actor: activitypub_federation::fetch::object_id::ObjectId::from(local_actor.ap_id.clone()), + object: serde_json::json!({ + "type": "Announce", + "actor": local_actor.ap_id.to_string(), + "object": object_ap_id.to_string(), + }), + }; + + let inboxes = collect_inboxes(&accepted); + let sends = activitypub_federation::activity_sending::SendActivityTask::prepare( + &activitypub_federation::protocol::context::WithContext::new_default(undo), + &local_actor, + inboxes, + &data, + ) + .await?; + let failures = send_with_retry(sends, &data).await; + if !failures.is_empty() { + tracing::warn!(count = failures.len(), "some Undo(Announce) deliveries failed"); + } + Ok(()) + } + pub async fn follow(&self, local_user_id: uuid::Uuid, handle: &str) -> anyhow::Result<()> { let data = self.federation_config.to_request_data(); @@ -1383,6 +1445,19 @@ impl domain::ports::OutboundFederationPort for ActivityPubService { .await .map_err(|e| domain::errors::DomainError::Internal(e.to_string())) } + + async fn broadcast_undo_announce( + &self, + booster_user_id: &domain::value_objects::UserId, + object_ap_id: &str, + ) -> Result<(), domain::errors::DomainError> { + let user_uuid = booster_user_id.as_uuid(); + let ap_id = url::Url::parse(object_ap_id) + .map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?; + self.broadcast_undo_announce_to_followers(user_uuid, ap_id) + .await + .map_err(|e| domain::errors::DomainError::Internal(e.to_string())) + } } #[cfg(test)] diff --git a/crates/application/src/services/federation_event.rs b/crates/application/src/services/federation_event.rs index 540d975..181a947 100644 --- a/crates/application/src/services/federation_event.rs +++ b/crates/application/src/services/federation_event.rs @@ -58,6 +58,17 @@ impl FederationEventService { self.ap.broadcast_announce(user_id, &object_ap_id).await } + DomainEvent::BoostRemoved { user_id, thought_id } => { + let thought = match self.thoughts.find_by_id(thought_id).await? { + Some(t) => t, + None => return Ok(()), + }; + let object_ap_id = thought.ap_id.clone().unwrap_or_else(|| { + format!("{}/thoughts/{}", self.base_url, thought_id) + }); + self.ap.broadcast_undo_announce(user_id, &object_ap_id).await + } + _ => Ok(()), } } @@ -82,10 +93,11 @@ mod tests { #[derive(Default)] struct SpyPort { - created: Mutex>, - deleted: Mutex>, - updated: Mutex>, - announced: Mutex>, + created: Mutex>, + deleted: Mutex>, + updated: Mutex>, + announced: Mutex>, + undo_announced: Mutex>, } #[async_trait] @@ -106,6 +118,10 @@ mod tests { self.announced.lock().unwrap().push(ap_id.to_string()); Ok(()) } + async fn broadcast_undo_announce(&self, _: &UserId, ap_id: &str) -> Result<(), DomainError> { + self.undo_announced.lock().unwrap().push(ap_id.to_string()); + Ok(()) + } } fn alice() -> User { @@ -354,6 +370,50 @@ mod tests { assert!(spy.created.lock().unwrap().is_empty()); } + #[tokio::test] + async fn boost_removed_sends_undo_announce_for_local_thought() { + let store = TestStore::default(); + let alice = alice(); + let thought = local_thought(alice.id.clone()); // ap_id = None → constructed URL + store.thoughts.lock().unwrap().push(thought.clone()); + + let spy = Arc::new(SpyPort::default()); + svc(&store, spy.clone()) + .process(&DomainEvent::BoostRemoved { + user_id: alice.id.clone(), + thought_id: thought.id.clone(), + }) + .await + .unwrap(); + + let undo_announced = spy.undo_announced.lock().unwrap(); + assert_eq!(undo_announced.len(), 1); + assert_eq!(undo_announced[0], format!("https://example.com/thoughts/{}", thought.id)); + } + + #[tokio::test] + async fn boost_removed_sends_undo_announce_for_remote_thought() { + let store = TestStore::default(); + let alice = alice(); + let mut thought = local_thought(alice.id.clone()); + thought.local = false; + thought.ap_id = Some("https://mastodon.social/users/bob/statuses/456".into()); + store.thoughts.lock().unwrap().push(thought.clone()); + + let spy = Arc::new(SpyPort::default()); + svc(&store, spy.clone()) + .process(&DomainEvent::BoostRemoved { + user_id: alice.id.clone(), + thought_id: thought.id.clone(), + }) + .await + .unwrap(); + + let undo_announced = spy.undo_announced.lock().unwrap(); + assert_eq!(undo_announced.len(), 1); + assert_eq!(undo_announced[0], "https://mastodon.social/users/bob/statuses/456"); + } + #[tokio::test] async fn thought_updated_does_not_broadcast_if_user_missing() { let store = TestStore::default(); diff --git a/crates/domain/src/ports.rs b/crates/domain/src/ports.rs index 36f8dda..2dcd12f 100644 --- a/crates/domain/src/ports.rs +++ b/crates/domain/src/ports.rs @@ -267,4 +267,11 @@ pub trait OutboundFederationPort: Send + Sync { booster_user_id: &UserId, object_ap_id: &str, ) -> Result<(), DomainError>; + + /// Fan out an Undo(Announce) to followers when a boost is removed. + async fn broadcast_undo_announce( + &self, + booster_user_id: &UserId, + object_ap_id: &str, + ) -> Result<(), DomainError>; }