# NATS Hardening, Dead-Letter Queue & Auth Security Design **Goal:** Fix five reliability issues in the NATS adapter, introduce an automatic-retry dead-letter queue backed by Postgres, and close three auth security gaps. --- ## Section 1: NATS Consumer Hardening ### Consumer configuration All magic numbers become named constants in `crates/adapters/nats/src/lib.rs`: ```rust const CONSUMER_MAX_DELIVER: i64 = 5; const CONSUMER_ACK_WAIT_SECS: u64 = 30; const ACK_TASK_TIMEOUT_SECS: u64 = 5; ``` The pull consumer config changes from `..Default::default()` to explicit settings: ```rust pull::Config { durable_name: Some(CONSUMER_NAME.to_string()), deliver_policy: DeliverPolicy::New, ack_policy: AckPolicy::Explicit, ack_wait: Duration::from_secs(CONSUMER_ACK_WAIT_SECS), max_deliver: CONSUMER_MAX_DELIVER, ..Default::default() } ``` - `DeliverPolicy::New` — worker restarts from the current position, not from the beginning of the stream - `AckPolicy::Explicit` — explicit (already the default, but now documented) - `ack_wait` — if the worker hangs for 30s without acking, NATS redelivers - `max_deliver` — after 5 failed deliveries the message is exhausted; the DLQ picks it up ### Ack task timeout Spawned ack/nack tasks currently have no timeout. If NATS is stuck, they hang forever. Wrap with `tokio::time::timeout`: ```rust ack: Box::new(move || { let m = Arc::clone(&msg); tokio::spawn(async move { let result = tokio::time::timeout( Duration::from_secs(ACK_TASK_TIMEOUT_SECS), m.ack(), ).await; match result { Ok(Ok(())) => {} Ok(Err(e)) => tracing::warn!("NATS ack failed: {e}"), Err(_) => tracing::warn!("NATS ack timed out"), } }); }), ``` Same pattern for nack. ### Unknown event type acking Currently unknown event types are silently dropped via `filter_map` and never acked — they orphan in the stream until `max_deliver` is exceeded. Fix: ack unknown messages explicitly before discarding: In `event-transport/src/lib.rs`, when deserialization fails, ack the raw NATS message before returning `None`: ```rust Err(e) => { tracing::warn!("unknown or malformed event, acking to prevent orphan: {e}"); // ack the message so it doesn't loop forever msg.ack(); None } ``` --- ## Section 2: Dead-Letter Queue ### Schema New migration `009_failed_events.sql`: ```sql CREATE TABLE failed_events ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), event_type TEXT NOT NULL, payload JSONB NOT NULL, failed_at TIMESTAMPTZ NOT NULL DEFAULT now(), retry_at TIMESTAMPTZ NOT NULL, retry_count INT NOT NULL DEFAULT 0, last_error TEXT NOT NULL ); CREATE INDEX failed_events_retry_at_idx ON failed_events (retry_at) WHERE retry_count < 3; ``` ### Constants In `crates/adapters/nats/src/lib.rs` (or a new `dlq.rs` module): ```rust const DLQ_INITIAL_BACKOFF_SECS: u64 = 300; // 5 minutes const DLQ_MAX_RETRIES: i32 = 3; const DLQ_POLL_INTERVAL_SECS: u64 = 60; // check every minute ``` ### Worker flow **On exhausted message** (detected when `num_delivered >= CONSUMER_MAX_DELIVER`): 1. Worker inserts row to `failed_events` with `retry_at = now() + DLQ_INITIAL_BACKOFF_SECS` 2. Worker **acks** the NATS message (removes it from the stream) 3. Message will be retried by the DLQ processor, not by NATS **DlqProcessor** — runs in the worker on a `DLQ_POLL_INTERVAL_SECS` tick: 1. Query: `SELECT * FROM failed_events WHERE retry_at <= now() AND retry_count < DLQ_MAX_RETRIES` 2. For each row: republish the `payload` JSONB to the NATS `thoughts-events` main stream 3. Update row: `retry_count += 1`, `retry_at = now() + DLQ_INITIAL_BACKOFF_SECS * 2^retry_count` (exponential backoff: 5m, 10m, 20m) 4. If republish fails: update `last_error`, leave row for next poll 5. When the republished message is processed successfully by the worker, the event handler completes normally — the `failed_events` row is deleted on success (see below) **On DLQ retry success detection**: After republishing, the DLQ processor subscribes to the ack signal OR the processor marks rows as `retry_count = DLQ_MAX_RETRIES` optimistically and lets the event handler delete the row if the event type matches. Simpler: the DLQ row is deleted when `retry_count` reaches the threshold and the message is republished for the final time. If the final attempt also fails, it stays in the table as a permanently failed record with `retry_count = DLQ_MAX_RETRIES` for manual inspection. ### Permanently failed messages Rows with `retry_count >= DLQ_MAX_RETRIES AND retry_at <= now()` are permanently failed. The DLQ processor: - Logs them at `ERROR` level with full payload - Sets `retry_at = now() + 365 days` (parking them out of the active query range) - Does NOT delete them — they remain visible for manual inspection A future admin endpoint can query and replay them, but that is out of scope for this spec. --- ## Section 3: Auth Hardening ### JWT secret validation In `crates/bootstrap/src/factory.rs`, before constructing `JwtAuthService`: ```rust const JWT_SECRET_MIN_BYTES: usize = 32; if cfg.jwt_secret.len() < JWT_SECRET_MIN_BYTES { panic!( "JWT_SECRET is too short ({} bytes). \ Minimum is {} bytes for HS256 security.", cfg.jwt_secret.len(), JWT_SECRET_MIN_BYTES ); } ``` Startup panics are appropriate here — running with a weak secret is a security failure. ### Timing equalization on failed login In `crates/application/src/use_cases/auth.rs`, in the `login` function, when the user is not found by email: ```rust fn dummy_hash() -> argon2::PasswordHash<'static> { // Pre-computed Argon2 hash of empty string. Used only to equalize timing // on failed lookups so attackers cannot enumerate valid emails. argon2::PasswordHash::new( "$argon2id$v=19$m=19456,t=2,p=1$\ AAAAAAAAAAAAAAAAAAAAAA$\ AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" ).expect("dummy hash is valid") } // In login(): if user.is_none() { let _ = Argon2::default().verify_password(plain.as_bytes(), &dummy_hash()); return Err(DomainError::Unauthorized); } ``` ### JWT TTL reduction In `crates/bootstrap/src/factory.rs`, the existing `JWT_TTL_SECS` constant: ```rust const JWT_TTL_SECS: i64 = 86_400; // 24 hours (was 30 days) ``` --- ## What does NOT change - NATS subject naming (`thoughts-events.>`) — unchanged - `MAX_MESSAGES` stream limit (100k) — unchanged; monitoring is out of scope - API surface, domain events, application layer — unchanged - Auth extractor, claims structure (`sub`, `exp`) — unchanged