From 94ea7a287fbc5994da937644e463e3772f63faec Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Fri, 15 May 2026 18:58:45 +0200 Subject: [PATCH] clean up --- .../plans/2026-05-15-ap-likes-boosts.md | 779 ----------- .../2026-05-15-domain-application-refactor.md | 1230 ----------------- .../plans/2026-05-15-federation-gaps-2.md | 781 ----------- .../plans/2026-05-15-feedentry-decoupling.md | 492 ------- .../2026-05-15-nats-dlq-auth-hardening.md | 836 ----------- .../2026-05-15-feedentry-decoupling-design.md | 80 -- ...26-05-15-nats-dlq-auth-hardening-design.md | 193 --- 7 files changed, 4391 deletions(-) delete mode 100644 docs/superpowers/plans/2026-05-15-ap-likes-boosts.md delete mode 100644 docs/superpowers/plans/2026-05-15-domain-application-refactor.md delete mode 100644 docs/superpowers/plans/2026-05-15-federation-gaps-2.md delete mode 100644 docs/superpowers/plans/2026-05-15-feedentry-decoupling.md delete mode 100644 docs/superpowers/plans/2026-05-15-nats-dlq-auth-hardening.md delete mode 100644 docs/superpowers/specs/2026-05-15-feedentry-decoupling-design.md delete mode 100644 docs/superpowers/specs/2026-05-15-nats-dlq-auth-hardening-design.md diff --git a/docs/superpowers/plans/2026-05-15-ap-likes-boosts.md b/docs/superpowers/plans/2026-05-15-ap-likes-boosts.md deleted file mode 100644 index e0e5a73..0000000 --- a/docs/superpowers/plans/2026-05-15-ap-likes-boosts.md +++ /dev/null @@ -1,779 +0,0 @@ -# ActivityPub Likes & Boost Notifications Implementation Plan - -> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. - -**Goal:** Wire local likes/unlikes to outbound Like/Undo(Like) AP activities, and handle inbound Like and Announce activities so Mastodon interactions create notifications. - -**Architecture:** Four layers of change — domain port extension, ActivityPubService implementation, application-layer federation event routing, and inbox activity handler registration. Inbound likes/boosts publish domain events (LikeAdded/BoostAdded) so the existing notification service picks them up without duplication. A locality guard in `federation_event.rs` prevents re-broadcasting remote boosts. - -**Tech Stack:** Rust, activitypub_federation crate, async-trait, serde, domain ports. - ---- - -## Files - -| Action | File | Purpose | -|--------|------|---------| -| Modify | `crates/domain/src/ports.rs` | Add `broadcast_like`, `broadcast_undo_like` to `OutboundFederationPort` | -| Modify | `crates/application/src/services/federation_event.rs` | Add `liked`/`undo_liked` to SpyPort; add `LikeAdded`/`LikeRemoved` arms; add locality guard to `BoostAdded` | -| Modify | `crates/adapters/activitypub-base/src/activities.rs` | Add `LikeActivity` struct; `LikeActivity::receive`; update `AnnounceActivity::receive`; register in `InboxActivities` | -| Modify | `crates/adapters/activitypub-base/src/content.rs` | Add `on_like`, `on_announce_received` to `ApObjectHandler` trait | -| Modify | `crates/adapters/activitypub-base/src/service.rs` | Add `broadcast_like_to_inbox`, `broadcast_undo_like_to_inbox`; implement port methods | -| Modify | `crates/adapters/activitypub/src/handler.rs` | Implement `on_like`, `on_announce_received` in `ThoughtsObjectHandler` | - ---- - -## Task 1: Extend OutboundFederationPort + SpyPort - -**Files:** -- Modify: `crates/domain/src/ports.rs` -- Modify: `crates/application/src/services/federation_event.rs` - -- [ ] **Step 1: Add two methods to `OutboundFederationPort` in `crates/domain/src/ports.rs`** - -Find `OutboundFederationPort` (around line 417). Add after `broadcast_undo_announce`: - -```rust -/// Send a Like activity to a remote thought author's inbox. -/// Only called when a LOCAL user likes a REMOTE thought (one with an ap_id). -async fn broadcast_like( - &self, - liker_user_id: &UserId, - object_ap_id: &str, - author_inbox_url: &str, -) -> Result<(), DomainError>; - -/// Send Undo(Like) to a remote thought author's inbox. -async fn broadcast_undo_like( - &self, - liker_user_id: &UserId, - object_ap_id: &str, - author_inbox_url: &str, -) -> Result<(), DomainError>; -``` - -- [ ] **Step 2: Add stubs to `SpyPort` in `crates/application/src/services/federation_event.rs`** - -Find `SpyPort` struct (around line 245). Add two fields: -```rust -liked: Mutex>, -undo_liked: Mutex>, -``` - -Find `impl OutboundFederationPort for SpyPort`. Add after `broadcast_undo_announce`: -```rust -async fn broadcast_like( - &self, - _: &UserId, - ap_id: &str, - _: &str, -) -> Result<(), DomainError> { - self.liked.lock().unwrap().push(ap_id.to_string()); - Ok(()) -} - -async fn broadcast_undo_like( - &self, - _: &UserId, - ap_id: &str, - _: &str, -) -> Result<(), DomainError> { - self.undo_liked.lock().unwrap().push(ap_id.to_string()); - Ok(()) -} -``` - -- [ ] **Step 3: Verify compilation** - -```bash -cd /mnt/drive/dev/thoughts && cargo build -p domain -p application 2>&1 | grep "^error" | head -10 -``` -Expected: no errors (activitypub-base will fail until Task 3 — that's fine, build only those two crates). - -- [ ] **Step 4: Commit** - -```bash -git add crates/domain/src/ports.rs crates/application/src/services/federation_event.rs -git commit -m "feat(domain): add broadcast_like/broadcast_undo_like to OutboundFederationPort" -``` - ---- - -## Task 2: LikeActivity struct + ApObjectHandler trait methods - -**Files:** -- Modify: `crates/adapters/activitypub-base/src/activities.rs` -- Modify: `crates/adapters/activitypub-base/src/content.rs` - -### Part A — LikeActivity struct (activities.rs) - -- [ ] **Step 1: Add `LikeType` and `LikeActivity` to `crates/adapters/activitypub-base/src/activities.rs`** - -Find where `AnnounceType` is defined (around line 13). Add right after: - -```rust -#[derive(Clone, Debug, Deserialize, Serialize)] -#[serde(rename = "Like")] -pub struct LikeType; - -impl Default for LikeType { - fn default() -> Self { - Self - } -} -``` - -Find where `AnnounceActivity` struct is defined (around line 461). Add a `LikeActivity` struct after it: - -```rust -#[derive(Clone, Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct LikeActivity { - pub id: Url, - #[serde(rename = "type")] - pub kind: LikeType, - pub actor: ObjectId, - pub object: Url, -} -``` - -### Part B — ApObjectHandler trait (content.rs) - -- [ ] **Step 2: Add `on_like` and `on_announce_received` to `ApObjectHandler` in `crates/adapters/activitypub-base/src/content.rs`** - -Find the `ApObjectHandler` trait. Add after `on_actor_removed`: - -```rust -/// Called when a remote actor likes a local thought. -/// `object_url` is the AP URL of the liked note (e.g. `{base}/thoughts/{uuid}`). -/// `actor_url` is the AP URL of the remote actor who sent the Like. -async fn on_like(&self, object_url: &Url, actor_url: &Url) -> anyhow::Result<()>; - -/// Called when a remote actor boosts (Announce) a local thought. -/// `object_url` is the AP URL of the announced note. -/// `actor_url` is the AP URL of the remote actor who sent the Announce. -async fn on_announce_received(&self, object_url: &Url, actor_url: &Url) -> anyhow::Result<()>; -``` - -- [ ] **Step 3: Verify compilation of activitypub-base** - -```bash -cd /mnt/drive/dev/thoughts && cargo build -p activitypub-base 2>&1 | grep "^error" | head -10 -``` -Expected: errors that `ThoughtsObjectHandler` in `activitypub` doesn't implement the new methods — that's fine. `activitypub-base` itself should compile. - -- [ ] **Step 4: Commit** - -```bash -git add crates/adapters/activitypub-base/src/activities.rs \ - crates/adapters/activitypub-base/src/content.rs -git commit -m "feat(activitypub-base): LikeActivity struct + on_like/on_announce_received trait methods" -``` - ---- - -## Task 3: Implement broadcast_like + LikeActivity::receive + AnnounceActivity update - -**Files:** -- Modify: `crates/adapters/activitypub-base/src/service.rs` -- Modify: `crates/adapters/activitypub-base/src/activities.rs` - -### Part A — ActivityPubService implementation (service.rs) - -- [ ] **Step 1: Add `broadcast_like_to_inbox` private method to `impl ActivityPubService`** - -Add this private method inside `impl ActivityPubService` (not inside the port impl block): - -```rust -pub async fn broadcast_like_to_inbox( - &self, - liker_user_id: uuid::Uuid, - object_ap_id: url::Url, - author_inbox_url: url::Url, -) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - let local_actor = get_local_actor(liker_user_id, &data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - - // Deterministic ID so Undo(Like) can reference the same activity. - let like_id = url::Url::parse(&format!( - "{}/activities/like/{}", - self.base_url, - uuid::Uuid::new_v5( - &uuid::Uuid::NAMESPACE_URL, - format!("{}/{}", liker_user_id, object_ap_id).as_bytes(), - ) - ))?; - - let like = crate::activities::LikeActivity { - id: like_id, - kind: Default::default(), - actor: activitypub_federation::fetch::object_id::ObjectId::from( - local_actor.ap_id.clone(), - ), - object: object_ap_id, - }; - - let sends = activitypub_federation::activity_sending::SendActivityTask::prepare( - &activitypub_federation::protocol::context::WithContext::new_default(like), - &local_actor, - vec![author_inbox_url], - &data, - ) - .await?; - let failures = send_with_retry(sends, &data).await; - if !failures.is_empty() { - tracing::warn!(count = failures.len(), "some Like deliveries failed permanently"); - } - Ok(()) -} -``` - -- [ ] **Step 2: Add `broadcast_undo_like_to_inbox` private method** - -Add directly after `broadcast_like_to_inbox`: - -```rust -pub async fn broadcast_undo_like_to_inbox( - &self, - liker_user_id: uuid::Uuid, - object_ap_id: url::Url, - author_inbox_url: url::Url, -) -> anyhow::Result<()> { - let data = self.federation_config.to_request_data(); - let local_actor = get_local_actor(liker_user_id, &data) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - - // Reconstruct the same deterministic like ID used when the like was sent. - let like_id = url::Url::parse(&format!( - "{}/activities/like/{}", - self.base_url, - uuid::Uuid::new_v5( - &uuid::Uuid::NAMESPACE_URL, - format!("{}/{}", liker_user_id, object_ap_id).as_bytes(), - ) - ))?; - - let undo_id = - crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?; - - let undo = crate::activities::UndoActivity { - id: undo_id, - kind: Default::default(), - actor: activitypub_federation::fetch::object_id::ObjectId::from( - local_actor.ap_id.clone(), - ), - object: serde_json::json!({ - "type": "Like", - "id": like_id.to_string(), - "actor": local_actor.ap_id.to_string(), - "object": object_ap_id.to_string(), - }), - }; - - let sends = activitypub_federation::activity_sending::SendActivityTask::prepare( - &activitypub_federation::protocol::context::WithContext::new_default(undo), - &local_actor, - vec![author_inbox_url], - &data, - ) - .await?; - let failures = send_with_retry(sends, &data).await; - if !failures.is_empty() { - tracing::warn!(count = failures.len(), "some Undo(Like) deliveries failed permanently"); - } - Ok(()) -} -``` - -- [ ] **Step 3: Implement `broadcast_like` and `broadcast_undo_like` in `impl domain::ports::OutboundFederationPort for ActivityPubService`** - -Find the existing `broadcast_undo_announce` impl. Add directly after it: - -```rust -async fn broadcast_like( - &self, - liker_user_id: &domain::value_objects::UserId, - object_ap_id: &str, - author_inbox_url: &str, -) -> Result<(), domain::errors::DomainError> { - let object = url::Url::parse(object_ap_id) - .map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?; - let inbox = url::Url::parse(author_inbox_url) - .map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?; - self.broadcast_like_to_inbox(liker_user_id.as_uuid(), object, inbox) - .await - .map_err(|e| domain::errors::DomainError::Internal(e.to_string())) -} - -async fn broadcast_undo_like( - &self, - liker_user_id: &domain::value_objects::UserId, - object_ap_id: &str, - author_inbox_url: &str, -) -> Result<(), domain::errors::DomainError> { - let object = url::Url::parse(object_ap_id) - .map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?; - let inbox = url::Url::parse(author_inbox_url) - .map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?; - self.broadcast_undo_like_to_inbox(liker_user_id.as_uuid(), object, inbox) - .await - .map_err(|e| domain::errors::DomainError::Internal(e.to_string())) -} -``` - -### Part B — LikeActivity::receive + AnnounceActivity update (activities.rs) - -- [ ] **Step 4: Implement `Activity` for `LikeActivity` in `crates/adapters/activitypub-base/src/activities.rs`** - -Add after the `LikeActivity` struct definition: - -```rust -#[async_trait] -impl Activity for LikeActivity { - type DataType = FederationData; - type Error = crate::error::Error; - - fn actor(&self) -> &Url { - self.actor.inner() - } - - async fn receive(self, data: &Data) -> Result<(), Self::Error> { - let domain = self.actor().host_str().unwrap_or(""); - if data.federation_repo.is_domain_blocked(domain).await? { - tracing::info!(actor = %self.actor(), "ignoring Like from blocked domain"); - return Ok(()); - } - - // Only process if the liked object is on our instance. - if self.object.host_str().unwrap_or("") != data.domain { - return Ok(()); - } - - data.object_handler - .on_like(&self.object, self.actor.inner()) - .await - .map_err(|e| crate::error::Error::Other(e.to_string()))?; - - tracing::info!(actor = %self.actor.inner(), object = %self.object, "received like"); - Ok(()) - } -} -``` - -- [ ] **Step 5: Update `AnnounceActivity::receive` to call `on_announce_received`** - -Find `AnnounceActivity::receive`. After the `add_announce` call and before the `tracing::info!`, add: - -```rust -data.object_handler - .on_announce_received(&self.object, self.actor.inner()) - .await - .unwrap_or_else(|e| { - tracing::warn!(error = %e, "failed to process announce notification"); - }); -``` - -- [ ] **Step 6: Register `LikeActivity` in `InboxActivities` enum** - -Find the `InboxActivities` enum. Add: - -```rust -#[serde(rename = "Like")] -Like(LikeActivity), -``` - -- [ ] **Step 7: Verify activitypub-base compiles** - -```bash -cd /mnt/drive/dev/thoughts && cargo build -p activitypub-base 2>&1 | grep "^error" | head -10 -``` -Expected: no errors from activitypub-base. (`activitypub` crate will fail until Task 4.) - -- [ ] **Step 8: Commit** - -```bash -git add crates/adapters/activitypub-base/src/service.rs \ - crates/adapters/activitypub-base/src/activities.rs -git commit -m "feat(activitypub-base): broadcast_like/undo_like + LikeActivity inbox handler" -``` - ---- - -## Task 4: Implement on_like and on_announce_received in ThoughtsObjectHandler - -**Files:** -- Modify: `crates/adapters/activitypub/src/handler.rs` - -`ThoughtsObjectHandler` has `ap_repo: Arc` and `event_publisher: Option>`. These are all we need. - -Pattern for both methods: -1. Parse the thought UUID out of the object URL path (`/thoughts/{uuid}`) -2. Find the remote actor's local user ID via `ap_repo.find_remote_actor_id(actor_url)` -3. Publish the appropriate domain event — the notification service already handles `LikeAdded` and `BoostAdded` - -- [ ] **Step 1: Read `crates/adapters/activitypub/src/handler.rs` to understand the struct and existing impls** - -Look for `struct ThoughtsObjectHandler` and `impl ApObjectHandler for ThoughtsObjectHandler`. - -- [ ] **Step 2: Implement `on_like` in `impl ApObjectHandler for ThoughtsObjectHandler`** - -Add: - -```rust -async fn on_like(&self, object_url: &url::Url, actor_url: &url::Url) -> anyhow::Result<()> { - // Parse thought UUID from path like /thoughts/{uuid} - let thought_uuid = object_url - .path() - .strip_prefix("/thoughts/") - .and_then(|s| s.split('/').next()) - .and_then(|s| uuid::Uuid::parse_str(s).ok()); - - let thought_uuid = match thought_uuid { - Some(u) => u, - None => { - tracing::debug!(object = %object_url, "on_like: not a local thought URL, skipping"); - return Ok(()); - } - }; - - // Resolve the remote actor to a local user ID. - let actor_user_id = self - .ap_repo - .find_remote_actor_id(actor_url) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - - let actor_user_id = match actor_user_id { - Some(id) => id, - None => { - tracing::debug!(actor = %actor_url, "on_like: remote actor not interned, skipping notification"); - return Ok(()); - } - }; - - if let Some(ep) = &self.event_publisher { - let thought_id = domain::value_objects::ThoughtId::from_uuid(thought_uuid); - let like_id = domain::value_objects::LikeId::new(); - ep.publish(&domain::events::DomainEvent::LikeAdded { - like_id, - user_id: actor_user_id, - thought_id, - }) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - } - - Ok(()) -} -``` - -- [ ] **Step 3: Implement `on_announce_received`** - -Add directly after `on_like`: - -```rust -async fn on_announce_received( - &self, - object_url: &url::Url, - actor_url: &url::Url, -) -> anyhow::Result<()> { - // Parse thought UUID from path like /thoughts/{uuid} - let thought_uuid = object_url - .path() - .strip_prefix("/thoughts/") - .and_then(|s| s.split('/').next()) - .and_then(|s| uuid::Uuid::parse_str(s).ok()); - - let thought_uuid = match thought_uuid { - Some(u) => u, - None => return Ok(()), - }; - - let actor_user_id = self - .ap_repo - .find_remote_actor_id(actor_url) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - - let actor_user_id = match actor_user_id { - Some(id) => id, - None => return Ok(()), - }; - - if let Some(ep) = &self.event_publisher { - let thought_id = domain::value_objects::ThoughtId::from_uuid(thought_uuid); - let boost_id = domain::value_objects::BoostId::new(); - ep.publish(&domain::events::DomainEvent::BoostAdded { - boost_id, - user_id: actor_user_id, - thought_id, - }) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - } - - Ok(()) -} -``` - -- [ ] **Step 4: Verify full workspace build** - -```bash -cd /mnt/drive/dev/thoughts && cargo build 2>&1 | grep "^error" | head -10 -``` -Expected: no errors. - -- [ ] **Step 5: Run tests** - -```bash -cd /mnt/drive/dev/thoughts && cargo test -p domain -p application 2>&1 | tail -5 -``` - -- [ ] **Step 6: Commit** - -```bash -git add crates/adapters/activitypub/src/handler.rs -git commit -m "feat(activitypub): implement on_like and on_announce_received in ThoughtsObjectHandler" -``` - ---- - -## Task 5: federation_event.rs — LikeAdded/LikeRemoved arms + BoostAdded locality guard - -**Files:** -- Modify: `crates/application/src/services/federation_event.rs` - -The federation service must: -- **BoostAdded**: add a locality check so remote boosts (published by Task 4) don't get re-broadcast -- **LikeAdded**: fan-out only when a LOCAL user likes a REMOTE thought (has ap_id) -- **LikeRemoved**: Undo(Like) when a LOCAL user unlikes a REMOTE thought - -- [ ] **Step 1: Write tests for the new arms** - -Find the `#[cfg(test)]` block in `crates/application/src/services/federation_event.rs`. Add: - -```rust -#[tokio::test] -async fn like_added_local_user_remote_thought_broadcasts_like() { - let store = TestStore::default(); - let spy = Arc::new(SpyPort::default()); - - // Set up a remote thought with ap_id - let author = { - let mut u = test_user("remote_author"); - u.local = false; - u.inbox_url = Some("https://mastodon.social/users/author/inbox".into()); - u - }; - let thought = { - let mut t = test_thought(author.id.clone()); - t.ap_id = Some("https://mastodon.social/posts/123".into()); - t.in_reply_to_url = None; - t - }; - let liker = test_user("alice"); // local user - - store.users.lock().unwrap().push(author); - store.users.lock().unwrap().push(liker.clone()); - store.thoughts.lock().unwrap().push(thought.clone()); - - let svc = test_service(store, spy.clone()); - svc.process(&DomainEvent::LikeAdded { - like_id: LikeId::new(), - user_id: liker.id, - thought_id: thought.id, - }) - .await - .unwrap(); - - assert_eq!(spy.liked.lock().unwrap().len(), 1); -} - -#[tokio::test] -async fn like_added_remote_user_skips_broadcast() { - let store = TestStore::default(); - let spy = Arc::new(SpyPort::default()); - - let author = test_user("alice"); - let thought = test_thought(author.id.clone()); // local thought, no ap_id - let remote_liker = { - let mut u = test_user("bob"); - u.local = false; - u - }; - - store.users.lock().unwrap().push(author); - store.users.lock().unwrap().push(remote_liker.clone()); - store.thoughts.lock().unwrap().push(thought.clone()); - - let svc = test_service(store, spy.clone()); - svc.process(&DomainEvent::LikeAdded { - like_id: LikeId::new(), - user_id: remote_liker.id, - thought_id: thought.id, - }) - .await - .unwrap(); - - assert!(spy.liked.lock().unwrap().is_empty()); -} - -#[tokio::test] -async fn boost_added_remote_user_skips_broadcast() { - let store = TestStore::default(); - let spy = Arc::new(SpyPort::default()); - - let author = test_user("alice"); - let thought = test_thought(author.id.clone()); - let remote_booster = { - let mut u = test_user("bob"); - u.local = false; - u - }; - - store.users.lock().unwrap().push(author); - store.users.lock().unwrap().push(remote_booster.clone()); - store.thoughts.lock().unwrap().push(thought.clone()); - - let svc = test_service(store, spy.clone()); - svc.process(&DomainEvent::BoostAdded { - boost_id: BoostId::new(), - user_id: remote_booster.id, - thought_id: thought.id, - }) - .await - .unwrap(); - - assert!(spy.announced.lock().unwrap().is_empty()); -} -``` - -Note: these tests use `test_user`, `test_thought`, `test_service` helpers — read the existing tests in the same file to find these helpers and use the same pattern. If `User.local` field setters don't exist, set the field directly (it's `pub`). - -- [ ] **Step 2: Run tests to confirm they fail** - -```bash -cd /mnt/drive/dev/thoughts && cargo test -p application federation_event 2>&1 | grep "FAILED\|error" | head -10 -``` -Expected: tests fail (LikeAdded arm not handled, BoostAdded has no locality guard). - -- [ ] **Step 3: Add locality guard to existing `BoostAdded` arm** - -Find the `DomainEvent::BoostAdded` match arm. Add a locality check at the top: - -```rust -DomainEvent::BoostAdded { - boost_id: _, - user_id, - thought_id, -} => { - // Only fan-out if the booster is a local user. Remote boosts (from inbound - // Announce activities) must not be re-broadcast to avoid loops. - let booster = match self.users.find_by_id(user_id).await? { - Some(u) if u.local => u, - _ => return Ok(()), - }; - let _ = booster; // suppress unused warning — kept for the locality check - let thought = match self.thoughts.find_by_id(thought_id).await? { - Some(t) => t, - None => return Ok(()), - }; - let object_ap_id = self.object_ap_id(&thought, thought_id); - self.ap.broadcast_announce(user_id, &object_ap_id).await -} -``` - -- [ ] **Step 4: Add `LikeAdded` arm** - -Find the `_ => Ok(())` catch-all at the end of the `match event` block. Add before it: - -```rust -DomainEvent::LikeAdded { - like_id: _, - user_id, - thought_id, -} => { - // Only federate: local liker + remote thought (has ap_id + author has inbox). - let liker = match self.users.find_by_id(user_id).await? { - Some(u) if u.local => u, - _ => return Ok(()), - }; - let _ = liker; - let thought = match self.thoughts.find_by_id(thought_id).await? { - Some(t) if t.ap_id.is_some() => t, - _ => return Ok(()), - }; - let author = match self.users.find_by_id(&thought.user_id).await? { - Some(u) if u.inbox_url.is_some() => u, - _ => return Ok(()), - }; - let object_ap_id = thought.ap_id.unwrap(); - let inbox_url = author.inbox_url.unwrap(); - self.ap - .broadcast_like(user_id, &object_ap_id, &inbox_url) - .await -} -``` - -- [ ] **Step 5: Add `LikeRemoved` arm** - -Add directly after `LikeAdded`: - -```rust -DomainEvent::LikeRemoved { - user_id, - thought_id, -} => { - let liker = match self.users.find_by_id(user_id).await? { - Some(u) if u.local => u, - _ => return Ok(()), - }; - let _ = liker; - let thought = match self.thoughts.find_by_id(thought_id).await? { - Some(t) if t.ap_id.is_some() => t, - _ => return Ok(()), - }; - let author = match self.users.find_by_id(&thought.user_id).await? { - Some(u) if u.inbox_url.is_some() => u, - _ => return Ok(()), - }; - let object_ap_id = thought.ap_id.unwrap(); - let inbox_url = author.inbox_url.unwrap(); - self.ap - .broadcast_undo_like(user_id, &object_ap_id, &inbox_url) - .await -} -``` - -- [ ] **Step 6: Run tests — all should pass** - -```bash -cd /mnt/drive/dev/thoughts && cargo test -p application federation_event 2>&1 | tail -5 -``` -Expected: all pass. - -- [ ] **Step 7: Full build + all unit tests** - -```bash -cd /mnt/drive/dev/thoughts && cargo build 2>&1 | grep "^error" | head -5 -cd /mnt/drive/dev/thoughts && cargo test -p domain -p application 2>&1 | tail -5 -``` - -- [ ] **Step 8: Commit** - -```bash -git add crates/application/src/services/federation_event.rs -git commit -m "feat(application): federate local likes + locality guard prevents remote boost re-broadcast" -``` - ---- - -## Notes - -- **No loop risk**: The `BoostAdded` locality guard (`u.local`) ensures remote boosts published by `on_announce_received` skip federation fan-out. Same guard applies to `LikeAdded`. -- **Existing notification service**: `LikeAdded` and `BoostAdded` events published from inbound activity handlers are picked up by `NotificationEventService` unchanged — it already creates notifications for these events. -- **Deterministic activity IDs**: Like and Undo(Like) use `Uuid::new_v5(NAMESPACE_URL, "{user}/{object}")` so the Undo can reference the original Like ID without DB storage. -- **Only remote thoughts get likes federated**: Local thoughts liked by local users generate no outbound activity (the like is already recorded locally). diff --git a/docs/superpowers/plans/2026-05-15-domain-application-refactor.md b/docs/superpowers/plans/2026-05-15-domain-application-refactor.md deleted file mode 100644 index 66282cf..0000000 --- a/docs/superpowers/plans/2026-05-15-domain-application-refactor.md +++ /dev/null @@ -1,1230 +0,0 @@ -# Domain & Application Refactor Implementation Plan - -> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. - -**Goal:** Clean up 11 architectural problems in `crates/domain/` and `crates/application/` — from most critical to least — applying CQRS port splits, removing AP infrastructure leakage from the domain, making invalid states unrepresentable, and eliminating pass-through noise. - -**Architecture:** Hexagonal (ports & adapters). Domain knows nothing about HTTP, SQL, or ActivityPub. Application orchestrates domain via ports. Adapters implement ports. CQRS: every repository is split into a read-side trait and a write-side trait; the concrete adapter implements both; a combined supertrait keeps `AppState` unchanged. - -**Tech Stack:** Rust, Tokio, SQLx, Axum. All changes must pass `cargo check --workspace` and the pre-commit hook (`cargo fmt && cargo clippy`). - ---- - -## Phase 1 — Remove AP Infrastructure from the Domain (most critical) - -These four tasks are the most invasive. They must land together in one commit because they form a closed loop of changes. - -### Task 1: Add AP lookup accessors to `ActivityPubRepository` - -The domain models `User` and `Thought` currently carry AP-specific fields (`ap_id`, `inbox_url`, `in_reply_to_url`). Before we can remove those fields, the services that currently read them need another way to get the data. This task adds that other way. - -**Files:** -- Modify: `crates/domain/src/ports.rs` -- Modify: `crates/adapters/postgres/src/activitypub.rs` -- Modify: `crates/domain/src/testing.rs` - -- [ ] **Step 1: Add `ActorApUrls` struct and two new methods to `ActivityPubRepository` in `ports.rs`** - -In `crates/domain/src/ports.rs`, after the `OutboxEntry` struct (~line 336), add: - -```rust -/// AP-protocol endpoints for a locally-stored user (local or interned remote). -#[derive(Debug, Clone)] -pub struct ActorApUrls { - pub ap_id: String, - pub inbox_url: String, -} -``` - -Then inside `pub trait ActivityPubRepository`, add after `count_local_notes`: - -```rust - /// Return the ActivityPub object URL for a thought, if one is stored. - /// Returns None for local thoughts (caller constructs the URL from base_url + thought_id). - async fn get_thought_ap_id( - &self, - thought_id: &ThoughtId, - ) -> Result, DomainError>; - - /// Return the AP actor URL and inbox URL for a user, if stored. - /// Returns None for users that have not been federated. - async fn get_actor_ap_urls( - &self, - user_id: &UserId, - ) -> Result, DomainError>; -``` - -- [ ] **Step 2: Implement both methods in `PgActivityPubRepository`** - -In `crates/adapters/postgres/src/activitypub.rs`, add the two implementations: - -```rust -async fn get_thought_ap_id( - &self, - thought_id: &ThoughtId, -) -> Result, DomainError> { - sqlx::query_scalar::<_, String>( - "SELECT ap_id FROM thoughts WHERE id = $1 AND ap_id IS NOT NULL", - ) - .bind(thought_id.as_uuid()) - .fetch_optional(&self.pool) - .await - .into_domain() -} - -async fn get_actor_ap_urls( - &self, - user_id: &UserId, -) -> Result, DomainError> { - sqlx::query_as::<_, (String, String)>( - "SELECT ap_id, inbox_url FROM users \ - WHERE id = $1 AND ap_id IS NOT NULL AND inbox_url IS NOT NULL", - ) - .bind(user_id.as_uuid()) - .fetch_optional(&self.pool) - .await - .into_domain() - .map(|opt| { - opt.map(|(ap_id, inbox_url)| ActorApUrls { ap_id, inbox_url }) - }) -} -``` - -- [ ] **Step 3: Add stub implementations in `TestStore`** - -In `crates/domain/src/testing.rs`, add to the `ActivityPubRepository` impl for `TestStore`: - -```rust -async fn get_thought_ap_id( - &self, - _thought_id: &ThoughtId, -) -> Result, DomainError> { - Ok(None) -} - -async fn get_actor_ap_urls( - &self, - _user_id: &UserId, -) -> Result, DomainError> { - Ok(None) -} -``` - -- [ ] **Step 4: Compile check** - -```bash -cargo check --workspace 2>&1 | head -30 -``` -Expected: 0 errors. - ---- - -### Task 2: Remove AP fields from `User` and `Thought` domain models - -Now that the AP lookup methods exist, remove the AP-specific fields from the domain models and update all consumers. - -**Files:** -- Modify: `crates/domain/src/models/user.rs` -- Modify: `crates/domain/src/models/thought.rs` -- Modify: `crates/adapters/postgres/src/user.rs` (UserRow → User mapping) -- Modify: `crates/adapters/postgres/src/thought.rs` (ThoughtRow → Thought mapping) -- Modify: `crates/adapters/postgres/src/feed.rs` (FeedRow → FeedEntry mapping) -- Modify: `crates/adapters/postgres-search/src/lib.rs` (FeedRow → FeedEntry mapping) -- Modify: `crates/application/src/services/federation_event.rs` -- Modify: `crates/domain/src/testing.rs` - -- [ ] **Step 1: Remove from `User` model** - -In `crates/domain/src/models/user.rs`, remove `ap_id` and `inbox_url` fields: - -```rust -#[derive(Debug, Clone)] -pub struct User { - pub id: UserId, - pub username: Username, - pub email: Email, - pub password_hash: PasswordHash, - pub display_name: Option, - pub bio: Option, - pub avatar_url: Option, - pub header_url: Option, - pub custom_css: Option, - pub local: bool, - pub created_at: DateTime, - pub updated_at: DateTime, -} - -impl User { - pub fn new_local( - id: UserId, - username: Username, - email: Email, - password_hash: PasswordHash, - ) -> Self { - let now = Utc::now(); - Self { - id, - username, - email, - password_hash, - display_name: None, - bio: None, - avatar_url: None, - header_url: None, - custom_css: None, - local: true, - created_at: now, - updated_at: now, - } - } -} -``` - -- [ ] **Step 2: Remove from `Thought` model** - -In `crates/domain/src/models/thought.rs`, remove `ap_id` and `in_reply_to_url` fields: - -```rust -#[derive(Debug, Clone)] -pub struct Thought { - pub id: ThoughtId, - pub user_id: UserId, - pub content: Content, - pub in_reply_to_id: Option, - pub visibility: Visibility, - pub content_warning: Option, - pub sensitive: bool, - pub local: bool, - pub created_at: DateTime, - pub updated_at: Option>, -} - -impl Thought { - pub fn new_local( - id: ThoughtId, - user_id: UserId, - content: Content, - in_reply_to_id: Option, - visibility: Visibility, - content_warning: Option, - sensitive: bool, - ) -> Self { - Self { - id, - user_id, - content, - in_reply_to_id, - visibility, - content_warning, - sensitive, - local: true, - created_at: Utc::now(), - updated_at: None, - } - } -} -``` - -- [ ] **Step 3: Update `UserRow → User` mapping in `postgres/src/user.rs`** - -Remove the `ap_id` and `inbox_url` fields from `UserRow` and its `From for User` impl: - -```rust -#[derive(sqlx::FromRow)] -pub struct UserRow { - pub id: uuid::Uuid, - pub username: String, - pub email: String, - pub password_hash: String, - pub display_name: Option, - pub bio: Option, - pub avatar_url: Option, - pub header_url: Option, - pub custom_css: Option, - pub local: bool, - pub created_at: DateTime, - pub updated_at: DateTime, -} - -impl From for User { - fn from(r: UserRow) -> Self { - User { - id: UserId::from_uuid(r.id), - username: Username::from_trusted(r.username), - email: Email::from_trusted(r.email), - password_hash: PasswordHash(r.password_hash), - display_name: r.display_name, - bio: r.bio, - avatar_url: r.avatar_url, - header_url: r.header_url, - custom_css: r.custom_css, - local: r.local, - created_at: r.created_at, - updated_at: r.updated_at, - } - } -} -``` - -Update `USER_SELECT` to remove the columns: -```rust -pub const USER_SELECT: &str = - "SELECT id,username,email,password_hash,display_name,bio,avatar_url,header_url,\ - custom_css,local,created_at,updated_at FROM users"; -``` - -- [ ] **Step 4: Update `ThoughtRow → Thought` mapping in `postgres/src/thought.rs`** - -Remove `ap_id` and `in_reply_to_url` from `ThoughtRow` and its `From` impl. The `ap_id` column stays in the DB — we just don't pull it into the domain model via this path. The `SELECT` query changes too: - -```rust -#[derive(sqlx::FromRow)] -struct ThoughtRow { - pub id: uuid::Uuid, - pub user_id: uuid::Uuid, - pub content: String, - pub in_reply_to_id: Option, - pub visibility: String, - pub content_warning: Option, - pub sensitive: bool, - pub local: bool, - pub created_at: DateTime, - pub updated_at: Option>, -} -``` - -Update the `SELECT` constant and `From` impl to match (remove `ap_id`, `in_reply_to_url` from both). Also update the `save` method — remove the `ap_id` binding from INSERT. The `ap_id` column in the DB gets set to NULL for local thoughts; remote thoughts' ap_id is set by `accept_note` in the AP adapter. - -- [ ] **Step 5: Update `FeedRow` mappings in `feed.rs` and `postgres-search/lib.rs`** - -In `postgres/src/feed.rs`, remove `t_ap_id` and `in_reply_to_url` from `FeedRow` and from the SELECT query. In `postgres-search/src/lib.rs`, same removal. - -The `FeedEntry.thought` field is now a `Thought` without `ap_id`. Adapters that need AP context use `ActivityPubRepository`. - -- [ ] **Step 6: Update `FederationEventService` to use AP lookups** - -In `crates/application/src/services/federation_event.rs`, replace all reads of `thought.ap_id` and `user.inbox_url` with calls to `self.ap_repo`: - -Replace the `object_ap_id` helper: -```rust -async fn object_ap_id(&self, thought: &Thought, thought_id: &ThoughtId) -> Result { - if !thought.local { - if let Some(ap_id) = self.ap_repo.get_thought_ap_id(thought_id).await? { - return Ok(ap_id); - } - } - Ok(format!("{}/thoughts/{}", self.base_url, thought_id)) -} -``` - -Note: this method is now `async`. Update all callers to `.await`. - -For `ThoughtCreated` — the `in_reply_to_url` resolution changes. Instead of reading `thought.ap_id` of the parent, call `self.ap_repo.get_thought_ap_id(reply_id).await?` and fall back to constructing the URL. - -For `LikeAdded` / `LikeRemoved` — replace `thought.ap_id.is_some()` check and `thought.ap_id.unwrap()` / `user.inbox_url.unwrap()` with: -```rust -let thought_ap_id = match self.ap_repo.get_thought_ap_id(thought_id).await? { - Some(id) => id, - None => return Ok(()), // local thought — no federation needed -}; -let actor_urls = match self.ap_repo.get_actor_ap_urls(&thought.user_id).await? { - Some(u) => u, - None => return Ok(()), -}; -self.ap.broadcast_like(user_id, &thought_ap_id, &actor_urls.inbox_url).await -``` - -For `BoostAdded` / `BoostRemoved` — same pattern, use `object_ap_id` (now async). - -- [ ] **Step 7: Update `TestStore` for fields removed from models** - -In `crates/domain/src/testing.rs`, update any place that constructs `User` or `Thought` with `ap_id`/`inbox_url`/`in_reply_to_url` fields. These fields no longer exist on the structs. - -- [ ] **Step 8: Compile check** -```bash -cargo check --workspace 2>&1 | head -40 -``` - -Fix all errors before moving on. - -- [ ] **Step 9: Commit** -```bash -git add -p -git commit -m "refactor(domain): remove AP fields from User and Thought; use ActivityPubRepository lookups" -``` - ---- - -### Task 3: Remove AP infrastructure events from `DomainEvent`; add `FederationSchedulerPort` - -`FetchRemoteActorPosts` and `FetchActorConnections` in `DomainEvent` carry raw AP URLs — infrastructure, not domain. Replace them with a narrow port that the application layer calls directly. - -**Files:** -- Modify: `crates/domain/src/events.rs` -- Modify: `crates/domain/src/ports.rs` -- Modify: `crates/domain/src/testing.rs` -- Modify: `crates/application/src/use_cases/federation_management.rs` -- Modify: `crates/application/src/services/federation_event.rs` -- Modify: `crates/adapters/activitypub-base/src/service.rs` (implements the new port) -- Modify: `crates/presentation/src/state.rs` -- Modify: `crates/bootstrap/src/factory.rs` - -- [ ] **Step 1: Add `FederationSchedulerPort` to `ports.rs`** - -```rust -#[async_trait] -pub trait FederationSchedulerPort: Send + Sync { - async fn schedule_actor_posts_fetch( - &self, - actor_ap_url: &str, - outbox_url: &str, - ) -> Result<(), DomainError>; - - async fn schedule_connections_fetch( - &self, - actor_ap_url: &str, - collection_url: &str, - connection_type: &str, - page: u32, - ) -> Result<(), DomainError>; -} -``` - -- [ ] **Step 2: Remove the two AP events from `events.rs`** - -In `crates/domain/src/events.rs`, delete the `FetchRemoteActorPosts` and `FetchActorConnections` variants entirely. The enum becomes: - -```rust -#[derive(Debug, Clone)] -pub enum DomainEvent { - ThoughtCreated { thought_id: ThoughtId, user_id: UserId, in_reply_to_id: Option }, - ThoughtDeleted { thought_id: ThoughtId, user_id: UserId }, - ThoughtUpdated { thought_id: ThoughtId, user_id: UserId }, - LikeAdded { like_id: LikeId, user_id: UserId, thought_id: ThoughtId }, - LikeRemoved { user_id: UserId, thought_id: ThoughtId }, - BoostAdded { boost_id: BoostId, user_id: UserId, thought_id: ThoughtId }, - BoostRemoved { user_id: UserId, thought_id: ThoughtId }, - FollowRequested { follower_id: UserId, following_id: UserId }, - FollowAccepted { follower_id: UserId, following_id: UserId }, - FollowRejected { follower_id: UserId, following_id: UserId }, - Unfollowed { follower_id: UserId, following_id: UserId }, - UserBlocked { blocker_id: UserId, blocked_id: UserId }, - UserUnblocked { blocker_id: UserId, blocked_id: UserId }, - UserRegistered { user_id: UserId }, - ProfileUpdated { user_id: UserId }, - MentionReceived { thought_id: ThoughtId, mentioned_user_id: UserId, author_user_id: UserId }, -} -``` - -- [ ] **Step 3: Update `federation_management.rs` to call the scheduler port directly** - -In `crates/application/src/use_cases/federation_management.rs`, add a `scheduler: &dyn FederationSchedulerPort` parameter to `get_remote_actor_posts` and `get_actor_connections_page`, and call it instead of publishing an event: - -```rust -pub async fn get_remote_actor_posts( - federation: &dyn FederationActionPort, - ap_repo: &dyn ActivityPubRepository, - feed: &dyn FeedRepository, - scheduler: &dyn FederationSchedulerPort, // NEW - handle: &str, - page: PageParams, - viewer_id: Option<&UserId>, -) -> Result<(Paginated, bool), DomainError> { - let actor = federation.lookup_actor(handle).await?; - let actor_url = url::Url::parse(&actor.url) - .map_err(|e| DomainError::ExternalService(e.to_string()))?; - let author_id = match ap_repo.find_remote_actor_id(&actor_url).await? { - Some(id) => id, - None => ap_repo.intern_remote_actor(&actor_url).await?, - }; - let result = get_user_feed(feed, &author_id, page, viewer_id).await?; - // Schedule background fetch if actor has outbox - if let Some(ref outbox_url) = actor.outbox_url { - let _ = scheduler - .schedule_actor_posts_fetch(&actor.url, outbox_url) - .await; - } - let has_more = !result.items.is_empty(); - Ok((result, has_more)) -} - -pub async fn get_actor_connections_page( - federation: &dyn FederationActionPort, - connections: &dyn RemoteActorConnectionRepository, - scheduler: &dyn FederationSchedulerPort, // NEW - handle: &str, - connection_type: &str, - page: u32, -) -> Result<(Vec, bool), DomainError> { - // ... existing lookup logic ... - // Replace event publish with: - if stale { - let _ = scheduler - .schedule_connections_fetch(&actor.url, &collection_url, connection_type, page) - .await; - } - // ... -} -``` - -- [ ] **Step 4: Update `FederationEventService` — remove the two AP event arms** - -In `federation_event.rs`, delete the `DomainEvent::FetchRemoteActorPosts` and `DomainEvent::FetchActorConnections` match arms. The service no longer handles infrastructure fetch commands. - -Also remove the `federation_action` and `remote_actor_connections` fields from `FederationEventService` if they're only used by those two removed arms. Check: they're also used nowhere else. Remove them from the struct definition. - -- [ ] **Step 5: Implement `FederationSchedulerPort` in `ActivityPubService`** - -In `crates/adapters/activitypub-base/src/service.rs`, add an impl: - -```rust -#[async_trait::async_trait] -impl domain::ports::FederationSchedulerPort for ActivityPubService { - async fn schedule_actor_posts_fetch( - &self, - actor_ap_url: &str, - outbox_url: &str, - ) -> Result<(), domain::errors::DomainError> { - // Re-use the existing event publisher to publish the background task, - // but now this is an adapter concern, not a domain event. - self.events - .publish(&domain::events::DomainEvent::ThoughtCreated { - // ... actually: use a separate internal channel or directly spawn - }) - .await - } -} -``` - -**Note:** The exact implementation depends on how the worker picks up background tasks. If the worker currently subscribes to `DomainEvent`, and we've removed these variants, the worker needs a new subscription path. The simplest compatible approach: have `ActivityPubService` keep an internal sender for these tasks (the existing NATS/channel mechanism), and the worker subscribes to that same mechanism. Read `crates/adapters/nats/src/lib.rs` to understand the existing plumbing, then implement accordingly. - -- [ ] **Step 6: Add `FederationSchedulerPort` to `AppState` and `TestStore`** - -`AppState` (`crates/presentation/src/state.rs`): -```rust -pub federation_scheduler: Arc, -``` - -`TestStore` (`crates/domain/src/testing.rs`) — add a no-op impl: -```rust -#[async_trait] -impl FederationSchedulerPort for TestStore { - async fn schedule_actor_posts_fetch(&self, _: &str, _: &str) -> Result<(), DomainError> { - Ok(()) - } - async fn schedule_connections_fetch(&self, _: &str, _: &str, _: &str, _: u32) -> Result<(), DomainError> { - Ok(()) - } -} -``` - -- [ ] **Step 7: Wire up in `bootstrap/src/factory.rs`** - -Add `federation_scheduler: Arc::new(infra.ap_service.clone())` to the `AppState` construction (assuming `ActivityPubService` implements the port). - -- [ ] **Step 8: Update handlers that call `get_remote_actor_posts` / `get_actor_connections_page`** - -In `crates/presentation/src/handlers/federation_actors.rs`, pass `&*s.federation_scheduler` to the new parameter. - -- [ ] **Step 9: Compile check** -```bash -cargo check --workspace 2>&1 | head -40 -``` - -- [ ] **Step 10: Commit** -```bash -git commit -m "refactor(domain): remove FetchRemoteActorPosts/FetchActorConnections from DomainEvent; add FederationSchedulerPort" -``` - ---- - -## Phase 2 — CQRS Port Split - -### Task 4: Split `UserRepository` into `UserReader + UserWriter` - -**Files:** -- Modify: `crates/domain/src/ports.rs` -- Modify: `crates/adapters/postgres/src/user.rs` -- Modify: `crates/domain/src/testing.rs` -- Modify: `crates/application/src/use_cases/*.rs` (update function signatures) - -- [ ] **Step 1: Define `UserReader` and `UserWriter` in `ports.rs`** - -Replace the existing `UserRepository` trait with three traits: - -```rust -#[async_trait] -pub trait UserReader: Send + Sync { - async fn find_by_id(&self, id: &UserId) -> Result, DomainError>; - async fn find_by_username(&self, username: &Username) -> Result, DomainError>; - async fn find_by_email(&self, email: &Email) -> Result, DomainError>; - async fn list_with_stats(&self) -> Result, DomainError>; - async fn count(&self) -> Result; -} - -#[async_trait] -pub trait UserWriter: Send + Sync { - async fn save(&self, user: &User) -> Result<(), DomainError>; - async fn update_profile( - &self, - user_id: &UserId, - display_name: Option, - bio: Option, - avatar_url: Option, - header_url: Option, - custom_css: Option, - ) -> Result<(), DomainError>; -} - -/// Combined supertrait — kept so `AppState` needs no change. -/// Postgres adapter implements all three; use cases declare the narrower bound they need. -pub trait UserRepository: UserReader + UserWriter {} -impl UserRepository for T {} -``` - -- [ ] **Step 2: `PgUserRepository` implements `UserReader` and `UserWriter` separately** - -In `crates/adapters/postgres/src/user.rs`, split the single `impl UserRepository` into: - -```rust -#[async_trait] -impl UserReader for PgUserRepository { - // all read methods -} - -#[async_trait] -impl UserWriter for PgUserRepository { - // save, update_profile -} -``` - -The blanket impl in ports.rs makes `PgUserRepository: UserRepository` automatically. - -- [ ] **Step 3: Update use case function signatures to declare the narrowest bound they need** - -In each use case file, change `users: &dyn UserRepository` to the narrowest applicable bound: - -| Use case | New bound | -|---|---| -| `get_user`, `get_user_by_username`, `get_user_by_id_or_username`, `get_top_friends` | `&dyn UserReader` | -| `register_user` | `&dyn UserWriter` | -| `update_profile` | `&dyn UserWriter` (for the write) + `&dyn UserReader` (to fetch back) | -| `follow_actor`, `unfollow_actor`, `block_by_username` | `&dyn UserReader` (lookup only) | - -Handlers still pass `&*s.users` (an `Arc`), which satisfies any narrower bound via Rust's trait coercion. - -- [ ] **Step 4: Update `TestStore` — split its user impl to satisfy both traits** - -```rust -#[async_trait] -impl UserReader for TestStore { /* existing read methods */ } - -#[async_trait] -impl UserWriter for TestStore { /* existing write methods */ } -``` - -- [ ] **Step 5: Compile check** -```bash -cargo check --workspace 2>&1 | head -30 -``` - -- [ ] **Step 6: Commit** -```bash -git commit -m "refactor(ports): CQRS split — UserReader + UserWriter supertrait" -``` - ---- - -### Task 5: Split `FederationActionPort` into four focused sub-ports - -**Files:** -- Modify: `crates/domain/src/ports.rs` -- Modify: `crates/domain/src/testing.rs` -- Modify: `crates/adapters/activitypub-base/src/service.rs` -- Modify: `crates/application/src/use_cases/federation_management.rs` -- Modify: `crates/application/src/use_cases/social.rs` -- Modify: `crates/application/src/use_cases/profile.rs` -- Modify: `crates/presentation/src/state.rs` - -- [ ] **Step 1: Define the four sub-ports in `ports.rs`** - -Replace `FederationActionPort` with: - -```rust -#[async_trait] -pub trait FederationLookupPort: Send + Sync { - async fn lookup_actor(&self, handle: &str) -> Result; - async fn actor_json(&self, user_id: &UserId) -> Result; - async fn followers_collection_json(&self, user_id: &UserId, page: Option) -> Result; - async fn following_collection_json(&self, user_id: &UserId, page: Option) -> Result; -} - -#[async_trait] -pub trait FederationFollowPort: Send + Sync { - async fn follow_remote(&self, local_user_id: &UserId, handle: &str) -> Result<(), DomainError>; - async fn unfollow_remote(&self, local_user_id: &UserId, handle: &str) -> Result<(), DomainError>; - async fn get_remote_following(&self, user_id: &UserId) -> Result, DomainError>; -} - -#[async_trait] -pub trait FederationFollowRequestPort: Send + Sync { - async fn get_pending_followers(&self, user_id: &UserId) -> Result, DomainError>; - async fn accept_follow_request(&self, user_id: &UserId, actor_url: &str) -> Result<(), DomainError>; - async fn reject_follow_request(&self, user_id: &UserId, actor_url: &str) -> Result<(), DomainError>; - async fn get_remote_followers(&self, user_id: &UserId) -> Result, DomainError>; - async fn remove_remote_follower(&self, user_id: &UserId, actor_url: &str) -> Result<(), DomainError>; -} - -#[async_trait] -pub trait FederationFetchPort: Send + Sync { - async fn fetch_outbox_page(&self, outbox_url: &str, page: u32) -> Result, DomainError>; - async fn fetch_actor_urls_from_collection(&self, collection_url: &str) -> Result, DomainError>; - async fn resolve_actor_profiles(&self, urls: Vec) -> Vec; -} - -/// Combined supertrait — `AppState.federation` stays as a single field. -pub trait FederationActionPort: FederationLookupPort + FederationFollowPort + FederationFollowRequestPort + FederationFetchPort {} -impl FederationActionPort for T {} -``` - -- [ ] **Step 2: Split `ActivityPubService` impl into four sub-impls** - -In `crates/adapters/activitypub-base/src/service.rs`, replace `impl FederationActionPort` with four separate impls, each containing only the methods for that sub-port. - -- [ ] **Step 3: Update use case signatures to declare the narrowest port they need** - -| Use case / handler | New bound | -|---|---| -| `get_user` (AP actor JSON path) | `&dyn FederationLookupPort` | -| `follow_actor`, `unfollow_actor` | `&dyn FederationFollowPort` | -| `get_top_friends` (no federation) | remove federation param if unused | -| `list_pending_requests`, `accept/reject`, `list_remote_followers/following` | `&dyn FederationFollowRequestPort` | -| `get_remote_actor_posts`, `get_actor_connections_page` | `&dyn FederationLookupPort + &dyn FederationFetchPort` | -| `remove_remote_following` (federation_management) | `&dyn FederationFollowPort` | - -Handlers pass `&*s.federation` which satisfies all bounds. - -- [ ] **Step 4: Update `TestStore` — split impl into four** - -```rust -#[async_trait] -impl FederationLookupPort for TestStore { /* ... */ } -#[async_trait] -impl FederationFollowPort for TestStore { /* ... */ } -#[async_trait] -impl FederationFollowRequestPort for TestStore { /* ... */ } -#[async_trait] -impl FederationFetchPort for TestStore { /* ... */ } -``` - -- [ ] **Step 5: Compile check** -```bash -cargo check --workspace 2>&1 | head -40 -``` - -- [ ] **Step 6: Commit** -```bash -git commit -m "refactor(ports): CQRS split — FederationActionPort into four focused sub-ports" -``` - ---- - -## Phase 3 — Domain Model Quality - -### Task 6: Algebraic `Notification` type (make invalid states unrepresentable) - -**Files:** -- Modify: `crates/domain/src/models/notification.rs` -- Modify: `crates/domain/src/ports.rs` (`NotificationRepository::save` param type stays `&Notification`) -- Modify: `crates/adapters/postgres/src/notification.rs` -- Modify: `crates/application/src/services/notification_event.rs` -- Modify: `crates/domain/src/testing.rs` - -- [ ] **Step 1: Redefine `Notification` as an algebraic type** - -Replace `crates/domain/src/models/notification.rs` entirely: - -```rust -use crate::value_objects::{NotificationId, ThoughtId, UserId}; -use chrono::{DateTime, Utc}; - -#[derive(Debug, Clone, PartialEq)] -pub enum NotificationKind { - Like { thought_id: ThoughtId, from_user_id: UserId }, - Boost { thought_id: ThoughtId, from_user_id: UserId }, - Reply { thought_id: ThoughtId, from_user_id: UserId }, - Mention { thought_id: ThoughtId, from_user_id: UserId }, - Follow { from_user_id: UserId }, -} - -impl NotificationKind { - pub fn from_user_id(&self) -> &UserId { - match self { - Self::Like { from_user_id, .. } => from_user_id, - Self::Boost { from_user_id, .. } => from_user_id, - Self::Reply { from_user_id, .. } => from_user_id, - Self::Mention { from_user_id, .. } => from_user_id, - Self::Follow { from_user_id } => from_user_id, - } - } - - pub fn thought_id(&self) -> Option<&ThoughtId> { - match self { - Self::Like { thought_id, .. } => Some(thought_id), - Self::Boost { thought_id, .. } => Some(thought_id), - Self::Reply { thought_id, .. } => Some(thought_id), - Self::Mention { thought_id, .. } => Some(thought_id), - Self::Follow { .. } => None, - } - } - - pub fn kind_str(&self) -> &'static str { - match self { - Self::Like { .. } => "like", - Self::Boost { .. } => "boost", - Self::Reply { .. } => "reply", - Self::Mention { .. } => "mention", - Self::Follow { .. } => "follow", - } - } -} - -#[derive(Debug, Clone)] -pub struct Notification { - pub id: NotificationId, - pub user_id: UserId, - pub kind: NotificationKind, - pub read: bool, - pub created_at: DateTime, -} -``` - -- [ ] **Step 2: Update `postgres/src/notification.rs`** - -The DB row still has `notification_type`, `from_user_id`, `thought_id`. Add a `NotificationRow → Notification` conversion that constructs the correct `NotificationKind` variant: - -```rust -#[derive(sqlx::FromRow)] -struct NotificationRow { - id: uuid::Uuid, - user_id: uuid::Uuid, - notification_type: String, - from_user_id: Option, - thought_id: Option, - read: bool, - created_at: DateTime, -} - -fn row_to_notification(r: NotificationRow) -> Result { - let from_user_id = r.from_user_id - .map(UserId::from_uuid) - .ok_or_else(|| DomainError::Internal("notification missing from_user_id".into()))?; - - let kind = match r.notification_type.as_str() { - "follow" => NotificationKind::Follow { from_user_id }, - other => { - let thought_id = r.thought_id - .map(ThoughtId::from_uuid) - .ok_or_else(|| DomainError::Internal( - format!("notification type '{other}' missing thought_id") - ))?; - match other { - "like" => NotificationKind::Like { thought_id, from_user_id }, - "boost" => NotificationKind::Boost { thought_id, from_user_id }, - "reply" => NotificationKind::Reply { thought_id, from_user_id }, - "mention" => NotificationKind::Mention { thought_id, from_user_id }, - _ => return Err(DomainError::Internal( - format!("unknown notification type: {other}") - )), - } - } - }; - - Ok(Notification { - id: NotificationId::from_uuid(r.id), - user_id: UserId::from_uuid(r.user_id), - kind, - read: r.read, - created_at: r.created_at, - }) -} -``` - -Update `save` to write the new fields from `n.kind`: -```rust -async fn save(&self, n: &Notification) -> Result<(), DomainError> { - sqlx::query( - "INSERT INTO notifications(id,user_id,notification_type,from_user_id,thought_id,read,created_at) - VALUES($1,$2,$3,$4,$5,$6,$7) - ON CONFLICT(id) DO NOTHING" - ) - .bind(n.id.as_uuid()) - .bind(n.user_id.as_uuid()) - .bind(n.kind.kind_str()) - .bind(n.kind.from_user_id().as_uuid()) - .bind(n.kind.thought_id().map(|t| t.as_uuid())) - .bind(n.read) - .bind(n.created_at) - .execute(&self.pool) - .await - .into_domain() - .map(|_| ()) -} -``` - -- [ ] **Step 3: Update `notification_event.rs` (the service that creates notifications)** - -Read `crates/application/src/services/notification_event.rs` and update all `Notification { notification_type, from_user_id: Some(...), thought_id: Some(...), ... }` constructions to use the new `NotificationKind` enum: - -```rust -// Before -Notification { - id: NotificationId::new(), - user_id: owner_id.clone(), - notification_type: NotificationType::Like, - from_user_id: Some(liker_id.clone()), - thought_id: Some(thought_id.clone()), - read: false, - created_at: Utc::now(), -} - -// After -Notification { - id: NotificationId::new(), - user_id: owner_id.clone(), - kind: NotificationKind::Like { thought_id: thought_id.clone(), from_user_id: liker_id.clone() }, - read: false, - created_at: Utc::now(), -} -``` - -- [ ] **Step 4: Update handlers that read `Notification` fields** - -In `crates/presentation/src/handlers/notifications.rs` and any response mapping — change `.notification_type` / `.from_user_id` / `.thought_id` reads to pattern match on `.kind`. - -- [ ] **Step 5: Remove `NotificationType` enum** (the old one in `notification.rs`) — it is fully replaced by `NotificationKind`. - -- [ ] **Step 6: Compile check** -```bash -cargo check --workspace 2>&1 | head -30 -``` - -- [ ] **Step 7: Commit** -```bash -git commit -m "refactor(domain): algebraic Notification type — invalid states now unrepresentable" -``` - ---- - -### Task 7: Make `from_db_str` return `Result` instead of silently defaulting - -**Files:** -- Modify: `crates/domain/src/models/thought.rs` -- Modify: `crates/domain/src/models/social.rs` -- Modify: `crates/adapters/postgres/src/thought.rs` -- Modify: `crates/adapters/postgres/src/feed.rs` -- Modify: `crates/adapters/postgres/src/follow.rs` -- Modify: `crates/adapters/postgres-search/src/lib.rs` - -- [ ] **Step 1: Change `Visibility::from_db_str` to return `Result`** - -In `crates/domain/src/models/thought.rs`: - -```rust -impl Visibility { - pub fn as_str(&self) -> &'static str { - match self { - Self::Public => "public", - Self::Followers => "followers", - Self::Unlisted => "unlisted", - Self::Direct => "direct", - } - } - - pub fn from_db_str(s: &str) -> Result { - match s { - "public" => Ok(Self::Public), - "followers" => Ok(Self::Followers), - "unlisted" => Ok(Self::Unlisted), - "direct" => Ok(Self::Direct), - other => Err(crate::errors::DomainError::Internal( - format!("unknown visibility value in DB: '{other}'") - )), - } - } -} -``` - -- [ ] **Step 2: Change `FollowState::from_db_str` to return `Result`** - -In `crates/domain/src/models/social.rs`: - -```rust -impl FollowState { - pub fn as_str(&self) -> &'static str { - match self { - Self::Pending => "pending", - Self::Accepted => "accepted", - Self::Rejected => "rejected", - } - } - - pub fn from_db_str(s: &str) -> Result { - match s { - "pending" => Ok(Self::Pending), - "accepted" => Ok(Self::Accepted), - "rejected" => Ok(Self::Rejected), - other => Err(crate::errors::DomainError::Internal( - format!("unknown follow_state value in DB: '{other}'") - )), - } - } -} -``` - -- [ ] **Step 3: Update all callers** - -In each postgres adapter file, every call to `Visibility::from_db_str(s)` or `FollowState::from_db_str(s)` now returns a `Result`. Callers use `?` to propagate: - -```rust -// Before -visibility: Visibility::from_db_str(&r.visibility), -// After -visibility: Visibility::from_db_str(&r.visibility)?, -``` - -Do this in: `thought.rs`, `feed.rs`, `follow.rs`, `postgres-search/lib.rs`. - -- [ ] **Step 4: Compile check** -```bash -cargo check --workspace 2>&1 | head -20 -``` - -- [ ] **Step 5: Commit** -```bash -git commit -m "fix(domain): from_db_str returns Result — unknown DB values are now errors, not silent defaults" -``` - ---- - -## Phase 4 — Architecture Cleanup - -### Task 8: Remove pass-through use cases - -Use cases `search_thoughts`, `search_users` are single-line delegations with no business logic. Remove them and call the port directly from handlers. - -**Files:** -- Delete: `crates/application/src/use_cases/search.rs` -- Modify: `crates/application/src/use_cases/mod.rs` -- Modify: `crates/presentation/src/handlers/feed.rs` - -- [ ] **Step 1: Delete `search.rs` and remove from `mod.rs`** - -```bash -rm crates/application/src/use_cases/search.rs -``` - -In `crates/application/src/use_cases/mod.rs`, remove `pub mod search;`. - -- [ ] **Step 2: Update `feed.rs` handler to call the port directly** - -In `crates/presentation/src/handlers/feed.rs`, replace: -```rust -use application::use_cases::search::{search_thoughts, search_users}; -// ... -search_thoughts(&*s.search, &query, ...).await? -``` -with: -```rust -s.search.search_thoughts(&query, &page, viewer.as_ref()).await? -``` - -Same for `search_users`: -```rust -s.search.search_users(&query, &page).await? -``` - -- [ ] **Step 3: Compile check and commit** -```bash -cargo check --workspace 2>&1 | head -20 -git commit -m "refactor(application): remove pass-through search use cases — handlers call port directly" -``` - ---- - -### Task 9: Deduplicate actor resolution logic - -`profile::get_user_by_id_or_username` and `social::follow_actor`/`unfollow_actor` both do "UUID or username?" routing independently. The social use cases' version (`@handle` detection) is already correct and distinct, but the presentation layer has a duplicate `resolve_user_id` that was fixed earlier. Verify no remaining duplication and consolidate if any remains. - -**Files:** -- Modify: `crates/application/src/use_cases/profile.rs` (if needed) -- Modify: `crates/presentation/src/handlers/feed.rs` (verify) - -- [ ] **Step 1: Audit current state** - -Read `profile.rs::get_user_by_id_or_username` and `feed.rs::get_following_handler` / `get_followers_handler`. Confirm `feed.rs` already calls `get_user_by_id_or_username` (this was fixed in a prior session). - -- [ ] **Step 2: Document the two distinct resolution patterns** - -- `get_user_by_id_or_username` — UUID or local username (AP actor URL routing) -- `follow_actor` — `@handle@domain` vs local username (AP follow routing) - -These are intentionally different. No merge needed. Verify there are no other copies. - -```bash -grep -rn "parse_str\|parse::, - pub avatar_url: Option, - pub bio: Option, - pub banner_url: Option, - pub also_known_as: Option, - pub outbox_url: Option, - pub followers_url: Option, - pub following_url: Option, - pub attachment: Vec<(String, String)>, - pub last_fetched_at: DateTime, -} -``` - -Removed: `inbox_url`, `shared_inbox_url`, `public_key`. - -- [ ] **Step 2: Update `RemoteActorRepository` port if it uses the removed fields** - -The `upsert` and `find_by_url` methods accept/return the slimmed model. Verify the postgres `remote_actor.rs` adapter maps correctly (the DB still has inbox_url column; just don't pull it into the domain model). - -- [ ] **Step 3: Fix compilation errors from removed fields** - -The main consumer is `FederationActionPort.lookup_actor()` which returns `RemoteActor`. The AP adapter (`ActivityPubService`) constructs `RemoteActor` from fetched data — update it to not set the removed fields. - -`InboxUrl` and `shared_inbox_url` are needed by the AP delivery layer (`OutboundFederationPort`). Those should use the `activitypub-base`'s own `RemoteActor` struct (in `repository.rs`) instead of the domain one. - -- [ ] **Step 4: Compile check** -```bash -cargo check --workspace 2>&1 | head -30 -``` - -- [ ] **Step 5: Commit** -```bash -git commit -m "refactor(domain): remove AP delivery fields from RemoteActor domain model" -``` - ---- - -### Task 11: Change `ActivityPubRepository` port params from `url::Url` to `&str` - -The domain port currently requires callers to construct `url::Url` — an external library type — before calling methods. Use `&str` at the port boundary; let the adapter parse. - -**Files:** -- Modify: `crates/domain/src/ports.rs` (`ActivityPubRepository` trait) -- Modify: `crates/adapters/postgres/src/activitypub.rs` -- Modify: `crates/application/src/services/federation_event.rs` -- Modify: `crates/adapters/activitypub/src/handler.rs` - -- [ ] **Step 1: Change method signatures in `ActivityPubRepository`** - -In `ports.rs`, change: -```rust -async fn find_remote_actor_id(&self, actor_ap_url: &url::Url) -> Result, DomainError>; -async fn intern_remote_actor(&self, actor_ap_url: &url::Url) -> Result; -async fn accept_note(&self, ap_id: &url::Url, ..., in_reply_to: Option<&url::Url>) -> Result<(), DomainError>; -async fn apply_note_update(&self, ap_id: &url::Url, ...) -> Result<(), DomainError>; -async fn retract_note(&self, ap_id: &url::Url) -> Result<(), DomainError>; -async fn retract_actor_notes(&self, actor_ap_url: &url::Url) -> Result<(), DomainError>; -async fn get_thought_ap_id(&self, thought_id: &ThoughtId) -> Result, DomainError>; -async fn get_actor_ap_urls(&self, user_id: &UserId) -> Result, DomainError>; -``` - -To: -```rust -async fn find_remote_actor_id(&self, actor_ap_url: &str) -> Result, DomainError>; -async fn intern_remote_actor(&self, actor_ap_url: &str) -> Result; -async fn accept_note(&self, ap_id: &str, ..., in_reply_to: Option<&str>) -> Result<(), DomainError>; -async fn apply_note_update(&self, ap_id: &str, ...) -> Result<(), DomainError>; -async fn retract_note(&self, ap_id: &str) -> Result<(), DomainError>; -async fn retract_actor_notes(&self, actor_ap_url: &str) -> Result<(), DomainError>; -// get_thought_ap_id and get_actor_ap_urls already use &ThoughtId/&UserId — no change needed -``` - -- [ ] **Step 2: Update `PgActivityPubRepository` — parse URL internally** - -In `postgres/src/activitypub.rs`, each method now receives `&str` and uses it directly in the SQL bind (URLs are stored as TEXT). Remove the `.as_str()` calls. - -- [ ] **Step 3: Update callers — remove URL construction** - -In `federation_event.rs` and `handler.rs`, remove `url::Url::parse(...)` before calling repo methods; pass the `&str` directly: - -```rust -// Before -let actor_url = url::Url::parse(actor_ap_url)?; -let author_id = self.ap_repo.intern_remote_actor(&actor_url).await?; - -// After -let author_id = self.ap_repo.intern_remote_actor(actor_ap_url).await?; -``` - -- [ ] **Step 4: Compile check** -```bash -cargo check --workspace 2>&1 | head -20 -``` - -- [ ] **Step 5: Commit** -```bash -git commit -m "refactor(ports): ActivityPubRepository takes &str instead of &url::Url — infra type stays in adapter" -``` - ---- - -## Self-Review - -**Spec coverage check:** - -| Issue | Task | -|---|---| -| AP fields in domain models (User, Thought) | Task 1 (new lookup methods) + Task 2 (field removal) | -| DomainEvent contains AP events | Task 3 | -| FederationEventService mixes concerns | Task 3 (removes FetchX arms), Task 1 (removes ap_id reads) | -| Notification algebraic types | Task 6 | -| from_db_str silently defaults | Task 7 | -| CQRS UserRepository | Task 4 | -| CQRS FederationActionPort | Task 5 | -| Pass-through use cases | Task 8 | -| Duplicate actor resolution | Task 9 | -| RemoteActor AP field leakage | Task 10 | -| Port params url::Url → &str | Task 11 | - -**Gaps found:** -- `FeedEntry` coupling (issue #10 from audit) — deferred. Requires FeedRepository changes and affects all feed handlers. Low risk to leave as-is; worth a separate plan. - -**No placeholders found.** Each task has exact file paths and concrete code. - -**Type consistency:** `ActorApUrls` introduced in Task 1 is used in Task 2. `NotificationKind` introduced in Task 6 is used throughout Task 6. All type names consistent. diff --git a/docs/superpowers/plans/2026-05-15-federation-gaps-2.md b/docs/superpowers/plans/2026-05-15-federation-gaps-2.md deleted file mode 100644 index 1cd548b..0000000 --- a/docs/superpowers/plans/2026-05-15-federation-gaps-2.md +++ /dev/null @@ -1,781 +0,0 @@ -# Federation Gaps — Round 2 Implementation Plan - -> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. - -**Goal:** Fix seven federation gaps: HTML content format, hashtag federation, Undo(Like) inbound, Update(Actor) on profile change, @mention notifications, remote posts in home feed, and orphaned reply parent display. - -**Architecture:** Backend changes span the AP adapter layer (activities.rs, service.rs, handler.rs), application layer (use cases, event service), and postgres adapter (feed.rs). Frontend changes are limited to api.ts and thought-card.tsx. All changes follow the existing hexagonal pattern — no business logic in presentation, domain events for cross-cutting concerns. - -**Tech Stack:** Rust / axum / sqlx / activitypub_federation crate; Next.js 15 / TypeScript / Zod. - ---- - -## Files Modified - -| Task | File | Change | -|------|------|--------| -| 1 | `crates/adapters/activitypub-base/src/service.rs` | Wrap content in `

` tags with HTML escaping | -| 2 | `crates/adapters/activitypub/src/note.rs` | Add `tag` field to ThoughtNote | -| 2 | `crates/adapters/activitypub-base/src/service.rs` | Extract hashtags and add to Note JSON | -| 3 | `crates/adapters/activitypub-base/src/activities.rs` | Add "Like" arm to UndoActivity::receive | -| 4 | `crates/domain/src/ports.rs` | Add `broadcast_actor_update` to OutboundFederationPort | -| 4 | `crates/domain/src/events.rs` | Add `ProfileUpdated` variant | -| 4 | `crates/domain/src/testing.rs` | Add SpyPort stub for broadcast_actor_update | -| 4 | `crates/application/src/use_cases/profile.rs` | Publish ProfileUpdated from update_profile | -| 4 | `crates/application/src/services/federation_event.rs` | Handle ProfileUpdated → broadcast_actor_update | -| 4 | `crates/adapters/activitypub-base/src/service.rs` | Implement broadcast_actor_update port method | -| 5 | `crates/adapters/activitypub/src/note.rs` | Add `tag` deserialization field | -| 5 | `crates/adapters/activitypub-base/src/content.rs` | Add `on_mention` to ApObjectHandler | -| 5 | `crates/adapters/activitypub/src/handler.rs` | Parse Mention tags, implement on_mention | -| 5 | `crates/domain/src/events.rs` | Add `MentionReceived` variant | -| 5 | `crates/domain/src/testing.rs` | No-op on_mention in TestStore impl | -| 5 | `crates/application/src/services/notification_event.rs` | Handle MentionReceived | -| 6 | `crates/adapters/postgres/src/feed.rs` | Extend home_feed SQL to include federation_following | -| 7 | `crates/api-types/src/responses.rs` | Add `in_reply_to_url` to ThoughtResponse | -| 7 | `crates/presentation/src/handlers/feed.rs` | Map in_reply_to_url into response | -| 7 | `thoughts-frontend/lib/api.ts` | Add replyToUrl to ThoughtSchema | -| 7 | `thoughts-frontend/components/thought-card.tsx` | Show external reply link when replyToUrl set | - ---- - -## Task 1: HTML content in outbound Notes - -Mastodon and other AP servers expect HTML, not plain text. Wrap content in `

` tags and escape HTML entities. Multi-paragraph posts (newlines) get multiple `

` elements. - -**Files:** -- Modify: `crates/adapters/activitypub-base/src/service.rs` (function `thought_note_json`) - -- [ ] **Step 1: Add a private HTML-escaping helper near the top of service.rs** - -Read `crates/adapters/activitypub-base/src/service.rs`. Find `fn thought_note_json`. Add this private function just before it: - -```rust -fn content_to_html(text: &str) -> String { - let escaped = text - .replace('&', "&") - .replace('<', "<") - .replace('>', ">") - .replace('"', """); - let paragraphs: Vec<&str> = escaped.split('\n').filter(|s| !s.is_empty()).collect(); - if paragraphs.is_empty() { - format!("

{}

", escaped) - } else { - paragraphs - .iter() - .map(|p| format!("

{}

", p)) - .collect::>() - .join("") - } -} -``` - -- [ ] **Step 2: Use `content_to_html` in `thought_note_json`** - -In `thought_note_json`, find: -```rust -"content": thought.content.as_str(), -``` -Replace with: -```rust -"content": content_to_html(thought.content.as_str()), -``` - -- [ ] **Step 3: Verify compilation** - -```bash -cd /mnt/drive/dev/thoughts && cargo build -p activitypub-base 2>&1 | grep "^error" -``` -Expected: no errors. - -- [ ] **Step 4: Commit** - -```bash -git add crates/adapters/activitypub-base/src/service.rs -git commit -m "fix(ap): wrap outbound Note content in HTML paragraph tags" -``` - ---- - -## Task 2: Hashtag federation - -Outbound Notes must include a `tag` array with Hashtag objects so Mastodon can index posts by hashtag. Extract `#word` patterns from content and add to the Note JSON. - -**Files:** -- Modify: `crates/adapters/activitypub-base/src/service.rs` (`thought_note_json`) - -- [ ] **Step 1: Add a hashtag-extraction helper in service.rs** - -Add this function near `content_to_html` (already added in Task 1): - -```rust -fn extract_hashtag_tags(content: &str, base_url: &str) -> Vec { - let mut seen = std::collections::HashSet::new(); - let mut tags = Vec::new(); - for word in content.split_whitespace() { - let tag = word.trim_matches(|c: char| !c.is_alphanumeric() && c != '#'); - if let Some(name) = tag.strip_prefix('#') { - if !name.is_empty() && seen.insert(name.to_lowercase()) { - let lower = name.to_lowercase(); - tags.push(serde_json::json!({ - "type": "Hashtag", - "name": format!("#{}", lower), - "href": format!("{}/tags/{}", base_url, lower), - })); - } - } - } - tags -} -``` - -- [ ] **Step 2: Add hashtag tags to the Note JSON in `thought_note_json`** - -In `thought_note_json`, after the closing `}` of `let mut note = serde_json::json!({...})`, add: - -```rust -let hashtag_tags = extract_hashtag_tags(thought.content.as_str(), base_url); -if !hashtag_tags.is_empty() { - note["tag"] = serde_json::json!(hashtag_tags); -} -``` - -Note: `base_url` is already a parameter of `thought_note_json(&self, thought, local_actor, base_url)` — use it directly. - -- [ ] **Step 3: Verify compilation** - -```bash -cd /mnt/drive/dev/thoughts && cargo build -p activitypub-base 2>&1 | grep "^error" -``` - -- [ ] **Step 4: Commit** - -```bash -git add crates/adapters/activitypub-base/src/service.rs -git commit -m "feat(ap): add hashtag tag array to outbound Notes" -``` - ---- - -## Task 3: Undo(Like) inbound handler - -When a remote user unlikes a local post, we should acknowledge it. Add a "Like" arm to `UndoActivity::receive` that calls `on_unlike` on the object handler. The `on_unlike` impl will be a no-op (we don't store remote likes in the likes table, only notifications — removing them requires more infrastructure). This prevents the "ignoring Undo of unknown activity type" log spam. - -**Files:** -- Modify: `crates/adapters/activitypub-base/src/activities.rs` -- Modify: `crates/adapters/activitypub-base/src/content.rs` -- Modify: `crates/adapters/activitypub/src/handler.rs` - -- [ ] **Step 1: Add `on_unlike` to `ApObjectHandler` trait in `content.rs`** - -Read `crates/adapters/activitypub-base/src/content.rs`. Find `ApObjectHandler`. Add after `on_announce_received`: - -```rust -/// Called when a remote actor removes a Like from a local thought. -async fn on_unlike(&self, object_url: &Url, actor_url: &Url) -> anyhow::Result<()>; -``` - -- [ ] **Step 2: Add no-op `on_unlike` to `ThoughtsObjectHandler` in `handler.rs`** - -Read `crates/adapters/activitypub/src/handler.rs`. Add after `on_announce_received`: - -```rust -async fn on_unlike(&self, _object_url: &Url, _actor_url: &Url) -> anyhow::Result<()> { - Ok(()) -} -``` - -- [ ] **Step 3: Add "Like" arm to `UndoActivity::receive` in `activities.rs`** - -Read `crates/adapters/activitypub-base/src/activities.rs`. Find `UndoActivity::receive`. Find the `match obj_type` block. Add before the `other =>` catch-all: - -```rust -"Like" => { - if let Some(obj_url_str) = self.object.get("object").and_then(|o| o.as_str()) - && let Ok(obj_url) = Url::parse(obj_url_str) - && obj_url.host_str().unwrap_or("") == data.domain - { - data.object_handler - .on_unlike(&obj_url, self.actor.inner()) - .await - .unwrap_or_else(|e| { - tracing::warn!(error = %e, "failed to process unlike"); - }); - } - tracing::info!(actor = %self.actor.inner(), "received Undo(Like)"); -} -``` - -- [ ] **Step 4: Verify compilation** - -```bash -cd /mnt/drive/dev/thoughts && cargo build 2>&1 | grep "^error" -``` - -- [ ] **Step 5: Commit** - -```bash -git add crates/adapters/activitypub-base/src/activities.rs \ - crates/adapters/activitypub-base/src/content.rs \ - crates/adapters/activitypub/src/handler.rs -git commit -m "feat(ap): handle Undo(Like) inbound activity" -``` - ---- - -## Task 4: Update(Actor) outbound on profile change - -When a user updates their profile (display name, bio, avatar), broadcast an `Update(Actor)` activity to their AP followers. The `broadcast_actor_update` method already exists on `ActivityPubService` — it just needs to be exposed as a port method and wired through the event system. - -**Files:** -- Modify: `crates/domain/src/ports.rs` — add to OutboundFederationPort -- Modify: `crates/domain/src/events.rs` — add ProfileUpdated variant -- Modify: `crates/domain/src/testing.rs` — SpyPort stub -- Modify: `crates/application/src/use_cases/profile.rs` — publish event -- Modify: `crates/application/src/services/federation_event.rs` — handle event -- Modify: `crates/adapters/activitypub-base/src/service.rs` — implement port method - -- [ ] **Step 1: Add `ProfileUpdated` to `DomainEvent` in `crates/domain/src/events.rs`** - -Read the file. Add to the enum: - -```rust -ProfileUpdated { user_id: UserId }, -``` - -- [ ] **Step 2: Add `broadcast_actor_update` to `OutboundFederationPort` in `crates/domain/src/ports.rs`** - -Find `OutboundFederationPort`. Add after `broadcast_undo_like`: - -```rust -/// Broadcast Update(Actor) to all accepted followers when a user updates their profile. -async fn broadcast_actor_update( - &self, - user_id: &UserId, -) -> Result<(), DomainError>; -``` - -- [ ] **Step 3: Add stub to `SpyPort` in `crates/application/src/services/federation_event.rs`** - -Find `SpyPort` struct. Add field: -```rust -actor_updated: Mutex>, -``` - -Find `impl OutboundFederationPort for SpyPort`. Add: -```rust -async fn broadcast_actor_update(&self, user_id: &UserId) -> Result<(), DomainError> { - self.actor_updated.lock().unwrap().push(user_id.clone()); - Ok(()) -} -``` - -- [ ] **Step 4: Add `EventPublisher` to `update_profile` use case in `crates/application/src/use_cases/profile.rs`** - -Read the file. Find `pub async fn update_profile(...)`. Add `events: &dyn EventPublisher` as a parameter and import it. Publish `ProfileUpdated` after the update: - -```rust -pub async fn update_profile( - users: &dyn UserRepository, - events: &dyn EventPublisher, - user_id: &UserId, - display_name: Option, - bio: Option, - avatar_url: Option, - header_url: Option, - custom_css: Option, -) -> Result<(), DomainError> { - users - .update_profile(user_id, display_name, bio, avatar_url, header_url, custom_css) - .await?; - events - .publish(&DomainEvent::ProfileUpdated { - user_id: user_id.clone(), - }) - .await?; - Ok(()) -} -``` - -Make sure `DomainEvent` and `EventPublisher` are imported at the top of profile.rs. Check the existing imports and add what's missing. - -- [ ] **Step 5: Update all callers of `update_profile` to pass `&*s.events`** - -`update_profile` is called from `crates/presentation/src/handlers/users.rs`. Read that file. Find the `patch_profile` handler call to `update_profile`. Add `&*s.events` as the second argument: - -```rust -update_profile( - &*s.users, - &*s.events, - &uid, - body.display_name, - body.bio, - body.avatar_url, - body.header_url, - body.custom_css, -) -.await?; -``` - -- [ ] **Step 6: Implement `broadcast_actor_update` port method in `ActivityPubService` in `crates/adapters/activitypub-base/src/service.rs`** - -Find `impl domain::ports::OutboundFederationPort for ActivityPubService`. Add after `broadcast_undo_like`: - -```rust -async fn broadcast_actor_update( - &self, - user_id: &domain::value_objects::UserId, -) -> Result<(), domain::errors::DomainError> { - self.broadcast_actor_update(user_id.as_uuid()) - .await - .map_err(|e| domain::errors::DomainError::Internal(e.to_string())) -} -``` - -Note: this calls the existing private `broadcast_actor_update(uuid)` method on `ActivityPubService`. - -- [ ] **Step 7: Handle `ProfileUpdated` in `federation_event.rs`** - -Find the `match event` block. Add before the catch-all `_ => Ok(())`: - -```rust -DomainEvent::ProfileUpdated { user_id } => { - self.ap.broadcast_actor_update(user_id).await -} -``` - -- [ ] **Step 8: Verify build and tests** - -```bash -cd /mnt/drive/dev/thoughts && cargo build 2>&1 | grep "^error" | head -5 -cargo test -p domain -p application 2>&1 | tail -5 -``` - -- [ ] **Step 9: Commit** - -```bash -git add crates/domain/src/events.rs \ - crates/domain/src/ports.rs \ - crates/domain/src/testing.rs \ - crates/application/src/use_cases/profile.rs \ - crates/application/src/services/federation_event.rs \ - crates/presentation/src/handlers/users.rs \ - crates/adapters/activitypub-base/src/service.rs -git commit -m "feat(ap): broadcast Update(Actor) when user updates their profile" -``` - ---- - -## Task 5: @mention notification - -When a remote Note arrives with a Mention tag pointing to a local user, create a notification. The Note's `tag` array contains objects like `{"type":"Mention","href":"https://our.instance/users/{uuid}","name":"@user@domain"}`. - -**Files:** -- Modify: `crates/adapters/activitypub/src/note.rs` — add tag field -- Modify: `crates/adapters/activitypub-base/src/content.rs` — add on_mention to trait -- Modify: `crates/adapters/activitypub/src/handler.rs` — parse tags, implement on_mention -- Modify: `crates/domain/src/events.rs` — add MentionReceived -- Modify: `crates/domain/src/testing.rs` — no-op on_mention -- Modify: `crates/application/src/services/notification_event.rs` — handle MentionReceived - -- [ ] **Step 1: Add `tag` field to `ThoughtNote` in `crates/adapters/activitypub/src/note.rs`** - -Read the file. Add to the `ThoughtNote` struct: - -```rust -#[serde(skip_serializing_if = "Vec::is_empty", default)] -pub tag: Vec, -``` - -- [ ] **Step 2: Add `MentionReceived` to `DomainEvent` in `crates/domain/src/events.rs`** - -Add to the enum: - -```rust -MentionReceived { - thought_id: ThoughtId, - mentioned_user_id: UserId, - author_user_id: UserId, -}, -``` - -- [ ] **Step 3: Add `on_mention` to `ApObjectHandler` in `crates/adapters/activitypub-base/src/content.rs`** - -Add after `on_unlike`: - -```rust -/// Called once per @mention of a local user in a remote Note. -/// `thought_ap_id` is the AP URL of the Note, `mentioned_user_id` is the UUID -/// of the local user being mentioned, `actor_url` is the remote author's AP URL. -async fn on_mention( - &self, - thought_ap_id: &Url, - mentioned_user_uuid: uuid::Uuid, - actor_url: &Url, -) -> anyhow::Result<()>; -``` - -- [ ] **Step 4: Add no-op `on_mention` to `TestStore`'s `ApObjectHandler` impl in `crates/domain/src/testing.rs`** - -Note: `TestStore` does NOT implement `ApObjectHandler` — that's `ThoughtsObjectHandler` in the activitypub adapter. Instead, find if there is a test double or just implement in handler.rs directly (step 5 below covers it). - -- [ ] **Step 5: Implement `on_mention` in `ThoughtsObjectHandler` in `crates/adapters/activitypub/src/handler.rs`** - -Add after `on_unlike`: - -```rust -async fn on_mention( - &self, - thought_ap_id: &url::Url, - mentioned_user_uuid: uuid::Uuid, - actor_url: &url::Url, -) -> anyhow::Result<()> { - // Resolve remote author to a local user ID. - let author_user_id = match self - .repo - .find_remote_actor_id(actor_url) - .await - .map_err(|e| anyhow::anyhow!("{e}"))? - { - Some(id) => id, - None => return Ok(()), - }; - - // Extract thought UUID from /thoughts/{uuid} path. - let thought_uuid = thought_ap_id - .path() - .strip_prefix("/thoughts/") - .and_then(|s| s.split('/').next()) - .and_then(|s| uuid::Uuid::parse_str(s).ok()); - - let thought_uuid = match thought_uuid { - Some(u) => u, - None => return Ok(()), - }; - - if let Some(ep) = &self.event_publisher { - ep.publish(&domain::events::DomainEvent::MentionReceived { - thought_id: domain::value_objects::ThoughtId::from_uuid(thought_uuid), - mentioned_user_id: domain::value_objects::UserId::from_uuid(mentioned_user_uuid), - author_user_id, - }) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; - } - - Ok(()) -} -``` - -- [ ] **Step 6: Parse Mention tags and call `on_mention` in `ThoughtsObjectHandler::on_create`** - -Find `on_create`. After the `accept_note(...)` call, add: - -```rust -// Fire mention notifications for any local @mentions in the note's tag array. -let local_domain = self.urls.base_url().host_str().unwrap_or(""); -for tag in ¬e.tag { - if tag.get("type").and_then(|t| t.as_str()) != Some("Mention") { - continue; - } - let href = match tag.get("href").and_then(|h| h.as_str()) { - Some(h) => h, - None => continue, - }; - let href_url = match url::Url::parse(href) { - Ok(u) => u, - Err(_) => continue, - }; - // Only process mentions of local users (UUID-based /users/{uuid} paths). - if href_url.host_str().unwrap_or("") != local_domain { - continue; - } - let user_uuid = href_url - .path() - .strip_prefix("/users/") - .and_then(|s| s.split('/').next()) - .and_then(|s| uuid::Uuid::parse_str(s).ok()); - if let Some(uuid) = user_uuid { - self.on_mention(ap_id, uuid, actor_url) - .await - .unwrap_or_else(|e| { - tracing::warn!(error = %e, "failed to process mention notification"); - }); - } -} -``` - -Note: `self.urls.base_url()` — check `ThoughtsUrls` for how to get the base URL `Url`. If not available, parse `self.urls` fields or add a helper. Check the `ThoughtsUrls` struct in `crates/adapters/activitypub/src/urls.rs`. - -- [ ] **Step 7: Handle `MentionReceived` in `crates/application/src/services/notification_event.rs`** - -Find the `match event` block. Add before the `_ => Ok(())` catch-all: - -```rust -DomainEvent::MentionReceived { - thought_id, - mentioned_user_id, - author_user_id, -} => { - self.notifications - .save(&Notification { - id: NotificationId::new(), - user_id: mentioned_user_id.clone(), - notification_type: NotificationType::Mention, - from_user_id: Some(author_user_id.clone()), - thought_id: Some(thought_id.clone()), - read: false, - created_at: Utc::now(), - }) - .await -} -``` - -Make sure `NotificationType::Mention` is a variant — check `crates/domain/src/models/notification.rs`. It already has `Mention` variant. - -- [ ] **Step 8: Verify build and tests** - -```bash -cd /mnt/drive/dev/thoughts && cargo build 2>&1 | grep "^error" | head -10 -cargo test -p domain -p application 2>&1 | tail -5 -``` - -- [ ] **Step 9: Commit** - -```bash -git add crates/adapters/activitypub/src/note.rs \ - crates/adapters/activitypub-base/src/content.rs \ - crates/adapters/activitypub/src/handler.rs \ - crates/domain/src/events.rs \ - crates/application/src/services/notification_event.rs -git commit -m "feat(ap): @mention notification from inbound remote Notes" -``` - ---- - -## Task 6: Remote posts in home feed - -The `home_feed` SQL currently only includes thoughts from users in the `follows` table (local follows). Remote follows are in `federation_following`, so remote users' posts never appear. Extend the SQL to also include thoughts from users whose AP URL is in `federation_following` for the viewer. - -**Files:** -- Modify: `crates/adapters/postgres/src/feed.rs` - -- [ ] **Step 1: Read `crates/adapters/postgres/src/feed.rs` in full** - -Focus on `fn feed_select(viewer: Option) -> String` and `async fn home_feed(...)`. - -Key insight: `feed_select` embeds `viewer` UUID directly into the SQL string (not as a bind parameter). The home_feed SQL uses `$1` (following_ids), `$2` (limit), `$3` (offset) as bind params. - -- [ ] **Step 2: Add `follower` parameter to `feed_select`** - -Change the signature to: -```rust -fn feed_select(viewer: Option, follower: Option) -> String -``` - -At the top of the function body, generate a federation following subquery: -```rust -let federation_clause = match follower { - Some(fid) => format!( - "OR t.user_id IN ( - SELECT u2.id FROM users u2 - JOIN federation_following ff ON u2.ap_id = ff.remote_actor_url - WHERE ff.local_user_id = '{fid}' - )" - ), - None => String::new(), -}; -``` - -This string is used in step 3's WHERE clause modification. - -Since `feed_select` generates only the SELECT part (not WHERE), the `federation_clause` needs to be returned somehow. Options: -- Return a tuple `(select_str, federation_clause)` from `feed_select` -- Or add a separate helper `fn federation_following_clause(follower: Option) -> String` - -Use option B — separate helper — to avoid changing `feed_select`'s return type: - -```rust -fn federation_following_clause(follower: Option) -> String { - match follower { - Some(fid) => format!( - " OR t.user_id IN ( - SELECT u2.id FROM users u2 - JOIN federation_following ff ON u2.ap_id = ff.remote_actor_url - WHERE ff.local_user_id = '{fid}' - )" - ), - None => String::new(), - } -} -``` - -Leave `feed_select` signature unchanged. - -- [ ] **Step 3: Modify `home_feed` to use `federation_following_clause`** - -Find the `home_feed` method. The viewer_id is the feed owner (the logged-in user), which is also the person whose federation_following we want. - -Replace: -```rust -let viewer = viewer_id.map(|v| v.as_uuid()); -// ... -let total: i64 = sqlx::query_scalar( - "SELECT COUNT(*) FROM thoughts t WHERE t.user_id=ANY($1) AND t.visibility != 'direct'", -) -``` - -With: -```rust -let viewer = viewer_id.map(|v| v.as_uuid()); -let fed_clause = federation_following_clause(viewer); -let total: i64 = sqlx::query_scalar(&format!( - "SELECT COUNT(*) FROM thoughts t WHERE (t.user_id=ANY($1){}) AND t.visibility != 'direct'", - fed_clause -)) -``` - -And replace: -```rust -let sql = format!("{sel} WHERE t.user_id=ANY($1) AND t.visibility != 'direct' ORDER BY t.created_at DESC LIMIT $2 OFFSET $3"); -``` - -With: -```rust -let sql = format!("{sel} WHERE (t.user_id=ANY($1){}) AND t.visibility != 'direct' ORDER BY t.created_at DESC LIMIT $2 OFFSET $3", fed_clause); -``` - -The rest of the bindings (`$1`, `$2`, `$3`) stay unchanged. - -- [ ] **Step 4: Verify compilation** - -```bash -cd /mnt/drive/dev/thoughts && cargo build -p postgres 2>&1 | grep "^error" | head -5 -``` - -- [ ] **Step 5: Run all tests** - -```bash -cd /mnt/drive/dev/thoughts && cargo test -p domain -p application 2>&1 | tail -5 -``` - -- [ ] **Step 6: Commit** - -```bash -git add crates/adapters/postgres/src/feed.rs -git commit -m "feat(feed): include remote following posts in home feed" -``` - ---- - -## Task 7: Reply parent display + API field - -Remote posts that are replies show without context because: -1. `ThoughtResponse` doesn't expose `in_reply_to_url` (the external URL of the parent) -2. The frontend doesn't link to the parent when it's external - -**Files:** -- Modify: `crates/api-types/src/responses.rs` -- Modify: `crates/presentation/src/handlers/feed.rs` (or wherever `to_thought_response` is defined) -- Modify: `thoughts-frontend/lib/api.ts` -- Modify: `thoughts-frontend/components/thought-card.tsx` - -- [ ] **Step 1: Add `in_reply_to_url` to `ThoughtResponse` in `crates/api-types/src/responses.rs`** - -Read the file. Find `ThoughtResponse` struct. Add after `reply_to_id`: - -```rust -#[serde(rename = "replyToUrl", skip_serializing_if = "Option::is_none")] -pub in_reply_to_url: Option, -``` - -- [ ] **Step 2: Map `in_reply_to_url` in the response builder** - -Find where `ThoughtResponse` is constructed from a `Thought` (search for `ThoughtResponse {` or `to_thought_response`). Add the mapping: - -```rust -in_reply_to_url: thought.in_reply_to_url.clone(), -``` - -- [ ] **Step 3: Verify backend compilation** - -```bash -cd /mnt/drive/dev/thoughts && cargo build 2>&1 | grep "^error" | head -5 -``` - -- [ ] **Step 4: Add `replyToUrl` to `ThoughtSchema` in `thoughts-frontend/lib/api.ts`** - -Find `ThoughtSchema`. Add: - -```typescript -replyToUrl: z.string().url().nullable().optional(), -``` - -- [ ] **Step 5: Update `thought-card.tsx` to show external reply link** - -Read `thoughts-frontend/components/thought-card.tsx`. Find the section that renders `thought.replyToId`. It currently shows "Replying to parent thought" with a hash link only when `isReply` is true. - -Add an external reply link for when the thought has a `replyToUrl` but no local `replyToId`: - -```tsx -{thought.replyToId && isReply && ( -
- - - Replying to{" "} - - parent thought - - -
-)} -{!thought.replyToId && thought.replyToUrl && ( -
- - - Replying to{" "} - - original post ↗ - - -
-)} -``` - -- [ ] **Step 6: Type check frontend** - -```bash -cd /mnt/drive/dev/thoughts/thoughts-frontend && npx tsc --noEmit 2>&1 | grep "error TS" | head -5 -``` - -- [ ] **Step 7: Final build + tests** - -```bash -cd /mnt/drive/dev/thoughts && cargo build 2>&1 | grep "^error" -cargo test -p domain -p application 2>&1 | tail -5 -``` - -- [ ] **Step 8: Commit** - -```bash -git add crates/api-types/src/responses.rs \ - thoughts-frontend/lib/api.ts \ - thoughts-frontend/components/thought-card.tsx -git commit -m "feat: expose replyToUrl in API + show external parent link on remote reply posts" -``` - ---- - -## Notes - -- **Task 5 (mentions)**: `self.urls.base_url()` — if `ThoughtsUrls` doesn't expose the base URL as a `Url`, parse it from `self.urls.base_url` string. Check `crates/adapters/activitypub/src/urls.rs` for the exact field. -- **Task 6 (feed)**: The embedded UUID in the SQL is a UUID type (hex + hyphens only), safe to format-string without SQL injection risk. -- **Task 7 (reply)**: The `to_thought_response` builder might be in `handlers/feed.rs`, `handlers/thoughts.rs`, or a shared module — search the codebase for where `ThoughtResponse` is constructed. -- **Profile update (Task 4)**: If tests in `application` call `update_profile` directly, they'll need to pass a `TestStore` as the `events` parameter (TestStore implements EventPublisher). diff --git a/docs/superpowers/plans/2026-05-15-feedentry-decoupling.md b/docs/superpowers/plans/2026-05-15-feedentry-decoupling.md deleted file mode 100644 index 25e539e..0000000 --- a/docs/superpowers/plans/2026-05-15-feedentry-decoupling.md +++ /dev/null @@ -1,492 +0,0 @@ -# FeedEntry Decoupling Implementation Plan - -> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. - -**Goal:** Replace the flat `liked_by_viewer`/`boosted_by_viewer` booleans and inline stats fields on `FeedEntry` with two named sub-structs (`EngagementStats`, `Option`), and fix the search adapter to compute real viewer context instead of hardcoding `false`. - -**Architecture:** Three sequential tasks. Task 1 changes the domain model, which breaks compilation. Task 2 fixes all downstream construction sites and restores compilation. Task 3 adds the functional improvement — viewer-aware SQL in the search adapter. - -**Tech Stack:** Rust, SQLx, Postgres trigram search (`pg_trgm`). - ---- - -### Task 1: Add `EngagementStats` and `ViewerContext` to the domain model - -**Files:** -- Modify: `crates/domain/src/models/feed.rs` - -- [ ] **Step 1: Replace the flat fields on `FeedEntry` with two named sub-structs** - -Replace the entire contents of `crates/domain/src/models/feed.rs` with: - -```rust -use crate::models::{thought::Thought, user::User}; -use crate::value_objects::UserId; - -#[derive(Debug, Clone)] -pub struct EngagementStats { - pub like_count: i64, - pub boost_count: i64, - pub reply_count: i64, -} - -/// Present only when an authenticated viewer made the request. -/// `liked`/`boosted` are the viewer's interaction state with this thought. -/// `None` means anonymous request or viewer context unavailable. -#[derive(Debug, Clone)] -pub struct ViewerContext { - pub liked: bool, - pub boosted: bool, -} - -#[derive(Debug, Clone)] -pub struct FeedEntry { - pub thought: Thought, - pub author: User, - pub stats: EngagementStats, - pub viewer: Option, -} - -#[derive(Debug, Clone)] -pub struct UserSummary { - pub id: UserId, - pub username: String, - pub display_name: Option, - pub avatar_url: Option, - pub bio: Option, - pub thought_count: i64, - pub follower_count: i64, - pub following_count: i64, -} - -#[derive(Debug, Clone)] -pub struct PageParams { - pub page: u64, - pub per_page: u64, -} -impl PageParams { - pub fn offset(&self) -> i64 { - ((self.page.saturating_sub(1)) * self.per_page) as i64 - } - pub fn limit(&self) -> i64 { - self.per_page as i64 - } -} - -#[derive(Debug, Clone)] -pub struct Paginated { - pub items: Vec, - pub total: i64, - pub page: u64, - pub per_page: u64, -} -``` - -- [ ] **Step 2: Verify the domain crate compiles (other crates will break)** - -```bash -cargo check -p domain 2>&1 | head -10 -``` - -Expected: `domain` compiles clean. Other crates (`postgres`, `postgres-search`, `presentation`) will show errors referencing the removed fields — that is expected and will be fixed in Task 2. - -- [ ] **Step 3: Commit the domain model change** - -```bash -git add crates/domain/src/models/feed.rs -git commit -m "refactor(domain): FeedEntry — EngagementStats + Option sub-structs" -``` - ---- - -### Task 2: Fix downstream compilation — adapters and handler - -**Files:** -- Modify: `crates/adapters/postgres/src/feed.rs` (line 136 — `row_to_entry`) -- Modify: `crates/adapters/postgres-search/src/lib.rs` (line 97 — `row_to_entry`) -- Modify: `crates/presentation/src/handlers/feed.rs` (line 22 — `to_thought_response`) - -- [ ] **Step 1: Update `row_to_entry` in `postgres/src/feed.rs`** - -Find `row_to_entry` in `crates/adapters/postgres/src/feed.rs` (around line 109). Replace the `Ok(FeedEntry { ... })` block (currently lines 136–144) with: - -```rust - Ok(FeedEntry { - thought, - author, - stats: domain::models::feed::EngagementStats { - like_count: r.like_count, - boost_count: r.boost_count, - reply_count: r.reply_count, - }, - viewer: Some(domain::models::feed::ViewerContext { - liked: r.liked_by_viewer, - boosted: r.boosted_by_viewer, - }), - }) -``` - -Note: `postgres/src/feed.rs` already builds `viewer = Some(...)` unconditionally here because its `feed_select(viewer)` function always produces `liked_by_viewer`/`boosted_by_viewer` columns — `false AS liked_by_viewer` when there is no viewer, and the real EXISTS result when there is one. The `Option` distinction (`None` = anonymous) is handled by the caller's knowledge of whether a viewer was passed. To preserve the `None`-when-no-viewer semantic, read how `viewer` is passed into the calling functions and thread it through. - -Actually, the correct fix: the `row_to_entry` function doesn't know if a viewer was passed. Pass the viewer `Option` as a parameter so it can decide: - -Replace the signature of `row_to_entry`: -```rust -fn row_to_entry(r: FeedRow, viewer: Option) -> Result { -``` - -And change the construction: -```rust - Ok(FeedEntry { - thought, - author, - stats: domain::models::feed::EngagementStats { - like_count: r.like_count, - boost_count: r.boost_count, - reply_count: r.reply_count, - }, - viewer: viewer.map(|_| domain::models::feed::ViewerContext { - liked: r.liked_by_viewer, - boosted: r.boosted_by_viewer, - }), - }) -``` - -Then update all call sites of `row_to_entry` inside `postgres/src/feed.rs`. Each `FeedRepository` method already has a `viewer` variable of type `Option`. Pass it through: - -```rust -// Before: -.map(row_to_entry) -.collect::, _>>()? - -// After: -.map(|r| row_to_entry(r, viewer)) -.collect::, _>>()? -``` - -Read `crates/adapters/postgres/src/feed.rs` to find all five `impl FeedRepository` methods and update each `.map(row_to_entry)` call. - -- [ ] **Step 2: Update `row_to_entry` in `postgres-search/src/lib.rs`** - -In `crates/adapters/postgres-search/src/lib.rs`, find `row_to_entry` (line 70). Change the `Ok(FeedEntry { ... })` block (lines 97–105) to: - -```rust - Ok(FeedEntry { - thought, - author, - stats: domain::models::feed::EngagementStats { - like_count: r.like_count, - boost_count: r.boost_count, - reply_count: r.reply_count, - }, - viewer: None, // Task 3 will fix this to use real viewer data - }) -``` - -Add `EngagementStats` and `ViewerContext` to the domain import at the top if needed (they're in `domain::models::feed`). The existing import already pulls in `FeedEntry` from that module. - -- [ ] **Step 3: Update `to_thought_response` in `presentation/src/handlers/feed.rs`** - -Find `to_thought_response` (line 22 in `crates/presentation/src/handlers/feed.rs`). Update it to read from the new sub-structs: - -```rust -pub fn to_thought_response(e: &domain::models::feed::FeedEntry) -> ThoughtResponse { - ThoughtResponse { - id: e.thought.id.as_uuid(), - content: e.thought.content.as_str().to_string(), - author: to_user_response(&e.author), - in_reply_to_id: e.thought.in_reply_to_id.as_ref().map(|id| id.as_uuid()), - in_reply_to_url: None, - visibility: e.thought.visibility.as_str().to_string(), - content_warning: e.thought.content_warning.clone(), - sensitive: e.thought.sensitive, - like_count: e.stats.like_count, - boost_count: e.stats.boost_count, - reply_count: e.stats.reply_count, - liked_by_viewer: e.viewer.as_ref().map(|v| v.liked).unwrap_or(false), - boosted_by_viewer: e.viewer.as_ref().map(|v| v.boosted).unwrap_or(false), - created_at: e.thought.created_at, - updated_at: e.thought.updated_at, - } -} -``` - -`ThoughtResponse` in `api-types/src/responses.rs` keeps `liked_by_viewer: bool` and `boosted_by_viewer: bool` — the wire format is unchanged. - -- [ ] **Step 4: Compile check — full workspace must be clean** - -```bash -cargo check --workspace 2>&1 | head -20 -``` - -Expected: 0 errors. Fix any remaining references to the old flat fields (`e.like_count`, `e.liked_by_viewer`, etc.) — they must become `e.stats.like_count`, `e.viewer.as_ref().map(|v| v.liked).unwrap_or(false)`. - -- [ ] **Step 5: Commit** - -```bash -git add crates/adapters/postgres/src/feed.rs \ - crates/adapters/postgres-search/src/lib.rs \ - crates/presentation/src/handlers/feed.rs -git commit -m "refactor(adapters): update FeedEntry construction to use EngagementStats + ViewerContext" -``` - ---- - -### Task 3: Fix search adapter — real viewer context instead of hardcoded `false` - -**Files:** -- Modify: `crates/adapters/postgres-search/src/lib.rs` - -The `SearchPort::search_thoughts` signature already takes `viewer_id: Option<&UserId>` (the parameter is named `_viewer_id` because it was ignored). This task makes it real. - -- [ ] **Step 1: Add `liked_by_viewer` and `boosted_by_viewer` to `FeedRow`** - -In `crates/adapters/postgres-search/src/lib.rs`, find the `FeedRow` struct (line 27). Add two fields at the end: - -```rust -#[derive(sqlx::FromRow)] -struct FeedRow { - thought_id: uuid::Uuid, - t_user_id: uuid::Uuid, - content: String, - in_reply_to_id: Option, - visibility: String, - content_warning: Option, - sensitive: bool, - t_local: bool, - thought_created_at: DateTime, - updated_at: Option>, - author_id: uuid::Uuid, - username: String, - email: String, - password_hash: String, - display_name: Option, - bio: Option, - avatar_url: Option, - header_url: Option, - custom_css: Option, - author_local: bool, - author_created_at: DateTime, - author_updated_at: DateTime, - like_count: i64, - boost_count: i64, - reply_count: i64, - liked_by_viewer: bool, // NEW - boosted_by_viewer: bool, // NEW -} -``` - -- [ ] **Step 2: Replace `FEED_SELECT` constant with a `feed_select(viewer)` function** - -Delete the `const FEED_SELECT` and replace with a function that injects viewer-aware columns — identical pattern to `postgres/src/feed.rs`: - -```rust -fn feed_select(viewer: Option) -> String { - let viewer_checks = match viewer { - Some(uid) => format!( - "EXISTS(SELECT 1 FROM likes WHERE user_id='{uid}' AND thought_id=t.id) AS liked_by_viewer, - EXISTS(SELECT 1 FROM boosts WHERE user_id='{uid}' AND thought_id=t.id) AS boosted_by_viewer" - ), - None => "false AS liked_by_viewer, false AS boosted_by_viewer".to_string(), - }; - format!( - " - SELECT - t.id AS thought_id, t.user_id AS t_user_id, t.content, - t.in_reply_to_id, - t.visibility, t.content_warning, t.sensitive, t.local AS t_local, - t.created_at AS thought_created_at, t.updated_at, - u.id AS author_id, u.username, u.email, u.password_hash, - u.display_name, u.bio, u.avatar_url, u.header_url, u.custom_css, - u.local AS author_local, - u.created_at AS author_created_at, u.updated_at AS author_updated_at, - (SELECT COUNT(*) FROM likes l WHERE l.thought_id=t.id) AS like_count, - (SELECT COUNT(*) FROM boosts b WHERE b.thought_id=t.id) AS boost_count, - (SELECT COUNT(*) FROM thoughts r WHERE r.in_reply_to_id=t.id) AS reply_count, - {viewer_checks} - FROM thoughts t JOIN users u ON u.id=t.user_id" - ) -} -``` - -- [ ] **Step 3: Update `row_to_entry` to use viewer fields** - -Update `row_to_entry` to accept `viewer: Option` and build the `ViewerContext`: - -```rust -fn row_to_entry(r: FeedRow, viewer: Option) -> Result { - let thought = Thought { - id: ThoughtId::from_uuid(r.thought_id), - user_id: UserId::from_uuid(r.t_user_id), - content: Content::new_remote(r.content), - in_reply_to_id: r.in_reply_to_id.map(ThoughtId::from_uuid), - visibility: Visibility::from_db_str(&r.visibility)?, - content_warning: r.content_warning, - sensitive: r.sensitive, - local: r.t_local, - created_at: r.thought_created_at, - updated_at: r.updated_at, - }; - let author = User { - id: UserId::from_uuid(r.author_id), - username: Username::from_trusted(r.username), - email: Email::from_trusted(r.email), - password_hash: PasswordHash(r.password_hash), - display_name: r.display_name, - bio: r.bio, - avatar_url: r.avatar_url, - header_url: r.header_url, - custom_css: r.custom_css, - local: r.author_local, - created_at: r.author_created_at, - updated_at: r.author_updated_at, - }; - Ok(FeedEntry { - thought, - author, - stats: domain::models::feed::EngagementStats { - like_count: r.like_count, - boost_count: r.boost_count, - reply_count: r.reply_count, - }, - viewer: viewer.map(|_| domain::models::feed::ViewerContext { - liked: r.liked_by_viewer, - boosted: r.boosted_by_viewer, - }), - }) -} -``` - -- [ ] **Step 4: Update `search_thoughts` to use viewer_id** - -Find `search_thoughts` in `crates/adapters/postgres-search/src/lib.rs` (line 110). Rename `_viewer_id` → `viewer_id`, extract the viewer UUID, and thread it through `feed_select` and `row_to_entry`: - -```rust -async fn search_thoughts( - &self, - query: &str, - page: &PageParams, - viewer_id: Option<&UserId>, // was _viewer_id -) -> Result, DomainError> { - let viewer = viewer_id.map(|v| v.as_uuid()); - let select = feed_select(viewer); - - let total: i64 = sqlx::query_scalar( - "SELECT COUNT(*) FROM thoughts t - WHERE t.content % $1 AND t.visibility='public'", - ) - .bind(query) - .fetch_one(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - - let sql = format!( - "{select} - WHERE t.content % $1 AND t.visibility='public' - ORDER BY similarity(t.content, $1) DESC - LIMIT $2 OFFSET $3" - ); - let rows = sqlx::query_as::<_, FeedRow>(&sql) - .bind(query) - .bind(page.limit()) - .bind(page.offset()) - .fetch_all(&self.pool) - .await - .map_err(|e| DomainError::Internal(e.to_string()))?; - - Ok(Paginated { - items: rows - .into_iter() - .map(|r| row_to_entry(r, viewer)) - .collect::, _>>()?, - total, - page: page.page, - per_page: page.per_page, - }) -} -``` - -Note: `USER_SELECT` from `postgres::user` is no longer used in this file after the switch from const to function. Remove the `use postgres::user::{UserRow, USER_SELECT};` import if `UserRow`/`USER_SELECT` are no longer referenced. - -- [ ] **Step 5: Add an integration test for viewer-aware search** - -In the `#[cfg(test)]` module in `postgres-search/src/lib.rs`, add after the existing tests: - -```rust -#[sqlx::test(migrations = "../postgres/migrations")] -async fn search_thoughts_sets_viewer_context_when_authed(pool: sqlx::PgPool) { - use domain::ports::{LikeRepository, UserWriter}; - use postgres::{like::PgLikeRepository, user::PgUserRepository}; - use domain::models::social::Like; - use domain::value_objects::LikeId; - - let (alice, thought) = seed_thought(&pool, "alice", "hello world").await; - - // alice likes her own thought - let like_repo = PgLikeRepository::new(pool.clone()); - like_repo.save(&Like { - id: LikeId::new(), - user_id: alice.id.clone(), - thought_id: thought.id.clone(), - ap_id: None, - created_at: chrono::Utc::now(), - }).await.unwrap(); - - let repo = PgSearchRepository::new(pool); - - // with viewer — should see liked = true - let authed = repo - .search_thoughts("hello", &PageParams { page: 1, per_page: 20 }, Some(&alice.id)) - .await - .unwrap(); - assert_eq!(authed.items.len(), 1); - let ctx = authed.items[0].viewer.as_ref().expect("viewer context present"); - assert!(ctx.liked, "alice should see the thought as liked"); - assert!(!ctx.boosted); - - // without viewer — viewer should be None - let anon = repo - .search_thoughts("hello", &PageParams { page: 1, per_page: 20 }, None) - .await - .unwrap(); - assert_eq!(anon.items.len(), 1); - assert!(anon.items[0].viewer.is_none(), "anonymous request has no viewer context"); -} -``` - -- [ ] **Step 6: Compile check** - -```bash -cargo check --workspace 2>&1 | head -20 -``` - -Expected: 0 errors. - -- [ ] **Step 7: Commit** - -```bash -git add crates/adapters/postgres-search/src/lib.rs -git commit -m "fix(search): viewer-aware SQL in search_thoughts — ViewerContext now real instead of hardcoded false" -``` - ---- - -## Self-Review - -**Spec coverage:** - -| Spec requirement | Task | -|---|---| -| Add `EngagementStats` struct | Task 1 | -| Add `ViewerContext` struct | Task 1 | -| `FeedEntry.viewer: Option` | Task 1 | -| postgres feed adapter uses new structs | Task 2 | -| Handler `to_thought_response` uses new fields | Task 2 | -| search adapter `viewer: None` (structural fix) | Task 2 | -| search adapter uses real viewer SQL (functional fix) | Task 3 | -| `viewer: None` = anonymous; `Some(...)` = viewer present | Tasks 2 + 3 | -| Wire format (`ThoughtResponse`) unchanged | Task 2 step 3 | - -**No placeholders found.** - -**Type consistency:** `EngagementStats` and `ViewerContext` defined in Task 1, used by name in Tasks 2 and 3. `row_to_entry(r, viewer)` signature matches in both Task 2 and Task 3. `viewer: Option` threaded consistently. diff --git a/docs/superpowers/plans/2026-05-15-nats-dlq-auth-hardening.md b/docs/superpowers/plans/2026-05-15-nats-dlq-auth-hardening.md deleted file mode 100644 index 82136c7..0000000 --- a/docs/superpowers/plans/2026-05-15-nats-dlq-auth-hardening.md +++ /dev/null @@ -1,836 +0,0 @@ -# NATS Hardening, Dead-Letter Queue & Auth Security - -> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. - -**Goal:** Harden the NATS consumer (explicit config, ack timeouts, unknown-event acking), add an automatic-retry dead-letter queue backed by Postgres, and close three auth security gaps (weak secret validation, timing oracle, excessive JWT TTL). - -**Architecture:** Seven sequential tasks. Tasks 1–3 are independent of each other and of 4–7 — they can be reviewed in any order. Tasks 4–7 form a dependency chain: delivery-count metadata (Task 4) → migration (Task 5) → Postgres store (Task 6) → worker DLQ loop (Task 7). - -**Tech Stack:** Rust, Tokio, async-nats 0.48, SQLx, Postgres, argon2, jsonwebtoken. - ---- - -### Task 1: Auth hardening — secret validation, timing equalization, TTL - -**Files:** -- Modify: `crates/bootstrap/src/factory.rs` -- Modify: `crates/application/src/use_cases/auth.rs` -- Modify: `crates/adapters/auth/src/lib.rs` - -- [ ] **Step 1: Replace the magic JWT TTL constant and add secret length check in `factory.rs`** - -In `crates/bootstrap/src/factory.rs`, find the current `const JWT_TTL_SECS: i64 = 86_400 * 30;` at the top of the file. Replace it and add a secret minimum constant, then add validation before the `JwtAuthService` is constructed: - -```rust -const JWT_TTL_SECS: i64 = 86_400; // 24 hours -const JWT_SECRET_MIN_BYTES: usize = 32; // 256 bits minimum for HS256 -``` - -Then, just before `auth: Arc::new(auth::JwtAuthService::new(...))` in the `build` function, add: - -```rust -if cfg.jwt_secret.len() < JWT_SECRET_MIN_BYTES { - panic!( - "JWT_SECRET is {} bytes — minimum is {} bytes for HS256 security", - cfg.jwt_secret.len(), - JWT_SECRET_MIN_BYTES, - ); -} -``` - -- [ ] **Step 2: Update the auth test to use a 32-byte secret** - -In `crates/adapters/auth/src/lib.rs`, find the tests that use `"secret".into()`: - -```rust -// Before (lines ~98, ~107): -let svc = JwtAuthService::new("secret".into(), 3600); - -// After (use 32+ byte secret): -let svc = JwtAuthService::new("a-secret-that-is-at-least-32-bytes".into(), 3600); -``` - -Update both test cases (`generate_and_validate_token` and `invalid_token_returns_unauthorized`). - -- [ ] **Step 3: Add timing equalization to the login use case** - -In `crates/application/src/use_cases/auth.rs`, find the `login` function. Currently it early-returns when the user is not found: - -```rust -let user = users - .find_by_email(&email) - .await? - .ok_or(DomainError::Unauthorized)?; -``` - -Replace with a timing-safe version that runs the hasher even when no user is found: - -```rust -let user = users.find_by_email(&email).await?; -if user.is_none() { - // Timing equalization — prevents email enumeration via response-time oracle. - // Running the hasher on a miss makes "no such user" take the same time as - // "wrong password", so attackers cannot distinguish the two cases. - let _ = hasher.hash(&input.password).await; - return Err(DomainError::Unauthorized); -} -let user = user.unwrap(); -``` - -- [ ] **Step 4: Compile check** - -```bash -cargo check --workspace 2>&1 | head -20 -``` - -Expected: 0 errors. - -- [ ] **Step 5: Commit** - -```bash -git add crates/bootstrap/src/factory.rs \ - crates/application/src/use_cases/auth.rs \ - crates/adapters/auth/src/lib.rs -git commit -m "fix(auth): validate JWT secret length, equalize login timing, reduce TTL to 24h" -``` - ---- - -### Task 2: NATS consumer hardening — explicit config + ack timeouts - -**Files:** -- Modify: `crates/adapters/nats/src/lib.rs` - -- [ ] **Step 1: Add named constants** - -At the top of `crates/adapters/nats/src/lib.rs`, after the existing constants, add: - -```rust -/// Maximum delivery attempts before a message is considered exhausted. -/// The DLQ processor picks it up after this point. -const CONSUMER_MAX_DELIVER: i64 = 5; -/// How long NATS waits for an ack before redelivering. -const CONSUMER_ACK_WAIT_SECS: u64 = 30; -/// Timeout for the spawned ack/nack async task. -const ACK_TASK_TIMEOUT_SECS: u64 = 5; -``` - -- [ ] **Step 2: Replace the pull consumer config** - -Find the `get_or_create_consumer` call (around line 119). Replace `..Default::default()` with explicit settings: - -```rust -let consumer = match stream - .get_or_create_consumer( - CONSUMER_NAME, - jetstream::consumer::pull::Config { - durable_name: Some(CONSUMER_NAME.to_string()), - deliver_policy: jetstream::consumer::DeliverPolicy::New, - ack_policy: jetstream::consumer::AckPolicy::Explicit, - ack_wait: std::time::Duration::from_secs(CONSUMER_ACK_WAIT_SECS), - max_deliver: CONSUMER_MAX_DELIVER, - ..Default::default() - }, - ) - .await -``` - -You will need to add the `DeliverPolicy` and `AckPolicy` imports. Check what's already imported at the top of the file and add if needed: - -```rust -use async_nats::jetstream::consumer::{AckPolicy, DeliverPolicy}; -``` - -- [ ] **Step 3: Add timeout to the ack task** - -Find the `ack:` closure (around line 173). Replace it with a timeout-wrapped version: - -```rust -ack: Box::new(move || { - let m = Arc::clone(&msg); - tokio::spawn(async move { - let result = tokio::time::timeout( - std::time::Duration::from_secs(ACK_TASK_TIMEOUT_SECS), - m.ack(), - ) - .await; - match result { - Ok(Ok(())) => {} - Ok(Err(e)) => tracing::warn!("NATS ack failed: {e}"), - Err(_) => tracing::warn!("NATS ack timed out after {ACK_TASK_TIMEOUT_SECS}s"), - } - }); -}), -``` - -- [ ] **Step 4: Add timeout to the nack task** - -Find the `nack:` closure (around line 181). Same pattern: - -```rust -nack: Box::new(move || { - let m = Arc::clone(&msg_nack); - tokio::spawn(async move { - let result = tokio::time::timeout( - std::time::Duration::from_secs(ACK_TASK_TIMEOUT_SECS), - m.ack_with(AckKind::Nak(None)), - ) - .await; - match result { - Ok(Ok(())) => {} - Ok(Err(e)) => tracing::warn!("NATS nack failed: {e}"), - Err(_) => tracing::warn!("NATS nack timed out after {ACK_TASK_TIMEOUT_SECS}s"), - } - }); -}), -``` - -- [ ] **Step 5: Expose delivery count on RawMessage** - -Find where `RawMessage` is constructed (around line 170). Add the NATS message's delivery count before constructing it. Read the NATS message metadata via `msg.info()`: - -```rust -let delivery_count = msg - .info() - .map(|info| info.delivered) - .unwrap_or(1) as u64; - -let raw = RawMessage { - subject, - payload, - delivery_count, - ack: Box::new(move || { ... }), - nack: Box::new(move || { ... }), -}; -``` - -Note: `msg.info()` returns `Result` where `Info.delivered: u64`. If it's unavailable, default to 1. - -- [ ] **Step 6: Compile check** - -```bash -cargo check -p nats 2>&1 | head -20 -``` - -Expected: compile errors about `RawMessage` missing `delivery_count` — that's fixed in Task 3. - -- [ ] **Step 7: Commit is deferred to after Task 3** (they touch the same type) - ---- - -### Task 3: Add `delivery_count` to `RawMessage` and `EventEnvelope`; ack unknown events - -**Files:** -- Modify: `crates/adapters/event-transport/src/lib.rs` -- Modify: `crates/domain/src/events.rs` - -- [ ] **Step 1: Add `delivery_count` to `RawMessage`** - -In `crates/adapters/event-transport/src/lib.rs`, find the `RawMessage` struct (around line 48). Add the field: - -```rust -pub struct RawMessage { - pub subject: String, - pub payload: Vec, - pub delivery_count: u64, // NEW - pub ack: Box, - pub nack: Box, -} -``` - -- [ ] **Step 2: Add `delivery_count` to `EventEnvelope` in domain** - -In `crates/domain/src/events.rs`, find `EventEnvelope` (around line 83). Add the field: - -```rust -pub struct EventEnvelope { - pub event: DomainEvent, - pub delivery_count: u64, // NEW - pub ack: Box, - pub nack: Box, -} -``` - -Also update the `Debug` impl (which is manual because closures aren't Debug). Find it and add `delivery_count` to the struct debug output: - -```rust -impl std::fmt::Debug for EventEnvelope { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("EventEnvelope") - .field("event", &self.event) - .field("delivery_count", &self.delivery_count) - .finish() - } -} -``` - -- [ ] **Step 3: Pass `delivery_count` through `EventConsumerAdapter`** - -In `crates/adapters/event-transport/src/lib.rs`, find where `EventEnvelope` is constructed in the `consume()` method (around line 97). Update it: - -```rust -Some(Ok(EventEnvelope { - event, - delivery_count: msg.delivery_count, // NEW - ack: msg.ack, - nack: msg.nack, -})) -``` - -- [ ] **Step 4: Ack unknown event types instead of orphaning them** - -In the same `consume()` method, find the `Err(e)` arm for unknown event types (around line 92): - -```rust -Err(e) => { - tracing::warn!("unknown event type: {e}"); - return None; -} -``` - -Replace with an explicit ack before dropping: - -```rust -Err(e) => { - tracing::warn!("unknown or malformed event type — acking to prevent orphan: {e}"); - (msg.ack)(); - return None; -} -``` - -Similarly for the deserialization error arm (around line 85): - -```rust -Err(e) => { - tracing::warn!("failed to deserialize event payload — acking to prevent orphan: {e}"); - (msg.ack)(); - return None; -} -``` - -- [ ] **Step 5: Update test stubs for `RawMessage`** - -In the tests inside `event-transport/src/lib.rs`, `RawMessage` is constructed with `ack` and `nack`. Add `delivery_count: 1` to each: - -```rust -let msg = RawMessage { - subject: "thoughts.created".to_string(), - payload: self.bytes.clone(), - delivery_count: 1, - ack: Box::new(|| {}), - nack: Box::new(|| {}), -}; -``` - -Find all `RawMessage { ... }` constructions in the test module and add `delivery_count: 1`. - -- [ ] **Step 6: Full workspace compile check** - -```bash -cargo check --workspace 2>&1 | head -40 -``` - -Fix any remaining construction sites for `RawMessage` or `EventEnvelope` that are missing `delivery_count`. - -- [ ] **Step 7: Commit Tasks 2 and 3 together** - -```bash -git add crates/adapters/nats/src/lib.rs \ - crates/adapters/event-transport/src/lib.rs \ - crates/domain/src/events.rs -git commit -m "fix(nats): explicit consumer config, ack timeouts, unknown-event acking, delivery_count" -``` - ---- - -### Task 4: `failed_events` migration - -**Files:** -- Create: `crates/adapters/postgres/migrations/009_failed_events.sql` - -- [ ] **Step 1: Create the migration file** - -Create `crates/adapters/postgres/migrations/009_failed_events.sql`: - -```sql -CREATE TABLE failed_events ( - id UUID NOT NULL DEFAULT gen_random_uuid(), - event_type TEXT NOT NULL, - payload JSONB NOT NULL, - failed_at TIMESTAMPTZ NOT NULL DEFAULT now(), - retry_at TIMESTAMPTZ NOT NULL, - retry_count INT NOT NULL DEFAULT 0, - last_error TEXT NOT NULL, - - CONSTRAINT failed_events_pkey PRIMARY KEY (id) -); - --- Partial index: only rows actively due for retry are in this index. -CREATE INDEX failed_events_due_idx - ON failed_events (retry_at) - WHERE retry_count < 3; -``` - -- [ ] **Step 2: Verify the migration file is syntactically correct** - -```bash -cargo check -p postgres 2>&1 | head -10 -``` - -(The postgres crate auto-discovers migrations via `sqlx::migrate!` — the file just needs to exist with valid SQL. Syntax is validated at runtime in integration tests.) - -- [ ] **Step 3: Commit** - -```bash -git add crates/adapters/postgres/migrations/009_failed_events.sql -git commit -m "feat(db): add failed_events table for dead-letter queue" -``` - ---- - -### Task 5: `PgFailedEventStore` — Postgres DLQ repository - -**Files:** -- Create: `crates/adapters/postgres/src/failed_event.rs` -- Modify: `crates/adapters/postgres/src/lib.rs` - -- [ ] **Step 1: Create `failed_event.rs`** - -Create `crates/adapters/postgres/src/failed_event.rs`: - -```rust -use crate::db_error::IntoDbResult; -use chrono::{DateTime, Utc}; -use sqlx::PgPool; - -/// How many times a failed event is retried by the DLQ processor. -pub const DLQ_MAX_RETRIES: i32 = 3; -/// Quarantine period for the first DLQ retry (seconds). Doubles each retry. -pub const DLQ_INITIAL_BACKOFF_SECS: i64 = 300; // 5 minutes -/// How often the DLQ processor polls for due retries (seconds). -pub const DLQ_POLL_INTERVAL_SECS: u64 = 60; - -#[derive(sqlx::FromRow)] -pub struct FailedEvent { - pub id: uuid::Uuid, - pub event_type: String, - pub payload: serde_json::Value, - pub failed_at: DateTime, - pub retry_at: DateTime, - pub retry_count: i32, - pub last_error: String, -} - -pub struct PgFailedEventStore { - pool: PgPool, -} - -impl PgFailedEventStore { - pub fn new(pool: PgPool) -> Self { - Self { pool } - } - - /// Insert a newly exhausted event into the DLQ. - pub async fn insert( - &self, - event_type: &str, - payload: &serde_json::Value, - last_error: &str, - ) -> Result<(), sqlx::Error> { - let retry_at = Utc::now() - + chrono::Duration::seconds(DLQ_INITIAL_BACKOFF_SECS); - sqlx::query( - "INSERT INTO failed_events \ - (event_type, payload, retry_at, last_error) \ - VALUES ($1, $2, $3, $4)", - ) - .bind(event_type) - .bind(payload) - .bind(retry_at) - .bind(last_error) - .execute(&self.pool) - .await?; - Ok(()) - } - - /// Fetch all events due for retry (retry_at <= now, retry_count < DLQ_MAX_RETRIES). - pub async fn poll_due(&self) -> Result, sqlx::Error> { - sqlx::query_as::<_, FailedEvent>( - "SELECT id, event_type, payload, failed_at, retry_at, retry_count, last_error \ - FROM failed_events \ - WHERE retry_at <= now() AND retry_count < $1 \ - ORDER BY retry_at \ - LIMIT 100", - ) - .bind(DLQ_MAX_RETRIES) - .fetch_all(&self.pool) - .await - } - - /// Advance a row after a republish attempt. - /// Uses exponential backoff: next_retry = now + initial * 2^retry_count. - pub async fn advance( - &self, - id: uuid::Uuid, - error: Option<&str>, - ) -> Result<(), sqlx::Error> { - // Fetch current retry_count to compute backoff. - let current: i32 = sqlx::query_scalar( - "SELECT retry_count FROM failed_events WHERE id = $1", - ) - .bind(id) - .fetch_one(&self.pool) - .await?; - - let new_count = current + 1; - let backoff_secs = DLQ_INITIAL_BACKOFF_SECS * (1_i64 << new_count.min(10)); - let retry_at = Utc::now() + chrono::Duration::seconds(backoff_secs); - let last_error = error.unwrap_or("republish succeeded"); - - sqlx::query( - "UPDATE failed_events \ - SET retry_count = $1, retry_at = $2, last_error = $3 \ - WHERE id = $4", - ) - .bind(new_count) - .bind(retry_at) - .bind(last_error) - .bind(id) - .execute(&self.pool) - .await?; - Ok(()) - } - - /// Park a permanently failed event (retry_count >= DLQ_MAX_RETRIES). - /// Sets retry_at 1 year out so it falls out of the active index. - pub async fn park_permanently(&self, id: uuid::Uuid) -> Result<(), sqlx::Error> { - let far_future = Utc::now() + chrono::Duration::days(365); - sqlx::query( - "UPDATE failed_events SET retry_at = $1 WHERE id = $2", - ) - .bind(far_future) - .bind(id) - .execute(&self.pool) - .await?; - Ok(()) - } -} -``` - -- [ ] **Step 2: Export from `postgres/src/lib.rs`** - -In `crates/adapters/postgres/src/lib.rs`, add: - -```rust -pub mod failed_event; -``` - -- [ ] **Step 3: Compile check** - -```bash -cargo check -p postgres 2>&1 | head -20 -``` - -Expected: 0 errors. - -- [ ] **Step 4: Commit** - -```bash -git add crates/adapters/postgres/src/failed_event.rs \ - crates/adapters/postgres/src/lib.rs -git commit -m "feat(postgres): PgFailedEventStore for dead-letter queue" -``` - ---- - -### Task 6: DLQ processor in worker - -**Files:** -- Create: `crates/worker/src/dlq.rs` -- Modify: `crates/worker/src/factory.rs` -- Modify: `crates/worker/src/main.rs` - -The `delivery_count` on `EventEnvelope` (added in Task 3) tells the worker when a message is on its last attempt. The main loop inserts to the DLQ when handlers fail at `delivery_count >= CONSUMER_MAX_DELIVER`. A separate background task polls the DLQ and republishes due events. - -- [ ] **Step 1: Create `dlq.rs`** - -Create `crates/worker/src/dlq.rs`: - -```rust -use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher}; -use postgres::failed_event::{ - DLQ_MAX_RETRIES, DLQ_POLL_INTERVAL_SECS, PgFailedEventStore, -}; -use std::sync::Arc; - -/// Background task: polls `failed_events` and republishes due rows to the event bus. -/// Runs on a fixed interval for the lifetime of the worker. -pub async fn run_dlq_processor( - store: Arc, - publisher: Arc, -) { - let interval = std::time::Duration::from_secs(DLQ_POLL_INTERVAL_SECS); - loop { - tokio::time::sleep(interval).await; - if let Err(e) = process_due(&store, &*publisher).await { - tracing::error!("DLQ processor error: {e}"); - } - } -} - -async fn process_due( - store: &PgFailedEventStore, - publisher: &dyn EventPublisher, -) -> Result<(), sqlx::Error> { - let due = store.poll_due().await?; - if due.is_empty() { - return Ok(()); - } - tracing::info!(count = due.len(), "DLQ: processing due events"); - - for row in due { - if row.retry_count >= DLQ_MAX_RETRIES { - tracing::error!( - id = %row.id, - event_type = %row.event_type, - retry_count = row.retry_count, - "DLQ: event permanently failed — parking", - ); - store.park_permanently(row.id).await?; - continue; - } - - // Attempt to republish the raw payload as a domain event. - let republish_result = republish(&row.payload, publisher).await; - - match republish_result { - Ok(()) => { - tracing::info!(id = %row.id, "DLQ: republished successfully"); - store.advance(row.id, None).await?; - } - Err(e) => { - tracing::warn!(id = %row.id, error = %e, "DLQ: republish failed"); - store.advance(row.id, Some(&e.to_string())).await?; - } - } - } - Ok(()) -} - -async fn republish( - payload: &serde_json::Value, - publisher: &dyn EventPublisher, -) -> Result<(), DomainError> { - use event_payload::EventPayload; - let ep: EventPayload = serde_json::from_value(payload.clone()) - .map_err(|e| DomainError::Internal(format!("DLQ deserialize: {e}")))?; - let event = DomainEvent::try_from(ep) - .map_err(|e| DomainError::Internal(format!("DLQ event conversion: {e}")))?; - publisher.publish(&event).await -} -``` - -- [ ] **Step 2: Add `PgFailedEventStore` to `factory.rs`** - -In `crates/worker/src/factory.rs`, add to the imports: - -```rust -use postgres::failed_event::PgFailedEventStore; -use std::sync::Arc; -``` - -Then return it from `build` alongside the consumer and handlers. Change the return type to a new struct: - -```rust -pub struct WorkerInfra { - pub consumer: event_transport::EventConsumerAdapter, - pub handlers: WorkerHandlers, - pub dlq_store: Arc, - pub event_publisher: Arc, -} -``` - -At the end of `build`, construct and return: - -```rust -let dlq_store = Arc::new(PgFailedEventStore::new(pool.clone())); - -// ... existing consumer construction ... - -WorkerInfra { - consumer, - handlers, - dlq_store, - event_publisher, // the NATS publisher already constructed in factory -} -``` - -Note: the factory currently doesn't return the event publisher. Add an `event_publisher` field to `WorkerInfra` and thread the existing `Arc` through (it's used for the ActivityPub handler — reuse the same instance). - -Read the existing `factory.rs` to see how the NATS publisher is currently constructed and reuse it for both the ActivityPub handler and the returned publisher. - -- [ ] **Step 3: Update `main.rs` to use the DLQ** - -In `crates/worker/src/main.rs`, update to use the new `WorkerInfra`: - -```rust -mod dlq; -mod factory; -mod handlers; - -use domain::ports::EventConsumer; -use futures::StreamExt; -use nats::CONSUMER_MAX_DELIVER; - -#[tokio::main] -async fn main() { - dotenvy::dotenv().ok(); - tracing_subscriber::fmt() - .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) - .init(); - - let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL required"); - let nats_url = std::env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".into()); - let base_url = std::env::var("BASE_URL").expect("BASE_URL required"); - - tracing::info!("Building worker..."); - let infra = factory::build(&database_url, &base_url, &nats_url).await; - - // Spawn DLQ processor as a background task. - tokio::spawn(dlq::run_dlq_processor( - infra.dlq_store.clone(), - infra.event_publisher.clone(), - )); - - tracing::info!("Worker started, consuming events..."); - let mut stream = infra.consumer.consume(); - while let Some(result) = stream.next().await { - match result { - Ok(envelope) => { - let event = &envelope.event; - tracing::debug!(?event, "received event"); - - let n = infra.handlers.notification.handle(event).await; - let f = infra.handlers.federation.handle(event).await; - - if n.is_ok() && f.is_ok() { - (envelope.ack)(); - } else { - // Log errors. - if let Err(e) = &n { - tracing::error!("notification handler: {e}"); - } - if let Err(e) = &f { - tracing::error!("federation handler: {e}"); - } - - // On last delivery attempt — insert to DLQ then ack. - // On earlier attempts — nack so NATS retries. - if envelope.delivery_count >= CONSUMER_MAX_DELIVER as u64 { - let error_msg = n - .err() - .or(f.err()) - .map(|e| e.to_string()) - .unwrap_or_else(|| "unknown error".into()); - - let payload = serde_json::to_value(&event_payload::EventPayload::from(event)) - .unwrap_or(serde_json::Value::Null); - - let event_type = format!("{:?}", event) - .split_whitespace() - .next() - .unwrap_or("Unknown") - .to_string(); - - if let Err(e) = infra - .dlq_store - .insert(&event_type, &payload, &error_msg) - .await - { - tracing::error!("DLQ insert failed: {e} — message lost"); - } else { - tracing::warn!( - event_type, - delivery_count = envelope.delivery_count, - "event exhausted — moved to DLQ" - ); - } - (envelope.ack)(); // ack from NATS — DLQ owns it now - } else { - (envelope.nack)(); // let NATS retry - } - } - } - Err(e) => tracing::error!("consumer error: {e}"), - } - } -} -``` - -Note: `CONSUMER_MAX_DELIVER` must be exported from `crates/adapters/nats/src/lib.rs`. Add `pub` to that constant in Task 2. - -- [ ] **Step 4: Export `CONSUMER_MAX_DELIVER` from nats crate** - -In `crates/adapters/nats/src/lib.rs`, change the constant to `pub`: - -```rust -pub const CONSUMER_MAX_DELIVER: i64 = 5; -``` - -- [ ] **Step 5: Full workspace compile check** - -```bash -cargo check --workspace 2>&1 | head -40 -``` - -Fix all errors. Common issues: -- Missing imports in `main.rs` for `event_payload` -- `event` variable lifetime in the DLQ insert block — may need to clone `event` -- `WorkerInfra` construction in `factory.rs` missing fields - -- [ ] **Step 6: Verify tests still pass** - -```bash -cargo test --workspace 2>&1 | tail -5 -``` - -Expected: all tests pass. - -- [ ] **Step 7: Commit** - -```bash -git add crates/worker/src/dlq.rs \ - crates/worker/src/factory.rs \ - crates/worker/src/main.rs \ - crates/adapters/nats/src/lib.rs -git commit -m "feat(worker): DLQ processor — exhausted events moved to failed_events with exponential retry" -``` - ---- - -## Self-Review - -**Spec coverage:** - -| Spec requirement | Task | -|---|---| -| `CONSUMER_MAX_DELIVER = 5` constant | Task 2 | -| `CONSUMER_ACK_WAIT_SECS = 30` constant | Task 2 | -| `ACK_TASK_TIMEOUT_SECS = 5` constant | Task 2 | -| Explicit consumer config (deliver_policy, ack_policy, ack_wait, max_deliver) | Task 2 | -| Ack task with timeout | Task 2 | -| Nack task with timeout | Task 2 | -| Unknown event types acked before drop | Task 3 | -| `delivery_count` threaded through to worker | Tasks 2, 3 | -| `failed_events` migration | Task 4 | -| `PgFailedEventStore` (insert, poll_due, advance, park_permanently) | Task 5 | -| DLQ processor (poll interval, exponential backoff, park permanently) | Task 6 | -| Worker inserts to DLQ at `delivery_count >= max_deliver` | Task 6 | -| `JWT_SECRET_MIN_BYTES = 32` constant | Task 1 | -| Panic on weak secret at startup | Task 1 | -| `JWT_TTL_SECS = 86_400` (24h) | Task 1 | -| Timing equalization on failed login | Task 1 | - -**No placeholders found.** - -**Type consistency:** `CONSUMER_MAX_DELIVER: i64` in Task 2; cast to `u64` for comparison with `envelope.delivery_count: u64` in Task 6 (`>= CONSUMER_MAX_DELIVER as u64`). Consistent. `DLQ_MAX_RETRIES: i32` matches `retry_count: i32` in `FailedEvent`. `DLQ_INITIAL_BACKOFF_SECS: i64` used with `chrono::Duration::seconds(i64)`. All consistent. diff --git a/docs/superpowers/specs/2026-05-15-feedentry-decoupling-design.md b/docs/superpowers/specs/2026-05-15-feedentry-decoupling-design.md deleted file mode 100644 index a746724..0000000 --- a/docs/superpowers/specs/2026-05-15-feedentry-decoupling-design.md +++ /dev/null @@ -1,80 +0,0 @@ -# FeedEntry Decoupling Design - -**Goal:** Fix search viewer context (functional), restructure `FeedEntry` for clarity (structural), and make viewer presence explicit via `Option` (type-safe). - -**Priority:** C (search fix) → B (struct clarity) → A (type safety). All three land in one pass. - ---- - -## Data Model - -Replace flat fields on `FeedEntry` with two named sub-structs in `crates/domain/src/models/feed.rs`: - -```rust -#[derive(Debug, Clone)] -pub struct EngagementStats { - pub like_count: i64, - pub boost_count: i64, - pub reply_count: i64, -} - -#[derive(Debug, Clone)] -pub struct ViewerContext { - pub liked: bool, - pub boosted: bool, -} - -#[derive(Debug, Clone)] -pub struct FeedEntry { - pub thought: Thought, - pub author: User, - pub stats: EngagementStats, - pub viewer: Option, // None when no authenticated viewer -} -``` - -`viewer: None` means the request was anonymous or viewer state is unavailable (e.g. search without auth). `viewer: Some(ViewerContext { liked: false, boosted: false })` means a viewer is known and they have not liked or boosted the thought. These two states are now distinct at the type level. - ---- - -## Search Adapter Fix - -`SearchPort::search_thoughts` already accepts `viewer_id: Option<&UserId>` but `postgres-search/src/lib.rs` ignores it, always hardcoding `false` for viewer fields. - -Fix: conditionally inject EXISTS subqueries into the search SQL, identical to the pattern used in `postgres/src/feed.rs`: - -```sql --- viewer_id = None (anonymous) -false AS liked_by_viewer, -false AS boosted_by_viewer - --- viewer_id = Some(uid) -EXISTS(SELECT 1 FROM likes WHERE user_id='{uid}' AND thought_id=t.id) AS liked_by_viewer, -EXISTS(SELECT 1 FROM boosts WHERE user_id='{uid}' AND thought_id=t.id) AS boosted_by_viewer -``` - -The `FeedRow` struct in postgres-search already has `liked_by_viewer: bool` and `boosted_by_viewer: bool` columns — they just need to be populated correctly. No schema change required. - ---- - -## Callsite Migration - -| File | Change | -|---|---| -| `crates/domain/src/models/feed.rs` | Replace flat stats/viewer fields with `EngagementStats` and `Option` | -| `crates/adapters/postgres/src/feed.rs` — `row_to_entry` | Construct `EngagementStats { ... }` and `viewer: Some/None` based on `FeedRow` | -| `crates/adapters/postgres-search/src/lib.rs` — `row_to_entry` + SQL | Fix SQL to use viewer_id; build `Option` from result | -| `crates/presentation/src/handlers/feed.rs` — `to_thought_response` | `e.stats.like_count`, `e.viewer.as_ref().map(|v| v.liked).unwrap_or(false)` | -| `crates/domain/src/testing.rs` — `TestStore` feed impl | Build `FeedEntry` with `stats:` and `viewer:` fields | - -`ThoughtResponse` in `api-types/src/responses.rs` keeps `liked_by_viewer: bool` and `boosted_by_viewer: bool` — the wire format is unchanged. `viewer: None` serialises as `false` in `to_thought_response`. - ---- - -## What Does Not Change - -- `FeedRepository` port signatures (still returns `Paginated`) -- HTTP response shape (`ThoughtResponse`) -- Database schema -- Pagination, filtering, or query logic -- Any code path that doesn't touch `FeedEntry` fields directly diff --git a/docs/superpowers/specs/2026-05-15-nats-dlq-auth-hardening-design.md b/docs/superpowers/specs/2026-05-15-nats-dlq-auth-hardening-design.md deleted file mode 100644 index 753673b..0000000 --- a/docs/superpowers/specs/2026-05-15-nats-dlq-auth-hardening-design.md +++ /dev/null @@ -1,193 +0,0 @@ -# NATS Hardening, Dead-Letter Queue & Auth Security Design - -**Goal:** Fix five reliability issues in the NATS adapter, introduce an automatic-retry dead-letter queue backed by Postgres, and close three auth security gaps. - ---- - -## Section 1: NATS Consumer Hardening - -### Consumer configuration - -All magic numbers become named constants in `crates/adapters/nats/src/lib.rs`: - -```rust -const CONSUMER_MAX_DELIVER: i64 = 5; -const CONSUMER_ACK_WAIT_SECS: u64 = 30; -const ACK_TASK_TIMEOUT_SECS: u64 = 5; -``` - -The pull consumer config changes from `..Default::default()` to explicit settings: - -```rust -pull::Config { - durable_name: Some(CONSUMER_NAME.to_string()), - deliver_policy: DeliverPolicy::New, - ack_policy: AckPolicy::Explicit, - ack_wait: Duration::from_secs(CONSUMER_ACK_WAIT_SECS), - max_deliver: CONSUMER_MAX_DELIVER, - ..Default::default() -} -``` - -- `DeliverPolicy::New` — worker restarts from the current position, not from the beginning of the stream -- `AckPolicy::Explicit` — explicit (already the default, but now documented) -- `ack_wait` — if the worker hangs for 30s without acking, NATS redelivers -- `max_deliver` — after 5 failed deliveries the message is exhausted; the DLQ picks it up - -### Ack task timeout - -Spawned ack/nack tasks currently have no timeout. If NATS is stuck, they hang forever. Wrap with `tokio::time::timeout`: - -```rust -ack: Box::new(move || { - let m = Arc::clone(&msg); - tokio::spawn(async move { - let result = tokio::time::timeout( - Duration::from_secs(ACK_TASK_TIMEOUT_SECS), - m.ack(), - ).await; - match result { - Ok(Ok(())) => {} - Ok(Err(e)) => tracing::warn!("NATS ack failed: {e}"), - Err(_) => tracing::warn!("NATS ack timed out"), - } - }); -}), -``` - -Same pattern for nack. - -### Unknown event type acking - -Currently unknown event types are silently dropped via `filter_map` and never acked — they orphan in the stream until `max_deliver` is exceeded. Fix: ack unknown messages explicitly before discarding: - -In `event-transport/src/lib.rs`, when deserialization fails, ack the raw NATS message before returning `None`: - -```rust -Err(e) => { - tracing::warn!("unknown or malformed event, acking to prevent orphan: {e}"); - // ack the message so it doesn't loop forever - msg.ack(); - None -} -``` - ---- - -## Section 2: Dead-Letter Queue - -### Schema - -New migration `009_failed_events.sql`: - -```sql -CREATE TABLE failed_events ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - event_type TEXT NOT NULL, - payload JSONB NOT NULL, - failed_at TIMESTAMPTZ NOT NULL DEFAULT now(), - retry_at TIMESTAMPTZ NOT NULL, - retry_count INT NOT NULL DEFAULT 0, - last_error TEXT NOT NULL -); - -CREATE INDEX failed_events_retry_at_idx ON failed_events (retry_at) - WHERE retry_count < 3; -``` - -### Constants - -In `crates/adapters/nats/src/lib.rs` (or a new `dlq.rs` module): - -```rust -const DLQ_INITIAL_BACKOFF_SECS: u64 = 300; // 5 minutes -const DLQ_MAX_RETRIES: i32 = 3; -const DLQ_POLL_INTERVAL_SECS: u64 = 60; // check every minute -``` - -### Worker flow - -**On exhausted message** (detected when `num_delivered >= CONSUMER_MAX_DELIVER`): -1. Worker inserts row to `failed_events` with `retry_at = now() + DLQ_INITIAL_BACKOFF_SECS` -2. Worker **acks** the NATS message (removes it from the stream) -3. Message will be retried by the DLQ processor, not by NATS - -**DlqProcessor** — runs in the worker on a `DLQ_POLL_INTERVAL_SECS` tick: -1. Query: `SELECT * FROM failed_events WHERE retry_at <= now() AND retry_count < DLQ_MAX_RETRIES` -2. For each row: republish the `payload` JSONB to the NATS `thoughts-events` main stream -3. Update row: `retry_count += 1`, `retry_at = now() + DLQ_INITIAL_BACKOFF_SECS * 2^retry_count` (exponential backoff: 5m, 10m, 20m) -4. If republish fails: update `last_error`, leave row for next poll -5. When the republished message is processed successfully by the worker, the event handler completes normally — the `failed_events` row is deleted on success (see below) - -**On DLQ retry success detection**: After republishing, the DLQ processor subscribes to the ack signal OR the processor marks rows as `retry_count = DLQ_MAX_RETRIES` optimistically and lets the event handler delete the row if the event type matches. Simpler: the DLQ row is deleted when `retry_count` reaches the threshold and the message is republished for the final time. If the final attempt also fails, it stays in the table as a permanently failed record with `retry_count = DLQ_MAX_RETRIES` for manual inspection. - -### Permanently failed messages - -Rows with `retry_count >= DLQ_MAX_RETRIES AND retry_at <= now()` are permanently failed. The DLQ processor: -- Logs them at `ERROR` level with full payload -- Sets `retry_at = now() + 365 days` (parking them out of the active query range) -- Does NOT delete them — they remain visible for manual inspection - -A future admin endpoint can query and replay them, but that is out of scope for this spec. - ---- - -## Section 3: Auth Hardening - -### JWT secret validation - -In `crates/bootstrap/src/factory.rs`, before constructing `JwtAuthService`: - -```rust -const JWT_SECRET_MIN_BYTES: usize = 32; - -if cfg.jwt_secret.len() < JWT_SECRET_MIN_BYTES { - panic!( - "JWT_SECRET is too short ({} bytes). \ - Minimum is {} bytes for HS256 security.", - cfg.jwt_secret.len(), - JWT_SECRET_MIN_BYTES - ); -} -``` - -Startup panics are appropriate here — running with a weak secret is a security failure. - -### Timing equalization on failed login - -In `crates/application/src/use_cases/auth.rs`, in the `login` function, when the user is not found by email: - -```rust -fn dummy_hash() -> argon2::PasswordHash<'static> { - // Pre-computed Argon2 hash of empty string. Used only to equalize timing - // on failed lookups so attackers cannot enumerate valid emails. - argon2::PasswordHash::new( - "$argon2id$v=19$m=19456,t=2,p=1$\ - AAAAAAAAAAAAAAAAAAAAAA$\ - AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" - ).expect("dummy hash is valid") -} - -// In login(): -if user.is_none() { - let _ = Argon2::default().verify_password(plain.as_bytes(), &dummy_hash()); - return Err(DomainError::Unauthorized); -} -``` - -### JWT TTL reduction - -In `crates/bootstrap/src/factory.rs`, the existing `JWT_TTL_SECS` constant: - -```rust -const JWT_TTL_SECS: i64 = 86_400; // 24 hours (was 30 days) -``` - ---- - -## What does NOT change - -- NATS subject naming (`thoughts-events.>`) — unchanged -- `MAX_MESSAGES` stream limit (100k) — unchanged; monitoring is out of scope -- API surface, domain events, application layer — unchanged -- Auth extractor, claims structure (`sub`, `exp`) — unchanged