Files
thoughts/docs/superpowers/plans/2026-05-14-federation-follow-ups.md
Gabriel Kaszewski 004bfb427b
Some checks failed
lint / lint (push) Has been cancelled
test / unit (push) Has been cancelled
test / integration (push) Has been cancelled
lint / lint (pull_request) Failing after 5m8s
test / unit (pull_request) Successful in 16m18s
test / integration (pull_request) Failing after 16m59s
feat: implement merge readiness plan to close gaps between v2 and v1
- Task 1: Fix feed response hydration by adding `to_thought_response` helper and updating feed handlers to return full `ThoughtResponse`.
- Task 2: Wire follower/following REST routes for user feeds.
- Task 3: Add user listing and count endpoints, including `GET /users` and `GET /users/count`.
- Task 4: Implement popular tags feature with `GET /tags/popular`.
- Task 5: Enhance configuration with HOST, CORS_ORIGINS, and optional rate limiting using tower-governor.
2026-05-14 16:28:18 +02:00

14 KiB

Federation Follow-ups Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Two targeted follow-ups from the federation handler implementation: (1) handle BoostRemovedUndo(Announce) fan-out, which was a known missing feature; (2) extract the repeated follower-filtering block in ActivityPubService into a private helper to eliminate duplication across 6 broadcast methods.

Architecture: Both changes are additive and self-contained. Task 1 touches domain/ports.rs, activitypub-base/src/service.rs, and application/src/services/federation_event.rs. Task 2 touches only activitypub-base/src/service.rs.


File Map

Task 1:
  Modify: crates/domain/src/ports.rs               ← add broadcast_undo_announce to OutboundFederationPort
  Modify: crates/adapters/activitypub-base/src/service.rs  ← broadcast_undo_announce_to_followers + impl
  Modify: crates/application/src/services/federation_event.rs  ← handle BoostRemoved + tests

Task 2:
  Modify: crates/adapters/activitypub-base/src/service.rs  ← extract accepted_follower_inboxes helper

Task 1: BoostRemoved → Undo(Announce)

Files:

  • Modify: crates/domain/src/ports.rs
  • Modify: crates/adapters/activitypub-base/src/service.rs
  • Modify: crates/application/src/services/federation_event.rs

Step A: Add broadcast_undo_announce to OutboundFederationPort

  • In crates/domain/src/ports.rs, add one method to OutboundFederationPort after broadcast_announce:
/// Fan out an Undo(Announce) to followers when a boost is removed.
async fn broadcast_undo_announce(
    &self,
    booster_user_id: &UserId,
    object_ap_id: &str,
) -> Result<(), DomainError>;
  • Run: cargo check -p domain — Expected: error in activitypub-base (trait impl missing method). This is expected.

Step B: Add broadcast_undo_announce_to_followers to ActivityPubService and implement the port method

  • In crates/adapters/activitypub-base/src/service.rs, add broadcast_undo_announce_to_followers to impl ActivityPubService — insert after broadcast_announce_to_followers:
/// Fan out an Undo(Announce) activity to all accepted followers.
pub async fn broadcast_undo_announce_to_followers(
    &self,
    local_user_id: uuid::Uuid,
    object_ap_id: url::Url,
) -> anyhow::Result<()> {
    let data = self.federation_config.to_request_data();
    let local_actor = get_local_actor(local_user_id, &data)
        .await
        .map_err(|e| anyhow::anyhow!("{e}"))?;

    let followers = data.federation_repo.get_followers(local_user_id).await?;
    let blocked = data.federation_repo.get_blocked_actors(local_user_id).await.unwrap_or_default();
    let blocked_set: std::collections::HashSet<String> = blocked.into_iter().collect();
    let blocked_domains = data.federation_repo.get_blocked_domains().await.unwrap_or_default();
    let blocked_domain_set: std::collections::HashSet<String> =
        blocked_domains.into_iter().map(|d| d.domain).collect();

    let accepted: Vec<_> = followers
        .into_iter()
        .filter(|f| f.status == FollowerStatus::Accepted)
        .filter(|f| !blocked_set.contains(&f.actor.url))
        .filter(|f| {
            let domain = url::Url::parse(&f.actor.inbox_url)
                .ok()
                .and_then(|u| u.host_str().map(|s| s.to_string()))
                .unwrap_or_default();
            !blocked_domain_set.contains(&domain)
        })
        .collect();

    if accepted.is_empty() {
        return Ok(());
    }

    let undo_id = crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?;
    let undo = crate::activities::UndoActivity {
        id: undo_id,
        kind: Default::default(),
        actor: activitypub_federation::fetch::object_id::ObjectId::from(local_actor.ap_id.clone()),
        object: serde_json::json!({
            "type": "Announce",
            "actor": local_actor.ap_id.to_string(),
            "object": object_ap_id.to_string(),
        }),
    };

    let inboxes = collect_inboxes(&accepted);
    let sends = activitypub_federation::activity_sending::SendActivityTask::prepare(
        &activitypub_federation::protocol::context::WithContext::new_default(undo),
        &local_actor,
        inboxes,
        &data,
    )
    .await?;
    let failures = send_with_retry(sends, &data).await;
    if !failures.is_empty() {
        tracing::warn!(count = failures.len(), "some Undo(Announce) deliveries failed");
    }
    Ok(())
}
  • Add broadcast_undo_announce to the impl domain::ports::OutboundFederationPort for ActivityPubService block:
async fn broadcast_undo_announce(
    &self,
    booster_user_id: &domain::value_objects::UserId,
    object_ap_id: &str,
) -> Result<(), domain::errors::DomainError> {
    let user_uuid = booster_user_id.as_uuid();
    let ap_id = url::Url::parse(object_ap_id)
        .map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?;
    self.broadcast_undo_announce_to_followers(user_uuid, ap_id)
        .await
        .map_err(|e| domain::errors::DomainError::Internal(e.to_string()))
}
  • Run: cargo check -p activitypub-base — Expected: no errors.
  • Run: cargo check --workspace — Expected: no errors.

Step C: Handle BoostRemoved in FederationEventService

  • Write failing test first — add to the #[cfg(test)] mod tests block in crates/application/src/services/federation_event.rs:
#[tokio::test]
async fn boost_removed_sends_undo_announce_for_local_thought() {
    let store = TestStore::default();
    let alice = alice();
    let thought = local_thought(alice.id.clone()); // ap_id = None → constructed URL
    store.thoughts.lock().unwrap().push(thought.clone());

    let spy = Arc::new(SpyPort::default());
    svc(&store, spy.clone())
        .process(&DomainEvent::BoostRemoved {
            user_id: alice.id.clone(),
            thought_id: thought.id.clone(),
        })
        .await
        .unwrap();

    let announced = spy.announced.lock().unwrap();
    assert_eq!(announced.len(), 1);
    assert_eq!(announced[0], format!("https://example.com/thoughts/{}", thought.id));
}

#[tokio::test]
async fn boost_removed_sends_undo_announce_for_remote_thought() {
    let store = TestStore::default();
    let alice = alice();
    let mut thought = local_thought(alice.id.clone());
    thought.local = false;
    thought.ap_id = Some("https://mastodon.social/users/bob/statuses/456".into());
    store.thoughts.lock().unwrap().push(thought.clone());

    let spy = Arc::new(SpyPort::default());
    svc(&store, spy.clone())
        .process(&DomainEvent::BoostRemoved {
            user_id: alice.id.clone(),
            thought_id: thought.id.clone(),
        })
        .await
        .unwrap();

    let announced = spy.announced.lock().unwrap();
    assert_eq!(announced[0], "https://mastodon.social/users/bob/statuses/456");
}

NOTE: The SpyPort tracks broadcast_undo_announce calls in the same announced vec as broadcast_announce (or a new undo_announced vec — your choice, but be consistent in both the spy and the assertions).

Actually, use a separate undo_announced vec for clarity:

#[derive(Default)]
struct SpyPort {
    created:        Mutex<Vec<ThoughtId>>,
    deleted:        Mutex<Vec<String>>,
    updated:        Mutex<Vec<ThoughtId>>,
    announced:      Mutex<Vec<String>>,
    undo_announced: Mutex<Vec<String>>,
}

And add the impl method:

async fn broadcast_undo_announce(&self, _: &UserId, ap_id: &str) -> Result<(), DomainError> {
    self.undo_announced.lock().unwrap().push(ap_id.to_string());
    Ok(())
}

Update the test assertions to use spy.undo_announced.

  • Run: cargo test -p application -- services::federation_event — Expected: 2 new tests FAIL (not implemented).

  • Add BoostRemoved arm to FederationEventService::process — insert after the BoostAdded arm:

DomainEvent::BoostRemoved { user_id, thought_id } => {
    let thought = match self.thoughts.find_by_id(thought_id).await? {
        Some(t) => t,
        None => return Ok(()),
    };
    let object_ap_id = thought.ap_id.clone().unwrap_or_else(|| {
        format!("{}/thoughts/{}", self.base_url, thought_id)
    });
    self.ap.broadcast_undo_announce(user_id, &object_ap_id).await
}
  • Run: cargo test -p application -- services::federation_event — Expected: all tests pass (now 13).

  • Run: cargo test --workspace — Expected: only pre-existing postgres DB failures (require live database).

  • Commit:

git add crates/domain/src/ports.rs crates/adapters/activitypub-base/src/service.rs crates/application/src/services/federation_event.rs
git commit -m "feat: BoostRemoved → Undo(Announce) fan-out via OutboundFederationPort"

Task 2: Follower-filtering DRY extraction in activitypub-base

Files:

  • Modify: crates/adapters/activitypub-base/src/service.rs

The repeated 20-line follower-filtering block appears in 7 methods. Extract it into a private async helper, then call it from the 6 content-broadcast methods. Leave broadcast_actor_update alone — it uses different filtering (no blocked-actor/domain check).

Methods to update: broadcast_to_followers, broadcast_delete_to_followers, broadcast_update_to_followers, broadcast_add_to_followers, broadcast_undo_add_to_followers, broadcast_announce_to_followers, broadcast_undo_announce_to_followers.

Leave unchanged: broadcast_actor_update (filters only on FollowerStatus::Accepted, no blocked checks).

  • Add private helper to impl ActivityPubService — insert near the top of the impl block, after request_data:
/// Returns `(local_actor, deduplicated_inboxes)` for all accepted followers,
/// excluding blocked actors and blocked domains. Returns `None` if there are
/// no eligible followers (caller should early-return `Ok(())`).
async fn accepted_follower_inboxes(
    &self,
    data: &activitypub_federation::config::Data<FederationData>,
    local_user_id: uuid::Uuid,
) -> anyhow::Result<Option<(crate::actors::DbActor, Vec<Url>)>> {
    let local_actor = get_local_actor(local_user_id, data)
        .await
        .map_err(|e| anyhow::anyhow!("{e}"))?;

    let followers = data.federation_repo.get_followers(local_user_id).await?;
    let blocked = data.federation_repo.get_blocked_actors(local_user_id).await.unwrap_or_default();
    let blocked_set: std::collections::HashSet<String> = blocked.into_iter().collect();
    let blocked_domains = data.federation_repo.get_blocked_domains().await.unwrap_or_default();
    let blocked_domain_set: std::collections::HashSet<String> =
        blocked_domains.into_iter().map(|d| d.domain).collect();

    let accepted: Vec<_> = followers
        .into_iter()
        .filter(|f| f.status == FollowerStatus::Accepted)
        .filter(|f| !blocked_set.contains(&f.actor.url))
        .filter(|f| {
            let domain = url::Url::parse(&f.actor.inbox_url)
                .ok()
                .and_then(|u| u.host_str().map(|s| s.to_string()))
                .unwrap_or_default();
            !blocked_domain_set.contains(&domain)
        })
        .collect();

    if accepted.is_empty() {
        return Ok(None);
    }

    Ok(Some((local_actor, collect_inboxes(&accepted))))
}
  • Refactor each of the 7 methods to use accepted_follower_inboxes.

For each method, replace the block that:

  1. Gets local_actor
  2. Gets followers + filtered inboxes

with:

let data = self.federation_config.to_request_data();
let Some((local_actor, inboxes)) = self.accepted_follower_inboxes(&data, local_user_id).await? else {
    return Ok(());
};

Then use local_actor and inboxes directly in the activity construction (same as before).

The 7 methods are at these line numbers (before refactor — check actual lines in the file):

  • broadcast_announce_to_followers

  • broadcast_undo_announce_to_followers (just added in Task 1)

  • broadcast_to_followers

  • broadcast_delete_to_followers

  • broadcast_update_to_followers

  • broadcast_add_to_followers

  • broadcast_undo_add_to_followers

  • Run: cargo check -p activitypub-base — Expected: no errors.

  • Run: cargo check --workspace — Expected: no errors.

  • Run: cargo test --workspace — Expected: same result as before (pre-existing postgres failures only).

  • Commit:

git add crates/adapters/activitypub-base/src/service.rs
git commit -m "refactor(activitypub-base): extract accepted_follower_inboxes helper — eliminate 7x duplicated filtering block"

Self-Review

Spec coverage:

  • broadcast_undo_announce added to OutboundFederationPort (Task 1)
  • broadcast_undo_announce_to_followers sends Undo { object: { type: "Announce", actor, object } } to accepted, non-blocked followers (Task 1)
  • FederationEventService handles BoostRemoved with same ap_id construction as BoostAdded (Task 1)
  • 2 tests: local thought URL constructed, remote thought uses ap_id (Task 1)
  • SpyPort has separate undo_announced vec (Task 1)
  • accepted_follower_inboxes helper extracts the 20-line filtering block (Task 2)
  • Helper used in 7 content-broadcast methods (Task 2)
  • broadcast_actor_update NOT touched — it uses different filtering (Task 2)

Placeholder scan: None.

Type consistency:

  • UndoActivity is already defined in activities.rs with object: serde_json::Value — no new activity type needed
  • broadcast_undo_announce_to_followers(uuid::Uuid, url::Url) — same signature pattern as broadcast_announce_to_followers
  • accepted_follower_inboxes returns Option<(DbActor, Vec<Url>)> — caller destructures with let Some(...) = ... else { return Ok(()) }