From e278e4e2ccc7a586b308336ff57064af138fcfe3 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Fri, 15 May 2026 16:10:11 +0200 Subject: [PATCH] docs: NATS hardening, DLQ, and auth security design spec --- ...26-05-15-nats-dlq-auth-hardening-design.md | 193 ++++++++++++++++++ 1 file changed, 193 insertions(+) create mode 100644 docs/superpowers/specs/2026-05-15-nats-dlq-auth-hardening-design.md diff --git a/docs/superpowers/specs/2026-05-15-nats-dlq-auth-hardening-design.md b/docs/superpowers/specs/2026-05-15-nats-dlq-auth-hardening-design.md new file mode 100644 index 0000000..753673b --- /dev/null +++ b/docs/superpowers/specs/2026-05-15-nats-dlq-auth-hardening-design.md @@ -0,0 +1,193 @@ +# NATS Hardening, Dead-Letter Queue & Auth Security Design + +**Goal:** Fix five reliability issues in the NATS adapter, introduce an automatic-retry dead-letter queue backed by Postgres, and close three auth security gaps. + +--- + +## Section 1: NATS Consumer Hardening + +### Consumer configuration + +All magic numbers become named constants in `crates/adapters/nats/src/lib.rs`: + +```rust +const CONSUMER_MAX_DELIVER: i64 = 5; +const CONSUMER_ACK_WAIT_SECS: u64 = 30; +const ACK_TASK_TIMEOUT_SECS: u64 = 5; +``` + +The pull consumer config changes from `..Default::default()` to explicit settings: + +```rust +pull::Config { + durable_name: Some(CONSUMER_NAME.to_string()), + deliver_policy: DeliverPolicy::New, + ack_policy: AckPolicy::Explicit, + ack_wait: Duration::from_secs(CONSUMER_ACK_WAIT_SECS), + max_deliver: CONSUMER_MAX_DELIVER, + ..Default::default() +} +``` + +- `DeliverPolicy::New` — worker restarts from the current position, not from the beginning of the stream +- `AckPolicy::Explicit` — explicit (already the default, but now documented) +- `ack_wait` — if the worker hangs for 30s without acking, NATS redelivers +- `max_deliver` — after 5 failed deliveries the message is exhausted; the DLQ picks it up + +### Ack task timeout + +Spawned ack/nack tasks currently have no timeout. If NATS is stuck, they hang forever. Wrap with `tokio::time::timeout`: + +```rust +ack: Box::new(move || { + let m = Arc::clone(&msg); + tokio::spawn(async move { + let result = tokio::time::timeout( + Duration::from_secs(ACK_TASK_TIMEOUT_SECS), + m.ack(), + ).await; + match result { + Ok(Ok(())) => {} + Ok(Err(e)) => tracing::warn!("NATS ack failed: {e}"), + Err(_) => tracing::warn!("NATS ack timed out"), + } + }); +}), +``` + +Same pattern for nack. + +### Unknown event type acking + +Currently unknown event types are silently dropped via `filter_map` and never acked — they orphan in the stream until `max_deliver` is exceeded. Fix: ack unknown messages explicitly before discarding: + +In `event-transport/src/lib.rs`, when deserialization fails, ack the raw NATS message before returning `None`: + +```rust +Err(e) => { + tracing::warn!("unknown or malformed event, acking to prevent orphan: {e}"); + // ack the message so it doesn't loop forever + msg.ack(); + None +} +``` + +--- + +## Section 2: Dead-Letter Queue + +### Schema + +New migration `009_failed_events.sql`: + +```sql +CREATE TABLE failed_events ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + event_type TEXT NOT NULL, + payload JSONB NOT NULL, + failed_at TIMESTAMPTZ NOT NULL DEFAULT now(), + retry_at TIMESTAMPTZ NOT NULL, + retry_count INT NOT NULL DEFAULT 0, + last_error TEXT NOT NULL +); + +CREATE INDEX failed_events_retry_at_idx ON failed_events (retry_at) + WHERE retry_count < 3; +``` + +### Constants + +In `crates/adapters/nats/src/lib.rs` (or a new `dlq.rs` module): + +```rust +const DLQ_INITIAL_BACKOFF_SECS: u64 = 300; // 5 minutes +const DLQ_MAX_RETRIES: i32 = 3; +const DLQ_POLL_INTERVAL_SECS: u64 = 60; // check every minute +``` + +### Worker flow + +**On exhausted message** (detected when `num_delivered >= CONSUMER_MAX_DELIVER`): +1. Worker inserts row to `failed_events` with `retry_at = now() + DLQ_INITIAL_BACKOFF_SECS` +2. Worker **acks** the NATS message (removes it from the stream) +3. Message will be retried by the DLQ processor, not by NATS + +**DlqProcessor** — runs in the worker on a `DLQ_POLL_INTERVAL_SECS` tick: +1. Query: `SELECT * FROM failed_events WHERE retry_at <= now() AND retry_count < DLQ_MAX_RETRIES` +2. For each row: republish the `payload` JSONB to the NATS `thoughts-events` main stream +3. Update row: `retry_count += 1`, `retry_at = now() + DLQ_INITIAL_BACKOFF_SECS * 2^retry_count` (exponential backoff: 5m, 10m, 20m) +4. If republish fails: update `last_error`, leave row for next poll +5. When the republished message is processed successfully by the worker, the event handler completes normally — the `failed_events` row is deleted on success (see below) + +**On DLQ retry success detection**: After republishing, the DLQ processor subscribes to the ack signal OR the processor marks rows as `retry_count = DLQ_MAX_RETRIES` optimistically and lets the event handler delete the row if the event type matches. Simpler: the DLQ row is deleted when `retry_count` reaches the threshold and the message is republished for the final time. If the final attempt also fails, it stays in the table as a permanently failed record with `retry_count = DLQ_MAX_RETRIES` for manual inspection. + +### Permanently failed messages + +Rows with `retry_count >= DLQ_MAX_RETRIES AND retry_at <= now()` are permanently failed. The DLQ processor: +- Logs them at `ERROR` level with full payload +- Sets `retry_at = now() + 365 days` (parking them out of the active query range) +- Does NOT delete them — they remain visible for manual inspection + +A future admin endpoint can query and replay them, but that is out of scope for this spec. + +--- + +## Section 3: Auth Hardening + +### JWT secret validation + +In `crates/bootstrap/src/factory.rs`, before constructing `JwtAuthService`: + +```rust +const JWT_SECRET_MIN_BYTES: usize = 32; + +if cfg.jwt_secret.len() < JWT_SECRET_MIN_BYTES { + panic!( + "JWT_SECRET is too short ({} bytes). \ + Minimum is {} bytes for HS256 security.", + cfg.jwt_secret.len(), + JWT_SECRET_MIN_BYTES + ); +} +``` + +Startup panics are appropriate here — running with a weak secret is a security failure. + +### Timing equalization on failed login + +In `crates/application/src/use_cases/auth.rs`, in the `login` function, when the user is not found by email: + +```rust +fn dummy_hash() -> argon2::PasswordHash<'static> { + // Pre-computed Argon2 hash of empty string. Used only to equalize timing + // on failed lookups so attackers cannot enumerate valid emails. + argon2::PasswordHash::new( + "$argon2id$v=19$m=19456,t=2,p=1$\ + AAAAAAAAAAAAAAAAAAAAAA$\ + AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" + ).expect("dummy hash is valid") +} + +// In login(): +if user.is_none() { + let _ = Argon2::default().verify_password(plain.as_bytes(), &dummy_hash()); + return Err(DomainError::Unauthorized); +} +``` + +### JWT TTL reduction + +In `crates/bootstrap/src/factory.rs`, the existing `JWT_TTL_SECS` constant: + +```rust +const JWT_TTL_SECS: i64 = 86_400; // 24 hours (was 30 days) +``` + +--- + +## What does NOT change + +- NATS subject naming (`thoughts-events.>`) — unchanged +- `MAX_MESSAGES` stream limit (100k) — unchanged; monitoring is out of scope +- API surface, domain events, application layer — unchanged +- Auth extractor, claims structure (`sub`, `exp`) — unchanged