diff --git a/docs/superpowers/plans/2026-05-15-nats-dlq-auth-hardening.md b/docs/superpowers/plans/2026-05-15-nats-dlq-auth-hardening.md new file mode 100644 index 0000000..82136c7 --- /dev/null +++ b/docs/superpowers/plans/2026-05-15-nats-dlq-auth-hardening.md @@ -0,0 +1,836 @@ +# NATS Hardening, Dead-Letter Queue & Auth Security + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Harden the NATS consumer (explicit config, ack timeouts, unknown-event acking), add an automatic-retry dead-letter queue backed by Postgres, and close three auth security gaps (weak secret validation, timing oracle, excessive JWT TTL). + +**Architecture:** Seven sequential tasks. Tasks 1–3 are independent of each other and of 4–7 — they can be reviewed in any order. Tasks 4–7 form a dependency chain: delivery-count metadata (Task 4) → migration (Task 5) → Postgres store (Task 6) → worker DLQ loop (Task 7). + +**Tech Stack:** Rust, Tokio, async-nats 0.48, SQLx, Postgres, argon2, jsonwebtoken. + +--- + +### Task 1: Auth hardening — secret validation, timing equalization, TTL + +**Files:** +- Modify: `crates/bootstrap/src/factory.rs` +- Modify: `crates/application/src/use_cases/auth.rs` +- Modify: `crates/adapters/auth/src/lib.rs` + +- [ ] **Step 1: Replace the magic JWT TTL constant and add secret length check in `factory.rs`** + +In `crates/bootstrap/src/factory.rs`, find the current `const JWT_TTL_SECS: i64 = 86_400 * 30;` at the top of the file. Replace it and add a secret minimum constant, then add validation before the `JwtAuthService` is constructed: + +```rust +const JWT_TTL_SECS: i64 = 86_400; // 24 hours +const JWT_SECRET_MIN_BYTES: usize = 32; // 256 bits minimum for HS256 +``` + +Then, just before `auth: Arc::new(auth::JwtAuthService::new(...))` in the `build` function, add: + +```rust +if cfg.jwt_secret.len() < JWT_SECRET_MIN_BYTES { + panic!( + "JWT_SECRET is {} bytes — minimum is {} bytes for HS256 security", + cfg.jwt_secret.len(), + JWT_SECRET_MIN_BYTES, + ); +} +``` + +- [ ] **Step 2: Update the auth test to use a 32-byte secret** + +In `crates/adapters/auth/src/lib.rs`, find the tests that use `"secret".into()`: + +```rust +// Before (lines ~98, ~107): +let svc = JwtAuthService::new("secret".into(), 3600); + +// After (use 32+ byte secret): +let svc = JwtAuthService::new("a-secret-that-is-at-least-32-bytes".into(), 3600); +``` + +Update both test cases (`generate_and_validate_token` and `invalid_token_returns_unauthorized`). + +- [ ] **Step 3: Add timing equalization to the login use case** + +In `crates/application/src/use_cases/auth.rs`, find the `login` function. Currently it early-returns when the user is not found: + +```rust +let user = users + .find_by_email(&email) + .await? + .ok_or(DomainError::Unauthorized)?; +``` + +Replace with a timing-safe version that runs the hasher even when no user is found: + +```rust +let user = users.find_by_email(&email).await?; +if user.is_none() { + // Timing equalization — prevents email enumeration via response-time oracle. + // Running the hasher on a miss makes "no such user" take the same time as + // "wrong password", so attackers cannot distinguish the two cases. + let _ = hasher.hash(&input.password).await; + return Err(DomainError::Unauthorized); +} +let user = user.unwrap(); +``` + +- [ ] **Step 4: Compile check** + +```bash +cargo check --workspace 2>&1 | head -20 +``` + +Expected: 0 errors. + +- [ ] **Step 5: Commit** + +```bash +git add crates/bootstrap/src/factory.rs \ + crates/application/src/use_cases/auth.rs \ + crates/adapters/auth/src/lib.rs +git commit -m "fix(auth): validate JWT secret length, equalize login timing, reduce TTL to 24h" +``` + +--- + +### Task 2: NATS consumer hardening — explicit config + ack timeouts + +**Files:** +- Modify: `crates/adapters/nats/src/lib.rs` + +- [ ] **Step 1: Add named constants** + +At the top of `crates/adapters/nats/src/lib.rs`, after the existing constants, add: + +```rust +/// Maximum delivery attempts before a message is considered exhausted. +/// The DLQ processor picks it up after this point. +const CONSUMER_MAX_DELIVER: i64 = 5; +/// How long NATS waits for an ack before redelivering. +const CONSUMER_ACK_WAIT_SECS: u64 = 30; +/// Timeout for the spawned ack/nack async task. +const ACK_TASK_TIMEOUT_SECS: u64 = 5; +``` + +- [ ] **Step 2: Replace the pull consumer config** + +Find the `get_or_create_consumer` call (around line 119). Replace `..Default::default()` with explicit settings: + +```rust +let consumer = match stream + .get_or_create_consumer( + CONSUMER_NAME, + jetstream::consumer::pull::Config { + durable_name: Some(CONSUMER_NAME.to_string()), + deliver_policy: jetstream::consumer::DeliverPolicy::New, + ack_policy: jetstream::consumer::AckPolicy::Explicit, + ack_wait: std::time::Duration::from_secs(CONSUMER_ACK_WAIT_SECS), + max_deliver: CONSUMER_MAX_DELIVER, + ..Default::default() + }, + ) + .await +``` + +You will need to add the `DeliverPolicy` and `AckPolicy` imports. Check what's already imported at the top of the file and add if needed: + +```rust +use async_nats::jetstream::consumer::{AckPolicy, DeliverPolicy}; +``` + +- [ ] **Step 3: Add timeout to the ack task** + +Find the `ack:` closure (around line 173). Replace it with a timeout-wrapped version: + +```rust +ack: Box::new(move || { + let m = Arc::clone(&msg); + tokio::spawn(async move { + let result = tokio::time::timeout( + std::time::Duration::from_secs(ACK_TASK_TIMEOUT_SECS), + m.ack(), + ) + .await; + match result { + Ok(Ok(())) => {} + Ok(Err(e)) => tracing::warn!("NATS ack failed: {e}"), + Err(_) => tracing::warn!("NATS ack timed out after {ACK_TASK_TIMEOUT_SECS}s"), + } + }); +}), +``` + +- [ ] **Step 4: Add timeout to the nack task** + +Find the `nack:` closure (around line 181). Same pattern: + +```rust +nack: Box::new(move || { + let m = Arc::clone(&msg_nack); + tokio::spawn(async move { + let result = tokio::time::timeout( + std::time::Duration::from_secs(ACK_TASK_TIMEOUT_SECS), + m.ack_with(AckKind::Nak(None)), + ) + .await; + match result { + Ok(Ok(())) => {} + Ok(Err(e)) => tracing::warn!("NATS nack failed: {e}"), + Err(_) => tracing::warn!("NATS nack timed out after {ACK_TASK_TIMEOUT_SECS}s"), + } + }); +}), +``` + +- [ ] **Step 5: Expose delivery count on RawMessage** + +Find where `RawMessage` is constructed (around line 170). Add the NATS message's delivery count before constructing it. Read the NATS message metadata via `msg.info()`: + +```rust +let delivery_count = msg + .info() + .map(|info| info.delivered) + .unwrap_or(1) as u64; + +let raw = RawMessage { + subject, + payload, + delivery_count, + ack: Box::new(move || { ... }), + nack: Box::new(move || { ... }), +}; +``` + +Note: `msg.info()` returns `Result` where `Info.delivered: u64`. If it's unavailable, default to 1. + +- [ ] **Step 6: Compile check** + +```bash +cargo check -p nats 2>&1 | head -20 +``` + +Expected: compile errors about `RawMessage` missing `delivery_count` — that's fixed in Task 3. + +- [ ] **Step 7: Commit is deferred to after Task 3** (they touch the same type) + +--- + +### Task 3: Add `delivery_count` to `RawMessage` and `EventEnvelope`; ack unknown events + +**Files:** +- Modify: `crates/adapters/event-transport/src/lib.rs` +- Modify: `crates/domain/src/events.rs` + +- [ ] **Step 1: Add `delivery_count` to `RawMessage`** + +In `crates/adapters/event-transport/src/lib.rs`, find the `RawMessage` struct (around line 48). Add the field: + +```rust +pub struct RawMessage { + pub subject: String, + pub payload: Vec, + pub delivery_count: u64, // NEW + pub ack: Box, + pub nack: Box, +} +``` + +- [ ] **Step 2: Add `delivery_count` to `EventEnvelope` in domain** + +In `crates/domain/src/events.rs`, find `EventEnvelope` (around line 83). Add the field: + +```rust +pub struct EventEnvelope { + pub event: DomainEvent, + pub delivery_count: u64, // NEW + pub ack: Box, + pub nack: Box, +} +``` + +Also update the `Debug` impl (which is manual because closures aren't Debug). Find it and add `delivery_count` to the struct debug output: + +```rust +impl std::fmt::Debug for EventEnvelope { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("EventEnvelope") + .field("event", &self.event) + .field("delivery_count", &self.delivery_count) + .finish() + } +} +``` + +- [ ] **Step 3: Pass `delivery_count` through `EventConsumerAdapter`** + +In `crates/adapters/event-transport/src/lib.rs`, find where `EventEnvelope` is constructed in the `consume()` method (around line 97). Update it: + +```rust +Some(Ok(EventEnvelope { + event, + delivery_count: msg.delivery_count, // NEW + ack: msg.ack, + nack: msg.nack, +})) +``` + +- [ ] **Step 4: Ack unknown event types instead of orphaning them** + +In the same `consume()` method, find the `Err(e)` arm for unknown event types (around line 92): + +```rust +Err(e) => { + tracing::warn!("unknown event type: {e}"); + return None; +} +``` + +Replace with an explicit ack before dropping: + +```rust +Err(e) => { + tracing::warn!("unknown or malformed event type — acking to prevent orphan: {e}"); + (msg.ack)(); + return None; +} +``` + +Similarly for the deserialization error arm (around line 85): + +```rust +Err(e) => { + tracing::warn!("failed to deserialize event payload — acking to prevent orphan: {e}"); + (msg.ack)(); + return None; +} +``` + +- [ ] **Step 5: Update test stubs for `RawMessage`** + +In the tests inside `event-transport/src/lib.rs`, `RawMessage` is constructed with `ack` and `nack`. Add `delivery_count: 1` to each: + +```rust +let msg = RawMessage { + subject: "thoughts.created".to_string(), + payload: self.bytes.clone(), + delivery_count: 1, + ack: Box::new(|| {}), + nack: Box::new(|| {}), +}; +``` + +Find all `RawMessage { ... }` constructions in the test module and add `delivery_count: 1`. + +- [ ] **Step 6: Full workspace compile check** + +```bash +cargo check --workspace 2>&1 | head -40 +``` + +Fix any remaining construction sites for `RawMessage` or `EventEnvelope` that are missing `delivery_count`. + +- [ ] **Step 7: Commit Tasks 2 and 3 together** + +```bash +git add crates/adapters/nats/src/lib.rs \ + crates/adapters/event-transport/src/lib.rs \ + crates/domain/src/events.rs +git commit -m "fix(nats): explicit consumer config, ack timeouts, unknown-event acking, delivery_count" +``` + +--- + +### Task 4: `failed_events` migration + +**Files:** +- Create: `crates/adapters/postgres/migrations/009_failed_events.sql` + +- [ ] **Step 1: Create the migration file** + +Create `crates/adapters/postgres/migrations/009_failed_events.sql`: + +```sql +CREATE TABLE failed_events ( + id UUID NOT NULL DEFAULT gen_random_uuid(), + event_type TEXT NOT NULL, + payload JSONB NOT NULL, + failed_at TIMESTAMPTZ NOT NULL DEFAULT now(), + retry_at TIMESTAMPTZ NOT NULL, + retry_count INT NOT NULL DEFAULT 0, + last_error TEXT NOT NULL, + + CONSTRAINT failed_events_pkey PRIMARY KEY (id) +); + +-- Partial index: only rows actively due for retry are in this index. +CREATE INDEX failed_events_due_idx + ON failed_events (retry_at) + WHERE retry_count < 3; +``` + +- [ ] **Step 2: Verify the migration file is syntactically correct** + +```bash +cargo check -p postgres 2>&1 | head -10 +``` + +(The postgres crate auto-discovers migrations via `sqlx::migrate!` — the file just needs to exist with valid SQL. Syntax is validated at runtime in integration tests.) + +- [ ] **Step 3: Commit** + +```bash +git add crates/adapters/postgres/migrations/009_failed_events.sql +git commit -m "feat(db): add failed_events table for dead-letter queue" +``` + +--- + +### Task 5: `PgFailedEventStore` — Postgres DLQ repository + +**Files:** +- Create: `crates/adapters/postgres/src/failed_event.rs` +- Modify: `crates/adapters/postgres/src/lib.rs` + +- [ ] **Step 1: Create `failed_event.rs`** + +Create `crates/adapters/postgres/src/failed_event.rs`: + +```rust +use crate::db_error::IntoDbResult; +use chrono::{DateTime, Utc}; +use sqlx::PgPool; + +/// How many times a failed event is retried by the DLQ processor. +pub const DLQ_MAX_RETRIES: i32 = 3; +/// Quarantine period for the first DLQ retry (seconds). Doubles each retry. +pub const DLQ_INITIAL_BACKOFF_SECS: i64 = 300; // 5 minutes +/// How often the DLQ processor polls for due retries (seconds). +pub const DLQ_POLL_INTERVAL_SECS: u64 = 60; + +#[derive(sqlx::FromRow)] +pub struct FailedEvent { + pub id: uuid::Uuid, + pub event_type: String, + pub payload: serde_json::Value, + pub failed_at: DateTime, + pub retry_at: DateTime, + pub retry_count: i32, + pub last_error: String, +} + +pub struct PgFailedEventStore { + pool: PgPool, +} + +impl PgFailedEventStore { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } + + /// Insert a newly exhausted event into the DLQ. + pub async fn insert( + &self, + event_type: &str, + payload: &serde_json::Value, + last_error: &str, + ) -> Result<(), sqlx::Error> { + let retry_at = Utc::now() + + chrono::Duration::seconds(DLQ_INITIAL_BACKOFF_SECS); + sqlx::query( + "INSERT INTO failed_events \ + (event_type, payload, retry_at, last_error) \ + VALUES ($1, $2, $3, $4)", + ) + .bind(event_type) + .bind(payload) + .bind(retry_at) + .bind(last_error) + .execute(&self.pool) + .await?; + Ok(()) + } + + /// Fetch all events due for retry (retry_at <= now, retry_count < DLQ_MAX_RETRIES). + pub async fn poll_due(&self) -> Result, sqlx::Error> { + sqlx::query_as::<_, FailedEvent>( + "SELECT id, event_type, payload, failed_at, retry_at, retry_count, last_error \ + FROM failed_events \ + WHERE retry_at <= now() AND retry_count < $1 \ + ORDER BY retry_at \ + LIMIT 100", + ) + .bind(DLQ_MAX_RETRIES) + .fetch_all(&self.pool) + .await + } + + /// Advance a row after a republish attempt. + /// Uses exponential backoff: next_retry = now + initial * 2^retry_count. + pub async fn advance( + &self, + id: uuid::Uuid, + error: Option<&str>, + ) -> Result<(), sqlx::Error> { + // Fetch current retry_count to compute backoff. + let current: i32 = sqlx::query_scalar( + "SELECT retry_count FROM failed_events WHERE id = $1", + ) + .bind(id) + .fetch_one(&self.pool) + .await?; + + let new_count = current + 1; + let backoff_secs = DLQ_INITIAL_BACKOFF_SECS * (1_i64 << new_count.min(10)); + let retry_at = Utc::now() + chrono::Duration::seconds(backoff_secs); + let last_error = error.unwrap_or("republish succeeded"); + + sqlx::query( + "UPDATE failed_events \ + SET retry_count = $1, retry_at = $2, last_error = $3 \ + WHERE id = $4", + ) + .bind(new_count) + .bind(retry_at) + .bind(last_error) + .bind(id) + .execute(&self.pool) + .await?; + Ok(()) + } + + /// Park a permanently failed event (retry_count >= DLQ_MAX_RETRIES). + /// Sets retry_at 1 year out so it falls out of the active index. + pub async fn park_permanently(&self, id: uuid::Uuid) -> Result<(), sqlx::Error> { + let far_future = Utc::now() + chrono::Duration::days(365); + sqlx::query( + "UPDATE failed_events SET retry_at = $1 WHERE id = $2", + ) + .bind(far_future) + .bind(id) + .execute(&self.pool) + .await?; + Ok(()) + } +} +``` + +- [ ] **Step 2: Export from `postgres/src/lib.rs`** + +In `crates/adapters/postgres/src/lib.rs`, add: + +```rust +pub mod failed_event; +``` + +- [ ] **Step 3: Compile check** + +```bash +cargo check -p postgres 2>&1 | head -20 +``` + +Expected: 0 errors. + +- [ ] **Step 4: Commit** + +```bash +git add crates/adapters/postgres/src/failed_event.rs \ + crates/adapters/postgres/src/lib.rs +git commit -m "feat(postgres): PgFailedEventStore for dead-letter queue" +``` + +--- + +### Task 6: DLQ processor in worker + +**Files:** +- Create: `crates/worker/src/dlq.rs` +- Modify: `crates/worker/src/factory.rs` +- Modify: `crates/worker/src/main.rs` + +The `delivery_count` on `EventEnvelope` (added in Task 3) tells the worker when a message is on its last attempt. The main loop inserts to the DLQ when handlers fail at `delivery_count >= CONSUMER_MAX_DELIVER`. A separate background task polls the DLQ and republishes due events. + +- [ ] **Step 1: Create `dlq.rs`** + +Create `crates/worker/src/dlq.rs`: + +```rust +use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher}; +use postgres::failed_event::{ + DLQ_MAX_RETRIES, DLQ_POLL_INTERVAL_SECS, PgFailedEventStore, +}; +use std::sync::Arc; + +/// Background task: polls `failed_events` and republishes due rows to the event bus. +/// Runs on a fixed interval for the lifetime of the worker. +pub async fn run_dlq_processor( + store: Arc, + publisher: Arc, +) { + let interval = std::time::Duration::from_secs(DLQ_POLL_INTERVAL_SECS); + loop { + tokio::time::sleep(interval).await; + if let Err(e) = process_due(&store, &*publisher).await { + tracing::error!("DLQ processor error: {e}"); + } + } +} + +async fn process_due( + store: &PgFailedEventStore, + publisher: &dyn EventPublisher, +) -> Result<(), sqlx::Error> { + let due = store.poll_due().await?; + if due.is_empty() { + return Ok(()); + } + tracing::info!(count = due.len(), "DLQ: processing due events"); + + for row in due { + if row.retry_count >= DLQ_MAX_RETRIES { + tracing::error!( + id = %row.id, + event_type = %row.event_type, + retry_count = row.retry_count, + "DLQ: event permanently failed — parking", + ); + store.park_permanently(row.id).await?; + continue; + } + + // Attempt to republish the raw payload as a domain event. + let republish_result = republish(&row.payload, publisher).await; + + match republish_result { + Ok(()) => { + tracing::info!(id = %row.id, "DLQ: republished successfully"); + store.advance(row.id, None).await?; + } + Err(e) => { + tracing::warn!(id = %row.id, error = %e, "DLQ: republish failed"); + store.advance(row.id, Some(&e.to_string())).await?; + } + } + } + Ok(()) +} + +async fn republish( + payload: &serde_json::Value, + publisher: &dyn EventPublisher, +) -> Result<(), DomainError> { + use event_payload::EventPayload; + let ep: EventPayload = serde_json::from_value(payload.clone()) + .map_err(|e| DomainError::Internal(format!("DLQ deserialize: {e}")))?; + let event = DomainEvent::try_from(ep) + .map_err(|e| DomainError::Internal(format!("DLQ event conversion: {e}")))?; + publisher.publish(&event).await +} +``` + +- [ ] **Step 2: Add `PgFailedEventStore` to `factory.rs`** + +In `crates/worker/src/factory.rs`, add to the imports: + +```rust +use postgres::failed_event::PgFailedEventStore; +use std::sync::Arc; +``` + +Then return it from `build` alongside the consumer and handlers. Change the return type to a new struct: + +```rust +pub struct WorkerInfra { + pub consumer: event_transport::EventConsumerAdapter, + pub handlers: WorkerHandlers, + pub dlq_store: Arc, + pub event_publisher: Arc, +} +``` + +At the end of `build`, construct and return: + +```rust +let dlq_store = Arc::new(PgFailedEventStore::new(pool.clone())); + +// ... existing consumer construction ... + +WorkerInfra { + consumer, + handlers, + dlq_store, + event_publisher, // the NATS publisher already constructed in factory +} +``` + +Note: the factory currently doesn't return the event publisher. Add an `event_publisher` field to `WorkerInfra` and thread the existing `Arc` through (it's used for the ActivityPub handler — reuse the same instance). + +Read the existing `factory.rs` to see how the NATS publisher is currently constructed and reuse it for both the ActivityPub handler and the returned publisher. + +- [ ] **Step 3: Update `main.rs` to use the DLQ** + +In `crates/worker/src/main.rs`, update to use the new `WorkerInfra`: + +```rust +mod dlq; +mod factory; +mod handlers; + +use domain::ports::EventConsumer; +use futures::StreamExt; +use nats::CONSUMER_MAX_DELIVER; + +#[tokio::main] +async fn main() { + dotenvy::dotenv().ok(); + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .init(); + + let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL required"); + let nats_url = std::env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".into()); + let base_url = std::env::var("BASE_URL").expect("BASE_URL required"); + + tracing::info!("Building worker..."); + let infra = factory::build(&database_url, &base_url, &nats_url).await; + + // Spawn DLQ processor as a background task. + tokio::spawn(dlq::run_dlq_processor( + infra.dlq_store.clone(), + infra.event_publisher.clone(), + )); + + tracing::info!("Worker started, consuming events..."); + let mut stream = infra.consumer.consume(); + while let Some(result) = stream.next().await { + match result { + Ok(envelope) => { + let event = &envelope.event; + tracing::debug!(?event, "received event"); + + let n = infra.handlers.notification.handle(event).await; + let f = infra.handlers.federation.handle(event).await; + + if n.is_ok() && f.is_ok() { + (envelope.ack)(); + } else { + // Log errors. + if let Err(e) = &n { + tracing::error!("notification handler: {e}"); + } + if let Err(e) = &f { + tracing::error!("federation handler: {e}"); + } + + // On last delivery attempt — insert to DLQ then ack. + // On earlier attempts — nack so NATS retries. + if envelope.delivery_count >= CONSUMER_MAX_DELIVER as u64 { + let error_msg = n + .err() + .or(f.err()) + .map(|e| e.to_string()) + .unwrap_or_else(|| "unknown error".into()); + + let payload = serde_json::to_value(&event_payload::EventPayload::from(event)) + .unwrap_or(serde_json::Value::Null); + + let event_type = format!("{:?}", event) + .split_whitespace() + .next() + .unwrap_or("Unknown") + .to_string(); + + if let Err(e) = infra + .dlq_store + .insert(&event_type, &payload, &error_msg) + .await + { + tracing::error!("DLQ insert failed: {e} — message lost"); + } else { + tracing::warn!( + event_type, + delivery_count = envelope.delivery_count, + "event exhausted — moved to DLQ" + ); + } + (envelope.ack)(); // ack from NATS — DLQ owns it now + } else { + (envelope.nack)(); // let NATS retry + } + } + } + Err(e) => tracing::error!("consumer error: {e}"), + } + } +} +``` + +Note: `CONSUMER_MAX_DELIVER` must be exported from `crates/adapters/nats/src/lib.rs`. Add `pub` to that constant in Task 2. + +- [ ] **Step 4: Export `CONSUMER_MAX_DELIVER` from nats crate** + +In `crates/adapters/nats/src/lib.rs`, change the constant to `pub`: + +```rust +pub const CONSUMER_MAX_DELIVER: i64 = 5; +``` + +- [ ] **Step 5: Full workspace compile check** + +```bash +cargo check --workspace 2>&1 | head -40 +``` + +Fix all errors. Common issues: +- Missing imports in `main.rs` for `event_payload` +- `event` variable lifetime in the DLQ insert block — may need to clone `event` +- `WorkerInfra` construction in `factory.rs` missing fields + +- [ ] **Step 6: Verify tests still pass** + +```bash +cargo test --workspace 2>&1 | tail -5 +``` + +Expected: all tests pass. + +- [ ] **Step 7: Commit** + +```bash +git add crates/worker/src/dlq.rs \ + crates/worker/src/factory.rs \ + crates/worker/src/main.rs \ + crates/adapters/nats/src/lib.rs +git commit -m "feat(worker): DLQ processor — exhausted events moved to failed_events with exponential retry" +``` + +--- + +## Self-Review + +**Spec coverage:** + +| Spec requirement | Task | +|---|---| +| `CONSUMER_MAX_DELIVER = 5` constant | Task 2 | +| `CONSUMER_ACK_WAIT_SECS = 30` constant | Task 2 | +| `ACK_TASK_TIMEOUT_SECS = 5` constant | Task 2 | +| Explicit consumer config (deliver_policy, ack_policy, ack_wait, max_deliver) | Task 2 | +| Ack task with timeout | Task 2 | +| Nack task with timeout | Task 2 | +| Unknown event types acked before drop | Task 3 | +| `delivery_count` threaded through to worker | Tasks 2, 3 | +| `failed_events` migration | Task 4 | +| `PgFailedEventStore` (insert, poll_due, advance, park_permanently) | Task 5 | +| DLQ processor (poll interval, exponential backoff, park permanently) | Task 6 | +| Worker inserts to DLQ at `delivery_count >= max_deliver` | Task 6 | +| `JWT_SECRET_MIN_BYTES = 32` constant | Task 1 | +| Panic on weak secret at startup | Task 1 | +| `JWT_TTL_SECS = 86_400` (24h) | Task 1 | +| Timing equalization on failed login | Task 1 | + +**No placeholders found.** + +**Type consistency:** `CONSUMER_MAX_DELIVER: i64` in Task 2; cast to `u64` for comparison with `envelope.delivery_count: u64` in Task 6 (`>= CONSUMER_MAX_DELIVER as u64`). Consistent. `DLQ_MAX_RETRIES: i32` matches `retry_count: i32` in `FailedEvent`. `DLQ_INITIAL_BACKOFF_SECS: i64` used with `chrono::Duration::seconds(i64)`. All consistent.