6.6 KiB
NATS Hardening, Dead-Letter Queue & Auth Security Design
Goal: Fix five reliability issues in the NATS adapter, introduce an automatic-retry dead-letter queue backed by Postgres, and close three auth security gaps.
Section 1: NATS Consumer Hardening
Consumer configuration
All magic numbers become named constants in crates/adapters/nats/src/lib.rs:
const CONSUMER_MAX_DELIVER: i64 = 5;
const CONSUMER_ACK_WAIT_SECS: u64 = 30;
const ACK_TASK_TIMEOUT_SECS: u64 = 5;
The pull consumer config changes from ..Default::default() to explicit settings:
pull::Config {
durable_name: Some(CONSUMER_NAME.to_string()),
deliver_policy: DeliverPolicy::New,
ack_policy: AckPolicy::Explicit,
ack_wait: Duration::from_secs(CONSUMER_ACK_WAIT_SECS),
max_deliver: CONSUMER_MAX_DELIVER,
..Default::default()
}
DeliverPolicy::New— worker restarts from the current position, not from the beginning of the streamAckPolicy::Explicit— explicit (already the default, but now documented)ack_wait— if the worker hangs for 30s without acking, NATS redeliversmax_deliver— after 5 failed deliveries the message is exhausted; the DLQ picks it up
Ack task timeout
Spawned ack/nack tasks currently have no timeout. If NATS is stuck, they hang forever. Wrap with tokio::time::timeout:
ack: Box::new(move || {
let m = Arc::clone(&msg);
tokio::spawn(async move {
let result = tokio::time::timeout(
Duration::from_secs(ACK_TASK_TIMEOUT_SECS),
m.ack(),
).await;
match result {
Ok(Ok(())) => {}
Ok(Err(e)) => tracing::warn!("NATS ack failed: {e}"),
Err(_) => tracing::warn!("NATS ack timed out"),
}
});
}),
Same pattern for nack.
Unknown event type acking
Currently unknown event types are silently dropped via filter_map and never acked — they orphan in the stream until max_deliver is exceeded. Fix: ack unknown messages explicitly before discarding:
In event-transport/src/lib.rs, when deserialization fails, ack the raw NATS message before returning None:
Err(e) => {
tracing::warn!("unknown or malformed event, acking to prevent orphan: {e}");
// ack the message so it doesn't loop forever
msg.ack();
None
}
Section 2: Dead-Letter Queue
Schema
New migration 009_failed_events.sql:
CREATE TABLE failed_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
event_type TEXT NOT NULL,
payload JSONB NOT NULL,
failed_at TIMESTAMPTZ NOT NULL DEFAULT now(),
retry_at TIMESTAMPTZ NOT NULL,
retry_count INT NOT NULL DEFAULT 0,
last_error TEXT NOT NULL
);
CREATE INDEX failed_events_retry_at_idx ON failed_events (retry_at)
WHERE retry_count < 3;
Constants
In crates/adapters/nats/src/lib.rs (or a new dlq.rs module):
const DLQ_INITIAL_BACKOFF_SECS: u64 = 300; // 5 minutes
const DLQ_MAX_RETRIES: i32 = 3;
const DLQ_POLL_INTERVAL_SECS: u64 = 60; // check every minute
Worker flow
On exhausted message (detected when num_delivered >= CONSUMER_MAX_DELIVER):
- Worker inserts row to
failed_eventswithretry_at = now() + DLQ_INITIAL_BACKOFF_SECS - Worker acks the NATS message (removes it from the stream)
- Message will be retried by the DLQ processor, not by NATS
DlqProcessor — runs in the worker on a DLQ_POLL_INTERVAL_SECS tick:
- Query:
SELECT * FROM failed_events WHERE retry_at <= now() AND retry_count < DLQ_MAX_RETRIES - For each row: republish the
payloadJSONB to the NATSthoughts-eventsmain stream - Update row:
retry_count += 1,retry_at = now() + DLQ_INITIAL_BACKOFF_SECS * 2^retry_count(exponential backoff: 5m, 10m, 20m) - If republish fails: update
last_error, leave row for next poll - When the republished message is processed successfully by the worker, the event handler completes normally — the
failed_eventsrow is deleted on success (see below)
On DLQ retry success detection: After republishing, the DLQ processor subscribes to the ack signal OR the processor marks rows as retry_count = DLQ_MAX_RETRIES optimistically and lets the event handler delete the row if the event type matches. Simpler: the DLQ row is deleted when retry_count reaches the threshold and the message is republished for the final time. If the final attempt also fails, it stays in the table as a permanently failed record with retry_count = DLQ_MAX_RETRIES for manual inspection.
Permanently failed messages
Rows with retry_count >= DLQ_MAX_RETRIES AND retry_at <= now() are permanently failed. The DLQ processor:
- Logs them at
ERRORlevel with full payload - Sets
retry_at = now() + 365 days(parking them out of the active query range) - Does NOT delete them — they remain visible for manual inspection
A future admin endpoint can query and replay them, but that is out of scope for this spec.
Section 3: Auth Hardening
JWT secret validation
In crates/bootstrap/src/factory.rs, before constructing JwtAuthService:
const JWT_SECRET_MIN_BYTES: usize = 32;
if cfg.jwt_secret.len() < JWT_SECRET_MIN_BYTES {
panic!(
"JWT_SECRET is too short ({} bytes). \
Minimum is {} bytes for HS256 security.",
cfg.jwt_secret.len(),
JWT_SECRET_MIN_BYTES
);
}
Startup panics are appropriate here — running with a weak secret is a security failure.
Timing equalization on failed login
In crates/application/src/use_cases/auth.rs, in the login function, when the user is not found by email:
fn dummy_hash() -> argon2::PasswordHash<'static> {
// Pre-computed Argon2 hash of empty string. Used only to equalize timing
// on failed lookups so attackers cannot enumerate valid emails.
argon2::PasswordHash::new(
"$argon2id$v=19$m=19456,t=2,p=1$\
AAAAAAAAAAAAAAAAAAAAAA$\
AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
).expect("dummy hash is valid")
}
// In login():
if user.is_none() {
let _ = Argon2::default().verify_password(plain.as_bytes(), &dummy_hash());
return Err(DomainError::Unauthorized);
}
JWT TTL reduction
In crates/bootstrap/src/factory.rs, the existing JWT_TTL_SECS constant:
const JWT_TTL_SECS: i64 = 86_400; // 24 hours (was 30 days)
What does NOT change
- NATS subject naming (
thoughts-events.>) — unchanged MAX_MESSAGESstream limit (100k) — unchanged; monitoring is out of scope- API surface, domain events, application layer — unchanged
- Auth extractor, claims structure (
sub,exp) — unchanged