Some checks failed
lint / lint (push) Has been cancelled
test / unit (push) Has been cancelled
test / integration (push) Has been cancelled
lint / lint (pull_request) Failing after 5m8s
test / unit (pull_request) Successful in 16m18s
test / integration (pull_request) Failing after 16m59s
- Task 1: Fix feed response hydration by adding `to_thought_response` helper and updating feed handlers to return full `ThoughtResponse`. - Task 2: Wire follower/following REST routes for user feeds. - Task 3: Add user listing and count endpoints, including `GET /users` and `GET /users/count`. - Task 4: Implement popular tags feature with `GET /tags/popular`. - Task 5: Enhance configuration with HOST, CORS_ORIGINS, and optional rate limiting using tower-governor.
351 lines
14 KiB
Markdown
351 lines
14 KiB
Markdown
# Federation Follow-ups Plan
|
|
|
|
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
|
|
|
**Goal:** Two targeted follow-ups from the federation handler implementation: (1) handle `BoostRemoved` → `Undo(Announce)` fan-out, which was a known missing feature; (2) extract the repeated follower-filtering block in `ActivityPubService` into a private helper to eliminate duplication across 6 broadcast methods.
|
|
|
|
**Architecture:** Both changes are additive and self-contained. Task 1 touches `domain/ports.rs`, `activitypub-base/src/service.rs`, and `application/src/services/federation_event.rs`. Task 2 touches only `activitypub-base/src/service.rs`.
|
|
|
|
---
|
|
|
|
## File Map
|
|
|
|
```
|
|
Task 1:
|
|
Modify: crates/domain/src/ports.rs ← add broadcast_undo_announce to OutboundFederationPort
|
|
Modify: crates/adapters/activitypub-base/src/service.rs ← broadcast_undo_announce_to_followers + impl
|
|
Modify: crates/application/src/services/federation_event.rs ← handle BoostRemoved + tests
|
|
|
|
Task 2:
|
|
Modify: crates/adapters/activitypub-base/src/service.rs ← extract accepted_follower_inboxes helper
|
|
```
|
|
|
|
---
|
|
|
|
### Task 1: BoostRemoved → Undo(Announce)
|
|
|
|
**Files:**
|
|
- Modify: `crates/domain/src/ports.rs`
|
|
- Modify: `crates/adapters/activitypub-base/src/service.rs`
|
|
- Modify: `crates/application/src/services/federation_event.rs`
|
|
|
|
#### Step A: Add `broadcast_undo_announce` to `OutboundFederationPort`
|
|
|
|
- [ ] In `crates/domain/src/ports.rs`, add one method to `OutboundFederationPort` after `broadcast_announce`:
|
|
|
|
```rust
|
|
/// Fan out an Undo(Announce) to followers when a boost is removed.
|
|
async fn broadcast_undo_announce(
|
|
&self,
|
|
booster_user_id: &UserId,
|
|
object_ap_id: &str,
|
|
) -> Result<(), DomainError>;
|
|
```
|
|
|
|
- [ ] **Run:** `cargo check -p domain` — Expected: error in activitypub-base (trait impl missing method). This is expected.
|
|
|
|
#### Step B: Add `broadcast_undo_announce_to_followers` to `ActivityPubService` and implement the port method
|
|
|
|
- [ ] In `crates/adapters/activitypub-base/src/service.rs`, add `broadcast_undo_announce_to_followers` to `impl ActivityPubService` — insert after `broadcast_announce_to_followers`:
|
|
|
|
```rust
|
|
/// Fan out an Undo(Announce) activity to all accepted followers.
|
|
pub async fn broadcast_undo_announce_to_followers(
|
|
&self,
|
|
local_user_id: uuid::Uuid,
|
|
object_ap_id: url::Url,
|
|
) -> anyhow::Result<()> {
|
|
let data = self.federation_config.to_request_data();
|
|
let local_actor = get_local_actor(local_user_id, &data)
|
|
.await
|
|
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
|
|
|
let followers = data.federation_repo.get_followers(local_user_id).await?;
|
|
let blocked = data.federation_repo.get_blocked_actors(local_user_id).await.unwrap_or_default();
|
|
let blocked_set: std::collections::HashSet<String> = blocked.into_iter().collect();
|
|
let blocked_domains = data.federation_repo.get_blocked_domains().await.unwrap_or_default();
|
|
let blocked_domain_set: std::collections::HashSet<String> =
|
|
blocked_domains.into_iter().map(|d| d.domain).collect();
|
|
|
|
let accepted: Vec<_> = followers
|
|
.into_iter()
|
|
.filter(|f| f.status == FollowerStatus::Accepted)
|
|
.filter(|f| !blocked_set.contains(&f.actor.url))
|
|
.filter(|f| {
|
|
let domain = url::Url::parse(&f.actor.inbox_url)
|
|
.ok()
|
|
.and_then(|u| u.host_str().map(|s| s.to_string()))
|
|
.unwrap_or_default();
|
|
!blocked_domain_set.contains(&domain)
|
|
})
|
|
.collect();
|
|
|
|
if accepted.is_empty() {
|
|
return Ok(());
|
|
}
|
|
|
|
let undo_id = crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?;
|
|
let undo = crate::activities::UndoActivity {
|
|
id: undo_id,
|
|
kind: Default::default(),
|
|
actor: activitypub_federation::fetch::object_id::ObjectId::from(local_actor.ap_id.clone()),
|
|
object: serde_json::json!({
|
|
"type": "Announce",
|
|
"actor": local_actor.ap_id.to_string(),
|
|
"object": object_ap_id.to_string(),
|
|
}),
|
|
};
|
|
|
|
let inboxes = collect_inboxes(&accepted);
|
|
let sends = activitypub_federation::activity_sending::SendActivityTask::prepare(
|
|
&activitypub_federation::protocol::context::WithContext::new_default(undo),
|
|
&local_actor,
|
|
inboxes,
|
|
&data,
|
|
)
|
|
.await?;
|
|
let failures = send_with_retry(sends, &data).await;
|
|
if !failures.is_empty() {
|
|
tracing::warn!(count = failures.len(), "some Undo(Announce) deliveries failed");
|
|
}
|
|
Ok(())
|
|
}
|
|
```
|
|
|
|
- [ ] Add `broadcast_undo_announce` to the `impl domain::ports::OutboundFederationPort for ActivityPubService` block:
|
|
|
|
```rust
|
|
async fn broadcast_undo_announce(
|
|
&self,
|
|
booster_user_id: &domain::value_objects::UserId,
|
|
object_ap_id: &str,
|
|
) -> Result<(), domain::errors::DomainError> {
|
|
let user_uuid = booster_user_id.as_uuid();
|
|
let ap_id = url::Url::parse(object_ap_id)
|
|
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?;
|
|
self.broadcast_undo_announce_to_followers(user_uuid, ap_id)
|
|
.await
|
|
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))
|
|
}
|
|
```
|
|
|
|
- [ ] **Run:** `cargo check -p activitypub-base` — Expected: no errors.
|
|
- [ ] **Run:** `cargo check --workspace` — Expected: no errors.
|
|
|
|
#### Step C: Handle `BoostRemoved` in `FederationEventService`
|
|
|
|
- [ ] **Write failing test** first — add to the `#[cfg(test)] mod tests` block in `crates/application/src/services/federation_event.rs`:
|
|
|
|
```rust
|
|
#[tokio::test]
|
|
async fn boost_removed_sends_undo_announce_for_local_thought() {
|
|
let store = TestStore::default();
|
|
let alice = alice();
|
|
let thought = local_thought(alice.id.clone()); // ap_id = None → constructed URL
|
|
store.thoughts.lock().unwrap().push(thought.clone());
|
|
|
|
let spy = Arc::new(SpyPort::default());
|
|
svc(&store, spy.clone())
|
|
.process(&DomainEvent::BoostRemoved {
|
|
user_id: alice.id.clone(),
|
|
thought_id: thought.id.clone(),
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
let announced = spy.announced.lock().unwrap();
|
|
assert_eq!(announced.len(), 1);
|
|
assert_eq!(announced[0], format!("https://example.com/thoughts/{}", thought.id));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn boost_removed_sends_undo_announce_for_remote_thought() {
|
|
let store = TestStore::default();
|
|
let alice = alice();
|
|
let mut thought = local_thought(alice.id.clone());
|
|
thought.local = false;
|
|
thought.ap_id = Some("https://mastodon.social/users/bob/statuses/456".into());
|
|
store.thoughts.lock().unwrap().push(thought.clone());
|
|
|
|
let spy = Arc::new(SpyPort::default());
|
|
svc(&store, spy.clone())
|
|
.process(&DomainEvent::BoostRemoved {
|
|
user_id: alice.id.clone(),
|
|
thought_id: thought.id.clone(),
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
let announced = spy.announced.lock().unwrap();
|
|
assert_eq!(announced[0], "https://mastodon.social/users/bob/statuses/456");
|
|
}
|
|
```
|
|
|
|
NOTE: The `SpyPort` tracks `broadcast_undo_announce` calls in the same `announced` vec as `broadcast_announce` (or a new `undo_announced` vec — your choice, but be consistent in both the spy and the assertions).
|
|
|
|
Actually, use a separate `undo_announced` vec for clarity:
|
|
|
|
```rust
|
|
#[derive(Default)]
|
|
struct SpyPort {
|
|
created: Mutex<Vec<ThoughtId>>,
|
|
deleted: Mutex<Vec<String>>,
|
|
updated: Mutex<Vec<ThoughtId>>,
|
|
announced: Mutex<Vec<String>>,
|
|
undo_announced: Mutex<Vec<String>>,
|
|
}
|
|
```
|
|
|
|
And add the impl method:
|
|
```rust
|
|
async fn broadcast_undo_announce(&self, _: &UserId, ap_id: &str) -> Result<(), DomainError> {
|
|
self.undo_announced.lock().unwrap().push(ap_id.to_string());
|
|
Ok(())
|
|
}
|
|
```
|
|
|
|
Update the test assertions to use `spy.undo_announced`.
|
|
|
|
- [ ] **Run:** `cargo test -p application -- services::federation_event` — Expected: 2 new tests FAIL (not implemented).
|
|
|
|
- [ ] **Add `BoostRemoved` arm** to `FederationEventService::process` — insert after the `BoostAdded` arm:
|
|
|
|
```rust
|
|
DomainEvent::BoostRemoved { user_id, thought_id } => {
|
|
let thought = match self.thoughts.find_by_id(thought_id).await? {
|
|
Some(t) => t,
|
|
None => return Ok(()),
|
|
};
|
|
let object_ap_id = thought.ap_id.clone().unwrap_or_else(|| {
|
|
format!("{}/thoughts/{}", self.base_url, thought_id)
|
|
});
|
|
self.ap.broadcast_undo_announce(user_id, &object_ap_id).await
|
|
}
|
|
```
|
|
|
|
- [ ] **Run:** `cargo test -p application -- services::federation_event` — Expected: all tests pass (now 13).
|
|
|
|
- [ ] **Run:** `cargo test --workspace` — Expected: only pre-existing postgres DB failures (require live database).
|
|
|
|
- [ ] **Commit:**
|
|
|
|
```bash
|
|
git add crates/domain/src/ports.rs crates/adapters/activitypub-base/src/service.rs crates/application/src/services/federation_event.rs
|
|
git commit -m "feat: BoostRemoved → Undo(Announce) fan-out via OutboundFederationPort"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 2: Follower-filtering DRY extraction in activitypub-base
|
|
|
|
**Files:**
|
|
- Modify: `crates/adapters/activitypub-base/src/service.rs`
|
|
|
|
The repeated 20-line follower-filtering block appears in 7 methods. Extract it into a private async helper, then call it from the 6 content-broadcast methods. Leave `broadcast_actor_update` alone — it uses different filtering (no blocked-actor/domain check).
|
|
|
|
**Methods to update:** `broadcast_to_followers`, `broadcast_delete_to_followers`, `broadcast_update_to_followers`, `broadcast_add_to_followers`, `broadcast_undo_add_to_followers`, `broadcast_announce_to_followers`, `broadcast_undo_announce_to_followers`.
|
|
|
|
**Leave unchanged:** `broadcast_actor_update` (filters only on `FollowerStatus::Accepted`, no blocked checks).
|
|
|
|
- [ ] **Add private helper** to `impl ActivityPubService` — insert near the top of the impl block, after `request_data`:
|
|
|
|
```rust
|
|
/// Returns `(local_actor, deduplicated_inboxes)` for all accepted followers,
|
|
/// excluding blocked actors and blocked domains. Returns `None` if there are
|
|
/// no eligible followers (caller should early-return `Ok(())`).
|
|
async fn accepted_follower_inboxes(
|
|
&self,
|
|
data: &activitypub_federation::config::Data<FederationData>,
|
|
local_user_id: uuid::Uuid,
|
|
) -> anyhow::Result<Option<(crate::actors::DbActor, Vec<Url>)>> {
|
|
let local_actor = get_local_actor(local_user_id, data)
|
|
.await
|
|
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
|
|
|
let followers = data.federation_repo.get_followers(local_user_id).await?;
|
|
let blocked = data.federation_repo.get_blocked_actors(local_user_id).await.unwrap_or_default();
|
|
let blocked_set: std::collections::HashSet<String> = blocked.into_iter().collect();
|
|
let blocked_domains = data.federation_repo.get_blocked_domains().await.unwrap_or_default();
|
|
let blocked_domain_set: std::collections::HashSet<String> =
|
|
blocked_domains.into_iter().map(|d| d.domain).collect();
|
|
|
|
let accepted: Vec<_> = followers
|
|
.into_iter()
|
|
.filter(|f| f.status == FollowerStatus::Accepted)
|
|
.filter(|f| !blocked_set.contains(&f.actor.url))
|
|
.filter(|f| {
|
|
let domain = url::Url::parse(&f.actor.inbox_url)
|
|
.ok()
|
|
.and_then(|u| u.host_str().map(|s| s.to_string()))
|
|
.unwrap_or_default();
|
|
!blocked_domain_set.contains(&domain)
|
|
})
|
|
.collect();
|
|
|
|
if accepted.is_empty() {
|
|
return Ok(None);
|
|
}
|
|
|
|
Ok(Some((local_actor, collect_inboxes(&accepted))))
|
|
}
|
|
```
|
|
|
|
- [ ] **Refactor each of the 7 methods** to use `accepted_follower_inboxes`.
|
|
|
|
For each method, replace the block that:
|
|
1. Gets `local_actor`
|
|
2. Gets followers + filtered inboxes
|
|
|
|
with:
|
|
```rust
|
|
let data = self.federation_config.to_request_data();
|
|
let Some((local_actor, inboxes)) = self.accepted_follower_inboxes(&data, local_user_id).await? else {
|
|
return Ok(());
|
|
};
|
|
```
|
|
|
|
Then use `local_actor` and `inboxes` directly in the activity construction (same as before).
|
|
|
|
The 7 methods are at these line numbers (before refactor — check actual lines in the file):
|
|
- `broadcast_announce_to_followers`
|
|
- `broadcast_undo_announce_to_followers` (just added in Task 1)
|
|
- `broadcast_to_followers`
|
|
- `broadcast_delete_to_followers`
|
|
- `broadcast_update_to_followers`
|
|
- `broadcast_add_to_followers`
|
|
- `broadcast_undo_add_to_followers`
|
|
|
|
- [ ] **Run:** `cargo check -p activitypub-base` — Expected: no errors.
|
|
|
|
- [ ] **Run:** `cargo check --workspace` — Expected: no errors.
|
|
|
|
- [ ] **Run:** `cargo test --workspace` — Expected: same result as before (pre-existing postgres failures only).
|
|
|
|
- [ ] **Commit:**
|
|
|
|
```bash
|
|
git add crates/adapters/activitypub-base/src/service.rs
|
|
git commit -m "refactor(activitypub-base): extract accepted_follower_inboxes helper — eliminate 7x duplicated filtering block"
|
|
```
|
|
|
|
---
|
|
|
|
## Self-Review
|
|
|
|
**Spec coverage:**
|
|
- ✅ `broadcast_undo_announce` added to `OutboundFederationPort` (Task 1)
|
|
- ✅ `broadcast_undo_announce_to_followers` sends `Undo { object: { type: "Announce", actor, object } }` to accepted, non-blocked followers (Task 1)
|
|
- ✅ `FederationEventService` handles `BoostRemoved` with same ap_id construction as `BoostAdded` (Task 1)
|
|
- ✅ 2 tests: local thought URL constructed, remote thought uses ap_id (Task 1)
|
|
- ✅ `SpyPort` has separate `undo_announced` vec (Task 1)
|
|
- ✅ `accepted_follower_inboxes` helper extracts the 20-line filtering block (Task 2)
|
|
- ✅ Helper used in 7 content-broadcast methods (Task 2)
|
|
- ✅ `broadcast_actor_update` NOT touched — it uses different filtering (Task 2)
|
|
|
|
**Placeholder scan:** None.
|
|
|
|
**Type consistency:**
|
|
- `UndoActivity` is already defined in `activities.rs` with `object: serde_json::Value` — no new activity type needed
|
|
- `broadcast_undo_announce_to_followers(uuid::Uuid, url::Url)` — same signature pattern as `broadcast_announce_to_followers`
|
|
- `accepted_follower_inboxes` returns `Option<(DbActor, Vec<Url>)>` — caller destructures with `let Some(...) = ... else { return Ok(()) }`
|