diff --git a/crates/adapters/activitypub-base/src/service.rs b/crates/adapters/activitypub-base/src/service.rs index d0b24a3..92d6177 100644 --- a/crates/adapters/activitypub-base/src/service.rs +++ b/crates/adapters/activitypub-base/src/service.rs @@ -113,6 +113,45 @@ impl ActivityPubService { self.federation_config.to_request_data() } + /// Returns `(local_actor, deduplicated_inboxes)` for all accepted followers, + /// excluding blocked actors and blocked domains. + /// Returns `None` if there are no eligible followers. + async fn accepted_follower_inboxes( + &self, + data: &activitypub_federation::config::Data, + local_user_id: uuid::Uuid, + ) -> anyhow::Result)>> { + let local_actor = get_local_actor(local_user_id, data) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + + let followers = data.federation_repo.get_followers(local_user_id).await?; + let blocked = data.federation_repo.get_blocked_actors(local_user_id).await.unwrap_or_default(); + let blocked_set: std::collections::HashSet = blocked.into_iter().collect(); + let blocked_domains = data.federation_repo.get_blocked_domains().await.unwrap_or_default(); + let blocked_domain_set: std::collections::HashSet = + blocked_domains.into_iter().map(|d| d.domain).collect(); + + let accepted: Vec<_> = followers + .into_iter() + .filter(|f| f.status == FollowerStatus::Accepted) + .filter(|f| !blocked_set.contains(&f.actor.url)) + .filter(|f| { + let domain = url::Url::parse(&f.actor.inbox_url) + .ok() + .and_then(|u| u.host_str().map(|s| s.to_string())) + .unwrap_or_default(); + !blocked_domain_set.contains(&domain) + }) + .collect(); + + if accepted.is_empty() { + return Ok(None); + } + + Ok(Some((local_actor, collect_inboxes(&accepted)))) + } + pub async fn actor_json(&self, user_id_str: &str) -> anyhow::Result { use activitypub_federation::traits::Object; let uuid = uuid::Uuid::parse_str(user_id_str)?; @@ -146,35 +185,6 @@ impl ActivityPubService { local_user_id: uuid::Uuid, object_ap_id: url::Url, ) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - let local_actor = get_local_actor(local_user_id, &data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - - let followers = data.federation_repo.get_followers(local_user_id).await?; - let blocked = data.federation_repo.get_blocked_actors(local_user_id).await.unwrap_or_default(); - let blocked_set: std::collections::HashSet = blocked.into_iter().collect(); - let blocked_domains = data.federation_repo.get_blocked_domains().await.unwrap_or_default(); - let blocked_domain_set: std::collections::HashSet = - blocked_domains.into_iter().map(|d| d.domain).collect(); - - let accepted: Vec<_> = followers - .into_iter() - .filter(|f| f.status == FollowerStatus::Accepted) - .filter(|f| !blocked_set.contains(&f.actor.url)) - .filter(|f| { - let domain = url::Url::parse(&f.actor.inbox_url) - .ok() - .and_then(|u| u.host_str().map(|s| s.to_string())) - .unwrap_or_default(); - !blocked_domain_set.contains(&domain) - }) - .collect(); - - if accepted.is_empty() { - return Ok(()); - } - // Deterministic ID so Undo(Announce) can reference this same activity. let announce_id = url::Url::parse(&format!( "{}/activities/announce/{}", @@ -186,6 +196,11 @@ impl ActivityPubService { )) .map_err(|e| anyhow::anyhow!("{e}"))?; + let data = self.federation_config.to_request_data(); + let Some((local_actor, inboxes)) = self.accepted_follower_inboxes(&data, local_user_id).await? else { + return Ok(()); + }; + let announce = crate::activities::AnnounceActivity { id: announce_id, kind: Default::default(), @@ -196,7 +211,6 @@ impl ActivityPubService { cc: vec![local_actor.followers_url.to_string()], }; - let inboxes = collect_inboxes(&accepted); let sends = activitypub_federation::activity_sending::SendActivityTask::prepare( &activitypub_federation::protocol::context::WithContext::new_default(announce), &local_actor, @@ -217,35 +231,6 @@ impl ActivityPubService { local_user_id: uuid::Uuid, object_ap_id: url::Url, ) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - let local_actor = get_local_actor(local_user_id, &data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - - let followers = data.federation_repo.get_followers(local_user_id).await?; - let blocked = data.federation_repo.get_blocked_actors(local_user_id).await.unwrap_or_default(); - let blocked_set: std::collections::HashSet = blocked.into_iter().collect(); - let blocked_domains = data.federation_repo.get_blocked_domains().await.unwrap_or_default(); - let blocked_domain_set: std::collections::HashSet = - blocked_domains.into_iter().map(|d| d.domain).collect(); - - let accepted: Vec<_> = followers - .into_iter() - .filter(|f| f.status == FollowerStatus::Accepted) - .filter(|f| !blocked_set.contains(&f.actor.url)) - .filter(|f| { - let domain = url::Url::parse(&f.actor.inbox_url) - .ok() - .and_then(|u| u.host_str().map(|s| s.to_string())) - .unwrap_or_default(); - !blocked_domain_set.contains(&domain) - }) - .collect(); - - if accepted.is_empty() { - return Ok(()); - } - // Reconstruct the same deterministic announce ID used when the boost was sent. let announce_id = url::Url::parse(&format!( "{}/activities/announce/{}", @@ -258,6 +243,12 @@ impl ActivityPubService { .map_err(|e| anyhow::anyhow!("{e}"))?; let undo_id = crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; + + let data = self.federation_config.to_request_data(); + let Some((local_actor, inboxes)) = self.accepted_follower_inboxes(&data, local_user_id).await? else { + return Ok(()); + }; + let undo = crate::activities::UndoActivity { id: undo_id, kind: Default::default(), @@ -270,7 +261,6 @@ impl ActivityPubService { }), }; - let inboxes = collect_inboxes(&accepted); let sends = activitypub_federation::activity_sending::SendActivityTask::prepare( &activitypub_federation::protocol::context::WithContext::new_default(undo), &local_actor, @@ -604,40 +594,9 @@ impl ActivityPubService { object: serde_json::Value, ) -> anyhow::Result<()> { let data = self.federation_config.to_request_data(); - let local_actor = get_local_actor(local_user_id, &data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - - let followers = data.federation_repo.get_followers(local_user_id).await?; - let blocked = data - .federation_repo - .get_blocked_actors(local_user_id) - .await - .unwrap_or_default(); - let blocked_set: std::collections::HashSet = blocked.into_iter().collect(); - let blocked_domains = data - .federation_repo - .get_blocked_domains() - .await - .unwrap_or_default(); - let blocked_domain_set: std::collections::HashSet = - blocked_domains.into_iter().map(|d| d.domain).collect(); - let accepted: Vec<_> = followers - .into_iter() - .filter(|f| f.status == FollowerStatus::Accepted) - .filter(|f| !blocked_set.contains(&f.actor.url)) - .filter(|f| { - let domain = url::Url::parse(&f.actor.inbox_url) - .ok() - .and_then(|u| u.host_str().map(|s| s.to_string())) - .unwrap_or_default(); - !blocked_domain_set.contains(&domain) - }) - .collect(); - - if accepted.is_empty() { + let Some((local_actor, inboxes)) = self.accepted_follower_inboxes(&data, local_user_id).await? else { return Ok(()); - } + }; let create = CreateActivity { id: ap_id.clone(), @@ -649,8 +608,6 @@ impl ActivityPubService { }; let create_with_ctx = WithContext::new_default(create); - let inboxes = collect_inboxes(&accepted); - let sends = SendActivityTask::prepare(&create_with_ctx, &local_actor, inboxes, &data).await?; let failures = send_with_retry(sends, &data).await; @@ -671,40 +628,9 @@ impl ActivityPubService { ap_id: Url, ) -> anyhow::Result<()> { let data = self.federation_config.to_request_data(); - let local_actor = get_local_actor(local_user_id, &data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - - let followers = data.federation_repo.get_followers(local_user_id).await?; - let blocked = data - .federation_repo - .get_blocked_actors(local_user_id) - .await - .unwrap_or_default(); - let blocked_set: std::collections::HashSet = blocked.into_iter().collect(); - let blocked_domains = data - .federation_repo - .get_blocked_domains() - .await - .unwrap_or_default(); - let blocked_domain_set: std::collections::HashSet = - blocked_domains.into_iter().map(|d| d.domain).collect(); - let accepted: Vec<_> = followers - .into_iter() - .filter(|f| f.status == FollowerStatus::Accepted) - .filter(|f| !blocked_set.contains(&f.actor.url)) - .filter(|f| { - let domain = url::Url::parse(&f.actor.inbox_url) - .ok() - .and_then(|u| u.host_str().map(|s| s.to_string())) - .unwrap_or_default(); - !blocked_domain_set.contains(&domain) - }) - .collect(); - - if accepted.is_empty() { + let Some((local_actor, inboxes)) = self.accepted_follower_inboxes(&data, local_user_id).await? else { return Ok(()); - } + }; let delete_id = crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; @@ -717,7 +643,6 @@ impl ActivityPubService { cc: vec![local_actor.followers_url.to_string()], }; let delete_with_ctx = WithContext::new_default(delete); - let inboxes = collect_inboxes(&accepted); let sends = SendActivityTask::prepare(&delete_with_ctx, &local_actor, inboxes, &data).await?; let failures = send_with_retry(sends, &data).await; @@ -738,40 +663,9 @@ impl ActivityPubService { object: serde_json::Value, ) -> anyhow::Result<()> { let data = self.federation_config.to_request_data(); - let local_actor = get_local_actor(local_user_id, &data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - - let followers = data.federation_repo.get_followers(local_user_id).await?; - let blocked = data - .federation_repo - .get_blocked_actors(local_user_id) - .await - .unwrap_or_default(); - let blocked_set: std::collections::HashSet = blocked.into_iter().collect(); - let blocked_domains = data - .federation_repo - .get_blocked_domains() - .await - .unwrap_or_default(); - let blocked_domain_set: std::collections::HashSet = - blocked_domains.into_iter().map(|d| d.domain).collect(); - let accepted: Vec<_> = followers - .into_iter() - .filter(|f| f.status == FollowerStatus::Accepted) - .filter(|f| !blocked_set.contains(&f.actor.url)) - .filter(|f| { - let domain = url::Url::parse(&f.actor.inbox_url) - .ok() - .and_then(|u| u.host_str().map(|s| s.to_string())) - .unwrap_or_default(); - !blocked_domain_set.contains(&domain) - }) - .collect(); - - if accepted.is_empty() { + let Some((local_actor, inboxes)) = self.accepted_follower_inboxes(&data, local_user_id).await? else { return Ok(()); - } + }; let add = crate::activities::AddActivity { id: ap_id, @@ -782,7 +676,6 @@ impl ActivityPubService { cc: vec![local_actor.followers_url.to_string()], }; let add_with_ctx = WithContext::new_default(add); - let inboxes = collect_inboxes(&accepted); let sends = SendActivityTask::prepare(&add_with_ctx, &local_actor, inboxes, &data).await?; let failures = send_with_retry(sends, &data).await; if !failures.is_empty() { @@ -798,40 +691,9 @@ impl ActivityPubService { watchlist_entry_ap_id: Url, ) -> anyhow::Result<()> { let data = self.federation_config.to_request_data(); - let local_actor = get_local_actor(local_user_id, &data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - - let followers = data.federation_repo.get_followers(local_user_id).await?; - let blocked = data - .federation_repo - .get_blocked_actors(local_user_id) - .await - .unwrap_or_default(); - let blocked_set: std::collections::HashSet = blocked.into_iter().collect(); - let blocked_domains = data - .federation_repo - .get_blocked_domains() - .await - .unwrap_or_default(); - let blocked_domain_set: std::collections::HashSet = - blocked_domains.into_iter().map(|d| d.domain).collect(); - let accepted: Vec<_> = followers - .into_iter() - .filter(|f| f.status == FollowerStatus::Accepted) - .filter(|f| !blocked_set.contains(&f.actor.url)) - .filter(|f| { - let domain = url::Url::parse(&f.actor.inbox_url) - .ok() - .and_then(|u| u.host_str().map(|s| s.to_string())) - .unwrap_or_default(); - !blocked_domain_set.contains(&domain) - }) - .collect(); - - if accepted.is_empty() { + let Some((local_actor, inboxes)) = self.accepted_follower_inboxes(&data, local_user_id).await? else { return Ok(()); - } + }; let undo_id = crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; @@ -846,7 +708,6 @@ impl ActivityPubService { }), }; let undo_with_ctx = WithContext::new_default(undo); - let inboxes = collect_inboxes(&accepted); let sends = SendActivityTask::prepare(&undo_with_ctx, &local_actor, inboxes, &data).await?; let failures = send_with_retry(sends, &data).await; if !failures.is_empty() { @@ -862,40 +723,9 @@ impl ActivityPubService { object: serde_json::Value, ) -> anyhow::Result<()> { let data = self.federation_config.to_request_data(); - let local_actor = get_local_actor(local_user_id, &data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - - let followers = data.federation_repo.get_followers(local_user_id).await?; - let blocked = data - .federation_repo - .get_blocked_actors(local_user_id) - .await - .unwrap_or_default(); - let blocked_set: std::collections::HashSet = blocked.into_iter().collect(); - let blocked_domains = data - .federation_repo - .get_blocked_domains() - .await - .unwrap_or_default(); - let blocked_domain_set: std::collections::HashSet = - blocked_domains.into_iter().map(|d| d.domain).collect(); - let accepted: Vec<_> = followers - .into_iter() - .filter(|f| f.status == FollowerStatus::Accepted) - .filter(|f| !blocked_set.contains(&f.actor.url)) - .filter(|f| { - let domain = url::Url::parse(&f.actor.inbox_url) - .ok() - .and_then(|u| u.host_str().map(|s| s.to_string())) - .unwrap_or_default(); - !blocked_domain_set.contains(&domain) - }) - .collect(); - - if accepted.is_empty() { + let Some((local_actor, inboxes)) = self.accepted_follower_inboxes(&data, local_user_id).await? else { return Ok(()); - } + }; let update_id = Url::parse(&format!( "{}/activities/update/{}", @@ -911,7 +741,6 @@ impl ActivityPubService { cc: vec![local_actor.followers_url.to_string()], }; let update_with_ctx = WithContext::new_default(update); - let inboxes = collect_inboxes(&accepted); let sends = SendActivityTask::prepare(&update_with_ctx, &local_actor, inboxes, &data).await?; let failures = send_with_retry(sends, &data).await;