clean up
Some checks failed
lint / lint (push) Has been cancelled
test / unit (push) Has been cancelled
test / integration (push) Has been cancelled
lint / lint (pull_request) Failing after 5m2s
test / unit (pull_request) Successful in 16m56s
test / integration (pull_request) Failing after 17m15s
Some checks failed
lint / lint (push) Has been cancelled
test / unit (push) Has been cancelled
test / integration (push) Has been cancelled
lint / lint (pull_request) Failing after 5m2s
test / unit (pull_request) Successful in 16m56s
test / integration (pull_request) Failing after 17m15s
This commit is contained in:
@@ -1,779 +0,0 @@
|
||||
# ActivityPub Likes & Boost Notifications Implementation Plan
|
||||
|
||||
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
||||
|
||||
**Goal:** Wire local likes/unlikes to outbound Like/Undo(Like) AP activities, and handle inbound Like and Announce activities so Mastodon interactions create notifications.
|
||||
|
||||
**Architecture:** Four layers of change — domain port extension, ActivityPubService implementation, application-layer federation event routing, and inbox activity handler registration. Inbound likes/boosts publish domain events (LikeAdded/BoostAdded) so the existing notification service picks them up without duplication. A locality guard in `federation_event.rs` prevents re-broadcasting remote boosts.
|
||||
|
||||
**Tech Stack:** Rust, activitypub_federation crate, async-trait, serde, domain ports.
|
||||
|
||||
---
|
||||
|
||||
## Files
|
||||
|
||||
| Action | File | Purpose |
|
||||
|--------|------|---------|
|
||||
| Modify | `crates/domain/src/ports.rs` | Add `broadcast_like`, `broadcast_undo_like` to `OutboundFederationPort` |
|
||||
| Modify | `crates/application/src/services/federation_event.rs` | Add `liked`/`undo_liked` to SpyPort; add `LikeAdded`/`LikeRemoved` arms; add locality guard to `BoostAdded` |
|
||||
| Modify | `crates/adapters/activitypub-base/src/activities.rs` | Add `LikeActivity` struct; `LikeActivity::receive`; update `AnnounceActivity::receive`; register in `InboxActivities` |
|
||||
| Modify | `crates/adapters/activitypub-base/src/content.rs` | Add `on_like`, `on_announce_received` to `ApObjectHandler` trait |
|
||||
| Modify | `crates/adapters/activitypub-base/src/service.rs` | Add `broadcast_like_to_inbox`, `broadcast_undo_like_to_inbox`; implement port methods |
|
||||
| Modify | `crates/adapters/activitypub/src/handler.rs` | Implement `on_like`, `on_announce_received` in `ThoughtsObjectHandler` |
|
||||
|
||||
---
|
||||
|
||||
## Task 1: Extend OutboundFederationPort + SpyPort
|
||||
|
||||
**Files:**
|
||||
- Modify: `crates/domain/src/ports.rs`
|
||||
- Modify: `crates/application/src/services/federation_event.rs`
|
||||
|
||||
- [ ] **Step 1: Add two methods to `OutboundFederationPort` in `crates/domain/src/ports.rs`**
|
||||
|
||||
Find `OutboundFederationPort` (around line 417). Add after `broadcast_undo_announce`:
|
||||
|
||||
```rust
|
||||
/// Send a Like activity to a remote thought author's inbox.
|
||||
/// Only called when a LOCAL user likes a REMOTE thought (one with an ap_id).
|
||||
async fn broadcast_like(
|
||||
&self,
|
||||
liker_user_id: &UserId,
|
||||
object_ap_id: &str,
|
||||
author_inbox_url: &str,
|
||||
) -> Result<(), DomainError>;
|
||||
|
||||
/// Send Undo(Like) to a remote thought author's inbox.
|
||||
async fn broadcast_undo_like(
|
||||
&self,
|
||||
liker_user_id: &UserId,
|
||||
object_ap_id: &str,
|
||||
author_inbox_url: &str,
|
||||
) -> Result<(), DomainError>;
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Add stubs to `SpyPort` in `crates/application/src/services/federation_event.rs`**
|
||||
|
||||
Find `SpyPort` struct (around line 245). Add two fields:
|
||||
```rust
|
||||
liked: Mutex<Vec<String>>,
|
||||
undo_liked: Mutex<Vec<String>>,
|
||||
```
|
||||
|
||||
Find `impl OutboundFederationPort for SpyPort`. Add after `broadcast_undo_announce`:
|
||||
```rust
|
||||
async fn broadcast_like(
|
||||
&self,
|
||||
_: &UserId,
|
||||
ap_id: &str,
|
||||
_: &str,
|
||||
) -> Result<(), DomainError> {
|
||||
self.liked.lock().unwrap().push(ap_id.to_string());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn broadcast_undo_like(
|
||||
&self,
|
||||
_: &UserId,
|
||||
ap_id: &str,
|
||||
_: &str,
|
||||
) -> Result<(), DomainError> {
|
||||
self.undo_liked.lock().unwrap().push(ap_id.to_string());
|
||||
Ok(())
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 3: Verify compilation**
|
||||
|
||||
```bash
|
||||
cd /mnt/drive/dev/thoughts && cargo build -p domain -p application 2>&1 | grep "^error" | head -10
|
||||
```
|
||||
Expected: no errors (activitypub-base will fail until Task 3 — that's fine, build only those two crates).
|
||||
|
||||
- [ ] **Step 4: Commit**
|
||||
|
||||
```bash
|
||||
git add crates/domain/src/ports.rs crates/application/src/services/federation_event.rs
|
||||
git commit -m "feat(domain): add broadcast_like/broadcast_undo_like to OutboundFederationPort"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 2: LikeActivity struct + ApObjectHandler trait methods
|
||||
|
||||
**Files:**
|
||||
- Modify: `crates/adapters/activitypub-base/src/activities.rs`
|
||||
- Modify: `crates/adapters/activitypub-base/src/content.rs`
|
||||
|
||||
### Part A — LikeActivity struct (activities.rs)
|
||||
|
||||
- [ ] **Step 1: Add `LikeType` and `LikeActivity` to `crates/adapters/activitypub-base/src/activities.rs`**
|
||||
|
||||
Find where `AnnounceType` is defined (around line 13). Add right after:
|
||||
|
||||
```rust
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
#[serde(rename = "Like")]
|
||||
pub struct LikeType;
|
||||
|
||||
impl Default for LikeType {
|
||||
fn default() -> Self {
|
||||
Self
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Find where `AnnounceActivity` struct is defined (around line 461). Add a `LikeActivity` struct after it:
|
||||
|
||||
```rust
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct LikeActivity {
|
||||
pub id: Url,
|
||||
#[serde(rename = "type")]
|
||||
pub kind: LikeType,
|
||||
pub actor: ObjectId<DbActor>,
|
||||
pub object: Url,
|
||||
}
|
||||
```
|
||||
|
||||
### Part B — ApObjectHandler trait (content.rs)
|
||||
|
||||
- [ ] **Step 2: Add `on_like` and `on_announce_received` to `ApObjectHandler` in `crates/adapters/activitypub-base/src/content.rs`**
|
||||
|
||||
Find the `ApObjectHandler` trait. Add after `on_actor_removed`:
|
||||
|
||||
```rust
|
||||
/// Called when a remote actor likes a local thought.
|
||||
/// `object_url` is the AP URL of the liked note (e.g. `{base}/thoughts/{uuid}`).
|
||||
/// `actor_url` is the AP URL of the remote actor who sent the Like.
|
||||
async fn on_like(&self, object_url: &Url, actor_url: &Url) -> anyhow::Result<()>;
|
||||
|
||||
/// Called when a remote actor boosts (Announce) a local thought.
|
||||
/// `object_url` is the AP URL of the announced note.
|
||||
/// `actor_url` is the AP URL of the remote actor who sent the Announce.
|
||||
async fn on_announce_received(&self, object_url: &Url, actor_url: &Url) -> anyhow::Result<()>;
|
||||
```
|
||||
|
||||
- [ ] **Step 3: Verify compilation of activitypub-base**
|
||||
|
||||
```bash
|
||||
cd /mnt/drive/dev/thoughts && cargo build -p activitypub-base 2>&1 | grep "^error" | head -10
|
||||
```
|
||||
Expected: errors that `ThoughtsObjectHandler` in `activitypub` doesn't implement the new methods — that's fine. `activitypub-base` itself should compile.
|
||||
|
||||
- [ ] **Step 4: Commit**
|
||||
|
||||
```bash
|
||||
git add crates/adapters/activitypub-base/src/activities.rs \
|
||||
crates/adapters/activitypub-base/src/content.rs
|
||||
git commit -m "feat(activitypub-base): LikeActivity struct + on_like/on_announce_received trait methods"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 3: Implement broadcast_like + LikeActivity::receive + AnnounceActivity update
|
||||
|
||||
**Files:**
|
||||
- Modify: `crates/adapters/activitypub-base/src/service.rs`
|
||||
- Modify: `crates/adapters/activitypub-base/src/activities.rs`
|
||||
|
||||
### Part A — ActivityPubService implementation (service.rs)
|
||||
|
||||
- [ ] **Step 1: Add `broadcast_like_to_inbox` private method to `impl ActivityPubService`**
|
||||
|
||||
Add this private method inside `impl ActivityPubService` (not inside the port impl block):
|
||||
|
||||
```rust
|
||||
pub async fn broadcast_like_to_inbox(
|
||||
&self,
|
||||
liker_user_id: uuid::Uuid,
|
||||
object_ap_id: url::Url,
|
||||
author_inbox_url: url::Url,
|
||||
) -> anyhow::Result<()> {
|
||||
let data = self.federation_config.to_request_data();
|
||||
let local_actor = get_local_actor(liker_user_id, &data)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
|
||||
// Deterministic ID so Undo(Like) can reference the same activity.
|
||||
let like_id = url::Url::parse(&format!(
|
||||
"{}/activities/like/{}",
|
||||
self.base_url,
|
||||
uuid::Uuid::new_v5(
|
||||
&uuid::Uuid::NAMESPACE_URL,
|
||||
format!("{}/{}", liker_user_id, object_ap_id).as_bytes(),
|
||||
)
|
||||
))?;
|
||||
|
||||
let like = crate::activities::LikeActivity {
|
||||
id: like_id,
|
||||
kind: Default::default(),
|
||||
actor: activitypub_federation::fetch::object_id::ObjectId::from(
|
||||
local_actor.ap_id.clone(),
|
||||
),
|
||||
object: object_ap_id,
|
||||
};
|
||||
|
||||
let sends = activitypub_federation::activity_sending::SendActivityTask::prepare(
|
||||
&activitypub_federation::protocol::context::WithContext::new_default(like),
|
||||
&local_actor,
|
||||
vec![author_inbox_url],
|
||||
&data,
|
||||
)
|
||||
.await?;
|
||||
let failures = send_with_retry(sends, &data).await;
|
||||
if !failures.is_empty() {
|
||||
tracing::warn!(count = failures.len(), "some Like deliveries failed permanently");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Add `broadcast_undo_like_to_inbox` private method**
|
||||
|
||||
Add directly after `broadcast_like_to_inbox`:
|
||||
|
||||
```rust
|
||||
pub async fn broadcast_undo_like_to_inbox(
|
||||
&self,
|
||||
liker_user_id: uuid::Uuid,
|
||||
object_ap_id: url::Url,
|
||||
author_inbox_url: url::Url,
|
||||
) -> anyhow::Result<()> {
|
||||
let data = self.federation_config.to_request_data();
|
||||
let local_actor = get_local_actor(liker_user_id, &data)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
|
||||
// Reconstruct the same deterministic like ID used when the like was sent.
|
||||
let like_id = url::Url::parse(&format!(
|
||||
"{}/activities/like/{}",
|
||||
self.base_url,
|
||||
uuid::Uuid::new_v5(
|
||||
&uuid::Uuid::NAMESPACE_URL,
|
||||
format!("{}/{}", liker_user_id, object_ap_id).as_bytes(),
|
||||
)
|
||||
))?;
|
||||
|
||||
let undo_id =
|
||||
crate::urls::activity_url(&self.base_url).map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
|
||||
let undo = crate::activities::UndoActivity {
|
||||
id: undo_id,
|
||||
kind: Default::default(),
|
||||
actor: activitypub_federation::fetch::object_id::ObjectId::from(
|
||||
local_actor.ap_id.clone(),
|
||||
),
|
||||
object: serde_json::json!({
|
||||
"type": "Like",
|
||||
"id": like_id.to_string(),
|
||||
"actor": local_actor.ap_id.to_string(),
|
||||
"object": object_ap_id.to_string(),
|
||||
}),
|
||||
};
|
||||
|
||||
let sends = activitypub_federation::activity_sending::SendActivityTask::prepare(
|
||||
&activitypub_federation::protocol::context::WithContext::new_default(undo),
|
||||
&local_actor,
|
||||
vec![author_inbox_url],
|
||||
&data,
|
||||
)
|
||||
.await?;
|
||||
let failures = send_with_retry(sends, &data).await;
|
||||
if !failures.is_empty() {
|
||||
tracing::warn!(count = failures.len(), "some Undo(Like) deliveries failed permanently");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 3: Implement `broadcast_like` and `broadcast_undo_like` in `impl domain::ports::OutboundFederationPort for ActivityPubService`**
|
||||
|
||||
Find the existing `broadcast_undo_announce` impl. Add directly after it:
|
||||
|
||||
```rust
|
||||
async fn broadcast_like(
|
||||
&self,
|
||||
liker_user_id: &domain::value_objects::UserId,
|
||||
object_ap_id: &str,
|
||||
author_inbox_url: &str,
|
||||
) -> Result<(), domain::errors::DomainError> {
|
||||
let object = url::Url::parse(object_ap_id)
|
||||
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?;
|
||||
let inbox = url::Url::parse(author_inbox_url)
|
||||
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?;
|
||||
self.broadcast_like_to_inbox(liker_user_id.as_uuid(), object, inbox)
|
||||
.await
|
||||
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))
|
||||
}
|
||||
|
||||
async fn broadcast_undo_like(
|
||||
&self,
|
||||
liker_user_id: &domain::value_objects::UserId,
|
||||
object_ap_id: &str,
|
||||
author_inbox_url: &str,
|
||||
) -> Result<(), domain::errors::DomainError> {
|
||||
let object = url::Url::parse(object_ap_id)
|
||||
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?;
|
||||
let inbox = url::Url::parse(author_inbox_url)
|
||||
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?;
|
||||
self.broadcast_undo_like_to_inbox(liker_user_id.as_uuid(), object, inbox)
|
||||
.await
|
||||
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))
|
||||
}
|
||||
```
|
||||
|
||||
### Part B — LikeActivity::receive + AnnounceActivity update (activities.rs)
|
||||
|
||||
- [ ] **Step 4: Implement `Activity` for `LikeActivity` in `crates/adapters/activitypub-base/src/activities.rs`**
|
||||
|
||||
Add after the `LikeActivity` struct definition:
|
||||
|
||||
```rust
|
||||
#[async_trait]
|
||||
impl Activity for LikeActivity {
|
||||
type DataType = FederationData;
|
||||
type Error = crate::error::Error;
|
||||
|
||||
fn actor(&self) -> &Url {
|
||||
self.actor.inner()
|
||||
}
|
||||
|
||||
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
|
||||
let domain = self.actor().host_str().unwrap_or("");
|
||||
if data.federation_repo.is_domain_blocked(domain).await? {
|
||||
tracing::info!(actor = %self.actor(), "ignoring Like from blocked domain");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Only process if the liked object is on our instance.
|
||||
if self.object.host_str().unwrap_or("") != data.domain {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
data.object_handler
|
||||
.on_like(&self.object, self.actor.inner())
|
||||
.await
|
||||
.map_err(|e| crate::error::Error::Other(e.to_string()))?;
|
||||
|
||||
tracing::info!(actor = %self.actor.inner(), object = %self.object, "received like");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 5: Update `AnnounceActivity::receive` to call `on_announce_received`**
|
||||
|
||||
Find `AnnounceActivity::receive`. After the `add_announce` call and before the `tracing::info!`, add:
|
||||
|
||||
```rust
|
||||
data.object_handler
|
||||
.on_announce_received(&self.object, self.actor.inner())
|
||||
.await
|
||||
.unwrap_or_else(|e| {
|
||||
tracing::warn!(error = %e, "failed to process announce notification");
|
||||
});
|
||||
```
|
||||
|
||||
- [ ] **Step 6: Register `LikeActivity` in `InboxActivities` enum**
|
||||
|
||||
Find the `InboxActivities` enum. Add:
|
||||
|
||||
```rust
|
||||
#[serde(rename = "Like")]
|
||||
Like(LikeActivity),
|
||||
```
|
||||
|
||||
- [ ] **Step 7: Verify activitypub-base compiles**
|
||||
|
||||
```bash
|
||||
cd /mnt/drive/dev/thoughts && cargo build -p activitypub-base 2>&1 | grep "^error" | head -10
|
||||
```
|
||||
Expected: no errors from activitypub-base. (`activitypub` crate will fail until Task 4.)
|
||||
|
||||
- [ ] **Step 8: Commit**
|
||||
|
||||
```bash
|
||||
git add crates/adapters/activitypub-base/src/service.rs \
|
||||
crates/adapters/activitypub-base/src/activities.rs
|
||||
git commit -m "feat(activitypub-base): broadcast_like/undo_like + LikeActivity inbox handler"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 4: Implement on_like and on_announce_received in ThoughtsObjectHandler
|
||||
|
||||
**Files:**
|
||||
- Modify: `crates/adapters/activitypub/src/handler.rs`
|
||||
|
||||
`ThoughtsObjectHandler` has `ap_repo: Arc<dyn ActivityPubRepository>` and `event_publisher: Option<Arc<dyn EventPublisher>>`. These are all we need.
|
||||
|
||||
Pattern for both methods:
|
||||
1. Parse the thought UUID out of the object URL path (`/thoughts/{uuid}`)
|
||||
2. Find the remote actor's local user ID via `ap_repo.find_remote_actor_id(actor_url)`
|
||||
3. Publish the appropriate domain event — the notification service already handles `LikeAdded` and `BoostAdded`
|
||||
|
||||
- [ ] **Step 1: Read `crates/adapters/activitypub/src/handler.rs` to understand the struct and existing impls**
|
||||
|
||||
Look for `struct ThoughtsObjectHandler` and `impl ApObjectHandler for ThoughtsObjectHandler`.
|
||||
|
||||
- [ ] **Step 2: Implement `on_like` in `impl ApObjectHandler for ThoughtsObjectHandler`**
|
||||
|
||||
Add:
|
||||
|
||||
```rust
|
||||
async fn on_like(&self, object_url: &url::Url, actor_url: &url::Url) -> anyhow::Result<()> {
|
||||
// Parse thought UUID from path like /thoughts/{uuid}
|
||||
let thought_uuid = object_url
|
||||
.path()
|
||||
.strip_prefix("/thoughts/")
|
||||
.and_then(|s| s.split('/').next())
|
||||
.and_then(|s| uuid::Uuid::parse_str(s).ok());
|
||||
|
||||
let thought_uuid = match thought_uuid {
|
||||
Some(u) => u,
|
||||
None => {
|
||||
tracing::debug!(object = %object_url, "on_like: not a local thought URL, skipping");
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
// Resolve the remote actor to a local user ID.
|
||||
let actor_user_id = self
|
||||
.ap_repo
|
||||
.find_remote_actor_id(actor_url)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
|
||||
let actor_user_id = match actor_user_id {
|
||||
Some(id) => id,
|
||||
None => {
|
||||
tracing::debug!(actor = %actor_url, "on_like: remote actor not interned, skipping notification");
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(ep) = &self.event_publisher {
|
||||
let thought_id = domain::value_objects::ThoughtId::from_uuid(thought_uuid);
|
||||
let like_id = domain::value_objects::LikeId::new();
|
||||
ep.publish(&domain::events::DomainEvent::LikeAdded {
|
||||
like_id,
|
||||
user_id: actor_user_id,
|
||||
thought_id,
|
||||
})
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 3: Implement `on_announce_received`**
|
||||
|
||||
Add directly after `on_like`:
|
||||
|
||||
```rust
|
||||
async fn on_announce_received(
|
||||
&self,
|
||||
object_url: &url::Url,
|
||||
actor_url: &url::Url,
|
||||
) -> anyhow::Result<()> {
|
||||
// Parse thought UUID from path like /thoughts/{uuid}
|
||||
let thought_uuid = object_url
|
||||
.path()
|
||||
.strip_prefix("/thoughts/")
|
||||
.and_then(|s| s.split('/').next())
|
||||
.and_then(|s| uuid::Uuid::parse_str(s).ok());
|
||||
|
||||
let thought_uuid = match thought_uuid {
|
||||
Some(u) => u,
|
||||
None => return Ok(()),
|
||||
};
|
||||
|
||||
let actor_user_id = self
|
||||
.ap_repo
|
||||
.find_remote_actor_id(actor_url)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
|
||||
let actor_user_id = match actor_user_id {
|
||||
Some(id) => id,
|
||||
None => return Ok(()),
|
||||
};
|
||||
|
||||
if let Some(ep) = &self.event_publisher {
|
||||
let thought_id = domain::value_objects::ThoughtId::from_uuid(thought_uuid);
|
||||
let boost_id = domain::value_objects::BoostId::new();
|
||||
ep.publish(&domain::events::DomainEvent::BoostAdded {
|
||||
boost_id,
|
||||
user_id: actor_user_id,
|
||||
thought_id,
|
||||
})
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 4: Verify full workspace build**
|
||||
|
||||
```bash
|
||||
cd /mnt/drive/dev/thoughts && cargo build 2>&1 | grep "^error" | head -10
|
||||
```
|
||||
Expected: no errors.
|
||||
|
||||
- [ ] **Step 5: Run tests**
|
||||
|
||||
```bash
|
||||
cd /mnt/drive/dev/thoughts && cargo test -p domain -p application 2>&1 | tail -5
|
||||
```
|
||||
|
||||
- [ ] **Step 6: Commit**
|
||||
|
||||
```bash
|
||||
git add crates/adapters/activitypub/src/handler.rs
|
||||
git commit -m "feat(activitypub): implement on_like and on_announce_received in ThoughtsObjectHandler"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 5: federation_event.rs — LikeAdded/LikeRemoved arms + BoostAdded locality guard
|
||||
|
||||
**Files:**
|
||||
- Modify: `crates/application/src/services/federation_event.rs`
|
||||
|
||||
The federation service must:
|
||||
- **BoostAdded**: add a locality check so remote boosts (published by Task 4) don't get re-broadcast
|
||||
- **LikeAdded**: fan-out only when a LOCAL user likes a REMOTE thought (has ap_id)
|
||||
- **LikeRemoved**: Undo(Like) when a LOCAL user unlikes a REMOTE thought
|
||||
|
||||
- [ ] **Step 1: Write tests for the new arms**
|
||||
|
||||
Find the `#[cfg(test)]` block in `crates/application/src/services/federation_event.rs`. Add:
|
||||
|
||||
```rust
|
||||
#[tokio::test]
|
||||
async fn like_added_local_user_remote_thought_broadcasts_like() {
|
||||
let store = TestStore::default();
|
||||
let spy = Arc::new(SpyPort::default());
|
||||
|
||||
// Set up a remote thought with ap_id
|
||||
let author = {
|
||||
let mut u = test_user("remote_author");
|
||||
u.local = false;
|
||||
u.inbox_url = Some("https://mastodon.social/users/author/inbox".into());
|
||||
u
|
||||
};
|
||||
let thought = {
|
||||
let mut t = test_thought(author.id.clone());
|
||||
t.ap_id = Some("https://mastodon.social/posts/123".into());
|
||||
t.in_reply_to_url = None;
|
||||
t
|
||||
};
|
||||
let liker = test_user("alice"); // local user
|
||||
|
||||
store.users.lock().unwrap().push(author);
|
||||
store.users.lock().unwrap().push(liker.clone());
|
||||
store.thoughts.lock().unwrap().push(thought.clone());
|
||||
|
||||
let svc = test_service(store, spy.clone());
|
||||
svc.process(&DomainEvent::LikeAdded {
|
||||
like_id: LikeId::new(),
|
||||
user_id: liker.id,
|
||||
thought_id: thought.id,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(spy.liked.lock().unwrap().len(), 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn like_added_remote_user_skips_broadcast() {
|
||||
let store = TestStore::default();
|
||||
let spy = Arc::new(SpyPort::default());
|
||||
|
||||
let author = test_user("alice");
|
||||
let thought = test_thought(author.id.clone()); // local thought, no ap_id
|
||||
let remote_liker = {
|
||||
let mut u = test_user("bob");
|
||||
u.local = false;
|
||||
u
|
||||
};
|
||||
|
||||
store.users.lock().unwrap().push(author);
|
||||
store.users.lock().unwrap().push(remote_liker.clone());
|
||||
store.thoughts.lock().unwrap().push(thought.clone());
|
||||
|
||||
let svc = test_service(store, spy.clone());
|
||||
svc.process(&DomainEvent::LikeAdded {
|
||||
like_id: LikeId::new(),
|
||||
user_id: remote_liker.id,
|
||||
thought_id: thought.id,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(spy.liked.lock().unwrap().is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn boost_added_remote_user_skips_broadcast() {
|
||||
let store = TestStore::default();
|
||||
let spy = Arc::new(SpyPort::default());
|
||||
|
||||
let author = test_user("alice");
|
||||
let thought = test_thought(author.id.clone());
|
||||
let remote_booster = {
|
||||
let mut u = test_user("bob");
|
||||
u.local = false;
|
||||
u
|
||||
};
|
||||
|
||||
store.users.lock().unwrap().push(author);
|
||||
store.users.lock().unwrap().push(remote_booster.clone());
|
||||
store.thoughts.lock().unwrap().push(thought.clone());
|
||||
|
||||
let svc = test_service(store, spy.clone());
|
||||
svc.process(&DomainEvent::BoostAdded {
|
||||
boost_id: BoostId::new(),
|
||||
user_id: remote_booster.id,
|
||||
thought_id: thought.id,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(spy.announced.lock().unwrap().is_empty());
|
||||
}
|
||||
```
|
||||
|
||||
Note: these tests use `test_user`, `test_thought`, `test_service` helpers — read the existing tests in the same file to find these helpers and use the same pattern. If `User.local` field setters don't exist, set the field directly (it's `pub`).
|
||||
|
||||
- [ ] **Step 2: Run tests to confirm they fail**
|
||||
|
||||
```bash
|
||||
cd /mnt/drive/dev/thoughts && cargo test -p application federation_event 2>&1 | grep "FAILED\|error" | head -10
|
||||
```
|
||||
Expected: tests fail (LikeAdded arm not handled, BoostAdded has no locality guard).
|
||||
|
||||
- [ ] **Step 3: Add locality guard to existing `BoostAdded` arm**
|
||||
|
||||
Find the `DomainEvent::BoostAdded` match arm. Add a locality check at the top:
|
||||
|
||||
```rust
|
||||
DomainEvent::BoostAdded {
|
||||
boost_id: _,
|
||||
user_id,
|
||||
thought_id,
|
||||
} => {
|
||||
// Only fan-out if the booster is a local user. Remote boosts (from inbound
|
||||
// Announce activities) must not be re-broadcast to avoid loops.
|
||||
let booster = match self.users.find_by_id(user_id).await? {
|
||||
Some(u) if u.local => u,
|
||||
_ => return Ok(()),
|
||||
};
|
||||
let _ = booster; // suppress unused warning — kept for the locality check
|
||||
let thought = match self.thoughts.find_by_id(thought_id).await? {
|
||||
Some(t) => t,
|
||||
None => return Ok(()),
|
||||
};
|
||||
let object_ap_id = self.object_ap_id(&thought, thought_id);
|
||||
self.ap.broadcast_announce(user_id, &object_ap_id).await
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 4: Add `LikeAdded` arm**
|
||||
|
||||
Find the `_ => Ok(())` catch-all at the end of the `match event` block. Add before it:
|
||||
|
||||
```rust
|
||||
DomainEvent::LikeAdded {
|
||||
like_id: _,
|
||||
user_id,
|
||||
thought_id,
|
||||
} => {
|
||||
// Only federate: local liker + remote thought (has ap_id + author has inbox).
|
||||
let liker = match self.users.find_by_id(user_id).await? {
|
||||
Some(u) if u.local => u,
|
||||
_ => return Ok(()),
|
||||
};
|
||||
let _ = liker;
|
||||
let thought = match self.thoughts.find_by_id(thought_id).await? {
|
||||
Some(t) if t.ap_id.is_some() => t,
|
||||
_ => return Ok(()),
|
||||
};
|
||||
let author = match self.users.find_by_id(&thought.user_id).await? {
|
||||
Some(u) if u.inbox_url.is_some() => u,
|
||||
_ => return Ok(()),
|
||||
};
|
||||
let object_ap_id = thought.ap_id.unwrap();
|
||||
let inbox_url = author.inbox_url.unwrap();
|
||||
self.ap
|
||||
.broadcast_like(user_id, &object_ap_id, &inbox_url)
|
||||
.await
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 5: Add `LikeRemoved` arm**
|
||||
|
||||
Add directly after `LikeAdded`:
|
||||
|
||||
```rust
|
||||
DomainEvent::LikeRemoved {
|
||||
user_id,
|
||||
thought_id,
|
||||
} => {
|
||||
let liker = match self.users.find_by_id(user_id).await? {
|
||||
Some(u) if u.local => u,
|
||||
_ => return Ok(()),
|
||||
};
|
||||
let _ = liker;
|
||||
let thought = match self.thoughts.find_by_id(thought_id).await? {
|
||||
Some(t) if t.ap_id.is_some() => t,
|
||||
_ => return Ok(()),
|
||||
};
|
||||
let author = match self.users.find_by_id(&thought.user_id).await? {
|
||||
Some(u) if u.inbox_url.is_some() => u,
|
||||
_ => return Ok(()),
|
||||
};
|
||||
let object_ap_id = thought.ap_id.unwrap();
|
||||
let inbox_url = author.inbox_url.unwrap();
|
||||
self.ap
|
||||
.broadcast_undo_like(user_id, &object_ap_id, &inbox_url)
|
||||
.await
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 6: Run tests — all should pass**
|
||||
|
||||
```bash
|
||||
cd /mnt/drive/dev/thoughts && cargo test -p application federation_event 2>&1 | tail -5
|
||||
```
|
||||
Expected: all pass.
|
||||
|
||||
- [ ] **Step 7: Full build + all unit tests**
|
||||
|
||||
```bash
|
||||
cd /mnt/drive/dev/thoughts && cargo build 2>&1 | grep "^error" | head -5
|
||||
cd /mnt/drive/dev/thoughts && cargo test -p domain -p application 2>&1 | tail -5
|
||||
```
|
||||
|
||||
- [ ] **Step 8: Commit**
|
||||
|
||||
```bash
|
||||
git add crates/application/src/services/federation_event.rs
|
||||
git commit -m "feat(application): federate local likes + locality guard prevents remote boost re-broadcast"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Notes
|
||||
|
||||
- **No loop risk**: The `BoostAdded` locality guard (`u.local`) ensures remote boosts published by `on_announce_received` skip federation fan-out. Same guard applies to `LikeAdded`.
|
||||
- **Existing notification service**: `LikeAdded` and `BoostAdded` events published from inbound activity handlers are picked up by `NotificationEventService` unchanged — it already creates notifications for these events.
|
||||
- **Deterministic activity IDs**: Like and Undo(Like) use `Uuid::new_v5(NAMESPACE_URL, "{user}/{object}")` so the Undo can reference the original Like ID without DB storage.
|
||||
- **Only remote thoughts get likes federated**: Local thoughts liked by local users generate no outbound activity (the like is already recorded locally).
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,781 +0,0 @@
|
||||
# Federation Gaps — Round 2 Implementation Plan
|
||||
|
||||
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
||||
|
||||
**Goal:** Fix seven federation gaps: HTML content format, hashtag federation, Undo(Like) inbound, Update(Actor) on profile change, @mention notifications, remote posts in home feed, and orphaned reply parent display.
|
||||
|
||||
**Architecture:** Backend changes span the AP adapter layer (activities.rs, service.rs, handler.rs), application layer (use cases, event service), and postgres adapter (feed.rs). Frontend changes are limited to api.ts and thought-card.tsx. All changes follow the existing hexagonal pattern — no business logic in presentation, domain events for cross-cutting concerns.
|
||||
|
||||
**Tech Stack:** Rust / axum / sqlx / activitypub_federation crate; Next.js 15 / TypeScript / Zod.
|
||||
|
||||
---
|
||||
|
||||
## Files Modified
|
||||
|
||||
| Task | File | Change |
|
||||
|------|------|--------|
|
||||
| 1 | `crates/adapters/activitypub-base/src/service.rs` | Wrap content in `<p>` tags with HTML escaping |
|
||||
| 2 | `crates/adapters/activitypub/src/note.rs` | Add `tag` field to ThoughtNote |
|
||||
| 2 | `crates/adapters/activitypub-base/src/service.rs` | Extract hashtags and add to Note JSON |
|
||||
| 3 | `crates/adapters/activitypub-base/src/activities.rs` | Add "Like" arm to UndoActivity::receive |
|
||||
| 4 | `crates/domain/src/ports.rs` | Add `broadcast_actor_update` to OutboundFederationPort |
|
||||
| 4 | `crates/domain/src/events.rs` | Add `ProfileUpdated` variant |
|
||||
| 4 | `crates/domain/src/testing.rs` | Add SpyPort stub for broadcast_actor_update |
|
||||
| 4 | `crates/application/src/use_cases/profile.rs` | Publish ProfileUpdated from update_profile |
|
||||
| 4 | `crates/application/src/services/federation_event.rs` | Handle ProfileUpdated → broadcast_actor_update |
|
||||
| 4 | `crates/adapters/activitypub-base/src/service.rs` | Implement broadcast_actor_update port method |
|
||||
| 5 | `crates/adapters/activitypub/src/note.rs` | Add `tag` deserialization field |
|
||||
| 5 | `crates/adapters/activitypub-base/src/content.rs` | Add `on_mention` to ApObjectHandler |
|
||||
| 5 | `crates/adapters/activitypub/src/handler.rs` | Parse Mention tags, implement on_mention |
|
||||
| 5 | `crates/domain/src/events.rs` | Add `MentionReceived` variant |
|
||||
| 5 | `crates/domain/src/testing.rs` | No-op on_mention in TestStore impl |
|
||||
| 5 | `crates/application/src/services/notification_event.rs` | Handle MentionReceived |
|
||||
| 6 | `crates/adapters/postgres/src/feed.rs` | Extend home_feed SQL to include federation_following |
|
||||
| 7 | `crates/api-types/src/responses.rs` | Add `in_reply_to_url` to ThoughtResponse |
|
||||
| 7 | `crates/presentation/src/handlers/feed.rs` | Map in_reply_to_url into response |
|
||||
| 7 | `thoughts-frontend/lib/api.ts` | Add replyToUrl to ThoughtSchema |
|
||||
| 7 | `thoughts-frontend/components/thought-card.tsx` | Show external reply link when replyToUrl set |
|
||||
|
||||
---
|
||||
|
||||
## Task 1: HTML content in outbound Notes
|
||||
|
||||
Mastodon and other AP servers expect HTML, not plain text. Wrap content in `<p>` tags and escape HTML entities. Multi-paragraph posts (newlines) get multiple `<p>` elements.
|
||||
|
||||
**Files:**
|
||||
- Modify: `crates/adapters/activitypub-base/src/service.rs` (function `thought_note_json`)
|
||||
|
||||
- [ ] **Step 1: Add a private HTML-escaping helper near the top of service.rs**
|
||||
|
||||
Read `crates/adapters/activitypub-base/src/service.rs`. Find `fn thought_note_json`. Add this private function just before it:
|
||||
|
||||
```rust
|
||||
fn content_to_html(text: &str) -> String {
|
||||
let escaped = text
|
||||
.replace('&', "&")
|
||||
.replace('<', "<")
|
||||
.replace('>', ">")
|
||||
.replace('"', """);
|
||||
let paragraphs: Vec<&str> = escaped.split('\n').filter(|s| !s.is_empty()).collect();
|
||||
if paragraphs.is_empty() {
|
||||
format!("<p>{}</p>", escaped)
|
||||
} else {
|
||||
paragraphs
|
||||
.iter()
|
||||
.map(|p| format!("<p>{}</p>", p))
|
||||
.collect::<Vec<_>>()
|
||||
.join("")
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Use `content_to_html` in `thought_note_json`**
|
||||
|
||||
In `thought_note_json`, find:
|
||||
```rust
|
||||
"content": thought.content.as_str(),
|
||||
```
|
||||
Replace with:
|
||||
```rust
|
||||
"content": content_to_html(thought.content.as_str()),
|
||||
```
|
||||
|
||||
- [ ] **Step 3: Verify compilation**
|
||||
|
||||
```bash
|
||||
cd /mnt/drive/dev/thoughts && cargo build -p activitypub-base 2>&1 | grep "^error"
|
||||
```
|
||||
Expected: no errors.
|
||||
|
||||
- [ ] **Step 4: Commit**
|
||||
|
||||
```bash
|
||||
git add crates/adapters/activitypub-base/src/service.rs
|
||||
git commit -m "fix(ap): wrap outbound Note content in HTML paragraph tags"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 2: Hashtag federation
|
||||
|
||||
Outbound Notes must include a `tag` array with Hashtag objects so Mastodon can index posts by hashtag. Extract `#word` patterns from content and add to the Note JSON.
|
||||
|
||||
**Files:**
|
||||
- Modify: `crates/adapters/activitypub-base/src/service.rs` (`thought_note_json`)
|
||||
|
||||
- [ ] **Step 1: Add a hashtag-extraction helper in service.rs**
|
||||
|
||||
Add this function near `content_to_html` (already added in Task 1):
|
||||
|
||||
```rust
|
||||
fn extract_hashtag_tags(content: &str, base_url: &str) -> Vec<serde_json::Value> {
|
||||
let mut seen = std::collections::HashSet::new();
|
||||
let mut tags = Vec::new();
|
||||
for word in content.split_whitespace() {
|
||||
let tag = word.trim_matches(|c: char| !c.is_alphanumeric() && c != '#');
|
||||
if let Some(name) = tag.strip_prefix('#') {
|
||||
if !name.is_empty() && seen.insert(name.to_lowercase()) {
|
||||
let lower = name.to_lowercase();
|
||||
tags.push(serde_json::json!({
|
||||
"type": "Hashtag",
|
||||
"name": format!("#{}", lower),
|
||||
"href": format!("{}/tags/{}", base_url, lower),
|
||||
}));
|
||||
}
|
||||
}
|
||||
}
|
||||
tags
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Add hashtag tags to the Note JSON in `thought_note_json`**
|
||||
|
||||
In `thought_note_json`, after the closing `}` of `let mut note = serde_json::json!({...})`, add:
|
||||
|
||||
```rust
|
||||
let hashtag_tags = extract_hashtag_tags(thought.content.as_str(), base_url);
|
||||
if !hashtag_tags.is_empty() {
|
||||
note["tag"] = serde_json::json!(hashtag_tags);
|
||||
}
|
||||
```
|
||||
|
||||
Note: `base_url` is already a parameter of `thought_note_json(&self, thought, local_actor, base_url)` — use it directly.
|
||||
|
||||
- [ ] **Step 3: Verify compilation**
|
||||
|
||||
```bash
|
||||
cd /mnt/drive/dev/thoughts && cargo build -p activitypub-base 2>&1 | grep "^error"
|
||||
```
|
||||
|
||||
- [ ] **Step 4: Commit**
|
||||
|
||||
```bash
|
||||
git add crates/adapters/activitypub-base/src/service.rs
|
||||
git commit -m "feat(ap): add hashtag tag array to outbound Notes"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 3: Undo(Like) inbound handler
|
||||
|
||||
When a remote user unlikes a local post, we should acknowledge it. Add a "Like" arm to `UndoActivity::receive` that calls `on_unlike` on the object handler. The `on_unlike` impl will be a no-op (we don't store remote likes in the likes table, only notifications — removing them requires more infrastructure). This prevents the "ignoring Undo of unknown activity type" log spam.
|
||||
|
||||
**Files:**
|
||||
- Modify: `crates/adapters/activitypub-base/src/activities.rs`
|
||||
- Modify: `crates/adapters/activitypub-base/src/content.rs`
|
||||
- Modify: `crates/adapters/activitypub/src/handler.rs`
|
||||
|
||||
- [ ] **Step 1: Add `on_unlike` to `ApObjectHandler` trait in `content.rs`**
|
||||
|
||||
Read `crates/adapters/activitypub-base/src/content.rs`. Find `ApObjectHandler`. Add after `on_announce_received`:
|
||||
|
||||
```rust
|
||||
/// Called when a remote actor removes a Like from a local thought.
|
||||
async fn on_unlike(&self, object_url: &Url, actor_url: &Url) -> anyhow::Result<()>;
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Add no-op `on_unlike` to `ThoughtsObjectHandler` in `handler.rs`**
|
||||
|
||||
Read `crates/adapters/activitypub/src/handler.rs`. Add after `on_announce_received`:
|
||||
|
||||
```rust
|
||||
async fn on_unlike(&self, _object_url: &Url, _actor_url: &Url) -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 3: Add "Like" arm to `UndoActivity::receive` in `activities.rs`**
|
||||
|
||||
Read `crates/adapters/activitypub-base/src/activities.rs`. Find `UndoActivity::receive`. Find the `match obj_type` block. Add before the `other =>` catch-all:
|
||||
|
||||
```rust
|
||||
"Like" => {
|
||||
if let Some(obj_url_str) = self.object.get("object").and_then(|o| o.as_str())
|
||||
&& let Ok(obj_url) = Url::parse(obj_url_str)
|
||||
&& obj_url.host_str().unwrap_or("") == data.domain
|
||||
{
|
||||
data.object_handler
|
||||
.on_unlike(&obj_url, self.actor.inner())
|
||||
.await
|
||||
.unwrap_or_else(|e| {
|
||||
tracing::warn!(error = %e, "failed to process unlike");
|
||||
});
|
||||
}
|
||||
tracing::info!(actor = %self.actor.inner(), "received Undo(Like)");
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 4: Verify compilation**
|
||||
|
||||
```bash
|
||||
cd /mnt/drive/dev/thoughts && cargo build 2>&1 | grep "^error"
|
||||
```
|
||||
|
||||
- [ ] **Step 5: Commit**
|
||||
|
||||
```bash
|
||||
git add crates/adapters/activitypub-base/src/activities.rs \
|
||||
crates/adapters/activitypub-base/src/content.rs \
|
||||
crates/adapters/activitypub/src/handler.rs
|
||||
git commit -m "feat(ap): handle Undo(Like) inbound activity"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 4: Update(Actor) outbound on profile change
|
||||
|
||||
When a user updates their profile (display name, bio, avatar), broadcast an `Update(Actor)` activity to their AP followers. The `broadcast_actor_update` method already exists on `ActivityPubService` — it just needs to be exposed as a port method and wired through the event system.
|
||||
|
||||
**Files:**
|
||||
- Modify: `crates/domain/src/ports.rs` — add to OutboundFederationPort
|
||||
- Modify: `crates/domain/src/events.rs` — add ProfileUpdated variant
|
||||
- Modify: `crates/domain/src/testing.rs` — SpyPort stub
|
||||
- Modify: `crates/application/src/use_cases/profile.rs` — publish event
|
||||
- Modify: `crates/application/src/services/federation_event.rs` — handle event
|
||||
- Modify: `crates/adapters/activitypub-base/src/service.rs` — implement port method
|
||||
|
||||
- [ ] **Step 1: Add `ProfileUpdated` to `DomainEvent` in `crates/domain/src/events.rs`**
|
||||
|
||||
Read the file. Add to the enum:
|
||||
|
||||
```rust
|
||||
ProfileUpdated { user_id: UserId },
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Add `broadcast_actor_update` to `OutboundFederationPort` in `crates/domain/src/ports.rs`**
|
||||
|
||||
Find `OutboundFederationPort`. Add after `broadcast_undo_like`:
|
||||
|
||||
```rust
|
||||
/// Broadcast Update(Actor) to all accepted followers when a user updates their profile.
|
||||
async fn broadcast_actor_update(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
) -> Result<(), DomainError>;
|
||||
```
|
||||
|
||||
- [ ] **Step 3: Add stub to `SpyPort` in `crates/application/src/services/federation_event.rs`**
|
||||
|
||||
Find `SpyPort` struct. Add field:
|
||||
```rust
|
||||
actor_updated: Mutex<Vec<UserId>>,
|
||||
```
|
||||
|
||||
Find `impl OutboundFederationPort for SpyPort`. Add:
|
||||
```rust
|
||||
async fn broadcast_actor_update(&self, user_id: &UserId) -> Result<(), DomainError> {
|
||||
self.actor_updated.lock().unwrap().push(user_id.clone());
|
||||
Ok(())
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 4: Add `EventPublisher` to `update_profile` use case in `crates/application/src/use_cases/profile.rs`**
|
||||
|
||||
Read the file. Find `pub async fn update_profile(...)`. Add `events: &dyn EventPublisher` as a parameter and import it. Publish `ProfileUpdated` after the update:
|
||||
|
||||
```rust
|
||||
pub async fn update_profile(
|
||||
users: &dyn UserRepository,
|
||||
events: &dyn EventPublisher,
|
||||
user_id: &UserId,
|
||||
display_name: Option<String>,
|
||||
bio: Option<String>,
|
||||
avatar_url: Option<String>,
|
||||
header_url: Option<String>,
|
||||
custom_css: Option<String>,
|
||||
) -> Result<(), DomainError> {
|
||||
users
|
||||
.update_profile(user_id, display_name, bio, avatar_url, header_url, custom_css)
|
||||
.await?;
|
||||
events
|
||||
.publish(&DomainEvent::ProfileUpdated {
|
||||
user_id: user_id.clone(),
|
||||
})
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
```
|
||||
|
||||
Make sure `DomainEvent` and `EventPublisher` are imported at the top of profile.rs. Check the existing imports and add what's missing.
|
||||
|
||||
- [ ] **Step 5: Update all callers of `update_profile` to pass `&*s.events`**
|
||||
|
||||
`update_profile` is called from `crates/presentation/src/handlers/users.rs`. Read that file. Find the `patch_profile` handler call to `update_profile`. Add `&*s.events` as the second argument:
|
||||
|
||||
```rust
|
||||
update_profile(
|
||||
&*s.users,
|
||||
&*s.events,
|
||||
&uid,
|
||||
body.display_name,
|
||||
body.bio,
|
||||
body.avatar_url,
|
||||
body.header_url,
|
||||
body.custom_css,
|
||||
)
|
||||
.await?;
|
||||
```
|
||||
|
||||
- [ ] **Step 6: Implement `broadcast_actor_update` port method in `ActivityPubService` in `crates/adapters/activitypub-base/src/service.rs`**
|
||||
|
||||
Find `impl domain::ports::OutboundFederationPort for ActivityPubService`. Add after `broadcast_undo_like`:
|
||||
|
||||
```rust
|
||||
async fn broadcast_actor_update(
|
||||
&self,
|
||||
user_id: &domain::value_objects::UserId,
|
||||
) -> Result<(), domain::errors::DomainError> {
|
||||
self.broadcast_actor_update(user_id.as_uuid())
|
||||
.await
|
||||
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))
|
||||
}
|
||||
```
|
||||
|
||||
Note: this calls the existing private `broadcast_actor_update(uuid)` method on `ActivityPubService`.
|
||||
|
||||
- [ ] **Step 7: Handle `ProfileUpdated` in `federation_event.rs`**
|
||||
|
||||
Find the `match event` block. Add before the catch-all `_ => Ok(())`:
|
||||
|
||||
```rust
|
||||
DomainEvent::ProfileUpdated { user_id } => {
|
||||
self.ap.broadcast_actor_update(user_id).await
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 8: Verify build and tests**
|
||||
|
||||
```bash
|
||||
cd /mnt/drive/dev/thoughts && cargo build 2>&1 | grep "^error" | head -5
|
||||
cargo test -p domain -p application 2>&1 | tail -5
|
||||
```
|
||||
|
||||
- [ ] **Step 9: Commit**
|
||||
|
||||
```bash
|
||||
git add crates/domain/src/events.rs \
|
||||
crates/domain/src/ports.rs \
|
||||
crates/domain/src/testing.rs \
|
||||
crates/application/src/use_cases/profile.rs \
|
||||
crates/application/src/services/federation_event.rs \
|
||||
crates/presentation/src/handlers/users.rs \
|
||||
crates/adapters/activitypub-base/src/service.rs
|
||||
git commit -m "feat(ap): broadcast Update(Actor) when user updates their profile"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 5: @mention notification
|
||||
|
||||
When a remote Note arrives with a Mention tag pointing to a local user, create a notification. The Note's `tag` array contains objects like `{"type":"Mention","href":"https://our.instance/users/{uuid}","name":"@user@domain"}`.
|
||||
|
||||
**Files:**
|
||||
- Modify: `crates/adapters/activitypub/src/note.rs` — add tag field
|
||||
- Modify: `crates/adapters/activitypub-base/src/content.rs` — add on_mention to trait
|
||||
- Modify: `crates/adapters/activitypub/src/handler.rs` — parse tags, implement on_mention
|
||||
- Modify: `crates/domain/src/events.rs` — add MentionReceived
|
||||
- Modify: `crates/domain/src/testing.rs` — no-op on_mention
|
||||
- Modify: `crates/application/src/services/notification_event.rs` — handle MentionReceived
|
||||
|
||||
- [ ] **Step 1: Add `tag` field to `ThoughtNote` in `crates/adapters/activitypub/src/note.rs`**
|
||||
|
||||
Read the file. Add to the `ThoughtNote` struct:
|
||||
|
||||
```rust
|
||||
#[serde(skip_serializing_if = "Vec::is_empty", default)]
|
||||
pub tag: Vec<serde_json::Value>,
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Add `MentionReceived` to `DomainEvent` in `crates/domain/src/events.rs`**
|
||||
|
||||
Add to the enum:
|
||||
|
||||
```rust
|
||||
MentionReceived {
|
||||
thought_id: ThoughtId,
|
||||
mentioned_user_id: UserId,
|
||||
author_user_id: UserId,
|
||||
},
|
||||
```
|
||||
|
||||
- [ ] **Step 3: Add `on_mention` to `ApObjectHandler` in `crates/adapters/activitypub-base/src/content.rs`**
|
||||
|
||||
Add after `on_unlike`:
|
||||
|
||||
```rust
|
||||
/// Called once per @mention of a local user in a remote Note.
|
||||
/// `thought_ap_id` is the AP URL of the Note, `mentioned_user_id` is the UUID
|
||||
/// of the local user being mentioned, `actor_url` is the remote author's AP URL.
|
||||
async fn on_mention(
|
||||
&self,
|
||||
thought_ap_id: &Url,
|
||||
mentioned_user_uuid: uuid::Uuid,
|
||||
actor_url: &Url,
|
||||
) -> anyhow::Result<()>;
|
||||
```
|
||||
|
||||
- [ ] **Step 4: Add no-op `on_mention` to `TestStore`'s `ApObjectHandler` impl in `crates/domain/src/testing.rs`**
|
||||
|
||||
Note: `TestStore` does NOT implement `ApObjectHandler` — that's `ThoughtsObjectHandler` in the activitypub adapter. Instead, find if there is a test double or just implement in handler.rs directly (step 5 below covers it).
|
||||
|
||||
- [ ] **Step 5: Implement `on_mention` in `ThoughtsObjectHandler` in `crates/adapters/activitypub/src/handler.rs`**
|
||||
|
||||
Add after `on_unlike`:
|
||||
|
||||
```rust
|
||||
async fn on_mention(
|
||||
&self,
|
||||
thought_ap_id: &url::Url,
|
||||
mentioned_user_uuid: uuid::Uuid,
|
||||
actor_url: &url::Url,
|
||||
) -> anyhow::Result<()> {
|
||||
// Resolve remote author to a local user ID.
|
||||
let author_user_id = match self
|
||||
.repo
|
||||
.find_remote_actor_id(actor_url)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?
|
||||
{
|
||||
Some(id) => id,
|
||||
None => return Ok(()),
|
||||
};
|
||||
|
||||
// Extract thought UUID from /thoughts/{uuid} path.
|
||||
let thought_uuid = thought_ap_id
|
||||
.path()
|
||||
.strip_prefix("/thoughts/")
|
||||
.and_then(|s| s.split('/').next())
|
||||
.and_then(|s| uuid::Uuid::parse_str(s).ok());
|
||||
|
||||
let thought_uuid = match thought_uuid {
|
||||
Some(u) => u,
|
||||
None => return Ok(()),
|
||||
};
|
||||
|
||||
if let Some(ep) = &self.event_publisher {
|
||||
ep.publish(&domain::events::DomainEvent::MentionReceived {
|
||||
thought_id: domain::value_objects::ThoughtId::from_uuid(thought_uuid),
|
||||
mentioned_user_id: domain::value_objects::UserId::from_uuid(mentioned_user_uuid),
|
||||
author_user_id,
|
||||
})
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 6: Parse Mention tags and call `on_mention` in `ThoughtsObjectHandler::on_create`**
|
||||
|
||||
Find `on_create`. After the `accept_note(...)` call, add:
|
||||
|
||||
```rust
|
||||
// Fire mention notifications for any local @mentions in the note's tag array.
|
||||
let local_domain = self.urls.base_url().host_str().unwrap_or("");
|
||||
for tag in ¬e.tag {
|
||||
if tag.get("type").and_then(|t| t.as_str()) != Some("Mention") {
|
||||
continue;
|
||||
}
|
||||
let href = match tag.get("href").and_then(|h| h.as_str()) {
|
||||
Some(h) => h,
|
||||
None => continue,
|
||||
};
|
||||
let href_url = match url::Url::parse(href) {
|
||||
Ok(u) => u,
|
||||
Err(_) => continue,
|
||||
};
|
||||
// Only process mentions of local users (UUID-based /users/{uuid} paths).
|
||||
if href_url.host_str().unwrap_or("") != local_domain {
|
||||
continue;
|
||||
}
|
||||
let user_uuid = href_url
|
||||
.path()
|
||||
.strip_prefix("/users/")
|
||||
.and_then(|s| s.split('/').next())
|
||||
.and_then(|s| uuid::Uuid::parse_str(s).ok());
|
||||
if let Some(uuid) = user_uuid {
|
||||
self.on_mention(ap_id, uuid, actor_url)
|
||||
.await
|
||||
.unwrap_or_else(|e| {
|
||||
tracing::warn!(error = %e, "failed to process mention notification");
|
||||
});
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Note: `self.urls.base_url()` — check `ThoughtsUrls` for how to get the base URL `Url`. If not available, parse `self.urls` fields or add a helper. Check the `ThoughtsUrls` struct in `crates/adapters/activitypub/src/urls.rs`.
|
||||
|
||||
- [ ] **Step 7: Handle `MentionReceived` in `crates/application/src/services/notification_event.rs`**
|
||||
|
||||
Find the `match event` block. Add before the `_ => Ok(())` catch-all:
|
||||
|
||||
```rust
|
||||
DomainEvent::MentionReceived {
|
||||
thought_id,
|
||||
mentioned_user_id,
|
||||
author_user_id,
|
||||
} => {
|
||||
self.notifications
|
||||
.save(&Notification {
|
||||
id: NotificationId::new(),
|
||||
user_id: mentioned_user_id.clone(),
|
||||
notification_type: NotificationType::Mention,
|
||||
from_user_id: Some(author_user_id.clone()),
|
||||
thought_id: Some(thought_id.clone()),
|
||||
read: false,
|
||||
created_at: Utc::now(),
|
||||
})
|
||||
.await
|
||||
}
|
||||
```
|
||||
|
||||
Make sure `NotificationType::Mention` is a variant — check `crates/domain/src/models/notification.rs`. It already has `Mention` variant.
|
||||
|
||||
- [ ] **Step 8: Verify build and tests**
|
||||
|
||||
```bash
|
||||
cd /mnt/drive/dev/thoughts && cargo build 2>&1 | grep "^error" | head -10
|
||||
cargo test -p domain -p application 2>&1 | tail -5
|
||||
```
|
||||
|
||||
- [ ] **Step 9: Commit**
|
||||
|
||||
```bash
|
||||
git add crates/adapters/activitypub/src/note.rs \
|
||||
crates/adapters/activitypub-base/src/content.rs \
|
||||
crates/adapters/activitypub/src/handler.rs \
|
||||
crates/domain/src/events.rs \
|
||||
crates/application/src/services/notification_event.rs
|
||||
git commit -m "feat(ap): @mention notification from inbound remote Notes"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 6: Remote posts in home feed
|
||||
|
||||
The `home_feed` SQL currently only includes thoughts from users in the `follows` table (local follows). Remote follows are in `federation_following`, so remote users' posts never appear. Extend the SQL to also include thoughts from users whose AP URL is in `federation_following` for the viewer.
|
||||
|
||||
**Files:**
|
||||
- Modify: `crates/adapters/postgres/src/feed.rs`
|
||||
|
||||
- [ ] **Step 1: Read `crates/adapters/postgres/src/feed.rs` in full**
|
||||
|
||||
Focus on `fn feed_select(viewer: Option<uuid::Uuid>) -> String` and `async fn home_feed(...)`.
|
||||
|
||||
Key insight: `feed_select` embeds `viewer` UUID directly into the SQL string (not as a bind parameter). The home_feed SQL uses `$1` (following_ids), `$2` (limit), `$3` (offset) as bind params.
|
||||
|
||||
- [ ] **Step 2: Add `follower` parameter to `feed_select`**
|
||||
|
||||
Change the signature to:
|
||||
```rust
|
||||
fn feed_select(viewer: Option<uuid::Uuid>, follower: Option<uuid::Uuid>) -> String
|
||||
```
|
||||
|
||||
At the top of the function body, generate a federation following subquery:
|
||||
```rust
|
||||
let federation_clause = match follower {
|
||||
Some(fid) => format!(
|
||||
"OR t.user_id IN (
|
||||
SELECT u2.id FROM users u2
|
||||
JOIN federation_following ff ON u2.ap_id = ff.remote_actor_url
|
||||
WHERE ff.local_user_id = '{fid}'
|
||||
)"
|
||||
),
|
||||
None => String::new(),
|
||||
};
|
||||
```
|
||||
|
||||
This string is used in step 3's WHERE clause modification.
|
||||
|
||||
Since `feed_select` generates only the SELECT part (not WHERE), the `federation_clause` needs to be returned somehow. Options:
|
||||
- Return a tuple `(select_str, federation_clause)` from `feed_select`
|
||||
- Or add a separate helper `fn federation_following_clause(follower: Option<uuid::Uuid>) -> String`
|
||||
|
||||
Use option B — separate helper — to avoid changing `feed_select`'s return type:
|
||||
|
||||
```rust
|
||||
fn federation_following_clause(follower: Option<uuid::Uuid>) -> String {
|
||||
match follower {
|
||||
Some(fid) => format!(
|
||||
" OR t.user_id IN (
|
||||
SELECT u2.id FROM users u2
|
||||
JOIN federation_following ff ON u2.ap_id = ff.remote_actor_url
|
||||
WHERE ff.local_user_id = '{fid}'
|
||||
)"
|
||||
),
|
||||
None => String::new(),
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Leave `feed_select` signature unchanged.
|
||||
|
||||
- [ ] **Step 3: Modify `home_feed` to use `federation_following_clause`**
|
||||
|
||||
Find the `home_feed` method. The viewer_id is the feed owner (the logged-in user), which is also the person whose federation_following we want.
|
||||
|
||||
Replace:
|
||||
```rust
|
||||
let viewer = viewer_id.map(|v| v.as_uuid());
|
||||
// ...
|
||||
let total: i64 = sqlx::query_scalar(
|
||||
"SELECT COUNT(*) FROM thoughts t WHERE t.user_id=ANY($1) AND t.visibility != 'direct'",
|
||||
)
|
||||
```
|
||||
|
||||
With:
|
||||
```rust
|
||||
let viewer = viewer_id.map(|v| v.as_uuid());
|
||||
let fed_clause = federation_following_clause(viewer);
|
||||
let total: i64 = sqlx::query_scalar(&format!(
|
||||
"SELECT COUNT(*) FROM thoughts t WHERE (t.user_id=ANY($1){}) AND t.visibility != 'direct'",
|
||||
fed_clause
|
||||
))
|
||||
```
|
||||
|
||||
And replace:
|
||||
```rust
|
||||
let sql = format!("{sel} WHERE t.user_id=ANY($1) AND t.visibility != 'direct' ORDER BY t.created_at DESC LIMIT $2 OFFSET $3");
|
||||
```
|
||||
|
||||
With:
|
||||
```rust
|
||||
let sql = format!("{sel} WHERE (t.user_id=ANY($1){}) AND t.visibility != 'direct' ORDER BY t.created_at DESC LIMIT $2 OFFSET $3", fed_clause);
|
||||
```
|
||||
|
||||
The rest of the bindings (`$1`, `$2`, `$3`) stay unchanged.
|
||||
|
||||
- [ ] **Step 4: Verify compilation**
|
||||
|
||||
```bash
|
||||
cd /mnt/drive/dev/thoughts && cargo build -p postgres 2>&1 | grep "^error" | head -5
|
||||
```
|
||||
|
||||
- [ ] **Step 5: Run all tests**
|
||||
|
||||
```bash
|
||||
cd /mnt/drive/dev/thoughts && cargo test -p domain -p application 2>&1 | tail -5
|
||||
```
|
||||
|
||||
- [ ] **Step 6: Commit**
|
||||
|
||||
```bash
|
||||
git add crates/adapters/postgres/src/feed.rs
|
||||
git commit -m "feat(feed): include remote following posts in home feed"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Task 7: Reply parent display + API field
|
||||
|
||||
Remote posts that are replies show without context because:
|
||||
1. `ThoughtResponse` doesn't expose `in_reply_to_url` (the external URL of the parent)
|
||||
2. The frontend doesn't link to the parent when it's external
|
||||
|
||||
**Files:**
|
||||
- Modify: `crates/api-types/src/responses.rs`
|
||||
- Modify: `crates/presentation/src/handlers/feed.rs` (or wherever `to_thought_response` is defined)
|
||||
- Modify: `thoughts-frontend/lib/api.ts`
|
||||
- Modify: `thoughts-frontend/components/thought-card.tsx`
|
||||
|
||||
- [ ] **Step 1: Add `in_reply_to_url` to `ThoughtResponse` in `crates/api-types/src/responses.rs`**
|
||||
|
||||
Read the file. Find `ThoughtResponse` struct. Add after `reply_to_id`:
|
||||
|
||||
```rust
|
||||
#[serde(rename = "replyToUrl", skip_serializing_if = "Option::is_none")]
|
||||
pub in_reply_to_url: Option<String>,
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Map `in_reply_to_url` in the response builder**
|
||||
|
||||
Find where `ThoughtResponse` is constructed from a `Thought` (search for `ThoughtResponse {` or `to_thought_response`). Add the mapping:
|
||||
|
||||
```rust
|
||||
in_reply_to_url: thought.in_reply_to_url.clone(),
|
||||
```
|
||||
|
||||
- [ ] **Step 3: Verify backend compilation**
|
||||
|
||||
```bash
|
||||
cd /mnt/drive/dev/thoughts && cargo build 2>&1 | grep "^error" | head -5
|
||||
```
|
||||
|
||||
- [ ] **Step 4: Add `replyToUrl` to `ThoughtSchema` in `thoughts-frontend/lib/api.ts`**
|
||||
|
||||
Find `ThoughtSchema`. Add:
|
||||
|
||||
```typescript
|
||||
replyToUrl: z.string().url().nullable().optional(),
|
||||
```
|
||||
|
||||
- [ ] **Step 5: Update `thought-card.tsx` to show external reply link**
|
||||
|
||||
Read `thoughts-frontend/components/thought-card.tsx`. Find the section that renders `thought.replyToId`. It currently shows "Replying to parent thought" with a hash link only when `isReply` is true.
|
||||
|
||||
Add an external reply link for when the thought has a `replyToUrl` but no local `replyToId`:
|
||||
|
||||
```tsx
|
||||
{thought.replyToId && isReply && (
|
||||
<div className="text-sm text-muted-foreground flex items-center gap-2">
|
||||
<CornerUpLeft className="h-4 w-4 text-primary/70" />
|
||||
<span>
|
||||
Replying to{" "}
|
||||
<Link
|
||||
href={`#${thought.replyToId}`}
|
||||
className="hover:underline text-primary text-shadow-sm"
|
||||
>
|
||||
parent thought
|
||||
</Link>
|
||||
</span>
|
||||
</div>
|
||||
)}
|
||||
{!thought.replyToId && thought.replyToUrl && (
|
||||
<div className="text-sm text-muted-foreground flex items-center gap-2">
|
||||
<CornerUpLeft className="h-4 w-4 text-primary/70" />
|
||||
<span>
|
||||
Replying to{" "}
|
||||
<a
|
||||
href={thought.replyToUrl}
|
||||
target="_blank"
|
||||
rel="noopener noreferrer"
|
||||
className="hover:underline text-primary text-shadow-sm"
|
||||
>
|
||||
original post ↗
|
||||
</a>
|
||||
</span>
|
||||
</div>
|
||||
)}
|
||||
```
|
||||
|
||||
- [ ] **Step 6: Type check frontend**
|
||||
|
||||
```bash
|
||||
cd /mnt/drive/dev/thoughts/thoughts-frontend && npx tsc --noEmit 2>&1 | grep "error TS" | head -5
|
||||
```
|
||||
|
||||
- [ ] **Step 7: Final build + tests**
|
||||
|
||||
```bash
|
||||
cd /mnt/drive/dev/thoughts && cargo build 2>&1 | grep "^error"
|
||||
cargo test -p domain -p application 2>&1 | tail -5
|
||||
```
|
||||
|
||||
- [ ] **Step 8: Commit**
|
||||
|
||||
```bash
|
||||
git add crates/api-types/src/responses.rs \
|
||||
thoughts-frontend/lib/api.ts \
|
||||
thoughts-frontend/components/thought-card.tsx
|
||||
git commit -m "feat: expose replyToUrl in API + show external parent link on remote reply posts"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Notes
|
||||
|
||||
- **Task 5 (mentions)**: `self.urls.base_url()` — if `ThoughtsUrls` doesn't expose the base URL as a `Url`, parse it from `self.urls.base_url` string. Check `crates/adapters/activitypub/src/urls.rs` for the exact field.
|
||||
- **Task 6 (feed)**: The embedded UUID in the SQL is a UUID type (hex + hyphens only), safe to format-string without SQL injection risk.
|
||||
- **Task 7 (reply)**: The `to_thought_response` builder might be in `handlers/feed.rs`, `handlers/thoughts.rs`, or a shared module — search the codebase for where `ThoughtResponse` is constructed.
|
||||
- **Profile update (Task 4)**: If tests in `application` call `update_profile` directly, they'll need to pass a `TestStore` as the `events` parameter (TestStore implements EventPublisher).
|
||||
@@ -1,492 +0,0 @@
|
||||
# FeedEntry Decoupling Implementation Plan
|
||||
|
||||
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
||||
|
||||
**Goal:** Replace the flat `liked_by_viewer`/`boosted_by_viewer` booleans and inline stats fields on `FeedEntry` with two named sub-structs (`EngagementStats`, `Option<ViewerContext>`), and fix the search adapter to compute real viewer context instead of hardcoding `false`.
|
||||
|
||||
**Architecture:** Three sequential tasks. Task 1 changes the domain model, which breaks compilation. Task 2 fixes all downstream construction sites and restores compilation. Task 3 adds the functional improvement — viewer-aware SQL in the search adapter.
|
||||
|
||||
**Tech Stack:** Rust, SQLx, Postgres trigram search (`pg_trgm`).
|
||||
|
||||
---
|
||||
|
||||
### Task 1: Add `EngagementStats` and `ViewerContext` to the domain model
|
||||
|
||||
**Files:**
|
||||
- Modify: `crates/domain/src/models/feed.rs`
|
||||
|
||||
- [ ] **Step 1: Replace the flat fields on `FeedEntry` with two named sub-structs**
|
||||
|
||||
Replace the entire contents of `crates/domain/src/models/feed.rs` with:
|
||||
|
||||
```rust
|
||||
use crate::models::{thought::Thought, user::User};
|
||||
use crate::value_objects::UserId;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct EngagementStats {
|
||||
pub like_count: i64,
|
||||
pub boost_count: i64,
|
||||
pub reply_count: i64,
|
||||
}
|
||||
|
||||
/// Present only when an authenticated viewer made the request.
|
||||
/// `liked`/`boosted` are the viewer's interaction state with this thought.
|
||||
/// `None` means anonymous request or viewer context unavailable.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ViewerContext {
|
||||
pub liked: bool,
|
||||
pub boosted: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct FeedEntry {
|
||||
pub thought: Thought,
|
||||
pub author: User,
|
||||
pub stats: EngagementStats,
|
||||
pub viewer: Option<ViewerContext>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct UserSummary {
|
||||
pub id: UserId,
|
||||
pub username: String,
|
||||
pub display_name: Option<String>,
|
||||
pub avatar_url: Option<String>,
|
||||
pub bio: Option<String>,
|
||||
pub thought_count: i64,
|
||||
pub follower_count: i64,
|
||||
pub following_count: i64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PageParams {
|
||||
pub page: u64,
|
||||
pub per_page: u64,
|
||||
}
|
||||
impl PageParams {
|
||||
pub fn offset(&self) -> i64 {
|
||||
((self.page.saturating_sub(1)) * self.per_page) as i64
|
||||
}
|
||||
pub fn limit(&self) -> i64 {
|
||||
self.per_page as i64
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Paginated<T> {
|
||||
pub items: Vec<T>,
|
||||
pub total: i64,
|
||||
pub page: u64,
|
||||
pub per_page: u64,
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Verify the domain crate compiles (other crates will break)**
|
||||
|
||||
```bash
|
||||
cargo check -p domain 2>&1 | head -10
|
||||
```
|
||||
|
||||
Expected: `domain` compiles clean. Other crates (`postgres`, `postgres-search`, `presentation`) will show errors referencing the removed fields — that is expected and will be fixed in Task 2.
|
||||
|
||||
- [ ] **Step 3: Commit the domain model change**
|
||||
|
||||
```bash
|
||||
git add crates/domain/src/models/feed.rs
|
||||
git commit -m "refactor(domain): FeedEntry — EngagementStats + Option<ViewerContext> sub-structs"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 2: Fix downstream compilation — adapters and handler
|
||||
|
||||
**Files:**
|
||||
- Modify: `crates/adapters/postgres/src/feed.rs` (line 136 — `row_to_entry`)
|
||||
- Modify: `crates/adapters/postgres-search/src/lib.rs` (line 97 — `row_to_entry`)
|
||||
- Modify: `crates/presentation/src/handlers/feed.rs` (line 22 — `to_thought_response`)
|
||||
|
||||
- [ ] **Step 1: Update `row_to_entry` in `postgres/src/feed.rs`**
|
||||
|
||||
Find `row_to_entry` in `crates/adapters/postgres/src/feed.rs` (around line 109). Replace the `Ok(FeedEntry { ... })` block (currently lines 136–144) with:
|
||||
|
||||
```rust
|
||||
Ok(FeedEntry {
|
||||
thought,
|
||||
author,
|
||||
stats: domain::models::feed::EngagementStats {
|
||||
like_count: r.like_count,
|
||||
boost_count: r.boost_count,
|
||||
reply_count: r.reply_count,
|
||||
},
|
||||
viewer: Some(domain::models::feed::ViewerContext {
|
||||
liked: r.liked_by_viewer,
|
||||
boosted: r.boosted_by_viewer,
|
||||
}),
|
||||
})
|
||||
```
|
||||
|
||||
Note: `postgres/src/feed.rs` already builds `viewer = Some(...)` unconditionally here because its `feed_select(viewer)` function always produces `liked_by_viewer`/`boosted_by_viewer` columns — `false AS liked_by_viewer` when there is no viewer, and the real EXISTS result when there is one. The `Option<ViewerContext>` distinction (`None` = anonymous) is handled by the caller's knowledge of whether a viewer was passed. To preserve the `None`-when-no-viewer semantic, read how `viewer` is passed into the calling functions and thread it through.
|
||||
|
||||
Actually, the correct fix: the `row_to_entry` function doesn't know if a viewer was passed. Pass the viewer `Option<uuid::Uuid>` as a parameter so it can decide:
|
||||
|
||||
Replace the signature of `row_to_entry`:
|
||||
```rust
|
||||
fn row_to_entry(r: FeedRow, viewer: Option<uuid::Uuid>) -> Result<FeedEntry, DomainError> {
|
||||
```
|
||||
|
||||
And change the construction:
|
||||
```rust
|
||||
Ok(FeedEntry {
|
||||
thought,
|
||||
author,
|
||||
stats: domain::models::feed::EngagementStats {
|
||||
like_count: r.like_count,
|
||||
boost_count: r.boost_count,
|
||||
reply_count: r.reply_count,
|
||||
},
|
||||
viewer: viewer.map(|_| domain::models::feed::ViewerContext {
|
||||
liked: r.liked_by_viewer,
|
||||
boosted: r.boosted_by_viewer,
|
||||
}),
|
||||
})
|
||||
```
|
||||
|
||||
Then update all call sites of `row_to_entry` inside `postgres/src/feed.rs`. Each `FeedRepository` method already has a `viewer` variable of type `Option<uuid::Uuid>`. Pass it through:
|
||||
|
||||
```rust
|
||||
// Before:
|
||||
.map(row_to_entry)
|
||||
.collect::<Result<Vec<_>, _>>()?
|
||||
|
||||
// After:
|
||||
.map(|r| row_to_entry(r, viewer))
|
||||
.collect::<Result<Vec<_>, _>>()?
|
||||
```
|
||||
|
||||
Read `crates/adapters/postgres/src/feed.rs` to find all five `impl FeedRepository` methods and update each `.map(row_to_entry)` call.
|
||||
|
||||
- [ ] **Step 2: Update `row_to_entry` in `postgres-search/src/lib.rs`**
|
||||
|
||||
In `crates/adapters/postgres-search/src/lib.rs`, find `row_to_entry` (line 70). Change the `Ok(FeedEntry { ... })` block (lines 97–105) to:
|
||||
|
||||
```rust
|
||||
Ok(FeedEntry {
|
||||
thought,
|
||||
author,
|
||||
stats: domain::models::feed::EngagementStats {
|
||||
like_count: r.like_count,
|
||||
boost_count: r.boost_count,
|
||||
reply_count: r.reply_count,
|
||||
},
|
||||
viewer: None, // Task 3 will fix this to use real viewer data
|
||||
})
|
||||
```
|
||||
|
||||
Add `EngagementStats` and `ViewerContext` to the domain import at the top if needed (they're in `domain::models::feed`). The existing import already pulls in `FeedEntry` from that module.
|
||||
|
||||
- [ ] **Step 3: Update `to_thought_response` in `presentation/src/handlers/feed.rs`**
|
||||
|
||||
Find `to_thought_response` (line 22 in `crates/presentation/src/handlers/feed.rs`). Update it to read from the new sub-structs:
|
||||
|
||||
```rust
|
||||
pub fn to_thought_response(e: &domain::models::feed::FeedEntry) -> ThoughtResponse {
|
||||
ThoughtResponse {
|
||||
id: e.thought.id.as_uuid(),
|
||||
content: e.thought.content.as_str().to_string(),
|
||||
author: to_user_response(&e.author),
|
||||
in_reply_to_id: e.thought.in_reply_to_id.as_ref().map(|id| id.as_uuid()),
|
||||
in_reply_to_url: None,
|
||||
visibility: e.thought.visibility.as_str().to_string(),
|
||||
content_warning: e.thought.content_warning.clone(),
|
||||
sensitive: e.thought.sensitive,
|
||||
like_count: e.stats.like_count,
|
||||
boost_count: e.stats.boost_count,
|
||||
reply_count: e.stats.reply_count,
|
||||
liked_by_viewer: e.viewer.as_ref().map(|v| v.liked).unwrap_or(false),
|
||||
boosted_by_viewer: e.viewer.as_ref().map(|v| v.boosted).unwrap_or(false),
|
||||
created_at: e.thought.created_at,
|
||||
updated_at: e.thought.updated_at,
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
`ThoughtResponse` in `api-types/src/responses.rs` keeps `liked_by_viewer: bool` and `boosted_by_viewer: bool` — the wire format is unchanged.
|
||||
|
||||
- [ ] **Step 4: Compile check — full workspace must be clean**
|
||||
|
||||
```bash
|
||||
cargo check --workspace 2>&1 | head -20
|
||||
```
|
||||
|
||||
Expected: 0 errors. Fix any remaining references to the old flat fields (`e.like_count`, `e.liked_by_viewer`, etc.) — they must become `e.stats.like_count`, `e.viewer.as_ref().map(|v| v.liked).unwrap_or(false)`.
|
||||
|
||||
- [ ] **Step 5: Commit**
|
||||
|
||||
```bash
|
||||
git add crates/adapters/postgres/src/feed.rs \
|
||||
crates/adapters/postgres-search/src/lib.rs \
|
||||
crates/presentation/src/handlers/feed.rs
|
||||
git commit -m "refactor(adapters): update FeedEntry construction to use EngagementStats + ViewerContext"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 3: Fix search adapter — real viewer context instead of hardcoded `false`
|
||||
|
||||
**Files:**
|
||||
- Modify: `crates/adapters/postgres-search/src/lib.rs`
|
||||
|
||||
The `SearchPort::search_thoughts` signature already takes `viewer_id: Option<&UserId>` (the parameter is named `_viewer_id` because it was ignored). This task makes it real.
|
||||
|
||||
- [ ] **Step 1: Add `liked_by_viewer` and `boosted_by_viewer` to `FeedRow`**
|
||||
|
||||
In `crates/adapters/postgres-search/src/lib.rs`, find the `FeedRow` struct (line 27). Add two fields at the end:
|
||||
|
||||
```rust
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct FeedRow {
|
||||
thought_id: uuid::Uuid,
|
||||
t_user_id: uuid::Uuid,
|
||||
content: String,
|
||||
in_reply_to_id: Option<uuid::Uuid>,
|
||||
visibility: String,
|
||||
content_warning: Option<String>,
|
||||
sensitive: bool,
|
||||
t_local: bool,
|
||||
thought_created_at: DateTime<Utc>,
|
||||
updated_at: Option<DateTime<Utc>>,
|
||||
author_id: uuid::Uuid,
|
||||
username: String,
|
||||
email: String,
|
||||
password_hash: String,
|
||||
display_name: Option<String>,
|
||||
bio: Option<String>,
|
||||
avatar_url: Option<String>,
|
||||
header_url: Option<String>,
|
||||
custom_css: Option<String>,
|
||||
author_local: bool,
|
||||
author_created_at: DateTime<Utc>,
|
||||
author_updated_at: DateTime<Utc>,
|
||||
like_count: i64,
|
||||
boost_count: i64,
|
||||
reply_count: i64,
|
||||
liked_by_viewer: bool, // NEW
|
||||
boosted_by_viewer: bool, // NEW
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Replace `FEED_SELECT` constant with a `feed_select(viewer)` function**
|
||||
|
||||
Delete the `const FEED_SELECT` and replace with a function that injects viewer-aware columns — identical pattern to `postgres/src/feed.rs`:
|
||||
|
||||
```rust
|
||||
fn feed_select(viewer: Option<uuid::Uuid>) -> String {
|
||||
let viewer_checks = match viewer {
|
||||
Some(uid) => format!(
|
||||
"EXISTS(SELECT 1 FROM likes WHERE user_id='{uid}' AND thought_id=t.id) AS liked_by_viewer,
|
||||
EXISTS(SELECT 1 FROM boosts WHERE user_id='{uid}' AND thought_id=t.id) AS boosted_by_viewer"
|
||||
),
|
||||
None => "false AS liked_by_viewer, false AS boosted_by_viewer".to_string(),
|
||||
};
|
||||
format!(
|
||||
"
|
||||
SELECT
|
||||
t.id AS thought_id, t.user_id AS t_user_id, t.content,
|
||||
t.in_reply_to_id,
|
||||
t.visibility, t.content_warning, t.sensitive, t.local AS t_local,
|
||||
t.created_at AS thought_created_at, t.updated_at,
|
||||
u.id AS author_id, u.username, u.email, u.password_hash,
|
||||
u.display_name, u.bio, u.avatar_url, u.header_url, u.custom_css,
|
||||
u.local AS author_local,
|
||||
u.created_at AS author_created_at, u.updated_at AS author_updated_at,
|
||||
(SELECT COUNT(*) FROM likes l WHERE l.thought_id=t.id) AS like_count,
|
||||
(SELECT COUNT(*) FROM boosts b WHERE b.thought_id=t.id) AS boost_count,
|
||||
(SELECT COUNT(*) FROM thoughts r WHERE r.in_reply_to_id=t.id) AS reply_count,
|
||||
{viewer_checks}
|
||||
FROM thoughts t JOIN users u ON u.id=t.user_id"
|
||||
)
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 3: Update `row_to_entry` to use viewer fields**
|
||||
|
||||
Update `row_to_entry` to accept `viewer: Option<uuid::Uuid>` and build the `ViewerContext`:
|
||||
|
||||
```rust
|
||||
fn row_to_entry(r: FeedRow, viewer: Option<uuid::Uuid>) -> Result<FeedEntry, DomainError> {
|
||||
let thought = Thought {
|
||||
id: ThoughtId::from_uuid(r.thought_id),
|
||||
user_id: UserId::from_uuid(r.t_user_id),
|
||||
content: Content::new_remote(r.content),
|
||||
in_reply_to_id: r.in_reply_to_id.map(ThoughtId::from_uuid),
|
||||
visibility: Visibility::from_db_str(&r.visibility)?,
|
||||
content_warning: r.content_warning,
|
||||
sensitive: r.sensitive,
|
||||
local: r.t_local,
|
||||
created_at: r.thought_created_at,
|
||||
updated_at: r.updated_at,
|
||||
};
|
||||
let author = User {
|
||||
id: UserId::from_uuid(r.author_id),
|
||||
username: Username::from_trusted(r.username),
|
||||
email: Email::from_trusted(r.email),
|
||||
password_hash: PasswordHash(r.password_hash),
|
||||
display_name: r.display_name,
|
||||
bio: r.bio,
|
||||
avatar_url: r.avatar_url,
|
||||
header_url: r.header_url,
|
||||
custom_css: r.custom_css,
|
||||
local: r.author_local,
|
||||
created_at: r.author_created_at,
|
||||
updated_at: r.author_updated_at,
|
||||
};
|
||||
Ok(FeedEntry {
|
||||
thought,
|
||||
author,
|
||||
stats: domain::models::feed::EngagementStats {
|
||||
like_count: r.like_count,
|
||||
boost_count: r.boost_count,
|
||||
reply_count: r.reply_count,
|
||||
},
|
||||
viewer: viewer.map(|_| domain::models::feed::ViewerContext {
|
||||
liked: r.liked_by_viewer,
|
||||
boosted: r.boosted_by_viewer,
|
||||
}),
|
||||
})
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 4: Update `search_thoughts` to use viewer_id**
|
||||
|
||||
Find `search_thoughts` in `crates/adapters/postgres-search/src/lib.rs` (line 110). Rename `_viewer_id` → `viewer_id`, extract the viewer UUID, and thread it through `feed_select` and `row_to_entry`:
|
||||
|
||||
```rust
|
||||
async fn search_thoughts(
|
||||
&self,
|
||||
query: &str,
|
||||
page: &PageParams,
|
||||
viewer_id: Option<&UserId>, // was _viewer_id
|
||||
) -> Result<Paginated<FeedEntry>, DomainError> {
|
||||
let viewer = viewer_id.map(|v| v.as_uuid());
|
||||
let select = feed_select(viewer);
|
||||
|
||||
let total: i64 = sqlx::query_scalar(
|
||||
"SELECT COUNT(*) FROM thoughts t
|
||||
WHERE t.content % $1 AND t.visibility='public'",
|
||||
)
|
||||
.bind(query)
|
||||
.fetch_one(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
|
||||
let sql = format!(
|
||||
"{select}
|
||||
WHERE t.content % $1 AND t.visibility='public'
|
||||
ORDER BY similarity(t.content, $1) DESC
|
||||
LIMIT $2 OFFSET $3"
|
||||
);
|
||||
let rows = sqlx::query_as::<_, FeedRow>(&sql)
|
||||
.bind(query)
|
||||
.bind(page.limit())
|
||||
.bind(page.offset())
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
|
||||
Ok(Paginated {
|
||||
items: rows
|
||||
.into_iter()
|
||||
.map(|r| row_to_entry(r, viewer))
|
||||
.collect::<Result<Vec<_>, _>>()?,
|
||||
total,
|
||||
page: page.page,
|
||||
per_page: page.per_page,
|
||||
})
|
||||
}
|
||||
```
|
||||
|
||||
Note: `USER_SELECT` from `postgres::user` is no longer used in this file after the switch from const to function. Remove the `use postgres::user::{UserRow, USER_SELECT};` import if `UserRow`/`USER_SELECT` are no longer referenced.
|
||||
|
||||
- [ ] **Step 5: Add an integration test for viewer-aware search**
|
||||
|
||||
In the `#[cfg(test)]` module in `postgres-search/src/lib.rs`, add after the existing tests:
|
||||
|
||||
```rust
|
||||
#[sqlx::test(migrations = "../postgres/migrations")]
|
||||
async fn search_thoughts_sets_viewer_context_when_authed(pool: sqlx::PgPool) {
|
||||
use domain::ports::{LikeRepository, UserWriter};
|
||||
use postgres::{like::PgLikeRepository, user::PgUserRepository};
|
||||
use domain::models::social::Like;
|
||||
use domain::value_objects::LikeId;
|
||||
|
||||
let (alice, thought) = seed_thought(&pool, "alice", "hello world").await;
|
||||
|
||||
// alice likes her own thought
|
||||
let like_repo = PgLikeRepository::new(pool.clone());
|
||||
like_repo.save(&Like {
|
||||
id: LikeId::new(),
|
||||
user_id: alice.id.clone(),
|
||||
thought_id: thought.id.clone(),
|
||||
ap_id: None,
|
||||
created_at: chrono::Utc::now(),
|
||||
}).await.unwrap();
|
||||
|
||||
let repo = PgSearchRepository::new(pool);
|
||||
|
||||
// with viewer — should see liked = true
|
||||
let authed = repo
|
||||
.search_thoughts("hello", &PageParams { page: 1, per_page: 20 }, Some(&alice.id))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(authed.items.len(), 1);
|
||||
let ctx = authed.items[0].viewer.as_ref().expect("viewer context present");
|
||||
assert!(ctx.liked, "alice should see the thought as liked");
|
||||
assert!(!ctx.boosted);
|
||||
|
||||
// without viewer — viewer should be None
|
||||
let anon = repo
|
||||
.search_thoughts("hello", &PageParams { page: 1, per_page: 20 }, None)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(anon.items.len(), 1);
|
||||
assert!(anon.items[0].viewer.is_none(), "anonymous request has no viewer context");
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 6: Compile check**
|
||||
|
||||
```bash
|
||||
cargo check --workspace 2>&1 | head -20
|
||||
```
|
||||
|
||||
Expected: 0 errors.
|
||||
|
||||
- [ ] **Step 7: Commit**
|
||||
|
||||
```bash
|
||||
git add crates/adapters/postgres-search/src/lib.rs
|
||||
git commit -m "fix(search): viewer-aware SQL in search_thoughts — ViewerContext now real instead of hardcoded false"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Self-Review
|
||||
|
||||
**Spec coverage:**
|
||||
|
||||
| Spec requirement | Task |
|
||||
|---|---|
|
||||
| Add `EngagementStats` struct | Task 1 |
|
||||
| Add `ViewerContext` struct | Task 1 |
|
||||
| `FeedEntry.viewer: Option<ViewerContext>` | Task 1 |
|
||||
| postgres feed adapter uses new structs | Task 2 |
|
||||
| Handler `to_thought_response` uses new fields | Task 2 |
|
||||
| search adapter `viewer: None` (structural fix) | Task 2 |
|
||||
| search adapter uses real viewer SQL (functional fix) | Task 3 |
|
||||
| `viewer: None` = anonymous; `Some(...)` = viewer present | Tasks 2 + 3 |
|
||||
| Wire format (`ThoughtResponse`) unchanged | Task 2 step 3 |
|
||||
|
||||
**No placeholders found.**
|
||||
|
||||
**Type consistency:** `EngagementStats` and `ViewerContext` defined in Task 1, used by name in Tasks 2 and 3. `row_to_entry(r, viewer)` signature matches in both Task 2 and Task 3. `viewer: Option<uuid::Uuid>` threaded consistently.
|
||||
@@ -1,836 +0,0 @@
|
||||
# NATS Hardening, Dead-Letter Queue & Auth Security
|
||||
|
||||
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
||||
|
||||
**Goal:** Harden the NATS consumer (explicit config, ack timeouts, unknown-event acking), add an automatic-retry dead-letter queue backed by Postgres, and close three auth security gaps (weak secret validation, timing oracle, excessive JWT TTL).
|
||||
|
||||
**Architecture:** Seven sequential tasks. Tasks 1–3 are independent of each other and of 4–7 — they can be reviewed in any order. Tasks 4–7 form a dependency chain: delivery-count metadata (Task 4) → migration (Task 5) → Postgres store (Task 6) → worker DLQ loop (Task 7).
|
||||
|
||||
**Tech Stack:** Rust, Tokio, async-nats 0.48, SQLx, Postgres, argon2, jsonwebtoken.
|
||||
|
||||
---
|
||||
|
||||
### Task 1: Auth hardening — secret validation, timing equalization, TTL
|
||||
|
||||
**Files:**
|
||||
- Modify: `crates/bootstrap/src/factory.rs`
|
||||
- Modify: `crates/application/src/use_cases/auth.rs`
|
||||
- Modify: `crates/adapters/auth/src/lib.rs`
|
||||
|
||||
- [ ] **Step 1: Replace the magic JWT TTL constant and add secret length check in `factory.rs`**
|
||||
|
||||
In `crates/bootstrap/src/factory.rs`, find the current `const JWT_TTL_SECS: i64 = 86_400 * 30;` at the top of the file. Replace it and add a secret minimum constant, then add validation before the `JwtAuthService` is constructed:
|
||||
|
||||
```rust
|
||||
const JWT_TTL_SECS: i64 = 86_400; // 24 hours
|
||||
const JWT_SECRET_MIN_BYTES: usize = 32; // 256 bits minimum for HS256
|
||||
```
|
||||
|
||||
Then, just before `auth: Arc::new(auth::JwtAuthService::new(...))` in the `build` function, add:
|
||||
|
||||
```rust
|
||||
if cfg.jwt_secret.len() < JWT_SECRET_MIN_BYTES {
|
||||
panic!(
|
||||
"JWT_SECRET is {} bytes — minimum is {} bytes for HS256 security",
|
||||
cfg.jwt_secret.len(),
|
||||
JWT_SECRET_MIN_BYTES,
|
||||
);
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Update the auth test to use a 32-byte secret**
|
||||
|
||||
In `crates/adapters/auth/src/lib.rs`, find the tests that use `"secret".into()`:
|
||||
|
||||
```rust
|
||||
// Before (lines ~98, ~107):
|
||||
let svc = JwtAuthService::new("secret".into(), 3600);
|
||||
|
||||
// After (use 32+ byte secret):
|
||||
let svc = JwtAuthService::new("a-secret-that-is-at-least-32-bytes".into(), 3600);
|
||||
```
|
||||
|
||||
Update both test cases (`generate_and_validate_token` and `invalid_token_returns_unauthorized`).
|
||||
|
||||
- [ ] **Step 3: Add timing equalization to the login use case**
|
||||
|
||||
In `crates/application/src/use_cases/auth.rs`, find the `login` function. Currently it early-returns when the user is not found:
|
||||
|
||||
```rust
|
||||
let user = users
|
||||
.find_by_email(&email)
|
||||
.await?
|
||||
.ok_or(DomainError::Unauthorized)?;
|
||||
```
|
||||
|
||||
Replace with a timing-safe version that runs the hasher even when no user is found:
|
||||
|
||||
```rust
|
||||
let user = users.find_by_email(&email).await?;
|
||||
if user.is_none() {
|
||||
// Timing equalization — prevents email enumeration via response-time oracle.
|
||||
// Running the hasher on a miss makes "no such user" take the same time as
|
||||
// "wrong password", so attackers cannot distinguish the two cases.
|
||||
let _ = hasher.hash(&input.password).await;
|
||||
return Err(DomainError::Unauthorized);
|
||||
}
|
||||
let user = user.unwrap();
|
||||
```
|
||||
|
||||
- [ ] **Step 4: Compile check**
|
||||
|
||||
```bash
|
||||
cargo check --workspace 2>&1 | head -20
|
||||
```
|
||||
|
||||
Expected: 0 errors.
|
||||
|
||||
- [ ] **Step 5: Commit**
|
||||
|
||||
```bash
|
||||
git add crates/bootstrap/src/factory.rs \
|
||||
crates/application/src/use_cases/auth.rs \
|
||||
crates/adapters/auth/src/lib.rs
|
||||
git commit -m "fix(auth): validate JWT secret length, equalize login timing, reduce TTL to 24h"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 2: NATS consumer hardening — explicit config + ack timeouts
|
||||
|
||||
**Files:**
|
||||
- Modify: `crates/adapters/nats/src/lib.rs`
|
||||
|
||||
- [ ] **Step 1: Add named constants**
|
||||
|
||||
At the top of `crates/adapters/nats/src/lib.rs`, after the existing constants, add:
|
||||
|
||||
```rust
|
||||
/// Maximum delivery attempts before a message is considered exhausted.
|
||||
/// The DLQ processor picks it up after this point.
|
||||
const CONSUMER_MAX_DELIVER: i64 = 5;
|
||||
/// How long NATS waits for an ack before redelivering.
|
||||
const CONSUMER_ACK_WAIT_SECS: u64 = 30;
|
||||
/// Timeout for the spawned ack/nack async task.
|
||||
const ACK_TASK_TIMEOUT_SECS: u64 = 5;
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Replace the pull consumer config**
|
||||
|
||||
Find the `get_or_create_consumer` call (around line 119). Replace `..Default::default()` with explicit settings:
|
||||
|
||||
```rust
|
||||
let consumer = match stream
|
||||
.get_or_create_consumer(
|
||||
CONSUMER_NAME,
|
||||
jetstream::consumer::pull::Config {
|
||||
durable_name: Some(CONSUMER_NAME.to_string()),
|
||||
deliver_policy: jetstream::consumer::DeliverPolicy::New,
|
||||
ack_policy: jetstream::consumer::AckPolicy::Explicit,
|
||||
ack_wait: std::time::Duration::from_secs(CONSUMER_ACK_WAIT_SECS),
|
||||
max_deliver: CONSUMER_MAX_DELIVER,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await
|
||||
```
|
||||
|
||||
You will need to add the `DeliverPolicy` and `AckPolicy` imports. Check what's already imported at the top of the file and add if needed:
|
||||
|
||||
```rust
|
||||
use async_nats::jetstream::consumer::{AckPolicy, DeliverPolicy};
|
||||
```
|
||||
|
||||
- [ ] **Step 3: Add timeout to the ack task**
|
||||
|
||||
Find the `ack:` closure (around line 173). Replace it with a timeout-wrapped version:
|
||||
|
||||
```rust
|
||||
ack: Box::new(move || {
|
||||
let m = Arc::clone(&msg);
|
||||
tokio::spawn(async move {
|
||||
let result = tokio::time::timeout(
|
||||
std::time::Duration::from_secs(ACK_TASK_TIMEOUT_SECS),
|
||||
m.ack(),
|
||||
)
|
||||
.await;
|
||||
match result {
|
||||
Ok(Ok(())) => {}
|
||||
Ok(Err(e)) => tracing::warn!("NATS ack failed: {e}"),
|
||||
Err(_) => tracing::warn!("NATS ack timed out after {ACK_TASK_TIMEOUT_SECS}s"),
|
||||
}
|
||||
});
|
||||
}),
|
||||
```
|
||||
|
||||
- [ ] **Step 4: Add timeout to the nack task**
|
||||
|
||||
Find the `nack:` closure (around line 181). Same pattern:
|
||||
|
||||
```rust
|
||||
nack: Box::new(move || {
|
||||
let m = Arc::clone(&msg_nack);
|
||||
tokio::spawn(async move {
|
||||
let result = tokio::time::timeout(
|
||||
std::time::Duration::from_secs(ACK_TASK_TIMEOUT_SECS),
|
||||
m.ack_with(AckKind::Nak(None)),
|
||||
)
|
||||
.await;
|
||||
match result {
|
||||
Ok(Ok(())) => {}
|
||||
Ok(Err(e)) => tracing::warn!("NATS nack failed: {e}"),
|
||||
Err(_) => tracing::warn!("NATS nack timed out after {ACK_TASK_TIMEOUT_SECS}s"),
|
||||
}
|
||||
});
|
||||
}),
|
||||
```
|
||||
|
||||
- [ ] **Step 5: Expose delivery count on RawMessage**
|
||||
|
||||
Find where `RawMessage` is constructed (around line 170). Add the NATS message's delivery count before constructing it. Read the NATS message metadata via `msg.info()`:
|
||||
|
||||
```rust
|
||||
let delivery_count = msg
|
||||
.info()
|
||||
.map(|info| info.delivered)
|
||||
.unwrap_or(1) as u64;
|
||||
|
||||
let raw = RawMessage {
|
||||
subject,
|
||||
payload,
|
||||
delivery_count,
|
||||
ack: Box::new(move || { ... }),
|
||||
nack: Box::new(move || { ... }),
|
||||
};
|
||||
```
|
||||
|
||||
Note: `msg.info()` returns `Result<Info, _>` where `Info.delivered: u64`. If it's unavailable, default to 1.
|
||||
|
||||
- [ ] **Step 6: Compile check**
|
||||
|
||||
```bash
|
||||
cargo check -p nats 2>&1 | head -20
|
||||
```
|
||||
|
||||
Expected: compile errors about `RawMessage` missing `delivery_count` — that's fixed in Task 3.
|
||||
|
||||
- [ ] **Step 7: Commit is deferred to after Task 3** (they touch the same type)
|
||||
|
||||
---
|
||||
|
||||
### Task 3: Add `delivery_count` to `RawMessage` and `EventEnvelope`; ack unknown events
|
||||
|
||||
**Files:**
|
||||
- Modify: `crates/adapters/event-transport/src/lib.rs`
|
||||
- Modify: `crates/domain/src/events.rs`
|
||||
|
||||
- [ ] **Step 1: Add `delivery_count` to `RawMessage`**
|
||||
|
||||
In `crates/adapters/event-transport/src/lib.rs`, find the `RawMessage` struct (around line 48). Add the field:
|
||||
|
||||
```rust
|
||||
pub struct RawMessage {
|
||||
pub subject: String,
|
||||
pub payload: Vec<u8>,
|
||||
pub delivery_count: u64, // NEW
|
||||
pub ack: Box<dyn Fn() + Send + Sync>,
|
||||
pub nack: Box<dyn Fn() + Send + Sync>,
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Add `delivery_count` to `EventEnvelope` in domain**
|
||||
|
||||
In `crates/domain/src/events.rs`, find `EventEnvelope` (around line 83). Add the field:
|
||||
|
||||
```rust
|
||||
pub struct EventEnvelope {
|
||||
pub event: DomainEvent,
|
||||
pub delivery_count: u64, // NEW
|
||||
pub ack: Box<dyn Fn() + Send + Sync>,
|
||||
pub nack: Box<dyn Fn() + Send + Sync>,
|
||||
}
|
||||
```
|
||||
|
||||
Also update the `Debug` impl (which is manual because closures aren't Debug). Find it and add `delivery_count` to the struct debug output:
|
||||
|
||||
```rust
|
||||
impl std::fmt::Debug for EventEnvelope {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("EventEnvelope")
|
||||
.field("event", &self.event)
|
||||
.field("delivery_count", &self.delivery_count)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 3: Pass `delivery_count` through `EventConsumerAdapter`**
|
||||
|
||||
In `crates/adapters/event-transport/src/lib.rs`, find where `EventEnvelope` is constructed in the `consume()` method (around line 97). Update it:
|
||||
|
||||
```rust
|
||||
Some(Ok(EventEnvelope {
|
||||
event,
|
||||
delivery_count: msg.delivery_count, // NEW
|
||||
ack: msg.ack,
|
||||
nack: msg.nack,
|
||||
}))
|
||||
```
|
||||
|
||||
- [ ] **Step 4: Ack unknown event types instead of orphaning them**
|
||||
|
||||
In the same `consume()` method, find the `Err(e)` arm for unknown event types (around line 92):
|
||||
|
||||
```rust
|
||||
Err(e) => {
|
||||
tracing::warn!("unknown event type: {e}");
|
||||
return None;
|
||||
}
|
||||
```
|
||||
|
||||
Replace with an explicit ack before dropping:
|
||||
|
||||
```rust
|
||||
Err(e) => {
|
||||
tracing::warn!("unknown or malformed event type — acking to prevent orphan: {e}");
|
||||
(msg.ack)();
|
||||
return None;
|
||||
}
|
||||
```
|
||||
|
||||
Similarly for the deserialization error arm (around line 85):
|
||||
|
||||
```rust
|
||||
Err(e) => {
|
||||
tracing::warn!("failed to deserialize event payload — acking to prevent orphan: {e}");
|
||||
(msg.ack)();
|
||||
return None;
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 5: Update test stubs for `RawMessage`**
|
||||
|
||||
In the tests inside `event-transport/src/lib.rs`, `RawMessage` is constructed with `ack` and `nack`. Add `delivery_count: 1` to each:
|
||||
|
||||
```rust
|
||||
let msg = RawMessage {
|
||||
subject: "thoughts.created".to_string(),
|
||||
payload: self.bytes.clone(),
|
||||
delivery_count: 1,
|
||||
ack: Box::new(|| {}),
|
||||
nack: Box::new(|| {}),
|
||||
};
|
||||
```
|
||||
|
||||
Find all `RawMessage { ... }` constructions in the test module and add `delivery_count: 1`.
|
||||
|
||||
- [ ] **Step 6: Full workspace compile check**
|
||||
|
||||
```bash
|
||||
cargo check --workspace 2>&1 | head -40
|
||||
```
|
||||
|
||||
Fix any remaining construction sites for `RawMessage` or `EventEnvelope` that are missing `delivery_count`.
|
||||
|
||||
- [ ] **Step 7: Commit Tasks 2 and 3 together**
|
||||
|
||||
```bash
|
||||
git add crates/adapters/nats/src/lib.rs \
|
||||
crates/adapters/event-transport/src/lib.rs \
|
||||
crates/domain/src/events.rs
|
||||
git commit -m "fix(nats): explicit consumer config, ack timeouts, unknown-event acking, delivery_count"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 4: `failed_events` migration
|
||||
|
||||
**Files:**
|
||||
- Create: `crates/adapters/postgres/migrations/009_failed_events.sql`
|
||||
|
||||
- [ ] **Step 1: Create the migration file**
|
||||
|
||||
Create `crates/adapters/postgres/migrations/009_failed_events.sql`:
|
||||
|
||||
```sql
|
||||
CREATE TABLE failed_events (
|
||||
id UUID NOT NULL DEFAULT gen_random_uuid(),
|
||||
event_type TEXT NOT NULL,
|
||||
payload JSONB NOT NULL,
|
||||
failed_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
retry_at TIMESTAMPTZ NOT NULL,
|
||||
retry_count INT NOT NULL DEFAULT 0,
|
||||
last_error TEXT NOT NULL,
|
||||
|
||||
CONSTRAINT failed_events_pkey PRIMARY KEY (id)
|
||||
);
|
||||
|
||||
-- Partial index: only rows actively due for retry are in this index.
|
||||
CREATE INDEX failed_events_due_idx
|
||||
ON failed_events (retry_at)
|
||||
WHERE retry_count < 3;
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Verify the migration file is syntactically correct**
|
||||
|
||||
```bash
|
||||
cargo check -p postgres 2>&1 | head -10
|
||||
```
|
||||
|
||||
(The postgres crate auto-discovers migrations via `sqlx::migrate!` — the file just needs to exist with valid SQL. Syntax is validated at runtime in integration tests.)
|
||||
|
||||
- [ ] **Step 3: Commit**
|
||||
|
||||
```bash
|
||||
git add crates/adapters/postgres/migrations/009_failed_events.sql
|
||||
git commit -m "feat(db): add failed_events table for dead-letter queue"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 5: `PgFailedEventStore` — Postgres DLQ repository
|
||||
|
||||
**Files:**
|
||||
- Create: `crates/adapters/postgres/src/failed_event.rs`
|
||||
- Modify: `crates/adapters/postgres/src/lib.rs`
|
||||
|
||||
- [ ] **Step 1: Create `failed_event.rs`**
|
||||
|
||||
Create `crates/adapters/postgres/src/failed_event.rs`:
|
||||
|
||||
```rust
|
||||
use crate::db_error::IntoDbResult;
|
||||
use chrono::{DateTime, Utc};
|
||||
use sqlx::PgPool;
|
||||
|
||||
/// How many times a failed event is retried by the DLQ processor.
|
||||
pub const DLQ_MAX_RETRIES: i32 = 3;
|
||||
/// Quarantine period for the first DLQ retry (seconds). Doubles each retry.
|
||||
pub const DLQ_INITIAL_BACKOFF_SECS: i64 = 300; // 5 minutes
|
||||
/// How often the DLQ processor polls for due retries (seconds).
|
||||
pub const DLQ_POLL_INTERVAL_SECS: u64 = 60;
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
pub struct FailedEvent {
|
||||
pub id: uuid::Uuid,
|
||||
pub event_type: String,
|
||||
pub payload: serde_json::Value,
|
||||
pub failed_at: DateTime<Utc>,
|
||||
pub retry_at: DateTime<Utc>,
|
||||
pub retry_count: i32,
|
||||
pub last_error: String,
|
||||
}
|
||||
|
||||
pub struct PgFailedEventStore {
|
||||
pool: PgPool,
|
||||
}
|
||||
|
||||
impl PgFailedEventStore {
|
||||
pub fn new(pool: PgPool) -> Self {
|
||||
Self { pool }
|
||||
}
|
||||
|
||||
/// Insert a newly exhausted event into the DLQ.
|
||||
pub async fn insert(
|
||||
&self,
|
||||
event_type: &str,
|
||||
payload: &serde_json::Value,
|
||||
last_error: &str,
|
||||
) -> Result<(), sqlx::Error> {
|
||||
let retry_at = Utc::now()
|
||||
+ chrono::Duration::seconds(DLQ_INITIAL_BACKOFF_SECS);
|
||||
sqlx::query(
|
||||
"INSERT INTO failed_events \
|
||||
(event_type, payload, retry_at, last_error) \
|
||||
VALUES ($1, $2, $3, $4)",
|
||||
)
|
||||
.bind(event_type)
|
||||
.bind(payload)
|
||||
.bind(retry_at)
|
||||
.bind(last_error)
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Fetch all events due for retry (retry_at <= now, retry_count < DLQ_MAX_RETRIES).
|
||||
pub async fn poll_due(&self) -> Result<Vec<FailedEvent>, sqlx::Error> {
|
||||
sqlx::query_as::<_, FailedEvent>(
|
||||
"SELECT id, event_type, payload, failed_at, retry_at, retry_count, last_error \
|
||||
FROM failed_events \
|
||||
WHERE retry_at <= now() AND retry_count < $1 \
|
||||
ORDER BY retry_at \
|
||||
LIMIT 100",
|
||||
)
|
||||
.bind(DLQ_MAX_RETRIES)
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Advance a row after a republish attempt.
|
||||
/// Uses exponential backoff: next_retry = now + initial * 2^retry_count.
|
||||
pub async fn advance(
|
||||
&self,
|
||||
id: uuid::Uuid,
|
||||
error: Option<&str>,
|
||||
) -> Result<(), sqlx::Error> {
|
||||
// Fetch current retry_count to compute backoff.
|
||||
let current: i32 = sqlx::query_scalar(
|
||||
"SELECT retry_count FROM failed_events WHERE id = $1",
|
||||
)
|
||||
.bind(id)
|
||||
.fetch_one(&self.pool)
|
||||
.await?;
|
||||
|
||||
let new_count = current + 1;
|
||||
let backoff_secs = DLQ_INITIAL_BACKOFF_SECS * (1_i64 << new_count.min(10));
|
||||
let retry_at = Utc::now() + chrono::Duration::seconds(backoff_secs);
|
||||
let last_error = error.unwrap_or("republish succeeded");
|
||||
|
||||
sqlx::query(
|
||||
"UPDATE failed_events \
|
||||
SET retry_count = $1, retry_at = $2, last_error = $3 \
|
||||
WHERE id = $4",
|
||||
)
|
||||
.bind(new_count)
|
||||
.bind(retry_at)
|
||||
.bind(last_error)
|
||||
.bind(id)
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Park a permanently failed event (retry_count >= DLQ_MAX_RETRIES).
|
||||
/// Sets retry_at 1 year out so it falls out of the active index.
|
||||
pub async fn park_permanently(&self, id: uuid::Uuid) -> Result<(), sqlx::Error> {
|
||||
let far_future = Utc::now() + chrono::Duration::days(365);
|
||||
sqlx::query(
|
||||
"UPDATE failed_events SET retry_at = $1 WHERE id = $2",
|
||||
)
|
||||
.bind(far_future)
|
||||
.bind(id)
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Export from `postgres/src/lib.rs`**
|
||||
|
||||
In `crates/adapters/postgres/src/lib.rs`, add:
|
||||
|
||||
```rust
|
||||
pub mod failed_event;
|
||||
```
|
||||
|
||||
- [ ] **Step 3: Compile check**
|
||||
|
||||
```bash
|
||||
cargo check -p postgres 2>&1 | head -20
|
||||
```
|
||||
|
||||
Expected: 0 errors.
|
||||
|
||||
- [ ] **Step 4: Commit**
|
||||
|
||||
```bash
|
||||
git add crates/adapters/postgres/src/failed_event.rs \
|
||||
crates/adapters/postgres/src/lib.rs
|
||||
git commit -m "feat(postgres): PgFailedEventStore for dead-letter queue"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 6: DLQ processor in worker
|
||||
|
||||
**Files:**
|
||||
- Create: `crates/worker/src/dlq.rs`
|
||||
- Modify: `crates/worker/src/factory.rs`
|
||||
- Modify: `crates/worker/src/main.rs`
|
||||
|
||||
The `delivery_count` on `EventEnvelope` (added in Task 3) tells the worker when a message is on its last attempt. The main loop inserts to the DLQ when handlers fail at `delivery_count >= CONSUMER_MAX_DELIVER`. A separate background task polls the DLQ and republishes due events.
|
||||
|
||||
- [ ] **Step 1: Create `dlq.rs`**
|
||||
|
||||
Create `crates/worker/src/dlq.rs`:
|
||||
|
||||
```rust
|
||||
use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher};
|
||||
use postgres::failed_event::{
|
||||
DLQ_MAX_RETRIES, DLQ_POLL_INTERVAL_SECS, PgFailedEventStore,
|
||||
};
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Background task: polls `failed_events` and republishes due rows to the event bus.
|
||||
/// Runs on a fixed interval for the lifetime of the worker.
|
||||
pub async fn run_dlq_processor(
|
||||
store: Arc<PgFailedEventStore>,
|
||||
publisher: Arc<dyn EventPublisher>,
|
||||
) {
|
||||
let interval = std::time::Duration::from_secs(DLQ_POLL_INTERVAL_SECS);
|
||||
loop {
|
||||
tokio::time::sleep(interval).await;
|
||||
if let Err(e) = process_due(&store, &*publisher).await {
|
||||
tracing::error!("DLQ processor error: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn process_due(
|
||||
store: &PgFailedEventStore,
|
||||
publisher: &dyn EventPublisher,
|
||||
) -> Result<(), sqlx::Error> {
|
||||
let due = store.poll_due().await?;
|
||||
if due.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
tracing::info!(count = due.len(), "DLQ: processing due events");
|
||||
|
||||
for row in due {
|
||||
if row.retry_count >= DLQ_MAX_RETRIES {
|
||||
tracing::error!(
|
||||
id = %row.id,
|
||||
event_type = %row.event_type,
|
||||
retry_count = row.retry_count,
|
||||
"DLQ: event permanently failed — parking",
|
||||
);
|
||||
store.park_permanently(row.id).await?;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Attempt to republish the raw payload as a domain event.
|
||||
let republish_result = republish(&row.payload, publisher).await;
|
||||
|
||||
match republish_result {
|
||||
Ok(()) => {
|
||||
tracing::info!(id = %row.id, "DLQ: republished successfully");
|
||||
store.advance(row.id, None).await?;
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(id = %row.id, error = %e, "DLQ: republish failed");
|
||||
store.advance(row.id, Some(&e.to_string())).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn republish(
|
||||
payload: &serde_json::Value,
|
||||
publisher: &dyn EventPublisher,
|
||||
) -> Result<(), DomainError> {
|
||||
use event_payload::EventPayload;
|
||||
let ep: EventPayload = serde_json::from_value(payload.clone())
|
||||
.map_err(|e| DomainError::Internal(format!("DLQ deserialize: {e}")))?;
|
||||
let event = DomainEvent::try_from(ep)
|
||||
.map_err(|e| DomainError::Internal(format!("DLQ event conversion: {e}")))?;
|
||||
publisher.publish(&event).await
|
||||
}
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Add `PgFailedEventStore` to `factory.rs`**
|
||||
|
||||
In `crates/worker/src/factory.rs`, add to the imports:
|
||||
|
||||
```rust
|
||||
use postgres::failed_event::PgFailedEventStore;
|
||||
use std::sync::Arc;
|
||||
```
|
||||
|
||||
Then return it from `build` alongside the consumer and handlers. Change the return type to a new struct:
|
||||
|
||||
```rust
|
||||
pub struct WorkerInfra {
|
||||
pub consumer: event_transport::EventConsumerAdapter<nats::NatsMessageSource>,
|
||||
pub handlers: WorkerHandlers,
|
||||
pub dlq_store: Arc<PgFailedEventStore>,
|
||||
pub event_publisher: Arc<dyn domain::ports::EventPublisher>,
|
||||
}
|
||||
```
|
||||
|
||||
At the end of `build`, construct and return:
|
||||
|
||||
```rust
|
||||
let dlq_store = Arc::new(PgFailedEventStore::new(pool.clone()));
|
||||
|
||||
// ... existing consumer construction ...
|
||||
|
||||
WorkerInfra {
|
||||
consumer,
|
||||
handlers,
|
||||
dlq_store,
|
||||
event_publisher, // the NATS publisher already constructed in factory
|
||||
}
|
||||
```
|
||||
|
||||
Note: the factory currently doesn't return the event publisher. Add an `event_publisher` field to `WorkerInfra` and thread the existing `Arc<dyn EventPublisher>` through (it's used for the ActivityPub handler — reuse the same instance).
|
||||
|
||||
Read the existing `factory.rs` to see how the NATS publisher is currently constructed and reuse it for both the ActivityPub handler and the returned publisher.
|
||||
|
||||
- [ ] **Step 3: Update `main.rs` to use the DLQ**
|
||||
|
||||
In `crates/worker/src/main.rs`, update to use the new `WorkerInfra`:
|
||||
|
||||
```rust
|
||||
mod dlq;
|
||||
mod factory;
|
||||
mod handlers;
|
||||
|
||||
use domain::ports::EventConsumer;
|
||||
use futures::StreamExt;
|
||||
use nats::CONSUMER_MAX_DELIVER;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
dotenvy::dotenv().ok();
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
|
||||
.init();
|
||||
|
||||
let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL required");
|
||||
let nats_url = std::env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".into());
|
||||
let base_url = std::env::var("BASE_URL").expect("BASE_URL required");
|
||||
|
||||
tracing::info!("Building worker...");
|
||||
let infra = factory::build(&database_url, &base_url, &nats_url).await;
|
||||
|
||||
// Spawn DLQ processor as a background task.
|
||||
tokio::spawn(dlq::run_dlq_processor(
|
||||
infra.dlq_store.clone(),
|
||||
infra.event_publisher.clone(),
|
||||
));
|
||||
|
||||
tracing::info!("Worker started, consuming events...");
|
||||
let mut stream = infra.consumer.consume();
|
||||
while let Some(result) = stream.next().await {
|
||||
match result {
|
||||
Ok(envelope) => {
|
||||
let event = &envelope.event;
|
||||
tracing::debug!(?event, "received event");
|
||||
|
||||
let n = infra.handlers.notification.handle(event).await;
|
||||
let f = infra.handlers.federation.handle(event).await;
|
||||
|
||||
if n.is_ok() && f.is_ok() {
|
||||
(envelope.ack)();
|
||||
} else {
|
||||
// Log errors.
|
||||
if let Err(e) = &n {
|
||||
tracing::error!("notification handler: {e}");
|
||||
}
|
||||
if let Err(e) = &f {
|
||||
tracing::error!("federation handler: {e}");
|
||||
}
|
||||
|
||||
// On last delivery attempt — insert to DLQ then ack.
|
||||
// On earlier attempts — nack so NATS retries.
|
||||
if envelope.delivery_count >= CONSUMER_MAX_DELIVER as u64 {
|
||||
let error_msg = n
|
||||
.err()
|
||||
.or(f.err())
|
||||
.map(|e| e.to_string())
|
||||
.unwrap_or_else(|| "unknown error".into());
|
||||
|
||||
let payload = serde_json::to_value(&event_payload::EventPayload::from(event))
|
||||
.unwrap_or(serde_json::Value::Null);
|
||||
|
||||
let event_type = format!("{:?}", event)
|
||||
.split_whitespace()
|
||||
.next()
|
||||
.unwrap_or("Unknown")
|
||||
.to_string();
|
||||
|
||||
if let Err(e) = infra
|
||||
.dlq_store
|
||||
.insert(&event_type, &payload, &error_msg)
|
||||
.await
|
||||
{
|
||||
tracing::error!("DLQ insert failed: {e} — message lost");
|
||||
} else {
|
||||
tracing::warn!(
|
||||
event_type,
|
||||
delivery_count = envelope.delivery_count,
|
||||
"event exhausted — moved to DLQ"
|
||||
);
|
||||
}
|
||||
(envelope.ack)(); // ack from NATS — DLQ owns it now
|
||||
} else {
|
||||
(envelope.nack)(); // let NATS retry
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => tracing::error!("consumer error: {e}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Note: `CONSUMER_MAX_DELIVER` must be exported from `crates/adapters/nats/src/lib.rs`. Add `pub` to that constant in Task 2.
|
||||
|
||||
- [ ] **Step 4: Export `CONSUMER_MAX_DELIVER` from nats crate**
|
||||
|
||||
In `crates/adapters/nats/src/lib.rs`, change the constant to `pub`:
|
||||
|
||||
```rust
|
||||
pub const CONSUMER_MAX_DELIVER: i64 = 5;
|
||||
```
|
||||
|
||||
- [ ] **Step 5: Full workspace compile check**
|
||||
|
||||
```bash
|
||||
cargo check --workspace 2>&1 | head -40
|
||||
```
|
||||
|
||||
Fix all errors. Common issues:
|
||||
- Missing imports in `main.rs` for `event_payload`
|
||||
- `event` variable lifetime in the DLQ insert block — may need to clone `event`
|
||||
- `WorkerInfra` construction in `factory.rs` missing fields
|
||||
|
||||
- [ ] **Step 6: Verify tests still pass**
|
||||
|
||||
```bash
|
||||
cargo test --workspace 2>&1 | tail -5
|
||||
```
|
||||
|
||||
Expected: all tests pass.
|
||||
|
||||
- [ ] **Step 7: Commit**
|
||||
|
||||
```bash
|
||||
git add crates/worker/src/dlq.rs \
|
||||
crates/worker/src/factory.rs \
|
||||
crates/worker/src/main.rs \
|
||||
crates/adapters/nats/src/lib.rs
|
||||
git commit -m "feat(worker): DLQ processor — exhausted events moved to failed_events with exponential retry"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Self-Review
|
||||
|
||||
**Spec coverage:**
|
||||
|
||||
| Spec requirement | Task |
|
||||
|---|---|
|
||||
| `CONSUMER_MAX_DELIVER = 5` constant | Task 2 |
|
||||
| `CONSUMER_ACK_WAIT_SECS = 30` constant | Task 2 |
|
||||
| `ACK_TASK_TIMEOUT_SECS = 5` constant | Task 2 |
|
||||
| Explicit consumer config (deliver_policy, ack_policy, ack_wait, max_deliver) | Task 2 |
|
||||
| Ack task with timeout | Task 2 |
|
||||
| Nack task with timeout | Task 2 |
|
||||
| Unknown event types acked before drop | Task 3 |
|
||||
| `delivery_count` threaded through to worker | Tasks 2, 3 |
|
||||
| `failed_events` migration | Task 4 |
|
||||
| `PgFailedEventStore` (insert, poll_due, advance, park_permanently) | Task 5 |
|
||||
| DLQ processor (poll interval, exponential backoff, park permanently) | Task 6 |
|
||||
| Worker inserts to DLQ at `delivery_count >= max_deliver` | Task 6 |
|
||||
| `JWT_SECRET_MIN_BYTES = 32` constant | Task 1 |
|
||||
| Panic on weak secret at startup | Task 1 |
|
||||
| `JWT_TTL_SECS = 86_400` (24h) | Task 1 |
|
||||
| Timing equalization on failed login | Task 1 |
|
||||
|
||||
**No placeholders found.**
|
||||
|
||||
**Type consistency:** `CONSUMER_MAX_DELIVER: i64` in Task 2; cast to `u64` for comparison with `envelope.delivery_count: u64` in Task 6 (`>= CONSUMER_MAX_DELIVER as u64`). Consistent. `DLQ_MAX_RETRIES: i32` matches `retry_count: i32` in `FailedEvent`. `DLQ_INITIAL_BACKOFF_SECS: i64` used with `chrono::Duration::seconds(i64)`. All consistent.
|
||||
@@ -1,80 +0,0 @@
|
||||
# FeedEntry Decoupling Design
|
||||
|
||||
**Goal:** Fix search viewer context (functional), restructure `FeedEntry` for clarity (structural), and make viewer presence explicit via `Option<ViewerContext>` (type-safe).
|
||||
|
||||
**Priority:** C (search fix) → B (struct clarity) → A (type safety). All three land in one pass.
|
||||
|
||||
---
|
||||
|
||||
## Data Model
|
||||
|
||||
Replace flat fields on `FeedEntry` with two named sub-structs in `crates/domain/src/models/feed.rs`:
|
||||
|
||||
```rust
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct EngagementStats {
|
||||
pub like_count: i64,
|
||||
pub boost_count: i64,
|
||||
pub reply_count: i64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ViewerContext {
|
||||
pub liked: bool,
|
||||
pub boosted: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct FeedEntry {
|
||||
pub thought: Thought,
|
||||
pub author: User,
|
||||
pub stats: EngagementStats,
|
||||
pub viewer: Option<ViewerContext>, // None when no authenticated viewer
|
||||
}
|
||||
```
|
||||
|
||||
`viewer: None` means the request was anonymous or viewer state is unavailable (e.g. search without auth). `viewer: Some(ViewerContext { liked: false, boosted: false })` means a viewer is known and they have not liked or boosted the thought. These two states are now distinct at the type level.
|
||||
|
||||
---
|
||||
|
||||
## Search Adapter Fix
|
||||
|
||||
`SearchPort::search_thoughts` already accepts `viewer_id: Option<&UserId>` but `postgres-search/src/lib.rs` ignores it, always hardcoding `false` for viewer fields.
|
||||
|
||||
Fix: conditionally inject EXISTS subqueries into the search SQL, identical to the pattern used in `postgres/src/feed.rs`:
|
||||
|
||||
```sql
|
||||
-- viewer_id = None (anonymous)
|
||||
false AS liked_by_viewer,
|
||||
false AS boosted_by_viewer
|
||||
|
||||
-- viewer_id = Some(uid)
|
||||
EXISTS(SELECT 1 FROM likes WHERE user_id='{uid}' AND thought_id=t.id) AS liked_by_viewer,
|
||||
EXISTS(SELECT 1 FROM boosts WHERE user_id='{uid}' AND thought_id=t.id) AS boosted_by_viewer
|
||||
```
|
||||
|
||||
The `FeedRow` struct in postgres-search already has `liked_by_viewer: bool` and `boosted_by_viewer: bool` columns — they just need to be populated correctly. No schema change required.
|
||||
|
||||
---
|
||||
|
||||
## Callsite Migration
|
||||
|
||||
| File | Change |
|
||||
|---|---|
|
||||
| `crates/domain/src/models/feed.rs` | Replace flat stats/viewer fields with `EngagementStats` and `Option<ViewerContext>` |
|
||||
| `crates/adapters/postgres/src/feed.rs` — `row_to_entry` | Construct `EngagementStats { ... }` and `viewer: Some/None` based on `FeedRow` |
|
||||
| `crates/adapters/postgres-search/src/lib.rs` — `row_to_entry` + SQL | Fix SQL to use viewer_id; build `Option<ViewerContext>` from result |
|
||||
| `crates/presentation/src/handlers/feed.rs` — `to_thought_response` | `e.stats.like_count`, `e.viewer.as_ref().map(|v| v.liked).unwrap_or(false)` |
|
||||
| `crates/domain/src/testing.rs` — `TestStore` feed impl | Build `FeedEntry` with `stats:` and `viewer:` fields |
|
||||
|
||||
`ThoughtResponse` in `api-types/src/responses.rs` keeps `liked_by_viewer: bool` and `boosted_by_viewer: bool` — the wire format is unchanged. `viewer: None` serialises as `false` in `to_thought_response`.
|
||||
|
||||
---
|
||||
|
||||
## What Does Not Change
|
||||
|
||||
- `FeedRepository` port signatures (still returns `Paginated<FeedEntry>`)
|
||||
- HTTP response shape (`ThoughtResponse`)
|
||||
- Database schema
|
||||
- Pagination, filtering, or query logic
|
||||
- Any code path that doesn't touch `FeedEntry` fields directly
|
||||
@@ -1,193 +0,0 @@
|
||||
# NATS Hardening, Dead-Letter Queue & Auth Security Design
|
||||
|
||||
**Goal:** Fix five reliability issues in the NATS adapter, introduce an automatic-retry dead-letter queue backed by Postgres, and close three auth security gaps.
|
||||
|
||||
---
|
||||
|
||||
## Section 1: NATS Consumer Hardening
|
||||
|
||||
### Consumer configuration
|
||||
|
||||
All magic numbers become named constants in `crates/adapters/nats/src/lib.rs`:
|
||||
|
||||
```rust
|
||||
const CONSUMER_MAX_DELIVER: i64 = 5;
|
||||
const CONSUMER_ACK_WAIT_SECS: u64 = 30;
|
||||
const ACK_TASK_TIMEOUT_SECS: u64 = 5;
|
||||
```
|
||||
|
||||
The pull consumer config changes from `..Default::default()` to explicit settings:
|
||||
|
||||
```rust
|
||||
pull::Config {
|
||||
durable_name: Some(CONSUMER_NAME.to_string()),
|
||||
deliver_policy: DeliverPolicy::New,
|
||||
ack_policy: AckPolicy::Explicit,
|
||||
ack_wait: Duration::from_secs(CONSUMER_ACK_WAIT_SECS),
|
||||
max_deliver: CONSUMER_MAX_DELIVER,
|
||||
..Default::default()
|
||||
}
|
||||
```
|
||||
|
||||
- `DeliverPolicy::New` — worker restarts from the current position, not from the beginning of the stream
|
||||
- `AckPolicy::Explicit` — explicit (already the default, but now documented)
|
||||
- `ack_wait` — if the worker hangs for 30s without acking, NATS redelivers
|
||||
- `max_deliver` — after 5 failed deliveries the message is exhausted; the DLQ picks it up
|
||||
|
||||
### Ack task timeout
|
||||
|
||||
Spawned ack/nack tasks currently have no timeout. If NATS is stuck, they hang forever. Wrap with `tokio::time::timeout`:
|
||||
|
||||
```rust
|
||||
ack: Box::new(move || {
|
||||
let m = Arc::clone(&msg);
|
||||
tokio::spawn(async move {
|
||||
let result = tokio::time::timeout(
|
||||
Duration::from_secs(ACK_TASK_TIMEOUT_SECS),
|
||||
m.ack(),
|
||||
).await;
|
||||
match result {
|
||||
Ok(Ok(())) => {}
|
||||
Ok(Err(e)) => tracing::warn!("NATS ack failed: {e}"),
|
||||
Err(_) => tracing::warn!("NATS ack timed out"),
|
||||
}
|
||||
});
|
||||
}),
|
||||
```
|
||||
|
||||
Same pattern for nack.
|
||||
|
||||
### Unknown event type acking
|
||||
|
||||
Currently unknown event types are silently dropped via `filter_map` and never acked — they orphan in the stream until `max_deliver` is exceeded. Fix: ack unknown messages explicitly before discarding:
|
||||
|
||||
In `event-transport/src/lib.rs`, when deserialization fails, ack the raw NATS message before returning `None`:
|
||||
|
||||
```rust
|
||||
Err(e) => {
|
||||
tracing::warn!("unknown or malformed event, acking to prevent orphan: {e}");
|
||||
// ack the message so it doesn't loop forever
|
||||
msg.ack();
|
||||
None
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Section 2: Dead-Letter Queue
|
||||
|
||||
### Schema
|
||||
|
||||
New migration `009_failed_events.sql`:
|
||||
|
||||
```sql
|
||||
CREATE TABLE failed_events (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
event_type TEXT NOT NULL,
|
||||
payload JSONB NOT NULL,
|
||||
failed_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
retry_at TIMESTAMPTZ NOT NULL,
|
||||
retry_count INT NOT NULL DEFAULT 0,
|
||||
last_error TEXT NOT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX failed_events_retry_at_idx ON failed_events (retry_at)
|
||||
WHERE retry_count < 3;
|
||||
```
|
||||
|
||||
### Constants
|
||||
|
||||
In `crates/adapters/nats/src/lib.rs` (or a new `dlq.rs` module):
|
||||
|
||||
```rust
|
||||
const DLQ_INITIAL_BACKOFF_SECS: u64 = 300; // 5 minutes
|
||||
const DLQ_MAX_RETRIES: i32 = 3;
|
||||
const DLQ_POLL_INTERVAL_SECS: u64 = 60; // check every minute
|
||||
```
|
||||
|
||||
### Worker flow
|
||||
|
||||
**On exhausted message** (detected when `num_delivered >= CONSUMER_MAX_DELIVER`):
|
||||
1. Worker inserts row to `failed_events` with `retry_at = now() + DLQ_INITIAL_BACKOFF_SECS`
|
||||
2. Worker **acks** the NATS message (removes it from the stream)
|
||||
3. Message will be retried by the DLQ processor, not by NATS
|
||||
|
||||
**DlqProcessor** — runs in the worker on a `DLQ_POLL_INTERVAL_SECS` tick:
|
||||
1. Query: `SELECT * FROM failed_events WHERE retry_at <= now() AND retry_count < DLQ_MAX_RETRIES`
|
||||
2. For each row: republish the `payload` JSONB to the NATS `thoughts-events` main stream
|
||||
3. Update row: `retry_count += 1`, `retry_at = now() + DLQ_INITIAL_BACKOFF_SECS * 2^retry_count` (exponential backoff: 5m, 10m, 20m)
|
||||
4. If republish fails: update `last_error`, leave row for next poll
|
||||
5. When the republished message is processed successfully by the worker, the event handler completes normally — the `failed_events` row is deleted on success (see below)
|
||||
|
||||
**On DLQ retry success detection**: After republishing, the DLQ processor subscribes to the ack signal OR the processor marks rows as `retry_count = DLQ_MAX_RETRIES` optimistically and lets the event handler delete the row if the event type matches. Simpler: the DLQ row is deleted when `retry_count` reaches the threshold and the message is republished for the final time. If the final attempt also fails, it stays in the table as a permanently failed record with `retry_count = DLQ_MAX_RETRIES` for manual inspection.
|
||||
|
||||
### Permanently failed messages
|
||||
|
||||
Rows with `retry_count >= DLQ_MAX_RETRIES AND retry_at <= now()` are permanently failed. The DLQ processor:
|
||||
- Logs them at `ERROR` level with full payload
|
||||
- Sets `retry_at = now() + 365 days` (parking them out of the active query range)
|
||||
- Does NOT delete them — they remain visible for manual inspection
|
||||
|
||||
A future admin endpoint can query and replay them, but that is out of scope for this spec.
|
||||
|
||||
---
|
||||
|
||||
## Section 3: Auth Hardening
|
||||
|
||||
### JWT secret validation
|
||||
|
||||
In `crates/bootstrap/src/factory.rs`, before constructing `JwtAuthService`:
|
||||
|
||||
```rust
|
||||
const JWT_SECRET_MIN_BYTES: usize = 32;
|
||||
|
||||
if cfg.jwt_secret.len() < JWT_SECRET_MIN_BYTES {
|
||||
panic!(
|
||||
"JWT_SECRET is too short ({} bytes). \
|
||||
Minimum is {} bytes for HS256 security.",
|
||||
cfg.jwt_secret.len(),
|
||||
JWT_SECRET_MIN_BYTES
|
||||
);
|
||||
}
|
||||
```
|
||||
|
||||
Startup panics are appropriate here — running with a weak secret is a security failure.
|
||||
|
||||
### Timing equalization on failed login
|
||||
|
||||
In `crates/application/src/use_cases/auth.rs`, in the `login` function, when the user is not found by email:
|
||||
|
||||
```rust
|
||||
fn dummy_hash() -> argon2::PasswordHash<'static> {
|
||||
// Pre-computed Argon2 hash of empty string. Used only to equalize timing
|
||||
// on failed lookups so attackers cannot enumerate valid emails.
|
||||
argon2::PasswordHash::new(
|
||||
"$argon2id$v=19$m=19456,t=2,p=1$\
|
||||
AAAAAAAAAAAAAAAAAAAAAA$\
|
||||
AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
|
||||
).expect("dummy hash is valid")
|
||||
}
|
||||
|
||||
// In login():
|
||||
if user.is_none() {
|
||||
let _ = Argon2::default().verify_password(plain.as_bytes(), &dummy_hash());
|
||||
return Err(DomainError::Unauthorized);
|
||||
}
|
||||
```
|
||||
|
||||
### JWT TTL reduction
|
||||
|
||||
In `crates/bootstrap/src/factory.rs`, the existing `JWT_TTL_SECS` constant:
|
||||
|
||||
```rust
|
||||
const JWT_TTL_SECS: i64 = 86_400; // 24 hours (was 30 days)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## What does NOT change
|
||||
|
||||
- NATS subject naming (`thoughts-events.>`) — unchanged
|
||||
- `MAX_MESSAGES` stream limit (100k) — unchanged; monitoring is out of scope
|
||||
- API surface, domain events, application layer — unchanged
|
||||
- Auth extractor, claims structure (`sub`, `exp`) — unchanged
|
||||
Reference in New Issue
Block a user