837 lines
26 KiB
Markdown
837 lines
26 KiB
Markdown
# NATS Hardening, Dead-Letter Queue & Auth Security
|
||
|
||
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
||
|
||
**Goal:** Harden the NATS consumer (explicit config, ack timeouts, unknown-event acking), add an automatic-retry dead-letter queue backed by Postgres, and close three auth security gaps (weak secret validation, timing oracle, excessive JWT TTL).
|
||
|
||
**Architecture:** Seven sequential tasks. Tasks 1–3 are independent of each other and of 4–7 — they can be reviewed in any order. Tasks 4–7 form a dependency chain: delivery-count metadata (Task 4) → migration (Task 5) → Postgres store (Task 6) → worker DLQ loop (Task 7).
|
||
|
||
**Tech Stack:** Rust, Tokio, async-nats 0.48, SQLx, Postgres, argon2, jsonwebtoken.
|
||
|
||
---
|
||
|
||
### Task 1: Auth hardening — secret validation, timing equalization, TTL
|
||
|
||
**Files:**
|
||
- Modify: `crates/bootstrap/src/factory.rs`
|
||
- Modify: `crates/application/src/use_cases/auth.rs`
|
||
- Modify: `crates/adapters/auth/src/lib.rs`
|
||
|
||
- [ ] **Step 1: Replace the magic JWT TTL constant and add secret length check in `factory.rs`**
|
||
|
||
In `crates/bootstrap/src/factory.rs`, find the current `const JWT_TTL_SECS: i64 = 86_400 * 30;` at the top of the file. Replace it and add a secret minimum constant, then add validation before the `JwtAuthService` is constructed:
|
||
|
||
```rust
|
||
const JWT_TTL_SECS: i64 = 86_400; // 24 hours
|
||
const JWT_SECRET_MIN_BYTES: usize = 32; // 256 bits minimum for HS256
|
||
```
|
||
|
||
Then, just before `auth: Arc::new(auth::JwtAuthService::new(...))` in the `build` function, add:
|
||
|
||
```rust
|
||
if cfg.jwt_secret.len() < JWT_SECRET_MIN_BYTES {
|
||
panic!(
|
||
"JWT_SECRET is {} bytes — minimum is {} bytes for HS256 security",
|
||
cfg.jwt_secret.len(),
|
||
JWT_SECRET_MIN_BYTES,
|
||
);
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 2: Update the auth test to use a 32-byte secret**
|
||
|
||
In `crates/adapters/auth/src/lib.rs`, find the tests that use `"secret".into()`:
|
||
|
||
```rust
|
||
// Before (lines ~98, ~107):
|
||
let svc = JwtAuthService::new("secret".into(), 3600);
|
||
|
||
// After (use 32+ byte secret):
|
||
let svc = JwtAuthService::new("a-secret-that-is-at-least-32-bytes".into(), 3600);
|
||
```
|
||
|
||
Update both test cases (`generate_and_validate_token` and `invalid_token_returns_unauthorized`).
|
||
|
||
- [ ] **Step 3: Add timing equalization to the login use case**
|
||
|
||
In `crates/application/src/use_cases/auth.rs`, find the `login` function. Currently it early-returns when the user is not found:
|
||
|
||
```rust
|
||
let user = users
|
||
.find_by_email(&email)
|
||
.await?
|
||
.ok_or(DomainError::Unauthorized)?;
|
||
```
|
||
|
||
Replace with a timing-safe version that runs the hasher even when no user is found:
|
||
|
||
```rust
|
||
let user = users.find_by_email(&email).await?;
|
||
if user.is_none() {
|
||
// Timing equalization — prevents email enumeration via response-time oracle.
|
||
// Running the hasher on a miss makes "no such user" take the same time as
|
||
// "wrong password", so attackers cannot distinguish the two cases.
|
||
let _ = hasher.hash(&input.password).await;
|
||
return Err(DomainError::Unauthorized);
|
||
}
|
||
let user = user.unwrap();
|
||
```
|
||
|
||
- [ ] **Step 4: Compile check**
|
||
|
||
```bash
|
||
cargo check --workspace 2>&1 | head -20
|
||
```
|
||
|
||
Expected: 0 errors.
|
||
|
||
- [ ] **Step 5: Commit**
|
||
|
||
```bash
|
||
git add crates/bootstrap/src/factory.rs \
|
||
crates/application/src/use_cases/auth.rs \
|
||
crates/adapters/auth/src/lib.rs
|
||
git commit -m "fix(auth): validate JWT secret length, equalize login timing, reduce TTL to 24h"
|
||
```
|
||
|
||
---
|
||
|
||
### Task 2: NATS consumer hardening — explicit config + ack timeouts
|
||
|
||
**Files:**
|
||
- Modify: `crates/adapters/nats/src/lib.rs`
|
||
|
||
- [ ] **Step 1: Add named constants**
|
||
|
||
At the top of `crates/adapters/nats/src/lib.rs`, after the existing constants, add:
|
||
|
||
```rust
|
||
/// Maximum delivery attempts before a message is considered exhausted.
|
||
/// The DLQ processor picks it up after this point.
|
||
const CONSUMER_MAX_DELIVER: i64 = 5;
|
||
/// How long NATS waits for an ack before redelivering.
|
||
const CONSUMER_ACK_WAIT_SECS: u64 = 30;
|
||
/// Timeout for the spawned ack/nack async task.
|
||
const ACK_TASK_TIMEOUT_SECS: u64 = 5;
|
||
```
|
||
|
||
- [ ] **Step 2: Replace the pull consumer config**
|
||
|
||
Find the `get_or_create_consumer` call (around line 119). Replace `..Default::default()` with explicit settings:
|
||
|
||
```rust
|
||
let consumer = match stream
|
||
.get_or_create_consumer(
|
||
CONSUMER_NAME,
|
||
jetstream::consumer::pull::Config {
|
||
durable_name: Some(CONSUMER_NAME.to_string()),
|
||
deliver_policy: jetstream::consumer::DeliverPolicy::New,
|
||
ack_policy: jetstream::consumer::AckPolicy::Explicit,
|
||
ack_wait: std::time::Duration::from_secs(CONSUMER_ACK_WAIT_SECS),
|
||
max_deliver: CONSUMER_MAX_DELIVER,
|
||
..Default::default()
|
||
},
|
||
)
|
||
.await
|
||
```
|
||
|
||
You will need to add the `DeliverPolicy` and `AckPolicy` imports. Check what's already imported at the top of the file and add if needed:
|
||
|
||
```rust
|
||
use async_nats::jetstream::consumer::{AckPolicy, DeliverPolicy};
|
||
```
|
||
|
||
- [ ] **Step 3: Add timeout to the ack task**
|
||
|
||
Find the `ack:` closure (around line 173). Replace it with a timeout-wrapped version:
|
||
|
||
```rust
|
||
ack: Box::new(move || {
|
||
let m = Arc::clone(&msg);
|
||
tokio::spawn(async move {
|
||
let result = tokio::time::timeout(
|
||
std::time::Duration::from_secs(ACK_TASK_TIMEOUT_SECS),
|
||
m.ack(),
|
||
)
|
||
.await;
|
||
match result {
|
||
Ok(Ok(())) => {}
|
||
Ok(Err(e)) => tracing::warn!("NATS ack failed: {e}"),
|
||
Err(_) => tracing::warn!("NATS ack timed out after {ACK_TASK_TIMEOUT_SECS}s"),
|
||
}
|
||
});
|
||
}),
|
||
```
|
||
|
||
- [ ] **Step 4: Add timeout to the nack task**
|
||
|
||
Find the `nack:` closure (around line 181). Same pattern:
|
||
|
||
```rust
|
||
nack: Box::new(move || {
|
||
let m = Arc::clone(&msg_nack);
|
||
tokio::spawn(async move {
|
||
let result = tokio::time::timeout(
|
||
std::time::Duration::from_secs(ACK_TASK_TIMEOUT_SECS),
|
||
m.ack_with(AckKind::Nak(None)),
|
||
)
|
||
.await;
|
||
match result {
|
||
Ok(Ok(())) => {}
|
||
Ok(Err(e)) => tracing::warn!("NATS nack failed: {e}"),
|
||
Err(_) => tracing::warn!("NATS nack timed out after {ACK_TASK_TIMEOUT_SECS}s"),
|
||
}
|
||
});
|
||
}),
|
||
```
|
||
|
||
- [ ] **Step 5: Expose delivery count on RawMessage**
|
||
|
||
Find where `RawMessage` is constructed (around line 170). Add the NATS message's delivery count before constructing it. Read the NATS message metadata via `msg.info()`:
|
||
|
||
```rust
|
||
let delivery_count = msg
|
||
.info()
|
||
.map(|info| info.delivered)
|
||
.unwrap_or(1) as u64;
|
||
|
||
let raw = RawMessage {
|
||
subject,
|
||
payload,
|
||
delivery_count,
|
||
ack: Box::new(move || { ... }),
|
||
nack: Box::new(move || { ... }),
|
||
};
|
||
```
|
||
|
||
Note: `msg.info()` returns `Result<Info, _>` where `Info.delivered: u64`. If it's unavailable, default to 1.
|
||
|
||
- [ ] **Step 6: Compile check**
|
||
|
||
```bash
|
||
cargo check -p nats 2>&1 | head -20
|
||
```
|
||
|
||
Expected: compile errors about `RawMessage` missing `delivery_count` — that's fixed in Task 3.
|
||
|
||
- [ ] **Step 7: Commit is deferred to after Task 3** (they touch the same type)
|
||
|
||
---
|
||
|
||
### Task 3: Add `delivery_count` to `RawMessage` and `EventEnvelope`; ack unknown events
|
||
|
||
**Files:**
|
||
- Modify: `crates/adapters/event-transport/src/lib.rs`
|
||
- Modify: `crates/domain/src/events.rs`
|
||
|
||
- [ ] **Step 1: Add `delivery_count` to `RawMessage`**
|
||
|
||
In `crates/adapters/event-transport/src/lib.rs`, find the `RawMessage` struct (around line 48). Add the field:
|
||
|
||
```rust
|
||
pub struct RawMessage {
|
||
pub subject: String,
|
||
pub payload: Vec<u8>,
|
||
pub delivery_count: u64, // NEW
|
||
pub ack: Box<dyn Fn() + Send + Sync>,
|
||
pub nack: Box<dyn Fn() + Send + Sync>,
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 2: Add `delivery_count` to `EventEnvelope` in domain**
|
||
|
||
In `crates/domain/src/events.rs`, find `EventEnvelope` (around line 83). Add the field:
|
||
|
||
```rust
|
||
pub struct EventEnvelope {
|
||
pub event: DomainEvent,
|
||
pub delivery_count: u64, // NEW
|
||
pub ack: Box<dyn Fn() + Send + Sync>,
|
||
pub nack: Box<dyn Fn() + Send + Sync>,
|
||
}
|
||
```
|
||
|
||
Also update the `Debug` impl (which is manual because closures aren't Debug). Find it and add `delivery_count` to the struct debug output:
|
||
|
||
```rust
|
||
impl std::fmt::Debug for EventEnvelope {
|
||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||
f.debug_struct("EventEnvelope")
|
||
.field("event", &self.event)
|
||
.field("delivery_count", &self.delivery_count)
|
||
.finish()
|
||
}
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 3: Pass `delivery_count` through `EventConsumerAdapter`**
|
||
|
||
In `crates/adapters/event-transport/src/lib.rs`, find where `EventEnvelope` is constructed in the `consume()` method (around line 97). Update it:
|
||
|
||
```rust
|
||
Some(Ok(EventEnvelope {
|
||
event,
|
||
delivery_count: msg.delivery_count, // NEW
|
||
ack: msg.ack,
|
||
nack: msg.nack,
|
||
}))
|
||
```
|
||
|
||
- [ ] **Step 4: Ack unknown event types instead of orphaning them**
|
||
|
||
In the same `consume()` method, find the `Err(e)` arm for unknown event types (around line 92):
|
||
|
||
```rust
|
||
Err(e) => {
|
||
tracing::warn!("unknown event type: {e}");
|
||
return None;
|
||
}
|
||
```
|
||
|
||
Replace with an explicit ack before dropping:
|
||
|
||
```rust
|
||
Err(e) => {
|
||
tracing::warn!("unknown or malformed event type — acking to prevent orphan: {e}");
|
||
(msg.ack)();
|
||
return None;
|
||
}
|
||
```
|
||
|
||
Similarly for the deserialization error arm (around line 85):
|
||
|
||
```rust
|
||
Err(e) => {
|
||
tracing::warn!("failed to deserialize event payload — acking to prevent orphan: {e}");
|
||
(msg.ack)();
|
||
return None;
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 5: Update test stubs for `RawMessage`**
|
||
|
||
In the tests inside `event-transport/src/lib.rs`, `RawMessage` is constructed with `ack` and `nack`. Add `delivery_count: 1` to each:
|
||
|
||
```rust
|
||
let msg = RawMessage {
|
||
subject: "thoughts.created".to_string(),
|
||
payload: self.bytes.clone(),
|
||
delivery_count: 1,
|
||
ack: Box::new(|| {}),
|
||
nack: Box::new(|| {}),
|
||
};
|
||
```
|
||
|
||
Find all `RawMessage { ... }` constructions in the test module and add `delivery_count: 1`.
|
||
|
||
- [ ] **Step 6: Full workspace compile check**
|
||
|
||
```bash
|
||
cargo check --workspace 2>&1 | head -40
|
||
```
|
||
|
||
Fix any remaining construction sites for `RawMessage` or `EventEnvelope` that are missing `delivery_count`.
|
||
|
||
- [ ] **Step 7: Commit Tasks 2 and 3 together**
|
||
|
||
```bash
|
||
git add crates/adapters/nats/src/lib.rs \
|
||
crates/adapters/event-transport/src/lib.rs \
|
||
crates/domain/src/events.rs
|
||
git commit -m "fix(nats): explicit consumer config, ack timeouts, unknown-event acking, delivery_count"
|
||
```
|
||
|
||
---
|
||
|
||
### Task 4: `failed_events` migration
|
||
|
||
**Files:**
|
||
- Create: `crates/adapters/postgres/migrations/009_failed_events.sql`
|
||
|
||
- [ ] **Step 1: Create the migration file**
|
||
|
||
Create `crates/adapters/postgres/migrations/009_failed_events.sql`:
|
||
|
||
```sql
|
||
CREATE TABLE failed_events (
|
||
id UUID NOT NULL DEFAULT gen_random_uuid(),
|
||
event_type TEXT NOT NULL,
|
||
payload JSONB NOT NULL,
|
||
failed_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||
retry_at TIMESTAMPTZ NOT NULL,
|
||
retry_count INT NOT NULL DEFAULT 0,
|
||
last_error TEXT NOT NULL,
|
||
|
||
CONSTRAINT failed_events_pkey PRIMARY KEY (id)
|
||
);
|
||
|
||
-- Partial index: only rows actively due for retry are in this index.
|
||
CREATE INDEX failed_events_due_idx
|
||
ON failed_events (retry_at)
|
||
WHERE retry_count < 3;
|
||
```
|
||
|
||
- [ ] **Step 2: Verify the migration file is syntactically correct**
|
||
|
||
```bash
|
||
cargo check -p postgres 2>&1 | head -10
|
||
```
|
||
|
||
(The postgres crate auto-discovers migrations via `sqlx::migrate!` — the file just needs to exist with valid SQL. Syntax is validated at runtime in integration tests.)
|
||
|
||
- [ ] **Step 3: Commit**
|
||
|
||
```bash
|
||
git add crates/adapters/postgres/migrations/009_failed_events.sql
|
||
git commit -m "feat(db): add failed_events table for dead-letter queue"
|
||
```
|
||
|
||
---
|
||
|
||
### Task 5: `PgFailedEventStore` — Postgres DLQ repository
|
||
|
||
**Files:**
|
||
- Create: `crates/adapters/postgres/src/failed_event.rs`
|
||
- Modify: `crates/adapters/postgres/src/lib.rs`
|
||
|
||
- [ ] **Step 1: Create `failed_event.rs`**
|
||
|
||
Create `crates/adapters/postgres/src/failed_event.rs`:
|
||
|
||
```rust
|
||
use crate::db_error::IntoDbResult;
|
||
use chrono::{DateTime, Utc};
|
||
use sqlx::PgPool;
|
||
|
||
/// How many times a failed event is retried by the DLQ processor.
|
||
pub const DLQ_MAX_RETRIES: i32 = 3;
|
||
/// Quarantine period for the first DLQ retry (seconds). Doubles each retry.
|
||
pub const DLQ_INITIAL_BACKOFF_SECS: i64 = 300; // 5 minutes
|
||
/// How often the DLQ processor polls for due retries (seconds).
|
||
pub const DLQ_POLL_INTERVAL_SECS: u64 = 60;
|
||
|
||
#[derive(sqlx::FromRow)]
|
||
pub struct FailedEvent {
|
||
pub id: uuid::Uuid,
|
||
pub event_type: String,
|
||
pub payload: serde_json::Value,
|
||
pub failed_at: DateTime<Utc>,
|
||
pub retry_at: DateTime<Utc>,
|
||
pub retry_count: i32,
|
||
pub last_error: String,
|
||
}
|
||
|
||
pub struct PgFailedEventStore {
|
||
pool: PgPool,
|
||
}
|
||
|
||
impl PgFailedEventStore {
|
||
pub fn new(pool: PgPool) -> Self {
|
||
Self { pool }
|
||
}
|
||
|
||
/// Insert a newly exhausted event into the DLQ.
|
||
pub async fn insert(
|
||
&self,
|
||
event_type: &str,
|
||
payload: &serde_json::Value,
|
||
last_error: &str,
|
||
) -> Result<(), sqlx::Error> {
|
||
let retry_at = Utc::now()
|
||
+ chrono::Duration::seconds(DLQ_INITIAL_BACKOFF_SECS);
|
||
sqlx::query(
|
||
"INSERT INTO failed_events \
|
||
(event_type, payload, retry_at, last_error) \
|
||
VALUES ($1, $2, $3, $4)",
|
||
)
|
||
.bind(event_type)
|
||
.bind(payload)
|
||
.bind(retry_at)
|
||
.bind(last_error)
|
||
.execute(&self.pool)
|
||
.await?;
|
||
Ok(())
|
||
}
|
||
|
||
/// Fetch all events due for retry (retry_at <= now, retry_count < DLQ_MAX_RETRIES).
|
||
pub async fn poll_due(&self) -> Result<Vec<FailedEvent>, sqlx::Error> {
|
||
sqlx::query_as::<_, FailedEvent>(
|
||
"SELECT id, event_type, payload, failed_at, retry_at, retry_count, last_error \
|
||
FROM failed_events \
|
||
WHERE retry_at <= now() AND retry_count < $1 \
|
||
ORDER BY retry_at \
|
||
LIMIT 100",
|
||
)
|
||
.bind(DLQ_MAX_RETRIES)
|
||
.fetch_all(&self.pool)
|
||
.await
|
||
}
|
||
|
||
/// Advance a row after a republish attempt.
|
||
/// Uses exponential backoff: next_retry = now + initial * 2^retry_count.
|
||
pub async fn advance(
|
||
&self,
|
||
id: uuid::Uuid,
|
||
error: Option<&str>,
|
||
) -> Result<(), sqlx::Error> {
|
||
// Fetch current retry_count to compute backoff.
|
||
let current: i32 = sqlx::query_scalar(
|
||
"SELECT retry_count FROM failed_events WHERE id = $1",
|
||
)
|
||
.bind(id)
|
||
.fetch_one(&self.pool)
|
||
.await?;
|
||
|
||
let new_count = current + 1;
|
||
let backoff_secs = DLQ_INITIAL_BACKOFF_SECS * (1_i64 << new_count.min(10));
|
||
let retry_at = Utc::now() + chrono::Duration::seconds(backoff_secs);
|
||
let last_error = error.unwrap_or("republish succeeded");
|
||
|
||
sqlx::query(
|
||
"UPDATE failed_events \
|
||
SET retry_count = $1, retry_at = $2, last_error = $3 \
|
||
WHERE id = $4",
|
||
)
|
||
.bind(new_count)
|
||
.bind(retry_at)
|
||
.bind(last_error)
|
||
.bind(id)
|
||
.execute(&self.pool)
|
||
.await?;
|
||
Ok(())
|
||
}
|
||
|
||
/// Park a permanently failed event (retry_count >= DLQ_MAX_RETRIES).
|
||
/// Sets retry_at 1 year out so it falls out of the active index.
|
||
pub async fn park_permanently(&self, id: uuid::Uuid) -> Result<(), sqlx::Error> {
|
||
let far_future = Utc::now() + chrono::Duration::days(365);
|
||
sqlx::query(
|
||
"UPDATE failed_events SET retry_at = $1 WHERE id = $2",
|
||
)
|
||
.bind(far_future)
|
||
.bind(id)
|
||
.execute(&self.pool)
|
||
.await?;
|
||
Ok(())
|
||
}
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 2: Export from `postgres/src/lib.rs`**
|
||
|
||
In `crates/adapters/postgres/src/lib.rs`, add:
|
||
|
||
```rust
|
||
pub mod failed_event;
|
||
```
|
||
|
||
- [ ] **Step 3: Compile check**
|
||
|
||
```bash
|
||
cargo check -p postgres 2>&1 | head -20
|
||
```
|
||
|
||
Expected: 0 errors.
|
||
|
||
- [ ] **Step 4: Commit**
|
||
|
||
```bash
|
||
git add crates/adapters/postgres/src/failed_event.rs \
|
||
crates/adapters/postgres/src/lib.rs
|
||
git commit -m "feat(postgres): PgFailedEventStore for dead-letter queue"
|
||
```
|
||
|
||
---
|
||
|
||
### Task 6: DLQ processor in worker
|
||
|
||
**Files:**
|
||
- Create: `crates/worker/src/dlq.rs`
|
||
- Modify: `crates/worker/src/factory.rs`
|
||
- Modify: `crates/worker/src/main.rs`
|
||
|
||
The `delivery_count` on `EventEnvelope` (added in Task 3) tells the worker when a message is on its last attempt. The main loop inserts to the DLQ when handlers fail at `delivery_count >= CONSUMER_MAX_DELIVER`. A separate background task polls the DLQ and republishes due events.
|
||
|
||
- [ ] **Step 1: Create `dlq.rs`**
|
||
|
||
Create `crates/worker/src/dlq.rs`:
|
||
|
||
```rust
|
||
use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher};
|
||
use postgres::failed_event::{
|
||
DLQ_MAX_RETRIES, DLQ_POLL_INTERVAL_SECS, PgFailedEventStore,
|
||
};
|
||
use std::sync::Arc;
|
||
|
||
/// Background task: polls `failed_events` and republishes due rows to the event bus.
|
||
/// Runs on a fixed interval for the lifetime of the worker.
|
||
pub async fn run_dlq_processor(
|
||
store: Arc<PgFailedEventStore>,
|
||
publisher: Arc<dyn EventPublisher>,
|
||
) {
|
||
let interval = std::time::Duration::from_secs(DLQ_POLL_INTERVAL_SECS);
|
||
loop {
|
||
tokio::time::sleep(interval).await;
|
||
if let Err(e) = process_due(&store, &*publisher).await {
|
||
tracing::error!("DLQ processor error: {e}");
|
||
}
|
||
}
|
||
}
|
||
|
||
async fn process_due(
|
||
store: &PgFailedEventStore,
|
||
publisher: &dyn EventPublisher,
|
||
) -> Result<(), sqlx::Error> {
|
||
let due = store.poll_due().await?;
|
||
if due.is_empty() {
|
||
return Ok(());
|
||
}
|
||
tracing::info!(count = due.len(), "DLQ: processing due events");
|
||
|
||
for row in due {
|
||
if row.retry_count >= DLQ_MAX_RETRIES {
|
||
tracing::error!(
|
||
id = %row.id,
|
||
event_type = %row.event_type,
|
||
retry_count = row.retry_count,
|
||
"DLQ: event permanently failed — parking",
|
||
);
|
||
store.park_permanently(row.id).await?;
|
||
continue;
|
||
}
|
||
|
||
// Attempt to republish the raw payload as a domain event.
|
||
let republish_result = republish(&row.payload, publisher).await;
|
||
|
||
match republish_result {
|
||
Ok(()) => {
|
||
tracing::info!(id = %row.id, "DLQ: republished successfully");
|
||
store.advance(row.id, None).await?;
|
||
}
|
||
Err(e) => {
|
||
tracing::warn!(id = %row.id, error = %e, "DLQ: republish failed");
|
||
store.advance(row.id, Some(&e.to_string())).await?;
|
||
}
|
||
}
|
||
}
|
||
Ok(())
|
||
}
|
||
|
||
async fn republish(
|
||
payload: &serde_json::Value,
|
||
publisher: &dyn EventPublisher,
|
||
) -> Result<(), DomainError> {
|
||
use event_payload::EventPayload;
|
||
let ep: EventPayload = serde_json::from_value(payload.clone())
|
||
.map_err(|e| DomainError::Internal(format!("DLQ deserialize: {e}")))?;
|
||
let event = DomainEvent::try_from(ep)
|
||
.map_err(|e| DomainError::Internal(format!("DLQ event conversion: {e}")))?;
|
||
publisher.publish(&event).await
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 2: Add `PgFailedEventStore` to `factory.rs`**
|
||
|
||
In `crates/worker/src/factory.rs`, add to the imports:
|
||
|
||
```rust
|
||
use postgres::failed_event::PgFailedEventStore;
|
||
use std::sync::Arc;
|
||
```
|
||
|
||
Then return it from `build` alongside the consumer and handlers. Change the return type to a new struct:
|
||
|
||
```rust
|
||
pub struct WorkerInfra {
|
||
pub consumer: event_transport::EventConsumerAdapter<nats::NatsMessageSource>,
|
||
pub handlers: WorkerHandlers,
|
||
pub dlq_store: Arc<PgFailedEventStore>,
|
||
pub event_publisher: Arc<dyn domain::ports::EventPublisher>,
|
||
}
|
||
```
|
||
|
||
At the end of `build`, construct and return:
|
||
|
||
```rust
|
||
let dlq_store = Arc::new(PgFailedEventStore::new(pool.clone()));
|
||
|
||
// ... existing consumer construction ...
|
||
|
||
WorkerInfra {
|
||
consumer,
|
||
handlers,
|
||
dlq_store,
|
||
event_publisher, // the NATS publisher already constructed in factory
|
||
}
|
||
```
|
||
|
||
Note: the factory currently doesn't return the event publisher. Add an `event_publisher` field to `WorkerInfra` and thread the existing `Arc<dyn EventPublisher>` through (it's used for the ActivityPub handler — reuse the same instance).
|
||
|
||
Read the existing `factory.rs` to see how the NATS publisher is currently constructed and reuse it for both the ActivityPub handler and the returned publisher.
|
||
|
||
- [ ] **Step 3: Update `main.rs` to use the DLQ**
|
||
|
||
In `crates/worker/src/main.rs`, update to use the new `WorkerInfra`:
|
||
|
||
```rust
|
||
mod dlq;
|
||
mod factory;
|
||
mod handlers;
|
||
|
||
use domain::ports::EventConsumer;
|
||
use futures::StreamExt;
|
||
use nats::CONSUMER_MAX_DELIVER;
|
||
|
||
#[tokio::main]
|
||
async fn main() {
|
||
dotenvy::dotenv().ok();
|
||
tracing_subscriber::fmt()
|
||
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
|
||
.init();
|
||
|
||
let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL required");
|
||
let nats_url = std::env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".into());
|
||
let base_url = std::env::var("BASE_URL").expect("BASE_URL required");
|
||
|
||
tracing::info!("Building worker...");
|
||
let infra = factory::build(&database_url, &base_url, &nats_url).await;
|
||
|
||
// Spawn DLQ processor as a background task.
|
||
tokio::spawn(dlq::run_dlq_processor(
|
||
infra.dlq_store.clone(),
|
||
infra.event_publisher.clone(),
|
||
));
|
||
|
||
tracing::info!("Worker started, consuming events...");
|
||
let mut stream = infra.consumer.consume();
|
||
while let Some(result) = stream.next().await {
|
||
match result {
|
||
Ok(envelope) => {
|
||
let event = &envelope.event;
|
||
tracing::debug!(?event, "received event");
|
||
|
||
let n = infra.handlers.notification.handle(event).await;
|
||
let f = infra.handlers.federation.handle(event).await;
|
||
|
||
if n.is_ok() && f.is_ok() {
|
||
(envelope.ack)();
|
||
} else {
|
||
// Log errors.
|
||
if let Err(e) = &n {
|
||
tracing::error!("notification handler: {e}");
|
||
}
|
||
if let Err(e) = &f {
|
||
tracing::error!("federation handler: {e}");
|
||
}
|
||
|
||
// On last delivery attempt — insert to DLQ then ack.
|
||
// On earlier attempts — nack so NATS retries.
|
||
if envelope.delivery_count >= CONSUMER_MAX_DELIVER as u64 {
|
||
let error_msg = n
|
||
.err()
|
||
.or(f.err())
|
||
.map(|e| e.to_string())
|
||
.unwrap_or_else(|| "unknown error".into());
|
||
|
||
let payload = serde_json::to_value(&event_payload::EventPayload::from(event))
|
||
.unwrap_or(serde_json::Value::Null);
|
||
|
||
let event_type = format!("{:?}", event)
|
||
.split_whitespace()
|
||
.next()
|
||
.unwrap_or("Unknown")
|
||
.to_string();
|
||
|
||
if let Err(e) = infra
|
||
.dlq_store
|
||
.insert(&event_type, &payload, &error_msg)
|
||
.await
|
||
{
|
||
tracing::error!("DLQ insert failed: {e} — message lost");
|
||
} else {
|
||
tracing::warn!(
|
||
event_type,
|
||
delivery_count = envelope.delivery_count,
|
||
"event exhausted — moved to DLQ"
|
||
);
|
||
}
|
||
(envelope.ack)(); // ack from NATS — DLQ owns it now
|
||
} else {
|
||
(envelope.nack)(); // let NATS retry
|
||
}
|
||
}
|
||
}
|
||
Err(e) => tracing::error!("consumer error: {e}"),
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
Note: `CONSUMER_MAX_DELIVER` must be exported from `crates/adapters/nats/src/lib.rs`. Add `pub` to that constant in Task 2.
|
||
|
||
- [ ] **Step 4: Export `CONSUMER_MAX_DELIVER` from nats crate**
|
||
|
||
In `crates/adapters/nats/src/lib.rs`, change the constant to `pub`:
|
||
|
||
```rust
|
||
pub const CONSUMER_MAX_DELIVER: i64 = 5;
|
||
```
|
||
|
||
- [ ] **Step 5: Full workspace compile check**
|
||
|
||
```bash
|
||
cargo check --workspace 2>&1 | head -40
|
||
```
|
||
|
||
Fix all errors. Common issues:
|
||
- Missing imports in `main.rs` for `event_payload`
|
||
- `event` variable lifetime in the DLQ insert block — may need to clone `event`
|
||
- `WorkerInfra` construction in `factory.rs` missing fields
|
||
|
||
- [ ] **Step 6: Verify tests still pass**
|
||
|
||
```bash
|
||
cargo test --workspace 2>&1 | tail -5
|
||
```
|
||
|
||
Expected: all tests pass.
|
||
|
||
- [ ] **Step 7: Commit**
|
||
|
||
```bash
|
||
git add crates/worker/src/dlq.rs \
|
||
crates/worker/src/factory.rs \
|
||
crates/worker/src/main.rs \
|
||
crates/adapters/nats/src/lib.rs
|
||
git commit -m "feat(worker): DLQ processor — exhausted events moved to failed_events with exponential retry"
|
||
```
|
||
|
||
---
|
||
|
||
## Self-Review
|
||
|
||
**Spec coverage:**
|
||
|
||
| Spec requirement | Task |
|
||
|---|---|
|
||
| `CONSUMER_MAX_DELIVER = 5` constant | Task 2 |
|
||
| `CONSUMER_ACK_WAIT_SECS = 30` constant | Task 2 |
|
||
| `ACK_TASK_TIMEOUT_SECS = 5` constant | Task 2 |
|
||
| Explicit consumer config (deliver_policy, ack_policy, ack_wait, max_deliver) | Task 2 |
|
||
| Ack task with timeout | Task 2 |
|
||
| Nack task with timeout | Task 2 |
|
||
| Unknown event types acked before drop | Task 3 |
|
||
| `delivery_count` threaded through to worker | Tasks 2, 3 |
|
||
| `failed_events` migration | Task 4 |
|
||
| `PgFailedEventStore` (insert, poll_due, advance, park_permanently) | Task 5 |
|
||
| DLQ processor (poll interval, exponential backoff, park permanently) | Task 6 |
|
||
| Worker inserts to DLQ at `delivery_count >= max_deliver` | Task 6 |
|
||
| `JWT_SECRET_MIN_BYTES = 32` constant | Task 1 |
|
||
| Panic on weak secret at startup | Task 1 |
|
||
| `JWT_TTL_SECS = 86_400` (24h) | Task 1 |
|
||
| Timing equalization on failed login | Task 1 |
|
||
|
||
**No placeholders found.**
|
||
|
||
**Type consistency:** `CONSUMER_MAX_DELIVER: i64` in Task 2; cast to `u64` for comparison with `envelope.delivery_count: u64` in Task 6 (`>= CONSUMER_MAX_DELIVER as u64`). Consistent. `DLQ_MAX_RETRIES: i32` matches `retry_count: i32` in `FailedEvent`. `DLQ_INITIAL_BACKOFF_SECS: i64` used with `chrono::Duration::seconds(i64)`. All consistent.
|