feat: BoostRemoved → Undo(Announce) fan-out via OutboundFederationPort
This commit is contained in:
@@ -200,6 +200,68 @@ impl ActivityPubService {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Fan out an Undo(Announce) activity to all accepted followers.
|
||||||
|
pub async fn broadcast_undo_announce_to_followers(
|
||||||
|
&self,
|
||||||
|
local_user_id: uuid::Uuid,
|
||||||
|
object_ap_id: url::Url,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let data = self.federation_config.to_request_data();
|
||||||
|
let local_actor = get_local_actor(local_user_id, &data)
|
||||||
|
.await
|
||||||
|
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||||
|
|
||||||
|
let followers = data.federation_repo.get_followers(local_user_id).await?;
|
||||||
|
let blocked = data.federation_repo.get_blocked_actors(local_user_id).await.unwrap_or_default();
|
||||||
|
let blocked_set: std::collections::HashSet<String> = blocked.into_iter().collect();
|
||||||
|
let blocked_domains = data.federation_repo.get_blocked_domains().await.unwrap_or_default();
|
||||||
|
let blocked_domain_set: std::collections::HashSet<String> =
|
||||||
|
blocked_domains.into_iter().map(|d| d.domain).collect();
|
||||||
|
|
||||||
|
let accepted: Vec<_> = followers
|
||||||
|
.into_iter()
|
||||||
|
.filter(|f| f.status == FollowerStatus::Accepted)
|
||||||
|
.filter(|f| !blocked_set.contains(&f.actor.url))
|
||||||
|
.filter(|f| {
|
||||||
|
let domain = url::Url::parse(&f.actor.inbox_url)
|
||||||
|
.ok()
|
||||||
|
.and_then(|u| u.host_str().map(|s| s.to_string()))
|
||||||
|
.unwrap_or_default();
|
||||||
|
!blocked_domain_set.contains(&domain)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
if accepted.is_empty() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let undo_id = crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||||
|
let undo = crate::activities::UndoActivity {
|
||||||
|
id: undo_id,
|
||||||
|
kind: Default::default(),
|
||||||
|
actor: activitypub_federation::fetch::object_id::ObjectId::from(local_actor.ap_id.clone()),
|
||||||
|
object: serde_json::json!({
|
||||||
|
"type": "Announce",
|
||||||
|
"actor": local_actor.ap_id.to_string(),
|
||||||
|
"object": object_ap_id.to_string(),
|
||||||
|
}),
|
||||||
|
};
|
||||||
|
|
||||||
|
let inboxes = collect_inboxes(&accepted);
|
||||||
|
let sends = activitypub_federation::activity_sending::SendActivityTask::prepare(
|
||||||
|
&activitypub_federation::protocol::context::WithContext::new_default(undo),
|
||||||
|
&local_actor,
|
||||||
|
inboxes,
|
||||||
|
&data,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
let failures = send_with_retry(sends, &data).await;
|
||||||
|
if !failures.is_empty() {
|
||||||
|
tracing::warn!(count = failures.len(), "some Undo(Announce) deliveries failed");
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn follow(&self, local_user_id: uuid::Uuid, handle: &str) -> anyhow::Result<()> {
|
pub async fn follow(&self, local_user_id: uuid::Uuid, handle: &str) -> anyhow::Result<()> {
|
||||||
let data = self.federation_config.to_request_data();
|
let data = self.federation_config.to_request_data();
|
||||||
|
|
||||||
@@ -1383,6 +1445,19 @@ impl domain::ports::OutboundFederationPort for ActivityPubService {
|
|||||||
.await
|
.await
|
||||||
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))
|
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn broadcast_undo_announce(
|
||||||
|
&self,
|
||||||
|
booster_user_id: &domain::value_objects::UserId,
|
||||||
|
object_ap_id: &str,
|
||||||
|
) -> Result<(), domain::errors::DomainError> {
|
||||||
|
let user_uuid = booster_user_id.as_uuid();
|
||||||
|
let ap_id = url::Url::parse(object_ap_id)
|
||||||
|
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?;
|
||||||
|
self.broadcast_undo_announce_to_followers(user_uuid, ap_id)
|
||||||
|
.await
|
||||||
|
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
@@ -58,6 +58,17 @@ impl FederationEventService {
|
|||||||
self.ap.broadcast_announce(user_id, &object_ap_id).await
|
self.ap.broadcast_announce(user_id, &object_ap_id).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
DomainEvent::BoostRemoved { user_id, thought_id } => {
|
||||||
|
let thought = match self.thoughts.find_by_id(thought_id).await? {
|
||||||
|
Some(t) => t,
|
||||||
|
None => return Ok(()),
|
||||||
|
};
|
||||||
|
let object_ap_id = thought.ap_id.clone().unwrap_or_else(|| {
|
||||||
|
format!("{}/thoughts/{}", self.base_url, thought_id)
|
||||||
|
});
|
||||||
|
self.ap.broadcast_undo_announce(user_id, &object_ap_id).await
|
||||||
|
}
|
||||||
|
|
||||||
_ => Ok(()),
|
_ => Ok(()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -86,6 +97,7 @@ mod tests {
|
|||||||
deleted: Mutex<Vec<String>>,
|
deleted: Mutex<Vec<String>>,
|
||||||
updated: Mutex<Vec<ThoughtId>>,
|
updated: Mutex<Vec<ThoughtId>>,
|
||||||
announced: Mutex<Vec<String>>,
|
announced: Mutex<Vec<String>>,
|
||||||
|
undo_announced: Mutex<Vec<String>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
@@ -106,6 +118,10 @@ mod tests {
|
|||||||
self.announced.lock().unwrap().push(ap_id.to_string());
|
self.announced.lock().unwrap().push(ap_id.to_string());
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
async fn broadcast_undo_announce(&self, _: &UserId, ap_id: &str) -> Result<(), DomainError> {
|
||||||
|
self.undo_announced.lock().unwrap().push(ap_id.to_string());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn alice() -> User {
|
fn alice() -> User {
|
||||||
@@ -354,6 +370,50 @@ mod tests {
|
|||||||
assert!(spy.created.lock().unwrap().is_empty());
|
assert!(spy.created.lock().unwrap().is_empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn boost_removed_sends_undo_announce_for_local_thought() {
|
||||||
|
let store = TestStore::default();
|
||||||
|
let alice = alice();
|
||||||
|
let thought = local_thought(alice.id.clone()); // ap_id = None → constructed URL
|
||||||
|
store.thoughts.lock().unwrap().push(thought.clone());
|
||||||
|
|
||||||
|
let spy = Arc::new(SpyPort::default());
|
||||||
|
svc(&store, spy.clone())
|
||||||
|
.process(&DomainEvent::BoostRemoved {
|
||||||
|
user_id: alice.id.clone(),
|
||||||
|
thought_id: thought.id.clone(),
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let undo_announced = spy.undo_announced.lock().unwrap();
|
||||||
|
assert_eq!(undo_announced.len(), 1);
|
||||||
|
assert_eq!(undo_announced[0], format!("https://example.com/thoughts/{}", thought.id));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn boost_removed_sends_undo_announce_for_remote_thought() {
|
||||||
|
let store = TestStore::default();
|
||||||
|
let alice = alice();
|
||||||
|
let mut thought = local_thought(alice.id.clone());
|
||||||
|
thought.local = false;
|
||||||
|
thought.ap_id = Some("https://mastodon.social/users/bob/statuses/456".into());
|
||||||
|
store.thoughts.lock().unwrap().push(thought.clone());
|
||||||
|
|
||||||
|
let spy = Arc::new(SpyPort::default());
|
||||||
|
svc(&store, spy.clone())
|
||||||
|
.process(&DomainEvent::BoostRemoved {
|
||||||
|
user_id: alice.id.clone(),
|
||||||
|
thought_id: thought.id.clone(),
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let undo_announced = spy.undo_announced.lock().unwrap();
|
||||||
|
assert_eq!(undo_announced.len(), 1);
|
||||||
|
assert_eq!(undo_announced[0], "https://mastodon.social/users/bob/statuses/456");
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn thought_updated_does_not_broadcast_if_user_missing() {
|
async fn thought_updated_does_not_broadcast_if_user_missing() {
|
||||||
let store = TestStore::default();
|
let store = TestStore::default();
|
||||||
|
|||||||
@@ -267,4 +267,11 @@ pub trait OutboundFederationPort: Send + Sync {
|
|||||||
booster_user_id: &UserId,
|
booster_user_id: &UserId,
|
||||||
object_ap_id: &str,
|
object_ap_id: &str,
|
||||||
) -> Result<(), DomainError>;
|
) -> Result<(), DomainError>;
|
||||||
|
|
||||||
|
/// Fan out an Undo(Announce) to followers when a boost is removed.
|
||||||
|
async fn broadcast_undo_announce(
|
||||||
|
&self,
|
||||||
|
booster_user_id: &UserId,
|
||||||
|
object_ap_id: &str,
|
||||||
|
) -> Result<(), DomainError>;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user